[jira] [Commented] (FLINK-32807) when i use emitUpdateWithRetract of udtagg,bug error

2023-11-08 Thread yong yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784307#comment-17784307
 ] 

yong yang commented on FLINK-32807:
---

yes it is duplicated

> when i use emitUpdateWithRetract of udtagg,bug error
> 
>
> Key: FLINK-32807
> URL: https://issues.apache.org/jira/browse/FLINK-32807
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala, Table SQL / Planner
>Affects Versions: 1.17.1
> Environment: http://maven.apache.org/POM/4.0.0"; 
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
> 4.0.0
> org.example
> FlinkLocalDemo
> 1.0-SNAPSHOT
> jar
> FlinkLocalDemo
> http://maven.apache.org
> 
> UTF-8
> 1.17.1
> 2.12
> 2.12.8
> 
> 
> 
> 
> com.chuusai
> shapeless_${scala.binary.version}
> 2.3.10
> 
> 
> 
> joda-time
> joda-time
> 2.12.5
> 
> 
> org.apache.flink
> flink-avro
> ${flink.version}
> 
> 
> org.apache.flink
> flink-runtime-web
> ${flink.version}
> 
> 
> 
> com.alibaba.fastjson2
> fastjson2
> 2.0.33
> 
> 
> 
> com.alibaba
> fastjson
> 1.2.83
> 
> 
> 
> junit
> junit
> 3.8.1
> test
> 
> 
> 
> org.apache.flink
> flink-table-common
> ${flink.version}
> 
> 
> 
> org.apache.flink
> flink-connector-kafka
> ${flink.version}
> provided
> 
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
> 
> org.apache.flink
> flink-scala_${scala.binary.version}
> ${flink.version}
> provided
> 
> 
> org.apache.flink
> flink-streaming-scala_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-csv
> ${flink.version}
> 
> 
> 
> org.apache.flink
> flink-table-api-java-bridge
> ${flink.version}
> 
> 
> 
> 
> org.apache.flink
> flink-table-api-scala-bridge_${scala.binary.version}
> ${flink.version}
> 
> 
> 
> org.apache.flink
> flink-table-planner-loader
> ${flink.version}
> 
> 
> 
> 
> org.apache.flink
> flink-table-runtime
> ${flink.version}
> provided
> 
> 
> 
> org.apache.flink
> flink-connector-files
> ${flink.version}
> 
> 
> 
> 
> 
> 
> 
> 
> 
> org.apache.flink
> flink-clients
> ${flink.version}
> 
> 
> org.apache.flink
> flink-connector-jdbc
> 3.1.0-1.17
> provided
> 
> 
> mysql
> mysql-connector-java
> 8.0.11
> 
> 
> 
> 
> 
> 
> org.apache.maven.plugins
> maven-shade-plugin
> 2.4.3
> 
> 
> package
> 
> shade
> 
> 
> 
> 
> *:*
> 
> META-INF/*.SF
> META-INF/*.DSA
> META-INF/*.RSA
> 
> 
> 
> 
> 
> 
> 
> 
> org.scala-tools
> maven-scala-plugin
> 2.15.2
> 
> 
> 
> compile
> testCompile
> 
> 
> 
> 
> 
> net.alchim31.maven
> scala-maven-plugin
> 3.2.2
> 
> 
> scala-compile-first
> process-resources
> 
> add-source
> compile
> 
> 
> 
> 
> ${scala.version}
> 
> 
> 
> org.apache.maven.plugins
> maven-assembly-plugin
> 2.5.5
> 
> 
> 
> 
> 
> 
> 
> 
> jar-with-dependencies
> 
> 
> 
> 
> org.apache.maven.plugins
> maven-compiler-plugin
> 3.1
> 
> 11
> 11
> 
> 
> 
> 
> 
>Reporter: yong yang
>Priority: Major
> Attachments: Top2WithRetract.scala, UdtaggDemo3.scala
>
>
> 参考: 
> [https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/functions/udfs/#retraction-example]
> 我的代码:
> [^Top2WithRetract.scala]
>  
> bug show error:
> {code:java}
> //代码占位符
> /Users/thomas990p/Library/Java/JavaVirtualMachines/corretto-11.0.20/Contents/Home/bin/java
>  -javaagent:/Users/thomas990p/Library/Application 
> Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/231.9161.38/IntelliJ 
> IDEA.app/Contents/lib/idea_rt.jar=56941:/Users/thomas990p/Library/Application 
> Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/231.9161.38/IntelliJ 
> IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath 
> /Users/thomas990p/IdeaProjects/FlinkLocalDemo/target/classes:/Users/thomas990p/.m2/repository/com/chuusai/shapeless_2.12/2.3.10/shapeless_2.12-2.3.10.jar:/Users/thomas990p/.m2/repository/org/scala-lang/scala-library/2.12.15/scala-library-2.12.15.jar:/Users/thomas990p/.m2/repository/joda-time/joda-time/2.12.5/joda-time-2.12.5.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-avro/1.17.1/flink-avro-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/avro/avro/1.11.1/avro-1.11.1.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.12.7/jackson-core-2.12.7.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.12.7/jackson-databind-2.12.7.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.12.7/jackson-annotations-2.12.7.jar:/Users/thomas990p/.m2/repository/org/apache/commons/commons-compress/1.21/commons-compress-1.21.jar:/Users/thomas990p/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-runtime-web/1.17.1/flink-runtime-web-1.17.1.jar:/Users/thomas990p/.m2/repository/org/

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-08 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1383243917


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##
@@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy(
 SlotManagerUtils.generateDefaultSlotResourceProfile(
 totalResourceProfile, numSlotsPerWorker);
 this.availableResourceMatchingStrategy =
-evenlySpreadOutSlots
+taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS

Review Comment:
   What about tasks?



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##
@@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup {
 
 private final Set executionVertexIds;
 
-private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+private @Nonnull SlotSharingGroup slotSharingGroup;
 
+/** @deprecated Only for test classes. */
+@Deprecated

Review Comment:
   `@VisibleForTesting`



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java:
##
@@ -260,10 +261,10 @@ public static SlotManagerConfiguration fromConfiguration(
 configuration.getBoolean(
 
ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED);
 
-boolean evenlySpreadOutSlots =
-
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+TaskManagerLoadBalanceMode taskManagerLoadBalanceMode =
+
TaskManagerLoadBalanceMode.loadFromConfiguration(configuration);
 final SlotMatchingStrategy slotMatchingStrategy =
-evenlySpreadOutSlots
+taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS

Review Comment:
   ditto.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java:
##
@@ -42,69 +43,23 @@
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This strategy tries to reduce remote data exchanges. Execution vertices, 
which are connected and
  * belong to the same SlotSharingGroup, tend to be put in the same 
ExecutionSlotSharingGroup.
  * Co-location constraints will be respected.
  */
-class LocalInputPreferredSlotSharingStrategy
+class LocalInputPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy
 implements SlotSharingStrategy, SchedulingTopologyListener {

Review Comment:
   No need to implement them again.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java:
##
@@ -42,69 +43,23 @@
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This strategy tries to reduce remote data exchanges. Execution vertices, 
which are connected and
  * belong to the same SlotSharingGroup, tend to be put in the same 
ExecutionSlotSharingGroup.
  * Co-location constraints will be respected.
  */
-class LocalInputPreferredSlotSharingStrategy
+class LocalInputPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy
 implements SlotSharingStrategy, SchedulingTopologyListener {
 
-private final Map 
executionSlotSharingGroupMap;
-
-private final Set logicalSlotSharingGroups;
-
-private final Set coLocationGroups;
+public static final Logger LOG =
+
LoggerFactory.getLogger(LocalInputPreferredSlotSharingStrategy.class);

Review Comment:
   Where do we use it?



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java:
##
@@ -46,6 +47,12 @@ public class SlotSharingGroup implements 
java.io.Serializable {
 
 // 

 
+public SlotSharingGroup() {}
+
+public SlotSharingGroup(ResourceProfile resourceProfile) {

Review Comment:
   Seems we don't need to do that. Just instantiate a SlotSharingGroup and set 
the resource profile.



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java:
##
@@ -46,6 +47,12 @@ public class SlotSharingGroup implements 
java.io.Serializable {
 
 // 

 
+public SlotSharingGroup() {}
+
+public SlotSharingGroup(ResourceProfile resourceProfile) {

Review Comment:
   Only visible for testing?



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##
@@ -31,22 +34,36 @@ class ExecutionSlotSh

Re: [PR] [FLINK-33023][table-planner][JUnit5 Migration] Module: flink-table-planner (TableTestBase) [flink]

2023-11-08 Thread via GitHub


xuyangzhong commented on PR #23349:
URL: https://github.com/apache/flink/pull/23349#issuecomment-1803283116

   LGTM. cc @leonardBang for the final check.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-6755][CLI] Support manual checkpoints triggering [flink]

2023-11-08 Thread via GitHub


Zakelly commented on code in PR #23679:
URL: https://github.com/apache/flink/pull/23679#discussion_r1387577165


##
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java:
##
@@ -842,6 +844,85 @@ private void disposeSavepoint(
 logAndSysout("Savepoint '" + savepointPath + "' disposed.");
 }
 
+/**
+ * Executes the CHECKPOINT action.
+ *
+ * @param args Command line arguments for the checkpoint action.
+ */
+protected void checkpoint(String[] args) throws Exception {
+LOG.info("Running 'checkpoint' command.");
+
+final Options commandOptions = 
CliFrontendParser.getCheckpointCommandOptions();
+
+final CommandLine commandLine = getCommandLine(commandOptions, args, 
false);
+
+final CheckpointOptions checkpointOptions = new 
CheckpointOptions(commandLine);
+
+// evaluate help flag
+if (checkpointOptions.isPrintHelp()) {
+CliFrontendParser.printHelpForCheckpoint(customCommandLines);
+return;
+}
+
+final CustomCommandLine activeCommandLine = 
validateAndGetActiveCommandLine(commandLine);
+
+String[] cleanedArgs = checkpointOptions.getArgs();
+
+final JobID jobId;
+
+if (cleanedArgs.length >= 1) {
+String jobIdString = cleanedArgs[0];
+
+jobId = parseJobId(jobIdString);
+} else {
+throw new CliArgsException(
+"Missing JobID. " + "Specify a Job ID to manipulate a 
checkpoint.");
+}
+runClusterAction(
+activeCommandLine,
+commandLine,
+(clusterClient, effectiveConfiguration) ->
+triggerCheckpoint(
+clusterClient,
+jobId,
+checkpointOptions.getCheckpointType(),
+getClientTimeout(effectiveConfiguration)));
+}
+
+/** Sends a CheckpointTriggerMessage to the job manager. */
+private void triggerCheckpoint(
+ClusterClient clusterClient,
+JobID jobId,
+CheckpointType checkpointType,
+Duration clientTimeout)
+throws FlinkException {
+logAndSysout("Triggering checkpoint for job " + jobId + '.');
+
+CompletableFuture checkpointFuture =
+clusterClient.triggerCheckpoint(jobId, checkpointType);
+
+logAndSysout("Waiting for response...");
+
+try {
+final long checkpointId =
+checkpointFuture.get(clientTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
+
+logAndSysout(
+"Checkpoint("
++ checkpointType
++ ") "
++ checkpointId
++ " for job "
++ jobId
++ " completed.");
+logAndSysout("You can resume your program from this checkpoint 
with the run command.");
+} catch (Exception e) {
+Throwable cause = ExceptionUtils.stripExecutionException(e);
+throw new FlinkException(
+"Triggering a checkpoint for the job " + jobId + " 
failed.", cause);

Review Comment:
   👍  I also changed the same message in `savepoint` command.



##
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java:
##
@@ -842,6 +844,85 @@ private void disposeSavepoint(
 logAndSysout("Savepoint '" + savepointPath + "' disposed.");
 }
 
+/**
+ * Executes the CHECKPOINT action.
+ *
+ * @param args Command line arguments for the checkpoint action.
+ */
+protected void checkpoint(String[] args) throws Exception {
+LOG.info("Running 'checkpoint' command.");
+
+final Options commandOptions = 
CliFrontendParser.getCheckpointCommandOptions();
+
+final CommandLine commandLine = getCommandLine(commandOptions, args, 
false);
+
+final CheckpointOptions checkpointOptions = new 
CheckpointOptions(commandLine);
+
+// evaluate help flag
+if (checkpointOptions.isPrintHelp()) {
+CliFrontendParser.printHelpForCheckpoint(customCommandLines);
+return;
+}
+
+final CustomCommandLine activeCommandLine = 
validateAndGetActiveCommandLine(commandLine);
+
+String[] cleanedArgs = checkpointOptions.getArgs();
+
+final JobID jobId;
+
+if (cleanedArgs.length >= 1) {
+String jobIdString = cleanedArgs[0];
+
+jobId = parseJobId(jobIdString);
+} else {
+throw new CliArgsException(
+"Missing JobID. " + "Specify a Job ID to manipulate a 
checkpoint.");
+}
+runClusterAction(
+activeCommandLine,
+commandLine,
+(clusterClient, effectiveConfiguration

[jira] [Updated] (FLINK-33490) Validate the name conflicts when creating view

2023-11-08 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang updated FLINK-33490:
--
Description: 
We should forbid 

```
CREATE VIEW id_view AS
SELECT id, uid AS id FROM id_table
```

As the SQL standards states,

If  is specified, then:
i) If any two columns in the table specified by the  have 
equivalent s, or if any column of that table has an 
implementation-dependent name, then a  shall be specified.
ii) Equivalent s shall not be specified more than once in the 
.

Many databases also throw exception when view name conflicts, e.g. mysql, 
postgres.



  was:
When should forbid 

```
CREATE VIEW id_view AS
SELECT id, uid AS id FROM id_table
```

As the SQL standards states,

If  is specified, then:
i) If any two columns in the table specified by the  have 
equivalent s, or if any column of that table has an 
implementation-dependent name, then a  shall be specified.
ii) Equivalent s shall not be specified more than once in the 
.

Many databases also throw exception when view name conflicts, e.g. mysql, 
postgres.




> Validate the name conflicts when creating view
> --
>
> Key: FLINK-33490
> URL: https://issues.apache.org/jira/browse/FLINK-33490
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Shengkai Fang
>Priority: Major
>
> We should forbid 
> ```
> CREATE VIEW id_view AS
> SELECT id, uid AS id FROM id_table
> ```
> As the SQL standards states,
> If  is specified, then:
> i) If any two columns in the table specified by the  have 
> equivalent s, or if any column of that table has an 
> implementation-dependent name, then a  shall be specified.
> ii) Equivalent s shall not be specified more than once in the 
> .
> Many databases also throw exception when view name conflicts, e.g. mysql, 
> postgres.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33490) Validate the name conflicts when creating view

2023-11-08 Thread Shengkai Fang (Jira)
Shengkai Fang created FLINK-33490:
-

 Summary: Validate the name conflicts when creating view
 Key: FLINK-33490
 URL: https://issues.apache.org/jira/browse/FLINK-33490
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.19.0
Reporter: Shengkai Fang


When should forbid 

```
CREATE VIEW id_view AS
SELECT id, uid AS id FROM id_table
```

As the SQL standards states,

If  is specified, then:
i) If any two columns in the table specified by the  have 
equivalent s, or if any column of that table has an 
implementation-dependent name, then a  shall be specified.
ii) Equivalent s shall not be specified more than once in the 
.

Many databases also throw exception when view name conflicts, e.g. mysql, 
postgres.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-6755][CLI] Support manual checkpoints triggering [flink]

2023-11-08 Thread via GitHub


Zakelly commented on code in PR #23679:
URL: https://github.com/apache/flink/pull/23679#discussion_r1387573143


##
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java:
##
@@ -611,6 +636,22 @@ public static void 
printHelpForSavepoint(Collection customCom
 System.out.println();
 }
 
+public static void printHelpForCheckpoint(Collection 
customCommandLines) {
+HelpFormatter formatter = new HelpFormatter();
+formatter.setLeftPadding(5);
+formatter.setWidth(80);
+
+System.out.println(
+"\nAction \"checkpoint\" triggers checkpoints for a running 
job or disposes existing ones.");

Review Comment:
   Yeah, sorry for the mistake 😆 



##
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java:
##
@@ -611,6 +636,22 @@ public static void 
printHelpForSavepoint(Collection customCom
 System.out.println();
 }
 
+public static void printHelpForCheckpoint(Collection 
customCommandLines) {
+HelpFormatter formatter = new HelpFormatter();
+formatter.setLeftPadding(5);
+formatter.setWidth(80);
+
+System.out.println(
+"\nAction \"checkpoint\" triggers checkpoints for a running 
job or disposes existing ones.");

Review Comment:
   Yeah, sorry for the mistake 😆 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-6755][CLI] Support manual checkpoints triggering [flink]

2023-11-08 Thread via GitHub


Zakelly commented on code in PR #23679:
URL: https://github.com/apache/flink/pull/23679#discussion_r1387572052


##
docs/content.zh/docs/deployment/cli.md:
##
@@ -153,6 +153,35 @@ $ ./bin/flink savepoint \
 Triggering the savepoint disposal through the `savepoint` action does not only 
remove the data from 
 the storage but makes Flink clean up the savepoint-related metadata as well.
 
+### Creating a Checkpoint
+[Checkpoints]({{< ref "docs/ops/state/checkpoints" >}}) can also be manually 
created to save the
+current state. To get the difference between checkpoint and savepoint, please 
refer to
+[Checkpoints vs. Savepoints]({{< ref 
"docs/ops/state/checkpoints_vs_savepoints" >}}). All that's
+needed to trigger a checkpoint manually is the JobID:
+```bash
+$ ./bin/flink checkpoint \
+  $JOB_ID
+```
+```
+Triggering checkpoint for job 99c59fead08c613763944f533bf90c0f.
+Waiting for response...
+Checkpoint(CONFIGURED) 26 for job 99c59fead08c613763944f533bf90c0f completed.
+You can resume your program from this checkpoint with the run command.
+```
+If you want to trigger a full checkpoint while the job periodically triggering 
incremental checkpoints,
+please use the `--full` option.

Review Comment:
   After some digging, I found when a job is configured to run with rocksdb and 
incremental checkpoint disabled, the `--incremental` flag could not take 
effect. That is because the `RocksNativeFullSnapshotStrategy` does not take 
`CheckpointOptions` into account when taking async snapshots. So since only 
`--full` flag works when incremental checkpoint enabled, I only describe this 
usage here. But I'm ok with your phrasing.
   
   Also, we should reconsider the incremental file-sharing chain in this 
senario. How does a manually triggered incremental checkpoint reuse the files 
from previous full checkpoints, which exist in `chk-x` instead of `shared` 
directory? I think we need a more flexible checkpoint directory layout. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32807) when i use emitUpdateWithRetract of udtagg,bug error

2023-11-08 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784297#comment-17784297
 ] 

xuyang commented on FLINK-32807:


Hi, [~luca.yang] . I think this issue is duplicated with 
https://issues.apache.org/jira/browse/FLINK-31788 ?

> when i use emitUpdateWithRetract of udtagg,bug error
> 
>
> Key: FLINK-32807
> URL: https://issues.apache.org/jira/browse/FLINK-32807
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala, Table SQL / Planner
>Affects Versions: 1.17.1
> Environment: http://maven.apache.org/POM/4.0.0"; 
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
> 4.0.0
> org.example
> FlinkLocalDemo
> 1.0-SNAPSHOT
> jar
> FlinkLocalDemo
> http://maven.apache.org
> 
> UTF-8
> 1.17.1
> 2.12
> 2.12.8
> 
> 
> 
> 
> com.chuusai
> shapeless_${scala.binary.version}
> 2.3.10
> 
> 
> 
> joda-time
> joda-time
> 2.12.5
> 
> 
> org.apache.flink
> flink-avro
> ${flink.version}
> 
> 
> org.apache.flink
> flink-runtime-web
> ${flink.version}
> 
> 
> 
> com.alibaba.fastjson2
> fastjson2
> 2.0.33
> 
> 
> 
> com.alibaba
> fastjson
> 1.2.83
> 
> 
> 
> junit
> junit
> 3.8.1
> test
> 
> 
> 
> org.apache.flink
> flink-table-common
> ${flink.version}
> 
> 
> 
> org.apache.flink
> flink-connector-kafka
> ${flink.version}
> provided
> 
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
> 
> org.apache.flink
> flink-scala_${scala.binary.version}
> ${flink.version}
> provided
> 
> 
> org.apache.flink
> flink-streaming-scala_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-csv
> ${flink.version}
> 
> 
> 
> org.apache.flink
> flink-table-api-java-bridge
> ${flink.version}
> 
> 
> 
> 
> org.apache.flink
> flink-table-api-scala-bridge_${scala.binary.version}
> ${flink.version}
> 
> 
> 
> org.apache.flink
> flink-table-planner-loader
> ${flink.version}
> 
> 
> 
> 
> org.apache.flink
> flink-table-runtime
> ${flink.version}
> provided
> 
> 
> 
> org.apache.flink
> flink-connector-files
> ${flink.version}
> 
> 
> 
> 
> 
> 
> 
> 
> 
> org.apache.flink
> flink-clients
> ${flink.version}
> 
> 
> org.apache.flink
> flink-connector-jdbc
> 3.1.0-1.17
> provided
> 
> 
> mysql
> mysql-connector-java
> 8.0.11
> 
> 
> 
> 
> 
> 
> org.apache.maven.plugins
> maven-shade-plugin
> 2.4.3
> 
> 
> package
> 
> shade
> 
> 
> 
> 
> *:*
> 
> META-INF/*.SF
> META-INF/*.DSA
> META-INF/*.RSA
> 
> 
> 
> 
> 
> 
> 
> 
> org.scala-tools
> maven-scala-plugin
> 2.15.2
> 
> 
> 
> compile
> testCompile
> 
> 
> 
> 
> 
> net.alchim31.maven
> scala-maven-plugin
> 3.2.2
> 
> 
> scala-compile-first
> process-resources
> 
> add-source
> compile
> 
> 
> 
> 
> ${scala.version}
> 
> 
> 
> org.apache.maven.plugins
> maven-assembly-plugin
> 2.5.5
> 
> 
> 
> 
> 
> 
> 
> 
> jar-with-dependencies
> 
> 
> 
> 
> org.apache.maven.plugins
> maven-compiler-plugin
> 3.1
> 
> 11
> 11
> 
> 
> 
> 
> 
>Reporter: yong yang
>Priority: Major
> Attachments: Top2WithRetract.scala, UdtaggDemo3.scala
>
>
> 参考: 
> [https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/functions/udfs/#retraction-example]
> 我的代码:
> [^Top2WithRetract.scala]
>  
> bug show error:
> {code:java}
> //代码占位符
> /Users/thomas990p/Library/Java/JavaVirtualMachines/corretto-11.0.20/Contents/Home/bin/java
>  -javaagent:/Users/thomas990p/Library/Application 
> Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/231.9161.38/IntelliJ 
> IDEA.app/Contents/lib/idea_rt.jar=56941:/Users/thomas990p/Library/Application 
> Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/231.9161.38/IntelliJ 
> IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath 
> /Users/thomas990p/IdeaProjects/FlinkLocalDemo/target/classes:/Users/thomas990p/.m2/repository/com/chuusai/shapeless_2.12/2.3.10/shapeless_2.12-2.3.10.jar:/Users/thomas990p/.m2/repository/org/scala-lang/scala-library/2.12.15/scala-library-2.12.15.jar:/Users/thomas990p/.m2/repository/joda-time/joda-time/2.12.5/joda-time-2.12.5.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-avro/1.17.1/flink-avro-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/avro/avro/1.11.1/avro-1.11.1.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.12.7/jackson-core-2.12.7.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.12.7/jackson-databind-2.12.7.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.12.7/jackson-annotations-2.12.7.jar:/Users/thomas990p/.m2/repository/org/apache/commons/commons-compress/1.21/commons-compress-1.21.jar:/Users/thomas990p/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-ru

Re: [PR] [FLINK-33358][sql] Fix Flink SQL Client fail to start in Flink on YARN [flink]

2023-11-08 Thread via GitHub


PrabhuJoseph commented on PR #23629:
URL: https://github.com/apache/flink/pull/23629#issuecomment-1803208444

   @fsk119 Please review this patch when you have time. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-33262) Extend source provider interfaces with the new parallelism provider interface

2023-11-08 Thread Benchao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li resolved FLINK-33262.

Fix Version/s: 1.19.0
   Resolution: Implemented

merged via 55162dcc5cca6db6aeedddb30d80dd9f9b8d5202 (1.19.0)

> Extend source provider interfaces with the new parallelism provider interface
> -
>
> Key: FLINK-33262
> URL: https://issues.apache.org/jira/browse/FLINK-33262
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33262][table-api] Extend source provider interfaces with the new parallelism provider interface [flink]

2023-11-08 Thread via GitHub


libenchao closed pull request #23663: [FLINK-33262][table-api] Extend source 
provider interfaces with the new parallelism provider interface
URL: https://github.com/apache/flink/pull/23663


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-11-08 Thread via GitHub


masteryhx commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1803157270

   rebased.
   @Zakelly Could you also help to take a look ? Thanks a lot!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-20672) notifyCheckpointAborted RPC failure can fail JM

2023-11-08 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784274#comment-17784274
 ] 

Zakelly Lan commented on FLINK-20672:
-

[~yunta] A fatal exit for uncaught exception is a relatively "safe" option, 
since the executor service may not know what to do when encountering errors. If 
we could strictly limit its use and stipulate that it should not affect the job 
junning, we could use another handler without failing the process.

After reading some code, IIUC, the 
{{[DefaultJobMasterServiceFactory|https://github.com/apache/flink/blob/eb4ae5d4e7d517300e98e632de95249dbdd22192/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java#L101C1-L101C1]}}
 is using the io-executor to 
{{[createJobMasterService|https://github.com/apache/flink/blob/eb4ae5d4e7d517300e98e632de95249dbdd22192/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java#L101]}},
 which is essential for job running. And it leave all exceptions uncaught, 
which should be also changed if we decide to change the behavior of io executor.

Actually I have no preference changing this behavior or not, since maybe some 
"io" operations are fatal and most are not. This is a matter of regulations and 
contracts. I suggest we could do our best to catch the exception within a 
runnable task if we are sure this one should not have any side effects on the 
job. WDYT?

> notifyCheckpointAborted RPC failure can fail JM
> ---
>
> Key: FLINK-20672
> URL: https://issues.apache.org/jira/browse/FLINK-20672
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.3, 1.12.0
>Reporter: Roman Khachatryan
>Assignee: Zakelly Lan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> Introduced in FLINK-8871, aborted RPC notifications are done asynchonously:
>  
> {code}
>   private void sendAbortedMessages(long checkpointId, long timeStamp) {
>   // send notification of aborted checkpoints asynchronously.
>   executor.execute(() -> {
>   // send the "abort checkpoint" messages to necessary 
> vertices.
> // ..
>   });
>   }
> {code}
> However, the executor that eventually executes this request is created as 
> follows
> {code}
>   final ScheduledExecutorService futureExecutor = 
> Executors.newScheduledThreadPool(
>   Hardware.getNumberCPUCores(),
>   new ExecutorThreadFactory("jobmanager-future"));
> {code}
> ExecutorThreadFactory uses UncaughtExceptionHandler that exits JVM on error.
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33489][table-planner] forbid generating partial-final agg with LISTAGG to avoid wrong result [flink]

2023-11-08 Thread via GitHub


flinkbot commented on PR #23688:
URL: https://github.com/apache/flink/pull/23688#issuecomment-1803149601

   
   ## CI report:
   
   * 36da81aa605a7d8f29f80ceac070b7f673ccead6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33489) LISTAGG with generating partial-final agg will cause wrong result

2023-11-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33489:
---
Labels: pull-request-available  (was: )

> LISTAGG with generating partial-final agg will cause wrong result
> -
>
> Key: FLINK-33489
> URL: https://issues.apache.org/jira/browse/FLINK-33489
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0, 1.15.0, 
> 1.16.0, 1.17.0, 1.18.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> Adding the following test cases in SplitAggregateITCase will reproduce this 
> bug:
>  
> {code:java}
> // code placeholder
> @Test
> def testListAggWithDistinctMultiArgs(): Unit = {
>   val t1 = tEnv.sqlQuery(s"""
> |SELECT
> |  a,
> |  LISTAGG(DISTINCT c, '#')
> |FROM T
> |GROUP BY a
>  """.stripMargin)
>   val sink = new TestingRetractSink
>   t1.toRetractStream[Row].addSink(sink)
>   env.execute()
>   val expected = Map[String, List[String]](
> "1" -> List("Hello 0", "Hello 1"),
> "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"),
> "3" -> List("Hello 0", "Hello 1"),
> "4" -> List("Hello 1", "Hello 2", "Hello 3")
>   )
>   val actualData = sink.getRetractResults.sorted
>   println(actualData)
> } {code}
> The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello 
> 1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter 
> `#` will be ignored.
> Let's take its plan:
> {code:java}
> // code placeholder
> LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1])
> +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
> LISTAGG_RETRACT($f3_0) AS $f1])
>    +- Exchange(distribution=[hash[a]])
>       +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], 
> select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0])
>          +- Exchange(distribution=[hash[a, $f3, $f4]])
>             +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), 
> 1024) AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4])
>                +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>                   +- DataStreamScan(table=[[default_catalog, 
> default_database, T]], fields=[a, b, c]) {code}
> The final `GroupAggregate` missing the delimiter args, and the default 
> delimiter `,` will be used.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33489][table-planner] forbid generating partial-final agg with LISTAGG to avoid wrong result [flink]

2023-11-08 Thread via GitHub


xuyangzhong opened a new pull request, #23688:
URL: https://github.com/apache/flink/pull/23688

   ## What is the purpose of the change
   
   Currently LISTAGG with splitting original agg to final-partial agg will 
cause wrong result. This pr is for a quick fix to avoid splitting with it.
   
   ## Brief change log
   
 - *Avoid splitting LISTAGG when SplitAggregateRule*
 - *Add UT and IT*
   
   ## Verifying this change
   
   Some tests are added.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-33489) LISTAGG with generating partial-final agg will cause wrong result

2023-11-08 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang reassigned FLINK-33489:
-

Assignee: xuyang

> LISTAGG with generating partial-final agg will cause wrong result
> -
>
> Key: FLINK-33489
> URL: https://issues.apache.org/jira/browse/FLINK-33489
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0, 1.15.0, 
> 1.16.0, 1.17.0, 1.18.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>
> Adding the following test cases in SplitAggregateITCase will reproduce this 
> bug:
>  
> {code:java}
> // code placeholder
> @Test
> def testListAggWithDistinctMultiArgs(): Unit = {
>   val t1 = tEnv.sqlQuery(s"""
> |SELECT
> |  a,
> |  LISTAGG(DISTINCT c, '#')
> |FROM T
> |GROUP BY a
>  """.stripMargin)
>   val sink = new TestingRetractSink
>   t1.toRetractStream[Row].addSink(sink)
>   env.execute()
>   val expected = Map[String, List[String]](
> "1" -> List("Hello 0", "Hello 1"),
> "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"),
> "3" -> List("Hello 0", "Hello 1"),
> "4" -> List("Hello 1", "Hello 2", "Hello 3")
>   )
>   val actualData = sink.getRetractResults.sorted
>   println(actualData)
> } {code}
> The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello 
> 1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter 
> `#` will be ignored.
> Let's take its plan:
> {code:java}
> // code placeholder
> LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1])
> +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
> LISTAGG_RETRACT($f3_0) AS $f1])
>    +- Exchange(distribution=[hash[a]])
>       +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], 
> select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0])
>          +- Exchange(distribution=[hash[a, $f3, $f4]])
>             +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), 
> 1024) AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4])
>                +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>                   +- DataStreamScan(table=[[default_catalog, 
> default_database, T]], fields=[a, b, c]) {code}
> The final `GroupAggregate` missing the delimiter args, and the default 
> delimiter `,` will be used.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33023][table-planner][JUnit5 Migration] Module: flink-table-planner (TableTestBase) [flink]

2023-11-08 Thread via GitHub


Jiabao-Sun commented on PR #23349:
URL: https://github.com/apache/flink/pull/23349#issuecomment-1803143808

   > Thank for your huge contribution about replacing junit4 with junit5 in 
table-planner module. The whole pr looks good to me. I mainly have two 
confusion.
   > 
   > 1. Is it a good time to remove the dependency in module `table-planner` to 
avoid other contributor continuing using junit4?
   > 2. I found that you spent some time to remove `public` for each test 
functions. But it is a weak constraint for contributor to following this 
implicit rule. How to make this rule more visible for contributor? (maybe using 
custom check style or something else)
   
   Thanks @xuyangzhong for the hard review.
   1. Currently we can't remove junit4 dependencies from table-planner, because 
there are still some tests using Junit4 and have not been migrated to complete. 
After all the tests are migrated by Junit5, we can do this.
   2. I think we can add this check to ARCH Tests, but it may still need to 
wait until Junit5 is fully migrated.
   
   Please help check it again when you have time.
   Thanks again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33489) LISTAGG with generating partial-final agg will cause wrong result

2023-11-08 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-33489:
---
Summary: LISTAGG with generating partial-final agg will cause wrong result  
(was: LISTAGG with generating partial-final agg will case wrong result)

> LISTAGG with generating partial-final agg will cause wrong result
> -
>
> Key: FLINK-33489
> URL: https://issues.apache.org/jira/browse/FLINK-33489
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0, 1.15.0, 
> 1.16.0, 1.17.0, 1.18.0
>Reporter: xuyang
>Priority: Major
>
> Adding the following test cases in SplitAggregateITCase will reproduce this 
> bug:
>  
> {code:java}
> // code placeholder
> @Test
> def testListAggWithDistinctMultiArgs(): Unit = {
>   val t1 = tEnv.sqlQuery(s"""
> |SELECT
> |  a,
> |  LISTAGG(DISTINCT c, '#')
> |FROM T
> |GROUP BY a
>  """.stripMargin)
>   val sink = new TestingRetractSink
>   t1.toRetractStream[Row].addSink(sink)
>   env.execute()
>   val expected = Map[String, List[String]](
> "1" -> List("Hello 0", "Hello 1"),
> "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"),
> "3" -> List("Hello 0", "Hello 1"),
> "4" -> List("Hello 1", "Hello 2", "Hello 3")
>   )
>   val actualData = sink.getRetractResults.sorted
>   println(actualData)
> } {code}
> The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello 
> 1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter 
> `#` will be ignored.
> Let's take its plan:
> {code:java}
> // code placeholder
> LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1])
> +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
> LISTAGG_RETRACT($f3_0) AS $f1])
>    +- Exchange(distribution=[hash[a]])
>       +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], 
> select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0])
>          +- Exchange(distribution=[hash[a, $f3, $f4]])
>             +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), 
> 1024) AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4])
>                +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>                   +- DataStreamScan(table=[[default_catalog, 
> default_database, T]], fields=[a, b, c]) {code}
> The final `GroupAggregate` missing the delimiter args, and the default 
> delimiter `,` will be used.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33489) LISTAGG with generating partial-final agg will case wrong result

2023-11-08 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784273#comment-17784273
 ] 

xuyang commented on FLINK-33489:


For a quick fix, we can forbid generating final-partial agg with this function.

For a long term, we can only use distinct as the partial agg and then use the 
real agg function as the final agg. But first we must check out other agg 
functions.

 

I'll try to fix it.

> LISTAGG with generating partial-final agg will case wrong result
> 
>
> Key: FLINK-33489
> URL: https://issues.apache.org/jira/browse/FLINK-33489
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0, 1.15.0, 
> 1.16.0, 1.17.0, 1.18.0
>Reporter: xuyang
>Priority: Major
>
> Adding the following test cases in SplitAggregateITCase will reproduce this 
> bug:
>  
> {code:java}
> // code placeholder
> @Test
> def testListAggWithDistinctMultiArgs(): Unit = {
>   val t1 = tEnv.sqlQuery(s"""
> |SELECT
> |  a,
> |  LISTAGG(DISTINCT c, '#')
> |FROM T
> |GROUP BY a
>  """.stripMargin)
>   val sink = new TestingRetractSink
>   t1.toRetractStream[Row].addSink(sink)
>   env.execute()
>   val expected = Map[String, List[String]](
> "1" -> List("Hello 0", "Hello 1"),
> "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"),
> "3" -> List("Hello 0", "Hello 1"),
> "4" -> List("Hello 1", "Hello 2", "Hello 3")
>   )
>   val actualData = sink.getRetractResults.sorted
>   println(actualData)
> } {code}
> The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello 
> 1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter 
> `#` will be ignored.
> Let's take its plan:
> {code:java}
> // code placeholder
> LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1])
> +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
> LISTAGG_RETRACT($f3_0) AS $f1])
>    +- Exchange(distribution=[hash[a]])
>       +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], 
> select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0])
>          +- Exchange(distribution=[hash[a, $f3, $f4]])
>             +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), 
> 1024) AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4])
>                +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>                   +- DataStreamScan(table=[[default_catalog, 
> default_database, T]], fields=[a, b, c]) {code}
> The final `GroupAggregate` missing the delimiter args, and the default 
> delimiter `,` will be used.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33489) LISTAGG with generating partial-final agg will case wrong result

2023-11-08 Thread xuyang (Jira)
xuyang created FLINK-33489:
--

 Summary: LISTAGG with generating partial-final agg will case wrong 
result
 Key: FLINK-33489
 URL: https://issues.apache.org/jira/browse/FLINK-33489
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.18.0, 1.17.0, 1.16.0, 1.15.0, 1.14.0, 1.13.0, 1.12.0, 
1.11.0, 1.10.0, 1.9.0
Reporter: xuyang


Adding the following test cases in SplitAggregateITCase will reproduce this bug:

 
{code:java}
// code placeholder
@Test
def testListAggWithDistinctMultiArgs(): Unit = {
  val t1 = tEnv.sqlQuery(s"""
|SELECT
|  a,
|  LISTAGG(DISTINCT c, '#')
|FROM T
|GROUP BY a
 """.stripMargin)

  val sink = new TestingRetractSink
  t1.toRetractStream[Row].addSink(sink)
  env.execute()

  val expected = Map[String, List[String]](
"1" -> List("Hello 0", "Hello 1"),
"2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"),
"3" -> List("Hello 0", "Hello 1"),
"4" -> List("Hello 1", "Hello 2", "Hello 3")
  )
  val actualData = sink.getRetractResults.sorted
  println(actualData)
} {code}
The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello 
1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter 
`#` will be ignored.

Let's take its plan:
{code:java}
// code placeholder
LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1])
+- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
LISTAGG_RETRACT($f3_0) AS $f1])
   +- Exchange(distribution=[hash[a]])
      +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], 
select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0])
         +- Exchange(distribution=[hash[a, $f3, $f4]])
            +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), 1024) 
AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4])
               +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                  +- DataStreamScan(table=[[default_catalog, default_database, 
T]], fields=[a, b, c]) {code}
The final `GroupAggregate` missing the delimiter args, and the default 
delimiter `,` will be used.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-19931] Do not emit intermediate results for reduce operation BATCH execution mode [flink]

2023-11-08 Thread via GitHub


WencongLiu commented on code in PR #13890:
URL: https://github.com/apache/flink/pull/13890#discussion_r1387456047


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BatchGroupedReduceOperator.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * A {@link StreamOperator} for executing a {@link ReduceFunction} on a
+ * {@link org.apache.flink.streaming.api.datastream.KeyedStream} in a
+ * {@link RuntimeExecutionMode#BATCH} mode.
+ */
+@Internal
+public class BatchGroupedReduceOperator
+   extends AbstractUdfStreamOperator>
+   implements OneInputStreamOperator, Triggerable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final String STATE_NAME = "_op_state";
+
+   private transient ValueState values;
+
+   private final TypeSerializer serializer;
+
+   private InternalTimerService timerService;
+
+   public BatchGroupedReduceOperator(ReduceFunction reducer, 
TypeSerializer serializer) {
+   super(reducer);
+   this.serializer = serializer;
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   ValueStateDescriptor stateId = new 
ValueStateDescriptor<>(STATE_NAME, serializer);
+   values = getPartitionedState(stateId);

Review Comment:
   Thanks for the great work on this. @dawidwys 
   I have a question about the implementation of `BatchGroupedReduceOperator`.  
Why we store the last reduced record by the state 
from`getPartitionedState(stateId)`? Could we just use a variable in a class?
   
   In the situation of batch processing, `BatchExecutionKeyValueState` could 
provide the ability to get value by namespace. But this functionality is not 
needed by Reduce. WDYT? 🤔️



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BatchGroupedReduceOperator.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * A {@link StreamOperator} for executing a {@link ReduceFunction} on a
+ * {@link org.apache.flink.streaming.api.datastream.KeyedStream} in a
+ * {@link RuntimeExecutionMode#BATCH} mode.
+ */
+@Internal
+public class Ba

Re: [PR] [FLINK-26694][table] Support lookup join via a multi-level inheritance of TableFunction [flink]

2023-11-08 Thread via GitHub


YesOrNo828 commented on PR #23684:
URL: https://github.com/apache/flink/pull/23684#issuecomment-1803119191

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20539][table-planner] fix type mismatch when using ROW in computed column [flink]

2023-11-08 Thread via GitHub


xuyangzhong commented on PR #23519:
URL: https://github.com/apache/flink/pull/23519#issuecomment-1803115628

   @snuyanzin Hi, could you do the final check about this pr if you have time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33488] Implement restore tests for Deduplicate node [flink]

2023-11-08 Thread via GitHub


jnh5y commented on code in PR #23686:
URL: https://github.com/apache/flink/pull/23686#discussion_r1387440826


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeduplicationTestPrograms.java:
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecDeduplicate}. */
+public class DeduplicationTestPrograms {
+
+static final Row[] DATA1 = {
+Row.of(1L, "terry", "pen", 1000L),
+Row.of(2L, "alice", "pen", 2000L),
+Row.of(3L, "bob", "pen", 3000L),
+Row.of(4L, "bob", "apple", 4000L),
+Row.of(5L, "fish", "apple", 5000L)
+};
+
+static final Row[] DATA2 = {
+Row.of(6L, "jerry", "pen", 6000L),
+Row.of(7L, "larry", "apple", 7000L),
+Row.of(8L, "bill", "banana", 8000L),
+Row.of(9L, "carol", "apple", 9000L)
+};
+
+static final TableTestProgram DEDUPLICATE =
+TableTestProgram.of("deduplicate-asc", "validates deduplication in 
ascending")
+.setupTableSource(
+SourceTestStep.newBuilder("MyTable")
+.addSchema(
+"order_id bigint",
+"`user` varchar",
+"product varchar",
+"order_time bigint ",
+"event_time as 
TO_TIMESTAMP(FROM_UNIXTIME(order_time)) ",
+"watermark for event_time as 
event_time - INTERVAL '5' second ")
+.producedBeforeRestore(DATA1)
+.producedAfterRestore(DATA2)
+.build())
+.setupTableSink(
+SinkTestStep.newBuilder("MySink")
+.addSchema(
+"order_id bigint",
+"`user` varchar",
+"product varchar",
+"order_time bigint",
+"primary key(product) not 
enforced")
+.consumedBeforeRestore(
+Row.of(1, "terry", "pen", 1000),
+Row.of(4, "bob", "apple", 4000))
+.consumedAfterRestore(Row.of(8L, "bill", 
"banana", 8000L))
+.build())
+.runSql(
+"insert into MySink "
++ "select order_id, user, product, 
order_time \n"
++ "FROM ("
++ "  SELECT *,"
++ "ROW_NUMBER() OVER (PARTITION BY 
product ORDER BY event_time ASC) AS row_num\n"
++ "  FROM MyTable)"
++ "WHERE row_num = 1")
+.build();
+
+static final TableTestProgram DEDUPLICATE_DESC =

Review Comment:
   I'm not completely sold on this test.  I added it to see an example with 
`Deduplicate(keep=[LastRow]...`.
   
   If we want to remove it, I'm fine with that.  If the example needs an 
addition to make it make more sense, I'm open to suggestions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact I

Re: [PR] [FLINK-33398][runtime] Support switching from batch to stream mode for one input stream operator [flink]

2023-11-08 Thread via GitHub


yunfengzhou-hub commented on code in PR #23521:
URL: https://github.com/apache/flink/pull/23521#discussion_r1387397501


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordAttributesValve.java:
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * RecordAttributesValve combine RecordAttributes from different input 
channels. If any of the input
+ * channels is non backlog, the combined RecordAttributes is non backlog.
+ *
+ * Currently, we only support switching the backlog status from null to 
backlog and backlog to
+ * non-backlog. Switching from non-backlog to backlog is not support at the 
moment, and it will be
+ * ignored.

Review Comment:
   It might be better to throw exception instead of ignoring illegal switches.



##
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java:
##
@@ -629,6 +641,10 @@ private void sendCachedSplitsToNewlyRegisteredReader(int 
subtaskIndex, int attem
 }
 }
 
+public Boolean isBacklog() {

Review Comment:
   It might be better to improve the naming of this method and add JavaDoc 
explaining when a `null` would be returned.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalBacklogAwareTimerServiceManagerImpl.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.PriorityQueueSetFactory;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * InternalBacklogAwareTimerServiceManagerImpl keeps track of all the {@link
+ * InternalBacklogAwareTimerServiceImpl}.
+ */
+@Internal
+public class InternalBacklogAwareTimerServiceManagerImpl
+extends InternalTimeServiceManagerImpl
+implements InternalTimeServiceManager, 
KeyedStateBackend.KeySelectionListener {
+
+private final Map> 
timerServices =
+new HashMap<>();
+
+private boolean backlog = false;
+
+InternalBacklogAwareTimerServiceManagerImpl(
+KeyGroupRange localKeyGroupRange,
+KeyContext keyContext,
+PriorityQueueSetFactory priorityQueueSetFactory,
+ProcessingTimeService processingTimeService,

[jira] [Resolved] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-11-08 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-26585.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged eb4ae5d4 into master

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-26585][state-processor-api] replace implementation of MultiStateKeyIterator with Stream-free implementation [flink]

2023-11-08 Thread via GitHub


masteryhx closed pull request #23239: [FLINK-26585][state-processor-api] 
replace implementation of MultiStateKeyIterator with Stream-free implementation
URL: https://github.com/apache/flink/pull/23239


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fixup! [FLINK-33060][state] Fix the javadoc of ListState interfaces about not allowing null value [flink]

2023-11-08 Thread via GitHub


masteryhx closed pull request #23683: fixup! [FLINK-33060][state] Fix the 
javadoc of ListState interfaces about not allowing null value
URL: https://github.com/apache/flink/pull/23683


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-25538) [JUnit5 Migration] Module: flink-connector-kafka

2023-11-08 Thread xiang1 yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17783891#comment-17783891
 ] 

xiang1 yu edited comment on FLINK-25538 at 11/9/23 2:09 AM:


Hi [~mapohl],  I would be interested to work on this one. Could you please 
assign it to me?  I just submit a PR of this issue.  
[https://github.com/apache/flink-connector-kafka/pull/66]


was (Author: JIRAUSER302279):
Hi [~mapohl],  I would be interested to work on this one. Could you please 
assign it to me? 

> [JUnit5 Migration] Module: flink-connector-kafka
> 
>
> Key: FLINK-25538
> URL: https://issues.apache.org/jira/browse/FLINK-25538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Assignee: Ashmeet Kandhari
>Priority: Minor
>  Labels: pull-request-available, stale-assigned, starter
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-25538) [JUnit5 Migration] Module: flink-connector-kafka

2023-11-08 Thread xiang1 yu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-25538 ]


xiang1 yu deleted comment on FLINK-25538:
---

was (Author: JIRAUSER302279):
 I just submit a PR of this issue.If you have time, could you please help 
review it?  https://github.com/apache/flink-connector-kafka/pull/66

> [JUnit5 Migration] Module: flink-connector-kafka
> 
>
> Key: FLINK-25538
> URL: https://issues.apache.org/jira/browse/FLINK-25538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Assignee: Ashmeet Kandhari
>Priority: Minor
>  Labels: pull-request-available, stale-assigned, starter
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-27286] Add infra to support training high dimension models [flink-ml]

2023-11-08 Thread via GitHub


zhipeng93 closed pull request #251: [FLINK-27286] Add infra to support training 
high dimension models
URL: https://github.com/apache/flink-ml/pull/251


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-25538) [JUnit5 Migration] Module: flink-connector-kafka

2023-11-08 Thread xiang1 yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784245#comment-17784245
 ] 

xiang1 yu commented on FLINK-25538:
---

 I just submit a PR of this issue.If you have time, could you please help 
review it?  https://github.com/apache/flink-connector-kafka/pull/66

> [JUnit5 Migration] Module: flink-connector-kafka
> 
>
> Key: FLINK-25538
> URL: https://issues.apache.org/jira/browse/FLINK-25538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Assignee: Ashmeet Kandhari
>Priority: Minor
>  Labels: pull-request-available, stale-assigned, starter
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-25538][flink-connector-kafka] JUnit5 Migration [flink-connector-kafka]

2023-11-08 Thread via GitHub


victor09091 opened a new pull request, #66:
URL: https://github.com/apache/flink-connector-kafka/pull/66

   
   
   ## What is the purpose of the change
   
   * [JUnit5 Migration] Module: flink-connector-kafka. *
   
   
   ## Brief change log
   
 - *Updated simple junit 4 test packages to junit 5 test packages*
 
   ## Verifying this change
   
  - *This change is a trivial rework / code cleanup without any test 
coverage.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25538][flink-connector-kafka] JUnit5 Migration [flink-connector-kafka]

2023-11-08 Thread via GitHub


boring-cyborg[bot] commented on PR #66:
URL: 
https://github.com/apache/flink-connector-kafka/pull/66#issuecomment-1803027215

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33402] Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss [flink]

2023-11-08 Thread via GitHub


varun1729DD commented on PR #23687:
URL: https://github.com/apache/flink/pull/23687#issuecomment-1802979312

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss

2023-11-08 Thread Varun Narayanan Chakravarthy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784240#comment-17784240
 ] 

Varun Narayanan Chakravarthy commented on FLINK-33402:
--

Created a PR: https://github.com/apache/flink/pull/23687

> Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in 
> Data Loss
> 
>
> Key: FLINK-33402
> URL: https://issues.apache.org/jira/browse/FLINK-33402
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HybridSource
>Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
> Mac OSX, Linux etc. 
>Reporter: Varun Narayanan Chakravarthy
>Priority: Critical
>  Labels: pull-request-available
> Attachments: hybridSourceEnumeratorAndReaderFixes.patch
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Hello Team,
> I noticed that there is data loss when using Hybrid Source. We are reading 
> from a series of concrete File Sources ~100. All these locations are chained 
> together using the Hybrid source.
> The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid 
> Sources switches the next source before the current source is complete. 
> Similarly for the Hybrid Source readers. I have also shared the patch file 
> that fixes the issue.
> From the logs:
> *Task Manager logs:* 
> 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Adding 
> split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 
> 94451)  hosts=[localhost] ID=000229 position=null] 2023-10-10 
> 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO 
>  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek 
> policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: 
> parquet-source (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - 
> Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Finished 
> reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher 
> for Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Finished 
> reading from splits [000154] 2023-10-10 17:46:24.014 [Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Reader 
> received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source 
> (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader  
> - No more splits for subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
>  2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source 
> (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
> Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for 
> Source: parquet-source (1/2)#0|#0] INFO  
> org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek policy 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Switch source 
> event: subtask=0 sourceIndex=12 
> source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Closing 
> Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] 
> INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Shutting 
> down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Split fetcher 
> 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG 
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Reader closed: 
> subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
> We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.  
> This is assigned to Reader with ID 000229. Now, we can see from the logs 
> this split is added after the no-more splits event and is NOT read.
> *Job Manager logs:*
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote 
> split to requesting host '10': Optional[FileSourceSplit: 
> s3://REDACTED/part-1-13189.snappy [0, 94451)  hosts=[localhost] ID=000229 
> position=null]
> 2023-10-10 17:46:2

Re: [PR] [FLINK-33402] Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss [flink]

2023-11-08 Thread via GitHub


flinkbot commented on PR #23687:
URL: https://github.com/apache/flink/pull/23687#issuecomment-1802964699

   
   ## CI report:
   
   * 49b72ea82160e6c20daf10c4391ace307349317c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss

2023-11-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33402:
---
Labels: pull-request-available  (was: )

> Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in 
> Data Loss
> 
>
> Key: FLINK-33402
> URL: https://issues.apache.org/jira/browse/FLINK-33402
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HybridSource
>Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
> Mac OSX, Linux etc. 
>Reporter: Varun Narayanan Chakravarthy
>Priority: Critical
>  Labels: pull-request-available
> Attachments: hybridSourceEnumeratorAndReaderFixes.patch
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Hello Team,
> I noticed that there is data loss when using Hybrid Source. We are reading 
> from a series of concrete File Sources ~100. All these locations are chained 
> together using the Hybrid source.
> The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid 
> Sources switches the next source before the current source is complete. 
> Similarly for the Hybrid Source readers. I have also shared the patch file 
> that fixes the issue.
> From the logs:
> *Task Manager logs:* 
> 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Adding 
> split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 
> 94451)  hosts=[localhost] ID=000229 position=null] 2023-10-10 
> 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO 
>  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek 
> policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: 
> parquet-source (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - 
> Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Finished 
> reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher 
> for Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Finished 
> reading from splits [000154] 2023-10-10 17:46:24.014 [Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Reader 
> received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source 
> (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader  
> - No more splits for subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
>  2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source 
> (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
> Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for 
> Source: parquet-source (1/2)#0|#0] INFO  
> org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek policy 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Switch source 
> event: subtask=0 sourceIndex=12 
> source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Closing 
> Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] 
> INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Shutting 
> down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Split fetcher 
> 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG 
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Reader closed: 
> subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
> We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.  
> This is assigned to Reader with ID 000229. Now, we can see from the logs 
> this split is added after the no-more splits event and is NOT read.
> *Job Manager logs:*
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote 
> split to requesting host '10': Optional[FileSourceSplit: 
> s3://REDACTED/part-1-13189.snappy [0, 94451)  hosts=[localhost] ID=000229 
> position=null]
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticF

[PR] [FLINK-33402] Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss [flink]

2023-11-08 Thread via GitHub


varun1729DD opened a new pull request, #23687:
URL: https://github.com/apache/flink/pull/23687

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): No
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: No
 - The serializers: No
 - The runtime per-record code paths (performance sensitive): No
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No
 - The S3 file system connector: No
   
   ## Documentation
   
 - Does this pull request introduce a new feature? No
 - If yes, how is the feature documented? Not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33132) Flink Connector Redshift Sink Implementation

2023-11-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33132:
---
Labels: in-progress pull-request-available  (was: in-progress)

> Flink Connector Redshift Sink Implementation 
> -
>
> Key: FLINK-33132
> URL: https://issues.apache.org/jira/browse/FLINK-33132
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / AWS
>Reporter: Samrat Deb
>Assignee: Samrat Deb
>Priority: Major
>  Labels: in-progress, pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [WIP][FLINK-33132] Flink Connector Redshift Sink Implementation [flink-connector-aws]

2023-11-08 Thread via GitHub


Samrat002 opened a new pull request, #114:
URL: https://github.com/apache/flink-connector-aws/pull/114

   ## Purpose of the change
   
   Flink Connector Redshift Sink Implementation
   
   ## Verifying this change
   
   - To Do
   
   ## Significant changes
   *(Please check any boxes [x] if the answer is "yes". You can first publish 
the PR and check them afterwards, for convenience.)*
   - [x] Dependencies have been added or upgraded
   - [x] Public API has been changed (Public API is any class annotated with 
`@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [x] New feature has been introduced
 - If yes, how is this documented? (not applicable / docs / JavaDocs / not 
documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33132) Flink Connector Redshift Sink Implementation

2023-11-08 Thread Samrat Deb (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Samrat Deb updated FLINK-33132:
---
Labels:   (was: pull-request-available)

> Flink Connector Redshift Sink Implementation 
> -
>
> Key: FLINK-33132
> URL: https://issues.apache.org/jira/browse/FLINK-33132
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Samrat Deb
>Assignee: Samrat Deb
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33132) Flink Connector Redshift Sink Implementation

2023-11-08 Thread Samrat Deb (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Samrat Deb updated FLINK-33132:
---
Labels: in-progress  (was: )

> Flink Connector Redshift Sink Implementation 
> -
>
> Key: FLINK-33132
> URL: https://issues.apache.org/jira/browse/FLINK-33132
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Samrat Deb
>Assignee: Samrat Deb
>Priority: Major
>  Labels: in-progress
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33132) Flink Connector Redshift Sink Implementation

2023-11-08 Thread Samrat Deb (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Samrat Deb updated FLINK-33132:
---
Component/s: Connectors / AWS

> Flink Connector Redshift Sink Implementation 
> -
>
> Key: FLINK-33132
> URL: https://issues.apache.org/jira/browse/FLINK-33132
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / AWS
>Reporter: Samrat Deb
>Assignee: Samrat Deb
>Priority: Major
>  Labels: in-progress
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33488] Implement restore tests for Deduplicate node [flink]

2023-11-08 Thread via GitHub


flinkbot commented on PR #23686:
URL: https://github.com/apache/flink/pull/23686#issuecomment-1802808057

   
   ## CI report:
   
   * 9a3424757010142c5a5cea666861b31277a06f41 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33488) Implement restore tests for Deduplicate node

2023-11-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33488:
---
Labels: pull-request-available  (was: )

> Implement restore tests for Deduplicate node
> 
>
> Key: FLINK-33488
> URL: https://issues.apache.org/jira/browse/FLINK-33488
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jim Hughes
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33488] Implement restore tests for Deduplicate node [flink]

2023-11-08 Thread via GitHub


jnh5y opened a new pull request, #23686:
URL: https://github.com/apache/flink/pull/23686

   ## What is the purpose of the change
   
   Implement restore tests for Deduplicate node
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   * Added restore tests for Deduplicate node which verifies the generated 
compiled plan with the saved compiled plan
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss

2023-11-08 Thread Varun Narayanan Chakravarthy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Varun Narayanan Chakravarthy updated FLINK-33402:
-
Priority: Critical  (was: Blocker)

> Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in 
> Data Loss
> 
>
> Key: FLINK-33402
> URL: https://issues.apache.org/jira/browse/FLINK-33402
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HybridSource
>Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
> Mac OSX, Linux etc. 
>Reporter: Varun Narayanan Chakravarthy
>Priority: Critical
> Attachments: hybridSourceEnumeratorAndReaderFixes.patch
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Hello Team,
> I noticed that there is data loss when using Hybrid Source. We are reading 
> from a series of concrete File Sources ~100. All these locations are chained 
> together using the Hybrid source.
> The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid 
> Sources switches the next source before the current source is complete. 
> Similarly for the Hybrid Source readers. I have also shared the patch file 
> that fixes the issue.
> From the logs:
> *Task Manager logs:* 
> 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Adding 
> split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 
> 94451)  hosts=[localhost] ID=000229 position=null] 2023-10-10 
> 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO 
>  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek 
> policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: 
> parquet-source (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - 
> Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Finished 
> reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher 
> for Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Finished 
> reading from splits [000154] 2023-10-10 17:46:24.014 [Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Reader 
> received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source 
> (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader  
> - No more splits for subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
>  2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source 
> (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
> Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for 
> Source: parquet-source (1/2)#0|#0] INFO  
> org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek policy 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Switch source 
> event: subtask=0 sourceIndex=12 
> source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Closing 
> Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] 
> INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Shutting 
> down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Split fetcher 
> 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG 
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Reader closed: 
> subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
> We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.  
> This is assigned to Reader with ID 000229. Now, we can see from the logs 
> this split is added after the no-more splits event and is NOT read.
> *Job Manager logs:*
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote 
> split to requesting host '10': Optional[FileSourceSplit: 
> s3://REDACTED/part-1-13189.snappy [0, 94451)  hosts=[localhost] ID=000229 
> position=null]
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - A

Re: [PR] [Flink 31966] Flink Kubernetes operator lacks TLS support [flink-kubernetes-operator]

2023-11-08 Thread via GitHub


tagarr commented on PR #689:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/689#issuecomment-1802587767

   Hi @gaborgsomogyi I didn't appreciate that there could be 100's or more of 
FlinkDeployments running on a cluster. If that's the case then my solution 
wouldn't be the best. What if I provide an optional secret mount for the 
truststore and optional secretKeyRef env var for the store password. Then 
modify the config for creating the rest client for the operator to point to 
this store ?
   
   Additionally, I only created the OperatorKubernetesClusterDescriptor class 
so that the call to deployClusterInternal didn't throw an exception as the 
config for the actual cluster was being used. If instead of doing this I caught 
the exception and checked that it was a ClusterRetrieveException I would be 
able to reduce the changes considerably. Do you think this would be acceptable ?
   
   For reference the exception thrown by the operator is:
   ```
   [m2023-11-08 15:50:27,797 o.a.f.k.o.l.AuditUtils 
[INFO ][flink/basic-secure] >>> Event  | Warning | 
CLUSTERDEPLOYMENTEXCEPTION | 
org.apache.flink.client.deployment.ClusterRetrieveException: Could not create 
the RestClusterClient.
   2023-11-08 15:50:27,800 o.a.f.k.o.r.ReconciliationUtils 
[WARN ][flink/basic-secure] Attempt count: 0, last attempt: false
   2023-11-08 15:50:27,886 o.a.f.k.o.l.AuditUtils 
[INFO ][flink/basic-secure] >>> Status | Error   | UPGRADING   | 
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.RuntimeException:
 org.apache.flink.client.deployment.ClusterRetrieveException: Could not create 
the 
RestClusterClient.","additionalMetadata":{},"throwableList":[{"type":"java.lang.RuntimeException","message":"org.apache.flink.client.deployment.ClusterRetrieveException:
 Could not create the 
RestClusterClient.","additionalMetadata":{}},{"type":"org.apache.flink.client.deployment.ClusterRetrieveException","message":"Could
 not create the RestClusterClient.","additionalMetadata":{}}]} 
   2023-11-08 15:50:27,890 
i.j.o.p.e.ReconciliationDispatcher [ERROR][flink/basic-secure] 
Error during event processing ExecutionScope{ resource id: 
ResourceID{name='basic-secure', namespace='flink'}, version: 7366433} failed.
   org.apache.flink.kubernetes.operator.exception.ReconciliationException: 
java.lang.RuntimeException: 
org.apache.flink.client.deployment.ClusterRetrieveException: Could not create 
the RestClusterClient.
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:148)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
at 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
at 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)
at 
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
at 
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
at 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
at java.base/java.lang.Thread.run(Unknown Source)
   Caused by: java.lang.RuntimeException: 
org.apache.flink.client.deployment.ClusterRetrieveException: Could not create 
the RestClusterClient.
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$1(KubernetesClusterDescriptor.java:121)
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployApplicationCluster(KubernetesClusterDescriptor.java:217)
at 
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
at 
org.apache.flink.kubernetes.operator.service.NativeFlinkService.deployApplicationCluster(NativeFlinkService.java:104)
at 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:189)
at 
org.a

[jira] [Created] (FLINK-33488) Implement restore tests for Deduplicate node

2023-11-08 Thread Jim Hughes (Jira)
Jim Hughes created FLINK-33488:
--

 Summary: Implement restore tests for Deduplicate node
 Key: FLINK-33488
 URL: https://issues.apache.org/jira/browse/FLINK-33488
 Project: Flink
  Issue Type: Sub-task
Reporter: Jim Hughes






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29288) Can't start a job with a jar in the system classpath

2023-11-08 Thread Trystan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784174#comment-17784174
 ] 

Trystan edited comment on FLINK-29288 at 11/8/23 7:37 PM:
--

Yes, two different kinds of errors. flink-kubernetes-operator v1.6.0, flink 
version 1.16.1

If I do not include *jarURI* the job immediately goes into {*}Job Status: 
FINISHED / Lifecycle State: UPGRADING{*}.

If I include *jarURI* and point it at /opt/flink/lib/myjob.jar, I get linkage 
errors around the kafka source classes (specifically OffsetResetStrategy).

 

Edit: for the case where i'm not including {*}jarURI{*}, I did find an 
exception in the operator logs. It seems like it may be related to the pipeline 
options being null? Which I thought should be optional based on the linked PR.
{code:java}
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.NullPointerException","stackTrace":"org.apache.flink.kubernetes.operator.exception.ReconciliationException:
 java.lang.NullPointerException\n\tat 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:148)\n\tat
 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)\n\tat
 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)\n\tat
 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)\n\tat
 
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)\n\tat
 
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)\n\tat
 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)\n\tat
 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)\n\tat
 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)\n\tat
 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)\n\tat
 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)\n\tat
 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)\n\tat 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: 
java.lang.NullPointerException\n\tat 
org.apache.flink.kubernetes.utils.KubernetesUtils.checkJarFileForApplicationMode(KubernetesUtils.java:407)\n\tat
 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployApplicationCluster(KubernetesClusterDescriptor.java:207)\n\tat
 
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)\n\tat
 
org.apache.flink.kubernetes.operator.service.NativeFlinkService.deployApplicationCluster(Native","additionalMetadata":{},"throwableList":[{"type":"java.lang.NullPointerException","additionalMetadata":{}}]}
 {code}


was (Author: trystan):
Yes, two different kinds of errors. flink-kubernetes-operator v1.6.0

If I do not include *jarURI* the job immediately goes into {*}Job Status: 
FINISHED / Lifecycle State: UPGRADING{*}.

If I include *jarURI* and point it at /opt/flink/lib/myjob.jar, I get linkage 
errors around the kafka source classes (specifically OffsetResetStrategy).

 

Edit: for the case where i'm not including {*}jarURI{*}, I did find an 
exception in the operator logs. It seems like it may be related to the pipeline 
options being null? Which I thought should be optional based on the linked PR.
{code:java}
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.NullPointerException","stackTrace":"org.apache.flink.kubernetes.operator.exception.ReconciliationException:
 java.lang.NullPointerException\n\tat 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:148)\n\tat
 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)\n\tat
 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)\n\tat
 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)\n\tat
 
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)\n\tat
 
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)\n\tat
 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)\n\tat
 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.h

[jira] [Comment Edited] (FLINK-29288) Can't start a job with a jar in the system classpath

2023-11-08 Thread Trystan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784174#comment-17784174
 ] 

Trystan edited comment on FLINK-29288 at 11/8/23 7:36 PM:
--

Yes, two different kinds of errors. flink-kubernetes-operator v1.6.0

If I do not include *jarURI* the job immediately goes into {*}Job Status: 
FINISHED / Lifecycle State: UPGRADING{*}.

If I include *jarURI* and point it at /opt/flink/lib/myjob.jar, I get linkage 
errors around the kafka source classes (specifically OffsetResetStrategy).

 

Edit: for the case where i'm not including {*}jarURI{*}, I did find an 
exception in the operator logs. It seems like it may be related to the pipeline 
options being null? Which I thought should be optional based on the linked PR.
{code:java}
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.NullPointerException","stackTrace":"org.apache.flink.kubernetes.operator.exception.ReconciliationException:
 java.lang.NullPointerException\n\tat 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:148)\n\tat
 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)\n\tat
 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)\n\tat
 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)\n\tat
 
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)\n\tat
 
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)\n\tat
 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)\n\tat
 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)\n\tat
 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)\n\tat
 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)\n\tat
 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)\n\tat
 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)\n\tat 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: 
java.lang.NullPointerException\n\tat 
org.apache.flink.kubernetes.utils.KubernetesUtils.checkJarFileForApplicationMode(KubernetesUtils.java:407)\n\tat
 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployApplicationCluster(KubernetesClusterDescriptor.java:207)\n\tat
 
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)\n\tat
 
org.apache.flink.kubernetes.operator.service.NativeFlinkService.deployApplicationCluster(Native","additionalMetadata":{},"throwableList":[{"type":"java.lang.NullPointerException","additionalMetadata":{}}]}
 {code}


was (Author: trystan):
Yes, two different kinds of errors. flink-kubernetes-operator v1.6.0

If I do not include *jarURI* the job immediately goes into {*}Job Status: 
FINISHED / Lifecycle State: UPGRADING{*}.

If I include *jarURI* and point it at /opt/flink/lib/myjob.jar, I get linkage 
errors around the kafka source classes (specifically OffsetResetStrategy).

> Can't start a job with a jar in the system classpath
> 
>
> Key: FLINK-29288
> URL: https://issues.apache.org/jira/browse/FLINK-29288
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0
>Reporter: Yaroslav Tkachenko
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.2.0
>
>
> I'm using the latest (unreleased) version of the Kubernetes operator.
> It looks like currently, it's impossible to use it with a job jar file in the 
> system classpath (/opt/flink/lib). *jarURI* is required and it's always 
> passed as a *pipeline.jars* parameter to the Flink process. In practice, it 
> means that the same class is loaded twice: once by the system classloader and 
> another time by the user classloader. This leads to exceptions like this:
> {quote}java.lang.LinkageError: loader constraint violation: when resolving 
> method 'XXX' the class loader org.apache.flink.util.ChildFirstClassLoader 
> @47a5b70d of the current class, YYY, and the class loader 'app' for the 
> method's defining class, ZZZ, have different Class objects for the type AAA 
> used in the signature
> {quote}
> In my opinion, jarURI m

[jira] [Created] (FLINK-33487) Add the new Snowflake connector to supported list

2023-11-08 Thread Mohsen Rezaei (Jira)
Mohsen Rezaei created FLINK-33487:
-

 Summary: Add the new Snowflake connector to supported list
 Key: FLINK-33487
 URL: https://issues.apache.org/jira/browse/FLINK-33487
 Project: Flink
  Issue Type: New Feature
  Components: Documentation
Affects Versions: 1.17.1, 1.18.0
Reporter: Mohsen Rezaei
 Fix For: 1.17.2, 1.18.1


Code was contributed in FLINK-32737.

Add this new connector to the list of supported ones in the documentation with 
a corresponding sub-page with the details of the sink:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/overview/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33487) Add the new Snowflake connector to supported list

2023-11-08 Thread Mohsen Rezaei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mohsen Rezaei updated FLINK-33487:
--
Description: 
Code was contributed in FLINK-32737.

Add this new connector to the list of supported ones in the documentation with 
a corresponding sub-page for the details of the connector:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/overview/

  was:
Code was contributed in FLINK-32737.

Add this new connector to the list of supported ones in the documentation with 
a corresponding sub-page with the details of the sink:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/overview/


> Add the new Snowflake connector to supported list
> -
>
> Key: FLINK-33487
> URL: https://issues.apache.org/jira/browse/FLINK-33487
> Project: Flink
>  Issue Type: New Feature
>  Components: Documentation
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Mohsen Rezaei
>Priority: Major
> Fix For: 1.17.2, 1.18.1
>
>
> Code was contributed in FLINK-32737.
> Add this new connector to the list of supported ones in the documentation 
> with a corresponding sub-page for the details of the connector:
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/overview/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29288) Can't start a job with a jar in the system classpath

2023-11-08 Thread Trystan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784174#comment-17784174
 ] 

Trystan commented on FLINK-29288:
-

Yes, two different kinds of errors. flink-kubernetes-operator v1.6.0

If I do not include *jarURI* the job immediately goes into {*}Job Status: 
FINISHED / Lifecycle State: UPGRADING{*}.

If I include *jarURI* and point it at /opt/flink/lib/myjob.jar, I get linkage 
errors around the kafka source classes (specifically OffsetResetStrategy).

> Can't start a job with a jar in the system classpath
> 
>
> Key: FLINK-29288
> URL: https://issues.apache.org/jira/browse/FLINK-29288
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0
>Reporter: Yaroslav Tkachenko
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.2.0
>
>
> I'm using the latest (unreleased) version of the Kubernetes operator.
> It looks like currently, it's impossible to use it with a job jar file in the 
> system classpath (/opt/flink/lib). *jarURI* is required and it's always 
> passed as a *pipeline.jars* parameter to the Flink process. In practice, it 
> means that the same class is loaded twice: once by the system classloader and 
> another time by the user classloader. This leads to exceptions like this:
> {quote}java.lang.LinkageError: loader constraint violation: when resolving 
> method 'XXX' the class loader org.apache.flink.util.ChildFirstClassLoader 
> @47a5b70d of the current class, YYY, and the class loader 'app' for the 
> method's defining class, ZZZ, have different Class objects for the type AAA 
> used in the signature
> {quote}
> In my opinion, jarURI must be made optional even for the application mode. In 
> this case, it's assumed that it's already available in the system classpath.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]

2023-11-08 Thread via GitHub


jnh5y commented on code in PR #23680:
URL: https://github.com/apache/flink/pull/23680#discussion_r1386993678


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/JoinTestPrograms.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecJoin}. */
+public class JoinTestPrograms {
+
+static final TableTestProgram NON_WINDOW_INNER_JOIN;
+static final TableTestProgram NON_WINDOW_INNER_JOIN_WITH_NULL;
+static final TableTestProgram JOIN;
+static final TableTestProgram INNER_JOIN;
+static final TableTestProgram JOIN_WITH_FILTER;
+static final TableTestProgram INNER_JOIN_WITH_DUPLICATE_KEY;
+static final TableTestProgram INNER_JOIN_WITH_NON_EQUI_JOIN;
+static final TableTestProgram INNER_JOIN_WITH_EQUAL_PK;
+static final TableTestProgram INNER_JOIN_WITH_PK;
+
+static final SourceTestStep SOURCE_A =
+SourceTestStep.newBuilder("A")
+.addSchema("a1 int", "a2 bigint", "a3 varchar")
+.producedBeforeRestore(
+Row.of(1, 1L, "Hi"),
+Row.of(2, 2L, "Hello"),
+Row.of(3, 2L, "Hello world"))
+.producedAfterRestore(Row.of(4, 3L, "Hello there"))
+.build();
+
+static final SourceTestStep SOURCE_B =
+SourceTestStep.newBuilder("B")
+.addSchema("b1 int", "b2 bigint", "b3 int", "b4 varchar", 
"b5 bigint")
+.producedBeforeRestore(
+Row.of(1, 1L, 0, "Hallo", 1L),
+Row.of(2, 2L, 1, "Hallo Welt", 2L),
+Row.of(2, 3L, 2, "Hallo Welt wie", 1L),
+Row.of(3, 1L, 2, "Hallo Welt wie gehts", 1L))
+.producedAfterRestore(Row.of(2, 4L, 3, "Hallo Welt wie 
gehts", 4L))
+.build();
+static final SourceTestStep SOURCE_T1 =
+SourceTestStep.newBuilder("T1")
+.addSchema("a int", "b bigint", "c varchar")
+.producedBeforeRestore(
+Row.of(1, 1L, "Hi1"),
+Row.of(1, 2L, "Hi2"),
+Row.of(1, 2L, "Hi2"),
+Row.of(1, 5L, "Hi3"),
+Row.of(2, 7L, "Hi5"),
+Row.of(1, 9L, "Hi6"),
+Row.of(1, 8L, "Hi8"),
+Row.of(3, 8L, "Hi9"))
+.producedAfterRestore(Row.of(1, 1L, "PostRestore"))
+.build();
+static final SourceTestStep SOURCE_T2 =
+SourceTestStep.newBuilder("T2")
+.addSchema("a int", "b bigint", "c varchar")
+.producedBeforeRestore(
+Row.of(1, 1L, "HiHi"), Row.of(2, 2L, "HeHe"), 
Row.of(3, 2L, "HeHe"))
+.producedAfterRestore(Row.of(2, 1L, "PostRestoreRight"))
+.build();
+
+static {
+NON_WINDOW_INNER_JOIN =
+TableTestProgram.of("non-window-inner-join", "test non-window 
inner join")
+.setupTableSource(SOURCE_T1)
+.setupTableSource(SOURCE_T2)
+.setupTableSink(
+SinkTestStep.newBuilder("MySink")
+.addSchema("a int", "c1 varchar", "c2 
varchar")
+.consumedBeforeRestore(
+Row.of(1, "HiHi", "Hi2"),
+Row.of(1, "

Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]

2023-11-08 Thread via GitHub


jnh5y commented on code in PR #23680:
URL: https://github.com/apache/flink/pull/23680#discussion_r1386985040


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/JoinTestPrograms.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecJoin}. */
+public class JoinTestPrograms {
+
+static final TableTestProgram NON_WINDOW_INNER_JOIN;
+static final TableTestProgram NON_WINDOW_INNER_JOIN_WITH_NULL;
+static final TableTestProgram JOIN;
+static final TableTestProgram INNER_JOIN;
+static final TableTestProgram JOIN_WITH_FILTER;
+static final TableTestProgram INNER_JOIN_WITH_DUPLICATE_KEY;
+static final TableTestProgram INNER_JOIN_WITH_NON_EQUI_JOIN;
+static final TableTestProgram INNER_JOIN_WITH_EQUAL_PK;
+static final TableTestProgram INNER_JOIN_WITH_PK;
+
+static final SourceTestStep SOURCE_A =
+SourceTestStep.newBuilder("A")
+.addSchema("a1 int", "a2 bigint", "a3 varchar")
+.producedBeforeRestore(
+Row.of(1, 1L, "Hi"),
+Row.of(2, 2L, "Hello"),
+Row.of(3, 2L, "Hello world"))
+.producedAfterRestore(Row.of(4, 3L, "Hello there"))
+.build();
+
+static final SourceTestStep SOURCE_B =
+SourceTestStep.newBuilder("B")
+.addSchema("b1 int", "b2 bigint", "b3 int", "b4 varchar", 
"b5 bigint")
+.producedBeforeRestore(
+Row.of(1, 1L, 0, "Hallo", 1L),
+Row.of(2, 2L, 1, "Hallo Welt", 2L),
+Row.of(2, 3L, 2, "Hallo Welt wie", 1L),
+Row.of(3, 1L, 2, "Hallo Welt wie gehts", 1L))
+.producedAfterRestore(Row.of(2, 4L, 3, "Hallo Welt wie 
gehts", 4L))
+.build();
+static final SourceTestStep SOURCE_T1 =
+SourceTestStep.newBuilder("T1")
+.addSchema("a int", "b bigint", "c varchar")
+.producedBeforeRestore(
+Row.of(1, 1L, "Hi1"),
+Row.of(1, 2L, "Hi2"),
+Row.of(1, 2L, "Hi2"),
+Row.of(1, 5L, "Hi3"),
+Row.of(2, 7L, "Hi5"),
+Row.of(1, 9L, "Hi6"),
+Row.of(1, 8L, "Hi8"),
+Row.of(3, 8L, "Hi9"))
+.producedAfterRestore(Row.of(1, 1L, "PostRestore"))
+.build();
+static final SourceTestStep SOURCE_T2 =
+SourceTestStep.newBuilder("T2")
+.addSchema("a int", "b bigint", "c varchar")
+.producedBeforeRestore(
+Row.of(1, 1L, "HiHi"), Row.of(2, 2L, "HeHe"), 
Row.of(3, 2L, "HeHe"))
+.producedAfterRestore(Row.of(2, 1L, "PostRestoreRight"))
+.build();
+
+static {
+NON_WINDOW_INNER_JOIN =
+TableTestProgram.of("non-window-inner-join", "test non-window 
inner join")
+.setupTableSource(SOURCE_T1)
+.setupTableSource(SOURCE_T2)
+.setupTableSink(
+SinkTestStep.newBuilder("MySink")
+.addSchema("a int", "c1 varchar", "c2 
varchar")
+.consumedBeforeRestore(
+Row.of(1, "HiHi", "Hi2"),
+Row.of(1, "

Re: [PR] [FLINK-33469] Implement restore tests for Limit node [flink]

2023-11-08 Thread via GitHub


jnh5y commented on PR #23675:
URL: https://github.com/apache/flink/pull/23675#issuecomment-1802331893

   Responded to comments and moved the new tests to the package 
`org.apache.flink.table.planner.plan.nodes.exec.stream`.  (Relative to the 
discussion over on https://github.com/apache/flink/pull/23660.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33455] Implement restore tests for SortLimit node [flink]

2023-11-08 Thread via GitHub


jnh5y commented on PR #23660:
URL: https://github.com/apache/flink/pull/23660#issuecomment-1802323981

   > > Can we reuse org.apache.flink.table.planner.plan.nodes.exec.stream since 
we are removing tests from that package?
   > 
   > Yes, let's do that
   
   Should the Programs and Tests go into separate packages?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33486) Pulsar Client Send Timeout Terminates TaskManager

2023-11-08 Thread Jason Kania (Jira)
Jason Kania created FLINK-33486:
---

 Summary: Pulsar Client Send Timeout Terminates TaskManager
 Key: FLINK-33486
 URL: https://issues.apache.org/jira/browse/FLINK-33486
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.17.1
Reporter: Jason Kania


Currently, when the Pulsar Producer encounters a timeout when attempting to 
send data, it generates an unhandled TimeoutException. This is not a reasonable 
way to handle the timeout. The situation should be handled in a graceful way 
either through additional parameters that put control of the action under the 
discretion of the user or through some callback mechanism that the user can 
work with to write code. Unfortunately, fight now, this causes a termination of 
the task manager which then leads to other issues.

Increasing the timeout period to avoid the issue is not really an option to 
ensure proper handling in the event that the situation does occur.

The exception is as follows:

org.apache.flink.util.FlinkRuntimeException: Failed to send data to Pulsar: 
persistent://public/default/myproducer-partition-0
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.throwSendingException(PulsarWriter.java:182)
 ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.lambda$write$0(PulsarWriter.java:172)
 ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) 
~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
[flink-dist-1.17.1.jar:1.17.1]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 
The producer myproducer- f4b1580b-1ea8-4c21-9d0b-da4d12ca6f93 can not send 
message to the topic persistent://public/default/myproducer-partition-0 within 
given timeout
        at 
org.apache.pulsar.client.impl.ProducerImpl.run(ProducerImpl.java:1993) 
~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        ... 1 more

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33295) Separate SinkV2 and SinkV1Adapter tests

2023-11-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-33295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi closed FLINK-33295.
--
Resolution: Implemented

[{{92951a0}}|https://github.com/apache/flink/commit/92951a05127f1e0e2ab0ea04ae022659fc5276ab]
 in master

> Separate SinkV2 and SinkV1Adapter tests
> ---
>
> Key: FLINK-33295
> URL: https://issues.apache.org/jira/browse/FLINK-33295
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Connectors / Common
>Reporter: Peter Vary
>Assignee: Peter Vary
>Priority: Major
>  Labels: pull-request-available
>
> Current SinkV2 tests are based on the sink generated by the 
> _org.apache.flink.streaming.runtime.operators.sink.TestSink_ test class. This 
> test class does not generate the SinkV2 directly, but generates a SinkV1 and 
> wraps in with a 
> _org.apache.flink.streaming.api.transformations.SinkV1Adapter._ While this 
> tests the SinkV2, but only as it is aligned with SinkV1, and the 
> SinkV1Adapter.
> We should have tests where we create a SinkV2 directly and the functionality 
> is tested without the adapter.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33295) Separate SinkV2 and SinkV1Adapter tests

2023-11-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-33295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi updated FLINK-33295:
---
Fix Version/s: 1.19.0

> Separate SinkV2 and SinkV1Adapter tests
> ---
>
> Key: FLINK-33295
> URL: https://issues.apache.org/jira/browse/FLINK-33295
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Connectors / Common
>Reporter: Peter Vary
>Assignee: Peter Vary
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Current SinkV2 tests are based on the sink generated by the 
> _org.apache.flink.streaming.runtime.operators.sink.TestSink_ test class. This 
> test class does not generate the SinkV2 directly, but generates a SinkV1 and 
> wraps in with a 
> _org.apache.flink.streaming.api.transformations.SinkV1Adapter._ While this 
> tests the SinkV2, but only as it is aligned with SinkV1, and the 
> SinkV1Adapter.
> We should have tests where we create a SinkV2 directly and the functionality 
> is tested without the adapter.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33295] Separate SinkV2 and SinkV1Adapter tests [flink]

2023-11-08 Thread via GitHub


mbalassi merged PR #23541:
URL: https://github.com/apache/flink/pull/23541


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33059] Support transparent compression for file-connector for all file input formats [flink]

2023-11-08 Thread via GitHub


echauchot commented on PR #23443:
URL: https://github.com/apache/flink/pull/23443#issuecomment-1802266251

   @ferenc-csaky thanks for reviewing this PR ! I have addressed your comments, 
do I have your LGTM when the tests pass ?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33059] Support transparent compression for file-connector for all file input formats [flink]

2023-11-08 Thread via GitHub


echauchot commented on PR #23443:
URL: https://github.com/apache/flink/pull/23443#issuecomment-1802261114

   > The added logic makes sense IMO.
   > 
   > Added 2 comments, but in general I would consider separating the whole 
`INFLATER_INPUT_STREAM_FACTORIES` into a new class as there are a couple 
functions that uses it and seems quite detachable from `FlieInputFormats`.
   > 
   > After a quick peek I think something like the following could work:
   > 
   > ```java
   > public class InflaterInputStreamFactories {
   > 
   >   public static void register(String fileExt, 
InflaterInputStreamFactory factory) { ... }
   > 
   >   public static InflaterInputStreamFactory get(Path path) { ... }
   > 
   >   private static InflaterInputStreamFactory get(String fileExt) { ... }
   > 
   >   @VisibleForTesting
   >   public static Set getSupportedCompressionFormats() { ... }
   > }
   > ```
   > 
   > Also, `ConcurrentHashMap` can be utilized insead of the `synchronized` 
block, but other than that the current logic could be moved as is now.
   > 
   > This probably goes beyond the current PR, but I think it worth to note it. 
WDYT?
   
   I agree with the ConcurrentHashMap suggestion. Regarding creating a class 
just for wrapping a map that is used only in the FileInputFormat it seems 
overkill to me. An anyway it is indeed outside of the scope of this 
filesize-fix PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26203] Add docs for the table connector [flink-connector-pulsar]

2023-11-08 Thread via GitHub


tisonkun merged PR #63:
URL: https://github.com/apache/flink-connector-pulsar/pull/63


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33059] Support transparent compression for file-connector for all file input formats [flink]

2023-11-08 Thread via GitHub


echauchot commented on code in PR #23443:
URL: https://github.com/apache/flink/pull/23443#discussion_r1386887535


##
flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java:
##
@@ -136,6 +138,26 @@ public static String createTempFileDirExtension(
 return f.toURI().toString();
 }
 
+public static String createTempTextFileDirForAllCompressionFormats(File 
tempDir)

Review Comment:
   Agree, better to be more generic here, thx !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33059] Support transparent compression for file-connector for all file input formats [flink]

2023-11-08 Thread via GitHub


echauchot commented on code in PR #23443:
URL: https://github.com/apache/flink/pull/23443#discussion_r1386874406


##
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java:
##
@@ -157,6 +157,10 @@ protected static InflaterInputStreamFactory 
getInflaterInputStreamFactory(
 }
 }
 
+public static Set getSupportedCompressionFormats() {

Review Comment:
   :+1 thanks for pointing out



##
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java:
##
@@ -157,6 +157,10 @@ protected static InflaterInputStreamFactory 
getInflaterInputStreamFactory(
 }
 }
 
+public static Set getSupportedCompressionFormats() {

Review Comment:
   :+1 thanks for pointing out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33059] Support transparent compression for file-connector for all file input formats [flink]

2023-11-08 Thread via GitHub


echauchot commented on code in PR #23443:
URL: https://github.com/apache/flink/pull/23443#discussion_r1386875062


##
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java:
##
@@ -157,6 +157,10 @@ protected static InflaterInputStreamFactory 
getInflaterInputStreamFactory(
 }
 }
 
+public static Set getSupportedCompressionFormats() {

Review Comment:
   :+1: thx for pointing out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33059] Support transparent compression for file-connector for all file input formats [flink]

2023-11-08 Thread via GitHub


ferenc-csaky commented on code in PR #23443:
URL: https://github.com/apache/flink/pull/23443#discussion_r1386659760


##
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java:
##
@@ -157,6 +157,10 @@ protected static InflaterInputStreamFactory 
getInflaterInputStreamFactory(
 }
 }
 
+public static Set getSupportedCompressionFormats() {

Review Comment:
   I'd mark this `@VisibleForTesting`, because only tests use this function.



##
flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java:
##
@@ -136,6 +138,26 @@ public static String createTempFileDirExtension(
 return f.toURI().toString();
 }
 
+public static String createTempTextFileDirForAllCompressionFormats(File 
tempDir)

Review Comment:
   I think instead of bringing the specific `FileInputFormat` into this general 
utility, it would be cleaner to pass `Set extensions` as a parameter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33455] Implement restore tests for SortLimit node [flink]

2023-11-08 Thread via GitHub


dawidwys commented on PR #23660:
URL: https://github.com/apache/flink/pull/23660#issuecomment-1801918003

   > Can we reuse org.apache.flink.table.planner.plan.nodes.exec.stream since 
we are removing tests from that package?
   
   Yes, let's do that


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]

2023-11-08 Thread via GitHub


dawidwys commented on code in PR #23680:
URL: https://github.com/apache/flink/pull/23680#discussion_r1386592611


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java:
##
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.Before;
-import org.junit.Test;
-
-/** Test json serialization/deserialization for join. */
-public class JoinJsonPlanTest extends TableTestBase {
-
-private StreamTableTestUtil util;
-private TableEnvironment tEnv;
-
-@Before
-public void setup() {

Review Comment:
   Can we remove the json plan files as well?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/JoinTestPrograms.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecJoin}. */
+public class JoinTestPrograms {
+
+static final TableTestProgram NON_WINDOW_INNER_JOIN;
+static final TableTestProgram NON_WINDOW_INNER_JOIN_WITH_NULL;
+static final TableTestProgram JOIN;
+static final TableTestProgram INNER_JOIN;
+static final TableTestProgram JOIN_WITH_FILTER;
+static final TableTestProgram INNER_JOIN_WITH_DUPLICATE_KEY;
+static final TableTestProgram INNER_JOIN_WITH_NON_EQUI_JOIN;
+static final TableTestProgram INNER_JOIN_WITH_EQUAL_PK;
+static final TableTestProgram INNER_JOIN_WITH_PK;
+
+static final SourceTestStep SOURCE_A =
+SourceTestStep.newBuilder("A")
+.addSchema("a1 int", "a2 bigint", "a3 varchar")
+.producedBeforeRestore(
+Row.of(1, 1L, "Hi"),
+Row.of(2, 2L, "Hello"),
+Row.of(3, 2L, "Hello world"))
+.producedAfterRestore(Row.of(4, 3L, "Hello there"))
+.build();
+
+static final SourceTestStep SOURCE_B =
+SourceTestStep.newBuilder("B")
+.addSchema("b1 int", "b2 bigint", "b3 int", "b4 varchar", 
"b5 bigint")
+.producedBeforeRestore(
+Row.of(1, 1L, 0, "Hallo", 1L),
+Row.of(2, 2L, 1, "Hallo Welt", 2L),
+Row.of(2, 3L, 2, "Hallo Welt wie", 1L),
+Row.of(3, 1L, 2, "Hallo Welt wie gehts", 1L))
+.producedAfterRestore(Row.of(2, 4L, 3, "Hallo Welt wie 
gehts", 4L))
+.build();
+static final SourceTestStep SOURCE_T1 =
+SourceTestStep.newBuilder("T1")
+.addSchema("a int", "b b

Re: [PR] [FLINK-33485][table] Optimize exists subqueries by looking at metadata rowcount [flink]

2023-11-08 Thread via GitHub


flinkbot commented on PR #23685:
URL: https://github.com/apache/flink/pull/23685#issuecomment-1801904915

   
   ## CI report:
   
   * 2bcb47ade877cb2fba533001c3ce7c9d3788c9dc UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007
 ] 

Xin Chen edited comment on FLINK-33483 at 11/8/23 1:33 PM:
---

But in another scenario in production practice, UNDEFINED also appears. The Jm 
log can be found in the file  
[^container_e15_1693914709123_8498_01_01_8042] , but I have not fully 
reproduced this scene. Based on the key information in the log:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754  org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.  

15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by 
JobManager.
15:00:57.742 Shutting down cluster because job not finished
15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status 
UNKNOWN. Diagnostics null. 
15:00:57.767 Unregister application from the YARN Resource Manager with final 
status UNDEFINED.
{code}

>From the logs, it can be seen that there was a disconnection of zk for a few 
>seconds. During the disconnection period, rm(resourcemanager) was affected and 
>the Flink task was suspended, attempting to reconnect zk. The most important 
>thing is that after ZK reconnects, for some unknown reason, jm directly 
>determines that the task is in an UNKNOWN state: *Shutting 
>YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics 
>null*. During this process, jm did not restart, which is different from 
>FLINK-12302. 

So how exactly is “UNKNOWN” identified here, is it also determined after 
reading from the RunningJobRegistry in zk?  I have also tried many times and 
did not reproduce this scene. The reproduction log is attached [^reproduce.log] 
. I think the reason for the difficulty in reproducing is : When I disconnect 
all zks, jm will quickly down and restart, and the log will shows an error.

{code:java}
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message LocalFencedMessage(null, 
LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to 
akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the 
fencing token is null.
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67)
 ~
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~
{code}
Here, I speculate that the disconnection of zk may have affected rm's 
leadership, leading to issues when jm unregisters to rm with ApplicationStatus: 
UNDEFINED. However, in actual production scenarios, jm did not fail and 
continued with the execution.


was (Author: JIRAUSER298666):
But in another scenario in production practice, UNDEFINED als

[jira] [Updated] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xin Chen updated FLINK-33483:
-
Attachment: reproduce.log

> Why is “UNDEFINED” defined in the Flink task status?
> 
>
> Key: FLINK-33483
> URL: https://issues.apache.org/jira/browse/FLINK-33483
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / RPC, Runtime / Task
>Affects Versions: 1.12.2
>Reporter: Xin Chen
>Priority: Major
> Attachments: container_e15_1693914709123_8498_01_01_8042, 
> reproduce.log
>
>
> In the Flink on Yarn mode, if an unknown status appears in the Flink log, 
> jm(jobmanager) will report the task status as undefined. The Yarn page will 
> display the state as FINISHED, but the final status is *UNDEFINED*. In terms 
> of business, it is unknown whether the task has failed or succeeded, and 
> whether to retry. It has a certain impact. Why should we design UNDEFINED? 
> Usually, this situation occurs due to zk(zookeeper) disconnection or jm 
> abnormality, etc. Since the abnormality is present, why not use FAILED?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33485][table] Optimize exists subqueries by looking at metadata rowcount [flink]

2023-11-08 Thread via GitHub


snuyanzin opened a new pull request, #23685:
URL: https://github.com/apache/flink/pull/23685

   ## What is the purpose of the change
   
   The idea is to look at metadata rowcount for `EXISTS` subqueries and based 
on this take a decision whether it could be optimized to `TRUE`/`FALSE`
   
   ## Brief change log
   
   FlinkSubQueryRemoveRule.java
   adopted tests, since optimization changed execution plans
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
   CalcPruneAggregateCallRuleTest
   ProjectPruneAggregateCallRuleTest
   FlinkAggregateRemoveRuleTest
   FlinkLimit0RemoveRuleTest
   ProjectPruneAggregateCallRuleTest
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no )
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33485) Optimize the EXISTS sub-query by Metadata RowCount

2023-11-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33485:
---
Labels: pull-request-available  (was: )

> Optimize the EXISTS sub-query by Metadata RowCount
> --
>
> Key: FLINK-33485
> URL: https://issues.apache.org/jira/browse/FLINK-33485
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> If the sub-query is guaranteed to produce at least one row, just return TRUE. 
> If the sub-query is guaranteed to produce no row, just return FALSE.
> inspired by CALCITE-5117 however since there is {{FlinkSubQueryRemoveRule}} 
> then it shold be adopted accordingly
> examples
> {code:sql}
> SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a1), COUNT(*) FROM T1 WHERE 1=2)
> {code}
> aggregation functions always return 1 row even if there is an empty table 
> then we could just replace this query with 
> {code:sql}
> SELECT * FROM T2 
> {code}
> another example
> {code:sql}
> SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0)
> {code}
> {{LIMIT 0}} means no rows so it cold be optimized to
> {code:sql}
> SELECT * FROM MyTable
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33485) Optimize the EXISTS sub-query by Metadata RowCount

2023-11-08 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-33485:
---

Assignee: Sergey Nuyanzin

> Optimize the EXISTS sub-query by Metadata RowCount
> --
>
> Key: FLINK-33485
> URL: https://issues.apache.org/jira/browse/FLINK-33485
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> If the sub-query is guaranteed to produce at least one row, just return TRUE. 
> If the sub-query is guaranteed to produce no row, just return FALSE.
> inspired by CALCITE-5117 however since there is {{FlinkSubQueryRemoveRule}} 
> then it shold be adopted accordingly
> examples
> {code:sql}
> SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a1), COUNT(*) FROM T1 WHERE 1=2)
> {code}
> aggregation functions always return 1 row even if there is an empty table 
> then we could just replace this query with 
> {code:sql}
> SELECT * FROM T2 
> {code}
> another example
> {code:sql}
> SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0)
> {code}
> {{LIMIT 0}} means no rows so it cold be optimized to
> {code:sql}
> SELECT * FROM MyTable
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33485) Optimize the EXISTS sub-query by Metadata RowCount

2023-11-08 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-33485:
---

 Summary: Optimize the EXISTS sub-query by Metadata RowCount
 Key: FLINK-33485
 URL: https://issues.apache.org/jira/browse/FLINK-33485
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.18.0
Reporter: Sergey Nuyanzin


If the sub-query is guaranteed to produce at least one row, just return TRUE. 
If the sub-query is guaranteed to produce no row, just return FALSE.

inspired by CALCITE-5117 however since there is {{FlinkSubQueryRemoveRule}} 
then it shold be adopted accordingly

examples
{code:sql}
SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a1), COUNT(*) FROM T1 WHERE 1=2)
{code}

aggregation functions always return 1 row even if there is an empty table then 
we could just replace this query with 
{code:sql}
SELECT * FROM T2 
{code}

another example
{code:sql}
SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0)
{code}

{{LIMIT 0}} means no rows so it cold be optimized to

{code:sql}
SELECT * FROM MyTable
{code}





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784027#comment-17784027
 ] 

Xin Chen edited comment on FLINK-33483 at 11/8/23 12:50 PM:


Biggest confusion is why Flink needs to design UNDEFINED. From the perspective 
of the scenario, UNDEFINED is all due to exceptions (zk or jm exceptions,at 
all). Why can't we define failed? Defining FAILED allows us to determine and 
retry tasks, but UNDEFINED here has no meaning at all. Is there a better 
solution to this problem in subsequent versions of Flink, or how to better 
reproduce this scenario? My solution is the same as FLINK-12302, hoping to give 
a FALLED finalStatus to report to resourcemanager in this case, providing the 
user with the most clear reminder.  Even in this case, the task may have 
actually run successfully in TM(taskmanager), but after all, an exception (zk 
disconnection) has occurred. Anyway, executing a task and ultimately giving the 
user an UNDEFINED state can be very confusing.


was (Author: JIRAUSER298666):
Biggest confusion is why Flink needs to design UNDEFINED. From the perspective 
of the scenario, UNDEFINED is all due to exceptions (zk or jm exceptions,at 
all). Why can't we define failed? Defining FAILED allows us to determine and 
retry tasks, but UNDEFINED here has no meaning at all. Is there a better 
solution to this problem in subsequent versions of Flink, or how to better 
reproduce this scenario? My solution is the same as FLINK-12302, hoping to give 
a FALLED finalStatus to report to resourcemanager in this case, providing the 
user with the most clear reminder.  Even in this case, the task may have 
actually run successfully in TM(taskmanager), but after all, an exception (zk 
disconnection) has occurred. Anyway, executing a task and ultimately giving the 
user an UNDEFINED state can be confusing.

> Why is “UNDEFINED” defined in the Flink task status?
> 
>
> Key: FLINK-33483
> URL: https://issues.apache.org/jira/browse/FLINK-33483
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / RPC, Runtime / Task
>Affects Versions: 1.12.2
>Reporter: Xin Chen
>Priority: Major
> Attachments: container_e15_1693914709123_8498_01_01_8042
>
>
> In the Flink on Yarn mode, if an unknown status appears in the Flink log, 
> jm(jobmanager) will report the task status as undefined. The Yarn page will 
> display the state as FINISHED, but the final status is *UNDEFINED*. In terms 
> of business, it is unknown whether the task has failed or succeeded, and 
> whether to retry. It has a certain impact. Why should we design UNDEFINED? 
> Usually, this situation occurs due to zk(zookeeper) disconnection or jm 
> abnormality, etc. Since the abnormality is present, why not use FAILED?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-20672) notifyCheckpointAborted RPC failure can fail JM

2023-11-08 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784035#comment-17784035
 ] 

Yun Tang commented on FLINK-20672:
--

[~Zakelly] Thanks for the information. If so, I have another question: do we 
really need the {{io-executor}} to work with {{FatalExitExceptionHandler}}? 
From my point of view, if we do not delete the Savepoint correctly (as this is 
also executed on the {{io-executor}}), shall we need to fail the whole 
JobManager?

If the correct behavior of the exception handler of {{io-executor}} is not 
fatal exiting, I think we shall correct that behavior first.
[~Zakelly], [~roman], [~srichter] WDYT?

> notifyCheckpointAborted RPC failure can fail JM
> ---
>
> Key: FLINK-20672
> URL: https://issues.apache.org/jira/browse/FLINK-20672
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.3, 1.12.0
>Reporter: Roman Khachatryan
>Assignee: Zakelly Lan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> Introduced in FLINK-8871, aborted RPC notifications are done asynchonously:
>  
> {code}
>   private void sendAbortedMessages(long checkpointId, long timeStamp) {
>   // send notification of aborted checkpoints asynchronously.
>   executor.execute(() -> {
>   // send the "abort checkpoint" messages to necessary 
> vertices.
> // ..
>   });
>   }
> {code}
> However, the executor that eventually executes this request is created as 
> follows
> {code}
>   final ScheduledExecutorService futureExecutor = 
> Executors.newScheduledThreadPool(
>   Hardware.getNumberCPUCores(),
>   new ExecutorThreadFactory("jobmanager-future"));
> {code}
> ExecutorThreadFactory uses UncaughtExceptionHandler that exits JVM on error.
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007
 ] 

Xin Chen edited comment on FLINK-33483 at 11/8/23 12:48 PM:


But in another scenario in production practice, UNDEFINED also appears. The Jm 
log can be found in the file  
[^container_e15_1693914709123_8498_01_01_8042] , but I have not fully 
reproduced this scene. Based on the key information in the log:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754  org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.  

15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by 
JobManager.
15:00:57.742 Shutting down cluster because job not finished
15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status 
UNKNOWN. Diagnostics null. 
15:00:57.767 Unregister application from the YARN Resource Manager with final 
status UNDEFINED.
{code}

>From the logs, it can be seen that there was a disconnection of zk for a few 
>seconds. During the disconnection period, rm(resourcemanager) was affected and 
>the Flink task was suspended, attempting to reconnect zk. The most important 
>thing is that after ZK reconnects, for some unknown reason, jm directly 
>determines that the task is in an UNKNOWN state: *Shutting 
>YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics 
>null*. During this process, jm did not restart, which is different from 
>FLINK-12302. 

So how exactly is “UNKNOWN” identified here, is it also determined after 
reading from the RunningJobRegistry in zk?  I have also tried many times and 
did not reproduce this scene. The reproduction log is attached. I think the 
reason for the difficulty in reproducing is : When I disconnect all zks, jm 
will quickly down and restart, and the log will shows an error.

{code:java}
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message LocalFencedMessage(null, 
LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to 
akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the 
fencing token is null.
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67)
 ~
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~
{code}
Here, I speculate that the disconnection of zk may have affected rm's 
leadership, leading to issues when jm unregisters to rm with ApplicationStatus: 
UNDEFINED. However, in actual production scenarios, jm did not fail and 
continued with the execution.


was (Author: JIRAUSER298666):
But in another scenario in production practice, UNDEFINED also appears. The J

[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007
 ] 

Xin Chen edited comment on FLINK-33483 at 11/8/23 12:47 PM:


But in another scenario in production practice, UNDEFINED also appears. The Jm 
log can be found in the file  
[^container_e15_1693914709123_8498_01_01_8042] , but I have not fully 
reproduced this scene. Based on the key information in the log:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754  org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.  

15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by 
JobManager.
15:00:57.742 Shutting down cluster because job not finished
15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status 
UNKNOWN. Diagnostics null. 
15:00:57.767 Unregister application from the YARN Resource Manager with final 
status UNDEFINED.
{code}

>From the logs, it can be seen that there was a disconnection of zk for a few 
>seconds. During the disconnection period, rm(resourcemanager) was affected and 
>the Flink task was suspended, attempting to reconnect zk. The most important 
>thing is that after ZK reconnects, for some unknown reason, jm directly 
>determines that the task is in an UNKNOWN state: *Shutting 
>YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics 
>null*. During this process, jm did not restart, which is different from 
>FLINK-12302. 

So how exactly is “UNKNOWN” identified here, is it also determined after 
reading from the RunningJobRegistry in zk?  I have also tried many times and 
did not reproduce this scene. The reproduction log is attached. I think the 
reason for the difficulty in reproducing is : When I disconnect all zks, jm 
will quickly down and restart, and the log will shows an error.

{code:java}
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message LocalFencedMessage(null, 
LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to 
akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the 
fencing token is null.
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67)
 ~
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~
{code}
Here, I speculate that the disconnection of zk may have affected rm's 
leadership, leading to issues when jm unregisters to rm with ApplicationStatus: 
UNDEFINED. However, in actual production scenarios, jm did not fail and this 
has not been reproduced.


was (Author: JIRAUSER298666):
But in another scenario in production practice, UNDEFINED also appears. The J

[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007
 ] 

Xin Chen edited comment on FLINK-33483 at 11/8/23 12:45 PM:


But in another scenario in production practice, UNDEFINED also appears. The Jm 
log can be found in the file  
[^container_e15_1693914709123_8498_01_01_8042] , but I have not fully 
reproduced this scene. Based on the key information in the log:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754  org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.  

15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by 
JobManager.
15:00:57.742 Shutting down cluster because job not finished
15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status 
UNKNOWN. Diagnostics null. 
15:00:57.767 Unregister application from the YARN Resource Manager with final 
status UNDEFINED.
{code}

>From the logs, it can be seen that there was a disconnection of zk for a few 
>seconds. During the disconnection period, rm(resourcemanager) was affected and 
>the Flink task was suspended, attempting to reconnect zk. The most important 
>thing is that after ZK reconnects, for some unknown reason, jm directly 
>determines that the task is in an UNKNOWN state: *Shutting 
>YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics 
>null*. During this process, jm did not restart, which is different from 
>FLINK-12302. 

So how exactly is “UNKNOWN” identified here, also determined after reading from 
the RunningJobRegistry in zk?  I have also tried many times and did not 
reproduce this scene. The reproduction log is attached. I think the reason for 
the difficulty in reproducing is : When I disconnect all zks, jm will quickly 
down and restart, and the log will shows an error.

{code:java}
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message LocalFencedMessage(null, 
LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to 
akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the 
fencing token is null.
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67)
 ~
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~
{code}
Here, I speculate that the disconnection of zk may have affected rm's 
leadership, leading to issues when jm unregisters to rm with ApplicationStatus: 
UNDEFINED. However, in actual production scenarios, jm did not fail and this 
has not been reproduced.


was (Author: JIRAUSER298666):
But in another scenario in production practice, UN also appears. The Jm log can 
be

[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784001#comment-17784001
 ] 

Xin Chen edited comment on FLINK-33483 at 11/8/23 12:44 PM:


Links to FLINK-12302. Including that issue, two scenarios were discovered in 
Flink-1.12.2, with the issue of reporting *UNDEFINED* to yarn resourcemanager. 
I have reproduced that scenario, which is when the task is completed in tm and 
the global terminal state (FINISHED or FAILED) is reached, the jm log shows 
"Job 65ccc2410d4554553225889dbea552d7 reached global terminal state {}, kill 
the jm process (am) or disconnect the zk connection, which will cause a new jm 
to be pulled up. The new jm assigned “UNKNOWN” based on the task's status 
"DONE" in RunningJobRegistry recorded in zk, and ultimately reported 
“UNDEFINED”.


was (Author: JIRAUSER298666):
Links to FLINK-12302. Including that issue, two scenarios were discovered in 
Flink-1.12.2, with the issue of reporting *UNDEFINED* to yarn resourcemanager. 
I have replicated that scenario, which is when the task is completed in tm and 
the global terminal state (FINISHED or FAILED) is reached, the jm log shows 
"Job 65ccc2410d4554553225889dbea552d7 reached global terminal state {}, kill 
the jm process (am) or disconnect the zk connection, which will cause a new jm 
to be pulled up. The new jm assigned “UNKNOWN” based on the task's status 
"DONE" in RunningJobRegistry recorded in zk, and ultimately reported 
“UNDEFINED”.

> Why is “UNDEFINED” defined in the Flink task status?
> 
>
> Key: FLINK-33483
> URL: https://issues.apache.org/jira/browse/FLINK-33483
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / RPC, Runtime / Task
>Affects Versions: 1.12.2
>Reporter: Xin Chen
>Priority: Major
> Attachments: container_e15_1693914709123_8498_01_01_8042
>
>
> In the Flink on Yarn mode, if an unknown status appears in the Flink log, 
> jm(jobmanager) will report the task status as undefined. The Yarn page will 
> display the state as FINISHED, but the final status is *UNDEFINED*. In terms 
> of business, it is unknown whether the task has failed or succeeded, and 
> whether to retry. It has a certain impact. Why should we design UNDEFINED? 
> Usually, this situation occurs due to zk(zookeeper) disconnection or jm 
> abnormality, etc. Since the abnormality is present, why not use FAILED?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784027#comment-17784027
 ] 

Xin Chen edited comment on FLINK-33483 at 11/8/23 12:41 PM:


Biggest confusion is why Flink needs to design UNDEFINED. From the perspective 
of the scenario, UNDEFINED is all due to exceptions (zk or jm exceptions,at 
all). Why can't we define failed? Defining FAILED allows us to determine and 
retry tasks, but UNDEFINED here has no meaning at all. Is there a better 
solution to this problem in subsequent versions of Flink, or how to better 
reproduce this scenario? My solution is the same as FLINK-12302, hoping to give 
a FALLED finalStatus to report to resourcemanager in this case, providing the 
user with the most clear reminder.  Even in this case, the task may have 
actually run successfully in TM(taskmanager), but after all, an exception (zk 
disconnection) has occurred. Anyway, executing a task and ultimately giving the 
user an UNDEFINED state can be confusing.


was (Author: JIRAUSER298666):
*Biggest confusion is why Flink needs to design UNDEFINED*. From the 
perspective of the scenario, UNDEFINED is all due to exceptions (zk or jm 
exceptions,at all). Why can't we define failed? Defining FAILED allows us to 
determine and retry tasks, but UNDEFINED here has no meaning at all. Is there a 
better solution to this problem in subsequent versions of Flink, or how to 
better reproduce this scenario? My solution is the same as FLINK-12302, hoping 
to give a FALLED finalStatus to report to resourcemanager in this case, 
providing the user with the most clear reminder.  Even in this case, the task 
may have actually run successfully in TM(taskmanager), but after all, an 
exception (zk disconnection) has occurred. Anyway, executing a task and 
ultimately giving the user an UNDEFINED state can be confusing.

> Why is “UNDEFINED” defined in the Flink task status?
> 
>
> Key: FLINK-33483
> URL: https://issues.apache.org/jira/browse/FLINK-33483
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / RPC, Runtime / Task
>Affects Versions: 1.12.2
>Reporter: Xin Chen
>Priority: Major
> Attachments: container_e15_1693914709123_8498_01_01_8042
>
>
> In the Flink on Yarn mode, if an unknown status appears in the Flink log, 
> jm(jobmanager) will report the task status as undefined. The Yarn page will 
> display the state as FINISHED, but the final status is *UNDEFINED*. In terms 
> of business, it is unknown whether the task has failed or succeeded, and 
> whether to retry. It has a certain impact. Why should we design UNDEFINED? 
> Usually, this situation occurs due to zk(zookeeper) disconnection or jm 
> abnormality, etc. Since the abnormality is present, why not use FAILED?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784027#comment-17784027
 ] 

Xin Chen commented on FLINK-33483:
--

*Biggest confusion is why Flink needs to design UNDEFINED*. From the 
perspective of the scenario, UNDEFINED is all due to exceptions (zk or jm 
exceptions,at all). Why can't we define failed? Defining FAILED allows us to 
determine and retry tasks, but UNDEFINED here has no meaning at all. Is there a 
better solution to this problem in subsequent versions of Flink, or how to 
better reproduce this scenario? My solution is the same as FLINK-12302, hoping 
to give a FALLED finalStatus to report to resourcemanager in this case, 
providing the user with the most clear reminder.  Even in this case, the task 
may have actually run successfully in TM(taskmanager), but after all, an 
exception (zk disconnection) has occurred. Anyway, executing a task and 
ultimately giving the user an UNDEFINED state can be confusing.

> Why is “UNDEFINED” defined in the Flink task status?
> 
>
> Key: FLINK-33483
> URL: https://issues.apache.org/jira/browse/FLINK-33483
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / RPC, Runtime / Task
>Affects Versions: 1.12.2
>Reporter: Xin Chen
>Priority: Major
> Attachments: container_e15_1693914709123_8498_01_01_8042
>
>
> In the Flink on Yarn mode, if an unknown status appears in the Flink log, 
> jm(jobmanager) will report the task status as undefined. The Yarn page will 
> display the state as FINISHED, but the final status is *UNDEFINED*. In terms 
> of business, it is unknown whether the task has failed or succeeded, and 
> whether to retry. It has a certain impact. Why should we design UNDEFINED? 
> Usually, this situation occurs due to zk(zookeeper) disconnection or jm 
> abnormality, etc. Since the abnormality is present, why not use FAILED?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33469] Implement restore tests for Limit node [flink]

2023-11-08 Thread via GitHub


dawidwys commented on code in PR #23675:
URL: https://github.com/apache/flink/pull/23675#discussion_r1386539479


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/LimitTestPrograms.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecLimit}. 
*/
+public class LimitTestPrograms {
+
+static final Row[] DATA =
+new Row[] {
+Row.of(2, "a", 6),
+Row.of(4, "b", 8),
+Row.of(6, "c", 10),
+Row.of(1, "a", 5),
+Row.of(3, "b", 7),
+Row.of(5, "c", 9)
+};
+static final TableTestProgram LIMIT =
+TableTestProgram.of("limit", "validates limit node")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema("a INT", "b VARCHAR", "c INT")
+.producedBeforeRestore(DATA)
+.producedAfterRestore(DATA)
+.build())
+.setupTableSink(
+SinkTestStep.newBuilder("sink_t")
+.addSchema("a INT", "b VARCHAR", "c 
BIGINT")
+.addOption("sink-insert-only", "false")

Review Comment:
   This is set by the `RestoreTestBase`



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest.java:
##
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.Before;
-import org.junit.Test;
-
-/** Test json serialization for sort limit. */
-public class LimitJsonPlanTest extends TableTestBase {
-
-private StreamTableTestUtil util;
-private TableEnvironment tEnv;
-
-@Before
-public void setup() {
-util = streamTestUtil(TableConfig.getDefault());
-tEnv = util.getTableEnv();
-
-String srcTableDdl =
-"CREATE TABLE MyTable (\n"
-+ "  a bigint,\n"
-+ "  b int not null,\n"
-+ "  c varchar,\n"
-+ "  d timestamp(3)\n"
-+ ") with (\n"
-+ "  'connector' = 'values',\n"
-+ "  'bounded' = 'false')";
-tEnv.executeSql(srcTableDdl);
-}
-
-@Test
-public void testLimit() {
-String sinkTableDdl =
-"CREATE TABLE MySink (\n"
-+ "  a bigint,\n"
-+ "  b bigint\n"
- 

[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007
 ] 

Xin Chen edited comment on FLINK-33483 at 11/8/23 12:28 PM:


But in another scenario in production practice, UN also appears. The Jm log can 
be found in the file  [^container_e15_1693914709123_8498_01_01_8042] , but 
I have not fully reproduced this scene. Based on the key information in the log:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754  org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.  

15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by 
JobManager.
15:00:57.742 Shutting down cluster because job not finished
15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status 
UNKNOWN. Diagnostics null. 
15:00:57.767 Unregister application from the YARN Resource Manager with final 
status UNDEFINED.
{code}

>From the logs, it can be seen that there was a disconnection of zk for a few 
>seconds. During the disconnection period, rm(resourcemanager) was affected and 
>the Flink task was suspended, attempting to reconnect zk. The most important 
>thing is that after ZK reconnects, for some unknown reason, jm directly 
>determines that the task is in an UNKNOWN state: *Shutting 
>YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics 
>null*. During this process, jm did not restart, which is different from 
>FLINK-12302. 

So how exactly is “UNKNOWN” identified here, also determined after reading from 
the RunningJobRegistry in zk?  I have also tried many times and did not 
reproduce this scene. The reproduction log is attached. I think the reason for 
the difficulty in reproducing is : When I disconnect all zks, jm will quickly 
down and restart, and the log will shows an error.

{code:java}
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message LocalFencedMessage(null, 
LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to 
akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the 
fencing token is null.
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67)
 ~
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~
{code}
Here, I speculate that the disconnection of zk may have affected rm's 
leadership, leading to issues when jm unregisters to rm with ApplicationStatus: 
UNDEFINED. However, in actual production scenarios, jm did not fail and this 
has not been reproduced.


was (Author: JIRAUSER298666):
But in another scenario in production practice, UN also appears. The Jm log can 
be found i

[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007
 ] 

Xin Chen edited comment on FLINK-33483 at 11/8/23 12:27 PM:


But in another scenario in production practice, UN also appears. The Jm log can 
be found in the file  [^container_e15_1693914709123_8498_01_01_8042] , but 
I have not fully reproduced this scene. Based on the key information in the log:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754  org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.  

15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by 
JobManager.
15:00:57.742 Shutting down cluster because job not finished
15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status 
UNKNOWN. Diagnostics null. 
15:00:57.767 Unregister application from the YARN Resource Manager with final 
status UNDEFINED.
{code}

>From the logs, it can be seen that there was a disconnection of zk for a few 
>seconds. During the disconnection period, rm(resourcemanager) was affected and 
>the Flink task was suspended, attempting to reconnect zk. The most important 
>thing is that after ZK reconnects, for some unknown reason, jm directly 
>determines that the task is in an UNKNOWN state: *Shutting 
>YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics 
>null*. During this process, jm did not restart, which is different from 
>FLINK-12302. 

So how exactly is “UNKNOWN” identified here, and is it also determined after 
reading from the RunningJobRegistry in zk?  I have also tried many times and 
did not reproduce this scene. The reproduction log is attached. I think the 
reason for the difficulty in reproducing is : When I disconnect all zks, jm 
will quickly down and restart, and the log will shows an error.

{code:java}
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message LocalFencedMessage(null, 
LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to 
akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the 
fencing token is null.
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67)
 ~
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~
{code}
Here, I speculate that the disconnection of zk may have affected rm's 
leadership, leading to issues when jm unregisters rm with ApplicationStatus: 
UNDEFINED. However, in actual production scenarios, jm did not fail and this 
has not been reproduced.


was (Author: JIRAUSER298666):
But in another scenario in production practice, UN also appears. The Jm log can 
be 

Re: [PR] [FLINK-33455] Implement restore tests for SortLimit node [flink]

2023-11-08 Thread via GitHub


dawidwys commented on code in PR #23660:
URL: https://github.com/apache/flink/pull/23660#discussion_r1386534891


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/SortTestPrograms.java:
##
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecSortLimit}. */
+public class SortTestPrograms {
+
+static final TableTestProgram SORT_LIMIT_ASC =
+TableTestProgram.of(
+"sort-limit-asc",
+"validates sort limit node by sorting integers in 
asc mode")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema("a INT", "b VARCHAR", "c INT")
+.producedBeforeRestore(
+Row.of(2, "a", 6),
+Row.of(4, "b", 8),
+Row.of(6, "c", 10),
+Row.of(1, "a", 5),
+Row.of(3, "b", 7),
+Row.of(5, "c", 9))
+.producedAfterRestore(
+Row.of(2, "a", 6),
+Row.of(4, "b", 8),
+Row.of(6, "c", 10),
+Row.of(1, "a", 5),
+Row.of(3, "b", 7),
+Row.of(5, "c", 9))

Review Comment:
   Personally, I find this confusing. Having exact same data as an input twice 
is an artificial case. I think it rarely occurs in production use cases. Bear 
in mind this does not model the "replaying" case: reading from the same offset 
before and after restore. The API models a perfect source that stores the 
offsets in the checkpoint. Currently you model an input like (using only the 
first column for simplicity):
   
   ```
   offset  | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11
   data.   | 2 | 4 | 6 | 1 | 3 | 5 | 2 | 4 | 6 | 1 | 3  | 5
   ```
   with a savepoint at offset `5`.
   
   In other words. If you were testing `SELECT a, COUNT(*) FROM t GROUP BY a;` 
you'd get (after materializing all updates of course):
   ```
   a | COUNT(*)
   --
   2 | 2
   4 | 2
   6 | 2
   1 | 2
   3 | 2
   5 | 2
   ```
   
   
   Having said all that. I am not sure which property having the same data 
twice showcases? 
   
   I find it even more confusing, because I am starting to see a pattern in 
using the same data twice in more PRs: 
https://github.com/apache/flink/pull/23675
   
   Having such visible patterns easily caughts the attention and makes people 
wonder. If we want to test particular behaviour could we add a comment what is 
the role of each row, e.g.
   ```
   .producedBeforeRestore(
   Row.of(2, "a", 6),
   Row.of(4, "b", 8),
   Row.of(6, "c", 10),
   Row.of(1, "a", 5),
   Row.of(3, "b", 7),
   Row.of(5, "c", 9))
   .producedAfterRestore(
   Row.of(2, "a", 6), # should 
replace [3, b, 7] from before the restore
   Row.of(4, "b", 8), # should be 
ignored as it is "bigger" than the lowest [2, "a", 6]
   Row.o

[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007
 ] 

Xin Chen edited comment on FLINK-33483 at 11/8/23 12:13 PM:


But in another scenario in production practice, UN also appears. The Jm log can 
be found in the file  [^container_e15_1693914709123_8498_01_01_8042] , but 
I have not fully reproduced this scene. Based on the key information in the log:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754  org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.  

15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by 
JobManager.
15:00:57.742 Shutting down cluster because job not finished
15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status 
UNKNOWN. Diagnostics null. 
15:00:57.767 Unregister application from the YARN Resource Manager with final 
status UNDEFINED.
{code}

>From the logs, it can be seen that there was a disconnection of zk for a few 
>seconds. During the disconnection period, rm(resourcemanager) was affected and 
>the Flink task was suspended, attempting to reconnect zk. The most important 
>thing is that after ZK reconnects, for some unknown reason, jm directly 
>determines that the task is in an UNKNOWN state: *Shutting 
>YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics 
>null*. During this process, jm did not restart, which is different from 
>FLINK-12302. 



was (Author: JIRAUSER298666):
But in another scenario in production practice, UN also appears. The Jm log can 
be found in the file  [^container_e15_1693914709123_8498_01_01_8042] , but 
I have not fully reproduced this scene. Based on the key information in the log:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754  org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.  

15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by 
JobManager.
15:00:57.742 Shutting down cluster because job not finished
15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status 
UNKNOWN. Diagnostics null. 
{code}

>From the logs, it can be seen that there was a disconnection of zk for a few 
>seconds. During the disconnection period, rm(resourcemanager) was affected and 
>the Flink task was suspended, attempting to reconnect zk. The most important 
>thing is that after ZK reconnects, for some unknown reason, jm directly 
>determines that the task is in an UNKNOWN state: *Shutting 
>YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics 
>null*. During this process, jm did not restart, which is different from 
>FLINK-12302. 


> Why is “UNDEFINED” defined in the Flink task status?
> 
>
> Key: FLINK-33483
> URL: https://issues.apache.org/jira/browse/FLINK-33483
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / RPC, Runtime / Task
>Affects Versions: 1.12.2
>Reporter: Xin Chen
>Priority: Major
> Attachments: container_e15_1693914709123_8498_01_01_8042
>
>
> In the Flink on Yarn mode, if an unknown status appears in the Flink log, 
> jm(jobmanager) will report the task status as undefined. The Yarn page will 
>

[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007
 ] 

Xin Chen edited comment on FLINK-33483 at 11/8/23 12:11 PM:


But in another scenario in production practice, UN also appears. The Jm log can 
be found in the file  [^container_e15_1693914709123_8498_01_01_8042] , but 
I have not fully reproduced this scene. Based on the key information in the log:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754  org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.  

15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by 
JobManager.
15:00:57.742 Shutting down cluster because job not finished
15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status 
UNKNOWN. Diagnostics null. 
{code}

>From the logs, it can be seen that there was a disconnection of zk for a few 
>seconds. During the disconnection period, rm(resourcemanager) was affected and 
>the Flink task was suspended, attempting to reconnect zk. The most important 
>thing is that after ZK reconnects, for some unknown reason, jm directly 
>determines that the task is in an UNKNOWN state: *Shutting 
>YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics 
>null*. During this process, jm did not restart, which is different from 
>FLINK-12302. 



was (Author: JIRAUSER298666):
But in another scenario in production practice, UN also appears. The Jm log can 
be found in the file  [^container_e15_1693914709123_8498_01_01_8042] , but 
I have not fully reproduced this scene. Based on the key information in the log:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754  org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.  

15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by 
JobManager.
15:00:57.742 Shutting down cluster because job not finished
15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status 
UNKNOWN. Diagnostics null. 
{code}

>From the logs, it can be seen that there was a disconnection of zk for a few 
>seconds. During the disconnection period, rm(resourcemanager) was affected and 
>the Flink task was suspended, attempting to reconnect zk.


> Why is “UNDEFINED” defined in the Flink task status?
> 
>
> Key: FLINK-33483
> URL: https://issues.apache.org/jira/browse/FLINK-33483
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / RPC, Runtime / Task
>Affects Versions: 1.12.2
>Reporter: Xin Chen
>Priority: Major
> Attachments: container_e15_1693914709123_8498_01_01_8042
>
>
> In the Flink on Yarn mode, if an unknown status appears in the Flink log, 
> jm(jobmanager) will report the task status as undefined. The Yarn page will 
> display the state as FINISHED, but the final status is *UNDEFINED*. In terms 
> of business, it is unknown whether the task has failed or succeeded, and 
> whether to retry. It has a certain impact. Why should we design UNDEFINED? 
> Usually, this situation occurs due to zk(zookeeper) disconnection or jm 
> abnormality, etc. Since the abnormality is present, why not use FAILED?
>  



--
This message was sent by A

Re: [PR] [FLINK-32738][formats] PROTOBUF format supports projection push down [flink]

2023-11-08 Thread via GitHub


zhougit86 commented on PR #23323:
URL: https://github.com/apache/flink/pull/23323#issuecomment-1801762669

   > @zhougit86 Sorry, I'm a little busy recently. You can take a look at this 
[PR](https://github.com/apache/flink/pull/23162). There are some modifications 
to the logic of codegen and the code will be split. You may need to rebase this 
for compatibility.
   
   @ljw-hit Ok, I will wait for your PR be modified first, and rebase my part 
on yours. Let me know once yours are merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007
 ] 

Xin Chen edited comment on FLINK-33483 at 11/8/23 12:06 PM:


But in another scenario in production practice, UN also appears. The Jm log can 
be found in the file  [^container_e15_1693914709123_8498_01_01_8042] , but 
I have not fully reproduced this scene. Based on the key information in the log:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754  org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.  

15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by 
JobManager.
15:00:57.742 Shutting down cluster because job not finished
15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status 
UNKNOWN. Diagnostics null. 
{code}

>From the logs, it can be seen that there was a disconnection of zk for a few 
>seconds. During the disconnection period, rm(resourcemanager) was affected and 
>the Flink task was suspended, attempting to reconnect zk.



was (Author: JIRAUSER298666):
But in another scenario in production practice, UN also appears. The Jm log can 
be found in the file  [^container_e15_1693914709123_8498_01_01_8042] , but 
I have not fully reproduced this scene. Based on the key information in the 
log, it can be seen that:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754  org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.   
{code}


> Why is “UNDEFINED” defined in the Flink task status?
> 
>
> Key: FLINK-33483
> URL: https://issues.apache.org/jira/browse/FLINK-33483
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / RPC, Runtime / Task
>Affects Versions: 1.12.2
>Reporter: Xin Chen
>Priority: Major
> Attachments: container_e15_1693914709123_8498_01_01_8042
>
>
> In the Flink on Yarn mode, if an unknown status appears in the Flink log, 
> jm(jobmanager) will report the task status as undefined. The Yarn page will 
> display the state as FINISHED, but the final status is *UNDEFINED*. In terms 
> of business, it is unknown whether the task has failed or succeeded, and 
> whether to retry. It has a certain impact. Why should we design UNDEFINED? 
> Usually, this situation occurs due to zk(zookeeper) disconnection or jm 
> abnormality, etc. Since the abnormality is present, why not use FAILED?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007
 ] 

Xin Chen commented on FLINK-33483:
--

But in another scenario in production practice, UN also appears. The Jm log can 
be found in the file  [^container_e15_1693914709123_8498_01_01_8042] , but 
I have not fully reproduced this scene. Based on the key information in the 
log, it can be seen that:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754  org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771  Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.   
{code}


> Why is “UNDEFINED” defined in the Flink task status?
> 
>
> Key: FLINK-33483
> URL: https://issues.apache.org/jira/browse/FLINK-33483
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / RPC, Runtime / Task
>Affects Versions: 1.12.2
>Reporter: Xin Chen
>Priority: Major
> Attachments: container_e15_1693914709123_8498_01_01_8042
>
>
> In the Flink on Yarn mode, if an unknown status appears in the Flink log, 
> jm(jobmanager) will report the task status as undefined. The Yarn page will 
> display the state as FINISHED, but the final status is *UNDEFINED*. In terms 
> of business, it is unknown whether the task has failed or succeeded, and 
> whether to retry. It has a certain impact. Why should we design UNDEFINED? 
> Usually, this situation occurs due to zk(zookeeper) disconnection or jm 
> abnormality, etc. Since the abnormality is present, why not use FAILED?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xin Chen updated FLINK-33483:
-
Attachment: container_e15_1693914709123_8498_01_01_8042

> Why is “UNDEFINED” defined in the Flink task status?
> 
>
> Key: FLINK-33483
> URL: https://issues.apache.org/jira/browse/FLINK-33483
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / RPC, Runtime / Task
>Affects Versions: 1.12.2
>Reporter: Xin Chen
>Priority: Major
> Attachments: container_e15_1693914709123_8498_01_01_8042
>
>
> In the Flink on Yarn mode, if an unknown status appears in the Flink log, 
> jm(jobmanager) will report the task status as undefined. The Yarn page will 
> display the state as FINISHED, but the final status is *UNDEFINED*. In terms 
> of business, it is unknown whether the task has failed or succeeded, and 
> whether to retry. It has a certain impact. Why should we design UNDEFINED? 
> Usually, this situation occurs due to zk(zookeeper) disconnection or jm 
> abnormality, etc. Since the abnormality is present, why not use FAILED?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33257][connectors/mongodb] Support filter pushdown in MongoDB connector [flink-connector-mongodb]

2023-11-08 Thread via GitHub


Jiabao-Sun commented on PR #17:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/17#issuecomment-1801742466

   Thanks @xuyangzhong and @leonardBang for the review.
   Could you help review it again when you have time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >