[GitHub] [flink-connector-aws] PatrickRen commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-15 Thread via GitHub


PatrickRen commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1194559931


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/ShardAssignerFactory.java:
##
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Experimental;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
+
+/**
+ * Factory that provides an instance of {@link KinesisShardAssigner} 
pre-packaged with the
+ * connector.
+ */
+@Experimental
+public class ShardAssignerFactory {
+private ShardAssignerFactory() {
+// No-op private constructor to prevent instantiation of class
+}
+
+public static KinesisShardAssigner uniformShardAssigner() {
+return new UniformShardAssigner();
+}
+
+public static KinesisShardAssigner hashShardAssigner() {

Review Comment:
   It looks like the `HashShardAssigner` is only used in tests. What about 
marking it as `@VisibleForTesting` or expose it to users with a configuration? 



##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java:
##
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.reader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * An implementation of the SplitReader that periodically polls the Kinesis 
stream to retrieve
+ * records.
+ */
+@Internal
+public class PollingKinesisShardSplitReader implements SplitReader {
+
+private static final RecordsWithSplitIds 
INCOMPLETE_SHARD_EMPTY_RECORDS =
+new KinesisRecordsWithSplitIds(Collections.emptyIterator(), null, 
false);
+
+private final StreamProxy kinesis;
+private final Deque assignedSplits = new 
ArrayDeque<>();
+
+public PollingKinesisShardSplitReader(StreamProxy kinesisProxy) {
+this.kinesis = kinesisProxy;
+}
+
+@Override
+public RecordsWithSplitIds fetch() throws IOException {
+KinesisShardSplitState splitState = assignedSplits.poll();
+if (splitState == null) {
+return INCOMPLETE_SHARD_EMPTY_RECORDS;
+}
+
+GetRecordsResponse getRecordsResponse =
+kinesis.getRecords(
+

[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and regis

2023-05-15 Thread via GitHub


XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1549085766

   The e2e test failed due to some (temporary) network connectivity issues 
within the Docker image (see 
[logs](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49011&view=logs&j=bea52777-eaf8-5663-8482-18fbc3630e81&t=43ba8ce7-ebbf-57cd-9163-444305d74117&l=10378))


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and regis

2023-05-15 Thread via GitHub


XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1549084955

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17723006#comment-17723006
 ] 

Matthias Pohl edited comment on FLINK-32069 at 5/16/23 6:48 AM:


>From the 1.15 finding above the issue might be that we're relying on the 
>{{ExecutionGraphCache}} in the {{JobDetailsHandler}} which might have an 
>out-dated version of the {{ExecutionGraph}} when returning the {{JobStatus}}. 
>Does that sound reasonable?

The cache TTL is configurable through 
[web.refresh-interval|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#web-refresh-interval]
 which is set to 3s by default.

Relying on the cache was removed in 1.16.0 with FLINK-26641. Can you confirm 
that you've seen this behavior as well in {{1.16.1}}? Because that would mean 
that my conclusion doesn't hold.


was (Author: mapohl):
>From the 1.15 finding above the issue might be that we're relying on the 
>{{ExecutionGraphCache}} in the {{JobDetailsHandler}} which might have an 
>out-dated version of the {{ExecutionGraph}} when returning the {{JobStatus}}. 
>Does that sound reasonable?

The cache TTL is configurable through 
[web.refresh-interval|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#web-refresh-interval]
 which is set to 3s by default.

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
>   tableResult.await();
>   JobClient jobClient = tableResult.getJobClient().get();
>   if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
>   } else {
> throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>   }
> } catch (InterruptedException e) {
>   throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
>   throw new IOException("Flink job is failed", e);
> } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>  
> [~mapohl], Thank you for assistance in slack, I have created this ticket to 
> back our  conversation, could you please add your thoughts on this failure 
> mode?
>  
> One possible solution would be to have additional check for presence of 
> JobResult in Result store before returning jobStatus (if there is a result, 
> job shouldn't be reported as running based on this documentation: 
> [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17723006#comment-17723006
 ] 

Matthias Pohl commented on FLINK-32069:
---

>From the 1.15 finding above the issue might be that we're relying on the 
>{{ExecutionGraphCache}} in the {{JobDetailsHandler}} which might have an 
>out-dated version of the {{ExecutionGraph}} when returning the {{JobStatus}}. 
>Does that sound reasonable?

The cache TTL is configurable through 
[web.refresh-interval|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#web-refresh-interval]
 which is set to 3s by default.

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
>   tableResult.await();
>   JobClient jobClient = tableResult.getJobClient().get();
>   if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
>   } else {
> throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>   }
> } catch (InterruptedException e) {
>   throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
>   throw new IOException("Flink job is failed", e);
> } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>  
> [~mapohl], Thank you for assistance in slack, I have created this ticket to 
> back our  conversation, could you please add your thoughts on this failure 
> mode?
>  
> One possible solution would be to have additional check for presence of 
> JobResult in Result store before returning jobStatus (if there is a result, 
> job shouldn't be reported as running based on this documentation: 
> [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31759) Update some external connectors to use sql_connector_download_table and connector_artifact shortcode

2023-05-15 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo resolved FLINK-31759.

Resolution: Fixed

> Update some external connectors to use sql_connector_download_table and 
> connector_artifact shortcode
> 
>
> Key: FLINK-31759
> URL: https://issues.apache.org/jira/browse/FLINK-31759
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / ElasticSearch, Connectors / Google Cloud 
> PubSub, Connectors / HBase, Connectors / JDBC, Connectors/ RabbitMQ
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: jdbc-3.2.0
>
>
> # FLINK-30291 introduce a new shortcode({{{}sql_connector_download_table{}}}) 
> for externalized connectors table document. Som externalized connectors does 
> not use it. Including: elasticsearch, hbase, jdbc.
>  # Some externalized connectors does not use {{connector_artifact}} shortcode 
> for datastream document. Including: jdbc, rabbitmq, gcp-pubsub.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722998#comment-17722998
 ] 

Matthias Pohl edited comment on FLINK-32069 at 5/16/23 6:18 AM:


Ok, I was too lazy to switch to the 1.15 branch because I assumed that the code 
didn't change that much (and I wanted to avoid running into compilation issues 
again locally due to switching branches). But looks like the code did change. 
I'm going to do the code analysis once more for {{release-1.15.4}} (assuming 
that that's the version you're using).

{{jobClient.getJobStatus}}:
* Client side:
** {{ClusterClientJobClientAdapter.getJobStatus}} > 
{{RestClusterClient.getJobStatus}} > {{RestClusterClient.requestJobStatus}} > 
{{RestClusterClient.getJobDetails}} > {{JobDetailsHeaders}} {{GET /jobs/:jobId}}
* Server side:
** {{JobDetailsHandler.handleRequest}} is called which relies on the 
{{ExecutionGraphCache}} instead of accessing the {{Dispatcher}} directly (like 
it's done in the {{master}} code with {{JobStatusHandler}})

For {{jobClient.getJobExecutionResult()}}, I find the following (same as in 
{{master}}):
* Client side:
** {{ClusterClientJobClientAdapter.getJobExecutionResult}} > 
{{RestClusterClient.requestJobResult}} > 
{{RestClusterClient.requestJobResultInternal}} > {{JobExecutionResultHeaders}} 
{{GET /jobs/:jobId/execution-result}}
* Server side:
** {{JobExecutionResultHandler.handleRequest}} > {{Dispatcher.requestJobStatus}}

I'm missing the path, though, where we use {{requestMultipleJobDetails}}. Do I 
have the wrong version of Flink still?


was (Author: mapohl):
Ok, I was too lazy to switch to the 1.15 branch because I assumed that the code 
didn't change that much (and I wanted to avoid running into compilation issues 
again locally due to switching branches). But looks like the code did change. 
I'm going to do the code analysis once more for {{release-1.15.4}} (assuming 
that that's the version you're using.

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
>   tableResult.await();
>   JobClient jobClient = tableResult.getJobClient().get();
>   if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
>   } else {
> throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>   }
> } catch (InterruptedException e) {
>   throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
>   throw new IOException("Flink job is failed", e);
> } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/b

[GitHub] [flink-connector-jdbc] reswqa merged pull request #45: [hotfix] Fix incorrect sql_url in jdbc.yml.

2023-05-15 Thread via GitHub


reswqa merged PR #45:
URL: https://github.com/apache/flink-connector-jdbc/pull/45


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722998#comment-17722998
 ] 

Matthias Pohl edited comment on FLINK-32069 at 5/16/23 5:57 AM:


Ok, I was too lazy to switch to the 1.15 branch because I assumed that the code 
didn't change that much (and I wanted to avoid running into compilation issues 
again locally due to switching branches). But looks like the code did change. 
I'm going to do the code analysis once more for {{release-1.15.4}} (assuming 
that that's the version you're using.


was (Author: mapohl):
Ok, I was too lazy to switch to the 1.15 branch because I assumed that the code 
didn't change that much (and I wanted to avoid running into compilation issues 
again). But looks like the code did change. I'm going to do the code analysis 
once more for {{release-1.15.4}} (assuming that that's the version you're using.

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
>   tableResult.await();
>   JobClient jobClient = tableResult.getJobClient().get();
>   if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
>   } else {
> throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>   }
> } catch (InterruptedException e) {
>   throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
>   throw new IOException("Flink job is failed", e);
> } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>  
> [~mapohl], Thank you for assistance in slack, I have created this ticket to 
> back our  conversation, could you please add your thoughts on this failure 
> mode?
>  
> One possible solution would be to have additional check for presence of 
> JobResult in Result store before returning jobStatus (if there is a result, 
> job shouldn't be reported as running based on this documentation: 
> [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32104) stop-with-savepoint fails and times out with simple reproducible example

2023-05-15 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722993#comment-17722993
 ] 

Weijie Guo edited comment on FLINK-32104 at 5/16/23 5:55 AM:
-

Hi Kurt, 

I have tested the uploaded job without any code change. However the native 
savepoint completed successfully on my own macbook. I'm worried that the 
timeout comes from prolonged sleep in {{DemoMapFunction}}.

{{NumberSequenceSource}} will be finished soon after the job submitted, and the 
recoed(i.e. 1...20) will accumulate in the downstream's received buffers. 
Savepoint will be inserted as the last record, and the previous data processing 
time will be {{20 * 5 = 100s}}, but {{client.timeout}} defaults to only 60s. If 
a savepoint is submitted before most of the data is processed, there is a high 
probability that it will directly timeout. Have you tried increasing the 
timeout here?



was (Author: weijie guo):
Hi Kurt, 

I have tested the uploaded job without any code change. However the native 
savepoint completed successfully on my own macbook. I'm worried that the 
timeout comes from prolonged sleep in {{DemoMapFunction}}.

{{NumberSequenceSource}} will be finished soon after the job submitted, and the 
recoed(i.e. 1...20) will accumulate in the downstream's received buffers. 
Savepoint will be inserted as the last record, and the previous data processing 
time will be {{20 * 5 = 100 s}}, but {{client.timeout}} defaults to only 60s. 
If a savepoint is submitted before most of the data is processed, there is a 
high probability that it will directly timeout. Have you tried increasing the 
timeout here?


> stop-with-savepoint fails and times out with simple reproducible example
> 
>
> Key: FLINK-32104
> URL: https://issues.apache.org/jira/browse/FLINK-32104
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.17.0
>Reporter: Kurt Ostfeld
>Priority: Major
>
> I've put together a simple demo app that reproduces the issue with 
> instructions on how to reproduce:
> [https://github.com/kurtostfeld/flink-stop-issue]
>  
> The issue is that with a very simple Flink DataStream API application, the 
> `stop-with-savepoint` fails and times out like this:
>  
> {code:java}
> ./bin/flink stop --type native --savepointPath ../savepoints 
> d69a952625497cca0665dfdcdb9f4718
> Suspending job "d69a952625497cca0665dfdcdb9f4718" with a NATIVE savepoint.
> 
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
> "d69a952625497cca0665dfdcdb9f4718".
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595)
> at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1041)
> at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:578)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1110)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at 
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: java.util.concurrent.TimeoutException
> at 
> java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
> at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:591)
> ... 7 more {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722998#comment-17722998
 ] 

Matthias Pohl commented on FLINK-32069:
---

Ok, I was too lazy to switch to the 1.15 branch because I assumed that the code 
didn't change that much (and I wanted to avoid running into compilation issues 
again). But looks like the code did change. I'm going to do the code analysis 
once more for {{release-1.15.4}} (assuming that that's the version you're using.

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
>   tableResult.await();
>   JobClient jobClient = tableResult.getJobClient().get();
>   if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
>   } else {
> throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>   }
> } catch (InterruptedException e) {
>   throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
>   throw new IOException("Flink job is failed", e);
> } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>  
> [~mapohl], Thank you for assistance in slack, I have created this ticket to 
> back our  conversation, could you please add your thoughts on this failure 
> mode?
>  
> One possible solution would be to have additional check for presence of 
> JobResult in Result store before returning jobStatus (if there is a result, 
> job shouldn't be reported as running based on this documentation: 
> [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32104) stop-with-savepoint fails and times out with simple reproducible example

2023-05-15 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722993#comment-17722993
 ] 

Weijie Guo edited comment on FLINK-32104 at 5/16/23 5:49 AM:
-

Hi Kurt, 

I have tested the uploaded job without any code change. However the native 
savepoint completed successfully on my own macbook. I'm worried that the 
timeout comes from prolonged sleep in {{DemoMapFunction}}.

{{NumberSequenceSource}} will be finished soon after the job submitted, and the 
recoed(i.e. 1...20) will accumulate in the downstream's received buffers. 
Savepoint will be inserted as the last record, and the previous data processing 
time will be {{20 * 5 = 100 s}}, but {{client.timeout}} defaults to only 60s. 
If a savepoint is submitted before most of the data is processed, there is a 
high probability that it will directly timeout. Have you tried increasing the 
timeout here?



was (Author: weijie guo):
Hi Kurt, 

I have tested the uploaded job without any code change. However the native 
savepoint completed successfully on my own macbook. I'm worried that the 
timeout comes from prolonged sleep in {{DemoMapFunction}}.

{{NumberSequenceSource}} will be finished soon after the job submitted, and the 
recoed(i.e. 1...20) will accumulate in the downstream's received buffers. 
Savepoint will be inserted as the last record, and the previous data processing 
time will be {{20 * 5 = 100 s}}, but {{client.timeout}} defaults to only 60s. 
If a savepoint is submitted before most of the data is processed, there is a 
high probability that it will directly timeout.


> stop-with-savepoint fails and times out with simple reproducible example
> 
>
> Key: FLINK-32104
> URL: https://issues.apache.org/jira/browse/FLINK-32104
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.17.0
>Reporter: Kurt Ostfeld
>Priority: Major
>
> I've put together a simple demo app that reproduces the issue with 
> instructions on how to reproduce:
> [https://github.com/kurtostfeld/flink-stop-issue]
>  
> The issue is that with a very simple Flink DataStream API application, the 
> `stop-with-savepoint` fails and times out like this:
>  
> {code:java}
> ./bin/flink stop --type native --savepointPath ../savepoints 
> d69a952625497cca0665dfdcdb9f4718
> Suspending job "d69a952625497cca0665dfdcdb9f4718" with a NATIVE savepoint.
> 
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
> "d69a952625497cca0665dfdcdb9f4718".
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595)
> at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1041)
> at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:578)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1110)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at 
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: java.util.concurrent.TimeoutException
> at 
> java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
> at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:591)
> ... 7 more {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32104) stop-with-savepoint fails and times out with simple reproducible example

2023-05-15 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722993#comment-17722993
 ] 

Weijie Guo commented on FLINK-32104:


Hi Kurt, 

I have tested the uploaded job without any code change. However the native 
savepoint completed successfully on my own macbook. I'm worried that the 
timeout comes from prolonged sleep in {{DemoMapFunction}}.

{{NumberSequenceSource}} will be finished soon after the job submitted, and the 
recoed(i.e. 1...20) will accumulate in the downstream's received buffers. 
Savepoint will be inserted as the last record, and the previous data processing 
time will be {{20 * 5 = 100 s}}, but {{client.timeout}} defaults to only 60s. 
If a savepoint is submitted before most of the data is processed, there is a 
high probability that it will directly timeout.


> stop-with-savepoint fails and times out with simple reproducible example
> 
>
> Key: FLINK-32104
> URL: https://issues.apache.org/jira/browse/FLINK-32104
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.17.0
>Reporter: Kurt Ostfeld
>Priority: Major
>
> I've put together a simple demo app that reproduces the issue with 
> instructions on how to reproduce:
> [https://github.com/kurtostfeld/flink-stop-issue]
>  
> The issue is that with a very simple Flink DataStream API application, the 
> `stop-with-savepoint` fails and times out like this:
>  
> {code:java}
> ./bin/flink stop --type native --savepointPath ../savepoints 
> d69a952625497cca0665dfdcdb9f4718
> Suspending job "d69a952625497cca0665dfdcdb9f4718" with a NATIVE savepoint.
> 
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
> "d69a952625497cca0665dfdcdb9f4718".
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595)
> at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1041)
> at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:578)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1110)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at 
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: java.util.concurrent.TimeoutException
> at 
> java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
> at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:591)
> ... 7 more {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-15 Thread via GitHub


lindong28 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1194570809


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/updater/ModelUpdater.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.updater;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * A model updater that could be used to handle push/pull request from workers.
+ *
+ * Note that model updater should also ensure that model data is robust to 
failures.
+ */
+public interface ModelUpdater extends Serializable {
+
+/** Initialize the model data. */
+void open(long startFeatureIndex, long endFeatureIndex);
+
+/** Applies the push to update the model data, e.g., using gradient to 
update model. */
+void handlePush(long[] keys, double[] values);
+
+/** Applies the pull and return the retrieved model data. */
+double[] handlePull(long[] keys);
+
+/** Returns model pieces with the format of (startFeatureIdx, 
endFeatureIdx, modelValues). */
+Iterator> getModelPieces();
+
+/** Recover the model data from state. */

Review Comment:
   It would be useful to make the comment style consistent. E.g. Recover -> 
Recovers.
   
   Same for other comments.



##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/updater/ModelUpdater.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.updater;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * A model updater that could be used to handle push/pull request from workers.
+ *
+ * Note that model updater should also ensure that model data is robust to 
failures.
+ */
+public interface ModelUpdater extends Serializable {
+
+/** Initialize the model data. */
+void open(long startFeatureIndex, long endFeatureIndex);
+
+/** Applies the push to update the model data, e.g., using gradient to 
update model. */
+void handlePush(long[] keys, double[] values);
+
+/** Applies the pull and return the retrieved model data. */
+double[] handlePull(long[] keys);
+
+/** Returns model pieces with the format of (startFeatureIdx, 
endFeatureIdx, modelValues). */
+Iterator> getModelPieces();

Review Comment:
   It would be useful to know what is the expected output of this API w.r.t. 
the invocation of other APIs (e.g. handlePush).



##
flink-ml-servable-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LogisticRegressionModelServable.java:
##
@@ -81,11 +82,49 @@ public DataFrame transform(DataFrame input) {
 public LogisticRegressionModelServable setModelData(InputStream... 
modelDataInputs)
 throws IOException {
 Preconditions.checkArgument(modelDataInputs.length == 1);
+List modelPieces = new ArrayList<>();
+while (true) {
+try {
+LogisticRegressionModelData piece =
+LogisticRegressionModelData.decode(modelDataInputs[0]);

[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722985#comment-17722985
 ] 

Matthias Pohl edited comment on FLINK-32069 at 5/16/23 5:43 AM:


For the client side, I'm not 100% sure whether your call hierarchy is correct. 
Based on the (master) code, I can find the following call hierarchies for the 
{{jobClient.getJobStatus()}}:
 * Client side: 
 ** {{ClusterClientJobClientAdapter.getJobStatus}} > 
{{RestClusterClient.getJobStatus}} > {{RestClusterClient.requestJobStatus}} > 
{{GET /jobs/:jobId/status}}
 * Server side:
 ** {{JobStatusHandler.handleRequest}} > {{Dispatcher.requestJobStatus}}

For the {{jobClient.getJobExecutionResult()}} call, I find the following call 
hierachy:
* Client side:
** {{ClusterClientJobClientAdapter.getJobExecutionResult}} > 
{{RestClusterClient.requestJobResult}} > 
{{RestClusterClient.requestJobResultInternal}} > {{GET 
/jobs/:jobId/execution-result}}
* Server side:
** {{JobExecutionResultHandler.handleRequest}} > {{Dispatcher.requestJobStatus}}

Both call hierarchies end up in {{Dispatcher.requestJobStatus}} which look for 
the {{JobManagerRunner}} and use {{ExecutionGraphInfoStore}} as a fallback.

Can you provide debug logs of the case? Additionally: Could you share what 
input (i.e. {{operations}} and {{{}tbenv{}}}) you use in your runs?


was (Author: mapohl):
Can you provide debug logs of the case? Additionally: Could you share what 
input (i.e. {{operations}} and {{{}tbenv{}}}) you use in your runs?

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
>   tableResult.await();
>   JobClient jobClient = tableResult.getJobClient().get();
>   if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
>   } else {
> throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>   }
> } catch (InterruptedException e) {
>   throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
>   throw new IOException("Flink job is failed", e);
> } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>  
> [~mapohl], Thank you for assistance in slack, I have created this ticket to 
> back our  conversation, could you please add your thoughts on this failure 
> mode?
>  
> One possible solution would be to have additional check for presence of 
> JobResult in Result store before returning jobStatus (if there is a result, 
> job shouldn't be reported as running based on this documentation: 
> [https://nightlies.apache.

[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722985#comment-17722985
 ] 

Matthias Pohl edited comment on FLINK-32069 at 5/16/23 5:03 AM:


Can you provide debug logs of the case? Additionally: Could you share what 
input (i.e. {{operations}} and {{{}tbenv{}}}) you use in your runs?


was (Author: mapohl):
Can you provide debug logs of the case?

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
>   tableResult.await();
>   JobClient jobClient = tableResult.getJobClient().get();
>   if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
>   } else {
> throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>   }
> } catch (InterruptedException e) {
>   throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
>   throw new IOException("Flink job is failed", e);
> } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>  
> [~mapohl], Thank you for assistance in slack, I have created this ticket to 
> back our  conversation, could you please add your thoughts on this failure 
> mode?
>  
> One possible solution would be to have additional check for presence of 
> JobResult in Result store before returning jobStatus (if there is a result, 
> job shouldn't be reported as running based on this documentation: 
> [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722985#comment-17722985
 ] 

Matthias Pohl commented on FLINK-32069:
---

Can you provide debug logs of the case?

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
>   tableResult.await();
>   JobClient jobClient = tableResult.getJobClient().get();
>   if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
>   } else {
> throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>   }
> } catch (InterruptedException e) {
>   throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
>   throw new IOException("Flink job is failed", e);
> } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>  
> [~mapohl], Thank you for assistance in slack, I have created this ticket to 
> back our  conversation, could you please add your thoughts on this failure 
> mode?
>  
> One possible solution would be to have additional check for presence of 
> JobResult in Result store before returning jobStatus (if there is a result, 
> job shouldn't be reported as running based on this documentation: 
> [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22588: [FLINK-31663][table] Add ARRAY_EXCEPT function

2023-05-15 Thread via GitHub


flinkbot commented on PR #22588:
URL: https://github.com/apache/flink/pull/22588#issuecomment-1548979423

   
   ## CI report:
   
   * 965fc11fe20d8b962b2a4c2ca67342051b130201 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] reswqa merged pull request #50: [BP-4.0][FLINK-32024][docs] Short code related to externalized connector retrieve version from its own data yaml

2023-05-15 Thread via GitHub


reswqa merged PR #50:
URL: https://github.com/apache/flink-connector-pulsar/pull/50


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] bvarghese1 opened a new pull request, #22588: [FLINK-31663][table] Add ARRAY_EXCEPT function

2023-05-15 Thread via GitHub


bvarghese1 opened a new pull request, #22588:
URL: https://github.com/apache/flink/pull/22588

   
   
   ## What is the purpose of the change
   This is an implementation of ARRAY_EXCEPT
   
   
   ## Brief change log
   ARRAY_EXCEPT for Table API and SQL
   ```
   Flink SQL> SELECT array_except(array[1,2,2], array[2,3,4]);
   [1]
   
   Flink SQL> SELECT array_except(array[1,2,2], array[1]);
   [2]
   
   Flink SQL> SELECT array_except(array[1,2,2], array[42]);
   [1, 2]
   
   Flink SQL> SELECT array_except(array[1,2,2], cast(null as array));
   [1, 2]
   
   Flink SQL> SELECT array_except(array[1,2,2], array[null,2]);
   [1]
   
   Flink SQL> SELECT array_except(cast(null as array), array[1,2,3]);
   
   
   Flink SQL> SELECT array_except(array[null,null,1], array[42]);
   [NULL, 1]
   
   Flink SQL> SELECT array_except(array[null,null,1], array[null, 42]);
   [1]
   
   Flink SQL> SELECT array_except(array[(TRUE, DATE '2022-04-20'), (TRUE, DATE 
'1990-10-14'), null], array[(TRUE, DATE '1990-10-14')]);
   [(TRUE, 2022-04-20), NULL]
   
   Flink SQL> SELECT array_except(array[(TRUE, DATE '2022-04-20'), (TRUE, DATE 
'1990-10-14'), null], cast(null as array>));
   [(TRUE, 2022-04-20), (TRUE, 1990-10-14), NULL]
   
   Flink SQL> SELECT array_except(array[array[1,null,3], array[0], array[1]], 
array[array[0]]);
   [[1, NULL, 3], [1]]
   
   Flink SQL> SELECT array_except(array[map[1, 'a', 2, 'b'], map[3, 'c', 4, 
'd']], array[map[3, 'c', 4, 'd']]);
   [{1=a, 2=b}]
   ```
   See also https://spark.apache.org/docs/latest/api/sql/index.html#array_except
   
   ## Verifying this change
   - This change added tests in CollectionFunctionsITCase.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32104) stop-with-savepoint fails and times out with simple reproducible example

2023-05-15 Thread Kurt Ostfeld (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Ostfeld updated FLINK-32104:
-
Description: 
I've put together a simple demo app that reproduces the issue with instructions 
on how to reproduce:

[https://github.com/kurtostfeld/flink-stop-issue]

 

The issue is that with a very simple Flink DataStream API application, the 
`stop-with-savepoint` fails and times out like this:

 
{code:java}
./bin/flink stop --type native --savepointPath ../savepoints 
d69a952625497cca0665dfdcdb9f4718

Suspending job "d69a952625497cca0665dfdcdb9f4718" with a NATIVE savepoint.


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
"d69a952625497cca0665dfdcdb9f4718".
at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1041)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:578)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1110)
at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.util.concurrent.TimeoutException
at 
java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:591)
... 7 more {code}
 

  was:
I've put together a simple demo app that reproduces the issue with instructions 
on how to reproduce:

[https://github.com/kurtostfeld/flink-stop-issue]

 

The issue is with a very simple application written with the Flink DataStream 
API, `stop-with-savepoint` fails and times out like this:

 
{code:java}
./bin/flink stop --type native --savepointPath ../savepoints 
d69a952625497cca0665dfdcdb9f4718

Suspending job "d69a952625497cca0665dfdcdb9f4718" with a NATIVE savepoint.


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
"d69a952625497cca0665dfdcdb9f4718".
at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1041)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:578)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1110)
at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.util.concurrent.TimeoutException
at 
java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:591)
... 7 more {code}


> stop-with-savepoint fails and times out with simple reproducible example
> 
>
> Key: FLINK-32104
> URL: https://issues.apache.org/jira/browse/FLINK-32104
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.17.0
>Reporter: Kurt Ostfeld
>Priority: Major
>
> I've put together a simple demo app that reproduces the issue with 
> instructions on how to reproduce:
> [https://github.com/kurtostfeld/flink-stop-issue]
>  
> The issue is that with a very simple Flink DataStream API application, the 
> `stop-with-savepoint` fails and times out like this:
>  
> {code:java}
> ./bin/flink stop --type native --savepointPath ../savepoints 
> d69a952625497cca0665dfdcdb9f4718
> Suspending job "d69a952625497cca0665dfdcdb9f4718" with a NATIVE savepoint.
> 
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
> "d69a952625497cca0665dfdcdb9f4718".
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595)
> at 
> org.apache.flink.client.cli.C

[jira] [Created] (FLINK-32104) stop-with-savepoint fails and times out with simple reproducible example

2023-05-15 Thread Kurt Ostfeld (Jira)
Kurt Ostfeld created FLINK-32104:


 Summary: stop-with-savepoint fails and times out with simple 
reproducible example
 Key: FLINK-32104
 URL: https://issues.apache.org/jira/browse/FLINK-32104
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.17.0
Reporter: Kurt Ostfeld


I've put together a simple demo app that reproduces the issue with instructions 
on how to reproduce:

[https://github.com/kurtostfeld/flink-stop-issue]

 

The issue is with a very simple application written with the Flink DataStream 
API, `stop-with-savepoint` fails and times out like this:

 
{code:java}
./bin/flink stop --type native --savepointPath ../savepoints 
d69a952625497cca0665dfdcdb9f4718

Suspending job "d69a952625497cca0665dfdcdb9f4718" with a NATIVE savepoint.


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
"d69a952625497cca0665dfdcdb9f4718".
at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1041)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:578)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1110)
at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.util.concurrent.TimeoutException
at 
java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:591)
... 7 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22587: [FLINK-31892][runtime] Introduce AdaptiveScheduler global failure enrichment/labeling

2023-05-15 Thread via GitHub


flinkbot commented on PR #22587:
URL: https://github.com/apache/flink/pull/22587#issuecomment-1548949084

   
   ## CI report:
   
   * 99f41cfb94b20162cce28f7e9ec5662afe385689 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myracle commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout

2023-05-15 Thread via GitHub


Myracle commented on code in PR #22560:
URL: https://github.com/apache/flink/pull/22560#discussion_r1194578196


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -92,6 +92,17 @@ public class ExecutionOptions {
 .withDescription(
 "Tells if we should use compression for the state 
snapshot data or not");
 
+public static final ConfigOption BUFFER_TIMEOUT_ENABLED =
+ConfigOptions.key("execution.buffer-timeout.enabled")

Review Comment:
   How about changing the config execution.buffer-timeout.enabled to 
execution.flush-on-buffer-full?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] reswqa commented on pull request #50: [BP-4.0][FLINK-32024][docs] Short code related to externalized connector retrieve version from its own data yaml

2023-05-15 Thread via GitHub


reswqa commented on PR #50:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/50#issuecomment-1548938191

   Reviewed in #49.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] reswqa opened a new pull request, #50: [BP-4.0][FLINK-32024][docs] Short code related to externalized connector retrieve version from its own data yaml

2023-05-15 Thread via GitHub


reswqa opened a new pull request, #50:
URL: https://github.com/apache/flink-connector-pulsar/pull/50

   Backport to `v4.0` as flink track this branch to render documentation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22586: [FLINK-31880][tests] Fixes unit test so it will pass regardless of ti…

2023-05-15 Thread via GitHub


flinkbot commented on PR #22586:
URL: https://github.com/apache/flink/pull/22586#issuecomment-1548929471

   
   ## CI report:
   
   * 6e8d876beb9ff0f2c5eddf4833ec572068629616 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] yangjf2019 opened a new pull request, #601: [hotfix][docs] fix some typos

2023-05-15 Thread via GitHub


yangjf2019 opened a new pull request, #601:
URL: https://github.com/apache/flink-kubernetes-operator/pull/601

   
   
   ## What is the purpose of the change
   
   * I have updated some typos in the 
`docs/content/docs/custom-resource/autoscaler.md` file.
   
   ## Verifying this change
   
   * I checked the changes. 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
(no)
 - Core observer or reconciler logic that is regularly executed: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   
   Hi, @mbalassi please take a look in your free time, thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31880) Bad Test in OrcColumnarRowSplitReaderTest

2023-05-15 Thread Kurt Ostfeld (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722973#comment-17722973
 ] 

Kurt Ostfeld commented on FLINK-31880:
--

Updated PR

https://github.com/apache/flink/pull/22586

> Bad Test in OrcColumnarRowSplitReaderTest
> -
>
> Key: FLINK-31880
> URL: https://issues.apache.org/jira/browse/FLINK-31880
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ORC, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile)
>Reporter: Kurt Ostfeld
>Priority: Minor
>  Labels: pull-request-available
>
> This is a development issue with, what looks like a buggy unit test.
>  
> I tried to build Flink with a clean copy of the repository and I get:
>  
> ```
> [INFO] Results:
> [INFO]
> [ERROR] Failures:
> [ERROR] OrcColumnarRowSplitReaderTest.testReadFileWithTypes:365
> expected: "1969-12-31"
> but was: "1970-01-01"
> [INFO]
> [ERROR] Tests run: 26, Failures: 1, Errors: 0, Skipped: 0
> ```
>  
> I see the test is testing Date data types with `new Date(562423)` which is 9 
> minutes and 22 seconds after the epoch time, which is 1970-01-01 UTC time, or 
> when I run that on my laptop in CST timezone, I get `Wed Dec 31 18:09:22 CST 
> 1969`.
>  
> I have a simple pull request ready which fixes this issue and uses the Java 8 
> LocalDate API instead which avoids time zones entirely.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] kurtostfeld opened a new pull request, #22586: [FLINK-31880][tests] Fixes unit test so it will pass regardless of ti…

2023-05-15 Thread via GitHub


kurtostfeld opened a new pull request, #22586:
URL: https://github.com/apache/flink/pull/22586

   Fixes unit test so it will pass regardless of time zone it is run in. Also 
use a better testing date.
   
   I submitted this before, I'm resubmitting with issues fixed. This should 
follow commit and PR style guides.
   
   This is a low priority issue, as the Flink projects only needs unit tests to 
pass in the CI system, not in other environments in other time zones, but it's 
helpful to some developers to have this pass, this is very low risk, and this 
also uses a better test value. The previous code uses `new Date(562423)` which 
is 9 minutes after the epoch in UTC, and in UTC-5 time zone it's before the 
epoch of 1970-01-01, and that's a poor test value.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31893) Placeholder

2023-05-15 Thread Panagiotis Garefalakis (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Panagiotis Garefalakis updated FLINK-31893:
---
Summary: Placeholder  (was: Introduce AdaptiveScheduler global failure 
enrichment/labeling)

> Placeholder
> ---
>
> Key: FLINK-31893
> URL: https://issues.apache.org/jira/browse/FLINK-31893
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Panagiotis Garefalakis
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout

2023-05-15 Thread via GitHub


1996fanrui commented on code in PR #22560:
URL: https://github.com/apache/flink/pull/22560#discussion_r1194556471


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -92,6 +92,17 @@ public class ExecutionOptions {
 .withDescription(
 "Tells if we should use compression for the state 
snapshot data or not");
 
+public static final ConfigOption BUFFER_TIMEOUT_ENABLED =
+ConfigOptions.key("execution.buffer-timeout.enabled")

Review Comment:
   FLINK-29372 limited that the option key cannot be a prefix of other options, 
however `execution.buffer-timeout` is the prefix of 
`execution.buffer-timeout.enabled`. So the CI fails.
   
   Hi @zentol , could we update the option key for `BUFFER_TIMEOUT` and mark 
the `"execution.buffer-timeout"` as the DeprecatedKey? How about update it to 
`execution.buffer-timeout.interval` ? Or do you have any suggestions here? 
Looking forward to your opinion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31892) Introduce AdaptiveScheduler global failure enrichment/labeling

2023-05-15 Thread Panagiotis Garefalakis (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Panagiotis Garefalakis updated FLINK-31892:
---
Summary: Introduce AdaptiveScheduler global failure enrichment/labeling  
(was: Introduce DefaultScheduler global failure enrichment/labeling)

> Introduce AdaptiveScheduler global failure enrichment/labeling
> --
>
> Key: FLINK-31892
> URL: https://issues.apache.org/jira/browse/FLINK-31892
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Panagiotis Garefalakis
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] kurtostfeld closed pull request #22451: FLINK-31880: Fixed broken date test. It was failing when run in CST time zone.

2023-05-15 Thread via GitHub


kurtostfeld closed pull request #22451: FLINK-31880: Fixed broken date test. It 
was failing when run in CST time zone.
URL: https://github.com/apache/flink/pull/22451


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JunRuiLee commented on a diff in pull request #16593: [FLINK-23425][streaming-java] The impact of cpu cores on test results…

2023-05-15 Thread via GitHub


JunRuiLee commented on code in PR #16593:
URL: https://github.com/apache/flink/pull/16593#discussion_r1194546629


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -1291,7 +1291,7 @@ public void 
testSlotSharingResourceConfigurationWithDefaultSlotSharingGroup() {
 StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, 
resourceProfile);
 
 final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.fromElements(1, 2, 3).map(x -> x + 1);
+env.fromElements(1, 2, 3).map(x -> x + 1).setParallelism(2);
 

Review Comment:
   I suggest adding some comments explaining why this change is needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


reswqa commented on PR #22574:
URL: https://github.com/apache/flink/pull/22574#issuecomment-1548891764

   Rebased on `origin/master`, will be merged after AZP green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32101) FlinkKafkaInternalProducerITCase.testInitTransactionId test failed

2023-05-15 Thread Yuxin Tan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722966#comment-17722966
 ] 

Yuxin Tan commented on FLINK-32101:
---

[~mapohl] Yes, exactly. Thanks for tracking it.

> FlinkKafkaInternalProducerITCase.testInitTransactionId test failed
> --
>
> Key: FLINK-32101
> URL: https://issues.apache.org/jira/browse/FLINK-32101
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Priority: Major
>  Labels: test
>
> FlinkKafkaInternalProducerITCase.testInitTransactionId test failed.
>   Caused by: org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The request timed out.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48990&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=502fb6c0-30a2-5e49-c5c2-a00fa3acb203&l=22973
> {code:java}
> Caused by: org.apache.kafka.common.KafkaException: 
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The request timed out.
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
>   at 
> java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
>   at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
>   at 
> java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
>   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
>   at 
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
>   at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1290)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1216)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:95)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:747)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:688)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The request timed out.
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1418)
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322)
>   at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>   at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
>   at 
> org.apache.kafka.clients.produce

[jira] [Updated] (FLINK-32101) FlinkKafkaInternalProducerITCase.testInitTransactionId test failed

2023-05-15 Thread Yuxin Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuxin Tan updated FLINK-32101:
--
Labels: test-stability  (was: test)

> FlinkKafkaInternalProducerITCase.testInitTransactionId test failed
> --
>
> Key: FLINK-32101
> URL: https://issues.apache.org/jira/browse/FLINK-32101
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Priority: Major
>  Labels: test-stability
>
> FlinkKafkaInternalProducerITCase.testInitTransactionId test failed.
>   Caused by: org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The request timed out.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48990&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=502fb6c0-30a2-5e49-c5c2-a00fa3acb203&l=22973
> {code:java}
> Caused by: org.apache.kafka.common.KafkaException: 
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The request timed out.
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
>   at 
> java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
>   at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
>   at 
> java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
>   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
>   at 
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
>   at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1290)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1216)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:95)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:747)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:688)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The request timed out.
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1418)
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322)
>   at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>   at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(

[GitHub] [flink] godfreyhe commented on a diff in pull request #22566: [FLINK-32052][table-runtime] Introduce left and right state retention time to StreamingJoinOperator

2023-05-15 Thread via GitHub


godfreyhe commented on code in PR #22566:
URL: https://github.com/apache/flink/pull/22566#discussion_r1194510238


##
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java:
##
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.stream;
+
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+
+/** Base test class for {@link AbstractStreamingJoinOperator}. */
+public abstract class StreamingJoinOperatorTestBase {
+
+protected final InternalTypeInfo leftTypeInfo =
+InternalTypeInfo.of(
+RowType.of(
+new LogicalType[] {
+new CharType(false, 20),
+new CharType(false, 20),
+VarCharType.STRING_TYPE
+},
+new String[] {"order_id", "line_order_id", 
"shipping_address"}));
+
+protected final InternalTypeInfo rightTypeInfo =
+InternalTypeInfo.of(
+RowType.of(
+new LogicalType[] {new CharType(false, 20), new 
CharType(true, 10)},
+new String[] {"line_order_id0", 
"line_order_ship_mode"}));
+
+protected final RowDataKeySelector leftKeySelector =
+HandwrittenSelectorUtil.getRowDataSelector(
+new int[] {1},
+leftTypeInfo.toRowType().getChildren().toArray(new 
LogicalType[0]));
+protected final RowDataKeySelector rightKeySelector =
+HandwrittenSelectorUtil.getRowDataSelector(
+new int[] {0},
+rightTypeInfo.toRowType().getChildren().toArray(new 
LogicalType[0]));
+
+protected final JoinInputSideSpec leftInputSpec =
+JoinInputSideSpec.withUniqueKeyContainedByJoinKey(leftTypeInfo, 
leftKeySelector);
+protected final JoinInputSideSpec rightInputSpec =
+JoinInputSideSpec.withUniqueKeyContainedByJoinKey(rightTypeInfo, 
rightKeySelector);
+
+protected final InternalTypeInfo joinKeyTypeInfo =
+InternalTypeInfo.of(new CharType(false, 20));
+
+protected final String funcCode =
+"public class ConditionFunction extends 
org.apache.flink.api.common.functions.AbstractRichFunction "
++ "implements 
org.apache.flink.table.runtime.generated.JoinCondition {\n"
++ "\n"
++ "public ConditionFunction(Object[] reference) {\n"
++ "}\n"
++ "\n"
++ "@Override\n"
++ "public boolean 
apply(org.apache.flink.table.data.RowData in1, 
org.apache.flink.table.data.RowData in2) {\n"
++ "return true;\n"
++ "}\n"
++ "\n"
++ "@Override\n"
++ "public void close() throws Exception {\n"
++ "super.close();\n"
++ "}"
++ "}\n";
+protected final GeneratedJoinCondition joinCondition =
+new GeneratedJoinCondition("ConditionFunction", funcCode, new 
Object[0]);
+
+protected f

[GitHub] [flink] pgaref commented on pull request #22506: [FLINK-31890][runtime] Introduce DefaultScheduler failure enrichment/labeling

2023-05-15 Thread via GitHub


pgaref commented on PR #22506:
URL: https://github.com/apache/flink/pull/22506#issuecomment-1548830252

   Thanks for the comments once again @zhuzhurk !
   PTAL on latest PR:
   
   * introducing async task failure labeling as part of 
ExecutionFailureHandler#handleFailure (for local and global failures)
   * transient CompletableFuture with serializable labels as part of ErrorInfo
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32103) RBAC flinkdeployments/finalizers missing for OpenShift Deployment

2023-05-15 Thread James Busche (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722952#comment-17722952
 ] 

James Busche edited comment on FLINK-32103 at 5/16/23 1:07 AM:
---

Can fix this on an existing OpenShift install by adding "- 
flinkdeployments/finalizers" to the flink.apache.org resources like this:
{quote}oc edit clusterrole flink-operator
 - apiGroups:

  - flink.apache.org

  resources:

  - flinkdeployments

  - flinkdeployments/status

  - flinkdeployments/finalizers

  - flinksessionjobs

  - flinksessionjobs/status

  verbs:

  - '*'
{quote}


was (Author: JIRAUSER287279):
Can fix this on an existing OpenShift install by adding "- 
flinkdeployments/finalizers" to the flink.apache.org resources like this:

{{}}
{quote}oc edit clusterrole flink-operator

- apiGroups:

  - flink.apache.org

  resources:

  - flinkdeployments

  - flinkdeployments/status

  - flinkdeployments/finalizers

  - flinksessionjobs

  - flinksessionjobs/status

  verbs:

  - '*'
{quote}

> RBAC flinkdeployments/finalizers missing for OpenShift Deployment
> -
>
> Key: FLINK-32103
> URL: https://issues.apache.org/jira/browse/FLINK-32103
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: James Busche
>Priority: Major
>
> In OpenShift 4.10 and above, I'm noticing with the Flink 1.5.0 RC release 
> that there's an issue with flinkdeployments on OpenShift.  Flinkdeployments 
> are stuck in upgrading:
> {quote}oc get flinkdep
> NAME                                    JOB STATUS   LIFECYCLE STATE
> basic-example                                        UPGRADING
> {quote}
>  
> The error message looks like:
> {quote}oc describe flinkdep basic-example
> 
> Error:                          
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException:
>  Could not create Kubernetes cluster 
> \"basic-example\".","throwableList":[\{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could
>  not create Kubernetes cluster 
> \"basic-example\"."},\{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
>  executing: POST at: 
> https://172.30.0.1/apis/apps/v1/namespaces/default/deployments. Message: 
> Forbidden!Configured service account doesn't have access. Service account may 
> have been revoked. deployments.apps \"basic-example\" is forbidden: cannot 
> set blockOwnerDeletion if an ownerReference refers to a resource you can't 
> set finalizers on: , ."}]}
>  
>  Job Manager Deployment Status:  MISSING
> {quote}
>  
> The solution is to fix it in the rbac.yaml of the helm template, adding a "  
> - flinkdeployments/finalizers" line to the flink.apache.org apiGroup.
>  
> If the Operator is already running and flinkdeployments are having trouble on 
> OpenShift, then someone can manually edit the 
> flink-kubernetes-operator.v1.5.0 clusterrole and add the
> "  - flinkdeployments/finalizers" in the flink.apache.org apiGroup.
>  
> I'll create a PR that addresses this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32103) RBAC flinkdeployments/finalizers missing for OpenShift Deployment

2023-05-15 Thread James Busche (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722952#comment-17722952
 ] 

James Busche commented on FLINK-32103:
--

Can fix this on an existing OpenShift install by adding "- 
flinkdeployments/finalizers" to the flink.apache.org resources like this:

{{}}
{quote}oc edit clusterrole flink-operator

- apiGroups:

  - flink.apache.org

  resources:

  - flinkdeployments

  - flinkdeployments/status

  - flinkdeployments/finalizers

  - flinksessionjobs

  - flinksessionjobs/status

  verbs:

  - '*'
{quote}

> RBAC flinkdeployments/finalizers missing for OpenShift Deployment
> -
>
> Key: FLINK-32103
> URL: https://issues.apache.org/jira/browse/FLINK-32103
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: James Busche
>Priority: Major
>
> In OpenShift 4.10 and above, I'm noticing with the Flink 1.5.0 RC release 
> that there's an issue with flinkdeployments on OpenShift.  Flinkdeployments 
> are stuck in upgrading:
> {quote}oc get flinkdep
> NAME                                    JOB STATUS   LIFECYCLE STATE
> basic-example                                        UPGRADING
> {quote}
>  
> The error message looks like:
> {quote}oc describe flinkdep basic-example
> 
> Error:                          
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException:
>  Could not create Kubernetes cluster 
> \"basic-example\".","throwableList":[\{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could
>  not create Kubernetes cluster 
> \"basic-example\"."},\{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
>  executing: POST at: 
> https://172.30.0.1/apis/apps/v1/namespaces/default/deployments. Message: 
> Forbidden!Configured service account doesn't have access. Service account may 
> have been revoked. deployments.apps \"basic-example\" is forbidden: cannot 
> set blockOwnerDeletion if an ownerReference refers to a resource you can't 
> set finalizers on: , ."}]}
>  
>  Job Manager Deployment Status:  MISSING
> {quote}
>  
> The solution is to fix it in the rbac.yaml of the helm template, adding a "  
> - flinkdeployments/finalizers" line to the flink.apache.org apiGroup.
>  
> If the Operator is already running and flinkdeployments are having trouble on 
> OpenShift, then someone can manually edit the 
> flink-kubernetes-operator.v1.5.0 clusterrole and add the
> "  - flinkdeployments/finalizers" in the flink.apache.org apiGroup.
>  
> I'll create a PR that addresses this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32103) RBAC flinkdeployments/finalizers missing for OpenShift Deployment

2023-05-15 Thread James Busche (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722951#comment-17722951
 ] 

James Busche commented on FLINK-32103:
--

Created PR [https://github.com/apache/flink-kubernetes-operator/pull/600] to 
address this

> RBAC flinkdeployments/finalizers missing for OpenShift Deployment
> -
>
> Key: FLINK-32103
> URL: https://issues.apache.org/jira/browse/FLINK-32103
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: James Busche
>Priority: Major
>
> In OpenShift 4.10 and above, I'm noticing with the Flink 1.5.0 RC release 
> that there's an issue with flinkdeployments on OpenShift.  Flinkdeployments 
> are stuck in upgrading:
> {quote}oc get flinkdep
> NAME                                    JOB STATUS   LIFECYCLE STATE
> basic-example                                        UPGRADING
> {quote}
>  
> The error message looks like:
> {quote}oc describe flinkdep basic-example
> 
> Error:                          
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException:
>  Could not create Kubernetes cluster 
> \"basic-example\".","throwableList":[\{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could
>  not create Kubernetes cluster 
> \"basic-example\"."},\{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
>  executing: POST at: 
> https://172.30.0.1/apis/apps/v1/namespaces/default/deployments. Message: 
> Forbidden!Configured service account doesn't have access. Service account may 
> have been revoked. deployments.apps \"basic-example\" is forbidden: cannot 
> set blockOwnerDeletion if an ownerReference refers to a resource you can't 
> set finalizers on: , ."}]}
>  
>  Job Manager Deployment Status:  MISSING
> {quote}
>  
> The solution is to fix it in the rbac.yaml of the helm template, adding a "  
> - flinkdeployments/finalizers" line to the flink.apache.org apiGroup.
>  
> If the Operator is already running and flinkdeployments are having trouble on 
> OpenShift, then someone can manually edit the 
> flink-kubernetes-operator.v1.5.0 clusterrole and add the
> "  - flinkdeployments/finalizers" in the flink.apache.org apiGroup.
>  
> I'll create a PR that addresses this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] jbusche commented on pull request #600: FLINK-32103 Update RBAC for OpenShift Deployment

2023-05-15 Thread via GitHub


jbusche commented on PR #600:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/600#issuecomment-1548743480

   OpenShift 4.10.54 is looking good with the PR:
   ```
   oc get pods,flinkdeployments -n default  
api.jim410.cp.fyre.ibm.com: Mon 
May 15 16:16:24 2023
   
   NAME READY   STATUS  
  RESTARTS   AGE
   pod/basic-example-5b547c9669-cj8pz   1/1 Running 
  0  5m37s
   pod/basic-example-taskmanager-1-11/1 Running 
  0  5m8s
   pod/basic-session-deployment-only-example-77957ff659-rvp5h   1/1 Running 
  0  3m23s
   pod/basic-session-deployment-only-example-taskmanager-1-11/1 Running 
  0  2m39s
   pod/basic-session-deployment-only-example-taskmanager-1-21/1 Running 
  0  2m39s
   pod/flink-kubernetes-operator-66f8544889-sjjm5   2/2 Running 
  0  8m38s
   
   NAME JOB 
STATUS   LIFECYCLE STATE
   flinkdeployment.flink.apache.org/basic-example   
RUNNING  STABLE
   flinkdeployment.flink.apache.org/basic-session-deployment-only-example   
 STABLE
   ```
   and the clusterrole looks good:
   ```
   oc describe clusterrole flink-kubernetes-operator.v1.5.0-866bfd99dd  |grep 
flinkdeployments
 flinkdeployments.flink.apache.org/finalizers  [] []
  [*]
 flinkdeployments.flink.apache.org/status  [] []
  [*]
 flinkdeployments.flink.apache.org [] []
  [*]
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] RamanVerma commented on a diff in pull request #29: [FLINK-32021][Connectors/Kafka] Improvement the Javadoc for SpecifiedOffsetsInitializer and TimestampOffsetsInitial

2023-05-15 Thread via GitHub


RamanVerma commented on code in PR #29:
URL: 
https://github.com/apache/flink-connector-kafka/pull/29#discussion_r119366


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java:
##
@@ -38,6 +38,10 @@
  * An implementation of {@link OffsetsInitializer} which initializes the 
offsets of the partition
  * according to the user specified offsets.
  *
+ * Use specified offsets for specified partitions while use commit offsets 
or earliest for

Review Comment:
   @loserwang1024 I have a question.
   When you mention `specified offsets`, do you mean the ones provided in the 
map `initialOffsets`.
   If that is true, these unspecified partitions are initialized to be consumed 
according to `offsetResetStrategy`, which can be `earliest` or `latest`. So, 
not necessarily `earliest`
   
   Also, could you point me to the code (or code change) that explains this
   > Specified partition's offset should be less than its latest offset,
 otherwise it will start from the earliest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] jbusche commented on pull request #600: FLINK-32103 Update RBAC for OpenShift Deployment

2023-05-15 Thread via GitHub


jbusche commented on PR #600:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/600#issuecomment-1548682654

   OK - the KIND cluster looks good - still deploying the samples fine.
   ```
   NAME READY   STATUS
RESTARTS   AGE
   basic-example-5bbd68657d-4dgj5   1/1 Running   0 
 3m44s
   basic-example-taskmanager-1-11/1 Running   0 
 3m14s
   basic-session-deployment-only-example-57994c7d96-bmlr6   1/1 Running   0 
 3m19s
   basic-session-deployment-only-example-taskmanager-1-11/1 Running   0 
 2m49s
   basic-session-deployment-only-example-taskmanager-1-21/1 Running   0 
 2m49s
   flink-kubernetes-operator-6b659d866-4t8g72/2 Running   0 
 4m26s
   ```
   The flink clusterrole looks good:
   ```
   oc describe clusterrole flink-operator |grep flinkdeployments
 flinkdeployments.flink.apache.org/finalizers  [] []
  [*]
 flinkdeployments.flink.apache.org/status  [] []
  [*]
 flinkdeployments.flink.apache.org [] []
  [*]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] jbusche commented on pull request #600: FLINK-32103 Update RBAC for OpenShift Deployment

2023-05-15 Thread via GitHub


jbusche commented on PR #600:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/600#issuecomment-1548632061

   Trying a new deployment on both a KIND cluster (non OpenShift) as well as 
two OpenShift clusters to verify the change.  Will post results here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] jbusche opened a new pull request, #600: FLINK-32103 Update RBAC for OpenShift Deployment

2023-05-15 Thread via GitHub


jbusche opened a new pull request, #600:
URL: https://github.com/apache/flink-kubernetes-operator/pull/600

   
   
   ## What is the purpose of the change
   
   Noticing in the 1.5.0-rc1 and 1.5.0-rc2 that we're having a problem 
deploying on OpenShift.  The solution is to either manually update the 
flink-kubernetes-operator.v1.5.0 clusterrole, but a better way is to update the 
rbac.yaml.  In either case, a `- flinkdeployments/finalizers` is needed in the 
flink.apache.org apiGroup.
   
   The following PR adds this finalizer for the deployment to succeed.
   
   ## Brief change log
   
 - *the `- flinkdeployments/finalizers` resource is added to the  
flink.apache.org apiGroup*
   
   ## Verifying this change
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): No
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
No
 - Core observer or reconciler logic that is regularly executed: No
   
   ## Documentation
   
 - Does this pull request introduce a new feature? No
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31890) Introduce DefaultScheduler failure enrichment/labeling

2023-05-15 Thread Panagiotis Garefalakis (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Panagiotis Garefalakis updated FLINK-31890:
---
Summary: Introduce DefaultScheduler failure enrichment/labeling  (was: 
Introduce SchedulerBase per-task failure enrichment/labeling)

> Introduce DefaultScheduler failure enrichment/labeling
> --
>
> Key: FLINK-31890
> URL: https://issues.apache.org/jira/browse/FLINK-31890
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Panagiotis Garefalakis
>Assignee: Panagiotis Garefalakis
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32103) RBAC flinkdeployments/finalizers missing for OpenShift Deployment

2023-05-15 Thread James Busche (Jira)
James Busche created FLINK-32103:


 Summary: RBAC flinkdeployments/finalizers missing for OpenShift 
Deployment
 Key: FLINK-32103
 URL: https://issues.apache.org/jira/browse/FLINK-32103
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: James Busche


In OpenShift 4.10 and above, I'm noticing with the Flink 1.5.0 RC release that 
there's an issue with flinkdeployments on OpenShift.  Flinkdeployments are 
stuck in upgrading:
{quote}oc get flinkdep

NAME                                    JOB STATUS   LIFECYCLE STATE

basic-example                                        UPGRADING
{quote}
 

The error message looks like:
{quote}oc describe flinkdep basic-example



Error:                          
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException:
 Could not create Kubernetes cluster 
\"basic-example\".","throwableList":[\{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could
 not create Kubernetes cluster 
\"basic-example\"."},\{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
 executing: POST at: 
https://172.30.0.1/apis/apps/v1/namespaces/default/deployments. Message: 
Forbidden!Configured service account doesn't have access. Service account may 
have been revoked. deployments.apps \"basic-example\" is forbidden: cannot set 
blockOwnerDeletion if an ownerReference refers to a resource you can't set 
finalizers on: , ."}]}

 

 Job Manager Deployment Status:  MISSING
{quote}
 

The solution is to fix it in the rbac.yaml of the helm template, adding a "  - 
flinkdeployments/finalizers" line to the flink.apache.org apiGroup.

 

If the Operator is already running and flinkdeployments are having trouble on 
OpenShift, then someone can manually edit the flink-kubernetes-operator.v1.5.0 
clusterrole and add the

"  - flinkdeployments/finalizers" in the flink.apache.org apiGroup.

 

I'll create a PR that addresses this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


reswqa commented on PR #22574:
URL: https://github.com/apache/flink/pull/22574#issuecomment-1548347105

   Thanks @snuyanzin very much for your patient review 👍 . Without you, I 
didn't even realize that the number of tests wouldn't be right (my mistake) 🙇 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


snuyanzin commented on PR #22574:
URL: https://github.com/apache/flink/pull/22574#issuecomment-1548342955

   I checked that now number of tests before the change is equal to number of 
tests after per changed class
   no more findings from my side
   lgtm


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


snuyanzin commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194200879


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala:
##
@@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
 import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, 
StreamingTestBase, TestingAppendSink, TestSinkUtil}
+import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension
 import org.apache.flink.types.Row
 
-import org.junit.{Assert, Before, Test}
-import org.junit.Assert.assertEquals
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.collection.{mutable, Seq}
 
 /** Streaming [[FileSystemITCaseBase]]. */
+@ExtendWith(Array(classOf[NoOpTestExtension]))
 abstract class StreamFileSystemITCaseBase extends StreamingTestBase with 
FileSystemITCaseBase {

Review Comment:
   thanks for clarification



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


reswqa commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194197548


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala:
##
@@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
 import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, 
StreamingTestBase, TestingAppendSink, TestSinkUtil}
+import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension
 import org.apache.flink.types.Row
 
-import org.junit.{Assert, Before, Test}
-import org.junit.Assert.assertEquals
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.collection.{mutable, Seq}
 
 /** Streaming [[FileSystemITCaseBase]]. */
+@ExtendWith(Array(classOf[NoOpTestExtension]))
 abstract class StreamFileSystemITCaseBase extends StreamingTestBase with 
FileSystemITCaseBase {

Review Comment:
   To some extent, yes. Because we manually set the parallelism in `before` 
method:
   ```
   env.setParallelism(4)
   ```
   This is the same as `DEFAULT_PARALLELISM` in `AbstractTestBase`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


reswqa commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194197548


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala:
##
@@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
 import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, 
StreamingTestBase, TestingAppendSink, TestSinkUtil}
+import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension
 import org.apache.flink.types.Row
 
-import org.junit.{Assert, Before, Test}
-import org.junit.Assert.assertEquals
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.collection.{mutable, Seq}
 
 /** Streaming [[FileSystemITCaseBase]]. */
+@ExtendWith(Array(classOf[NoOpTestExtension]))
 abstract class StreamFileSystemITCaseBase extends StreamingTestBase with 
FileSystemITCaseBase {

Review Comment:
   To some extent, yes. Because we manually set the parallelism in `before` 
method:
   ```
   env.setParallelism(4)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


snuyanzin commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194178109


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala:
##
@@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
 import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, 
StreamingTestBase, TestingAppendSink, TestSinkUtil}
+import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension
 import org.apache.flink.types.Row
 
-import org.junit.{Assert, Before, Test}
-import org.junit.Assert.assertEquals
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.collection.{mutable, Seq}
 
 /** Streaming [[FileSystemITCaseBase]]. */
+@ExtendWith(Array(classOf[NoOpTestExtension]))
 abstract class StreamFileSystemITCaseBase extends StreamingTestBase with 
FileSystemITCaseBase {

Review Comment:
   agree about rules, `@after/@before` annotations.
   Another point is that `StreamingTestBase` extends another junit4 based 
`AbstractTestBase` which has rule 
   ```java
   @ClassRule
   public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
   new MiniClusterWithClientResource(
   new MiniClusterResourceConfiguration.Builder()
   .setNumberTaskManagers(1)
   
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
   .build());
   ```
   it looks like this rule is not required for streaming tests 
(`before`/`after` is enough) 
   please correct me if I'm wrong



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


reswqa commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194170872


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala:
##
@@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
 import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, 
StreamingTestBase, TestingAppendSink, TestSinkUtil}
+import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension
 import org.apache.flink.types.Row
 
-import org.junit.{Assert, Before, Test}
-import org.junit.Assert.assertEquals
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.collection.{mutable, Seq}
 
 /** Streaming [[FileSystemITCaseBase]]. */
+@ExtendWith(Array(classOf[NoOpTestExtension]))
 abstract class StreamFileSystemITCaseBase extends StreamingTestBase with 
FileSystemITCaseBase {

Review Comment:
   Of course, in the long run, we should also migrate it to `Junit5`. Perhaps 
we can create a new ticket to track it because it is also a significant 
workload.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


reswqa commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194169663


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala:
##
@@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
 import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, 
StreamingTestBase, TestingAppendSink, TestSinkUtil}
+import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension
 import org.apache.flink.types.Row
 
-import org.junit.{Assert, Before, Test}
-import org.junit.Assert.assertEquals
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.collection.{mutable, Seq}
 
 /** Streaming [[FileSystemITCaseBase]]. */
+@ExtendWith(Array(classOf[NoOpTestExtension]))
 abstract class StreamFileSystemITCaseBase extends StreamingTestBase with 
FileSystemITCaseBase {

Review Comment:
   And these two `Rules` are not used in this PR related subclass
   ```
   @Rule
   def thrown: ExpectedException = expectedException
   @Rule
   def tempFolder: TemporaryFolder = _ tempFolder
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


reswqa commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194168520


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala:
##
@@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
 import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, 
StreamingTestBase, TestingAppendSink, TestSinkUtil}
+import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension
 import org.apache.flink.types.Row
 
-import org.junit.{Assert, Before, Test}
-import org.junit.Assert.assertEquals
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.collection.{mutable, Seq}
 
 /** Streaming [[FileSystemITCaseBase]]. */
+@ExtendWith(Array(classOf[NoOpTestExtension]))
 abstract class StreamFileSystemITCaseBase extends StreamingTestBase with 
FileSystemITCaseBase {

Review Comment:
   I did notice this issue, but I found that the `lifecycle` method like 
`before` in `StreamingTestBase` double annotationed with `@Before` and 
`@BeforeEach`.
   ```
 @Before
 @BeforeEach
 def before(): Unit = {
   this.env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(4)
   if (enableObjectReuse) {
 this.env.getConfig.enableObjectReuse()
   }
   val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
   this.tEnv = StreamTableEnvironment.create(env, setting)
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


reswqa commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194168520


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala:
##
@@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
 import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, 
StreamingTestBase, TestingAppendSink, TestSinkUtil}
+import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension
 import org.apache.flink.types.Row
 
-import org.junit.{Assert, Before, Test}
-import org.junit.Assert.assertEquals
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.collection.{mutable, Seq}
 
 /** Streaming [[FileSystemITCaseBase]]. */
+@ExtendWith(Array(classOf[NoOpTestExtension]))
 abstract class StreamFileSystemITCaseBase extends StreamingTestBase with 
FileSystemITCaseBase {

Review Comment:
   I did notice this issue, but I found that the `lifecycle` method like 
`before` in `StreamingTestBase ` double annotationed with `@Before` and 
`@BeforeEach`.
   ```
 @Before
 @BeforeEach
 def before(): Unit = {
   this.env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(4)
   if (enableObjectReuse) {
 this.env.getConfig.enableObjectReuse()
   }
   val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
   this.tEnv = StreamTableEnvironment.create(env, setting)
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


reswqa commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194167000


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala:
##
@@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
 import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, 
StreamingTestBase, TestingAppendSink, TestSinkUtil}
+import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension
 import org.apache.flink.types.Row
 
-import org.junit.{Assert, Before, Test}
-import org.junit.Assert.assertEquals
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.collection.{mutable, Seq}
 
 /** Streaming [[FileSystemITCaseBase]]. */
+@ExtendWith(Array(classOf[NoOpTestExtension]))
 abstract class StreamFileSystemITCaseBase extends StreamingTestBase with 
FileSystemITCaseBase {

Review Comment:
   Emmm, @Before
 @BeforeEach
 def before(): Unit = {
   this.env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(4)
   if (enableObjectReuse) {
 this.env.getConfig.enableObjectReuse()
   }
   val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
   this.tEnv = StreamTableEnvironment.create(env, setting)
 }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


snuyanzin commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194163579


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala:
##
@@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
 import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, 
StreamingTestBase, TestingAppendSink, TestSinkUtil}
+import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension
 import org.apache.flink.types.Row
 
-import org.junit.{Assert, Before, Test}
-import org.junit.Assert.assertEquals
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.collection.{mutable, Seq}
 
 /** Streaming [[FileSystemITCaseBase]]. */
+@ExtendWith(Array(classOf[NoOpTestExtension]))
 abstract class StreamFileSystemITCaseBase extends StreamingTestBase with 
FileSystemITCaseBase {

Review Comment:
   Couldn't it be another issue that `StreamingTestBase` is junit4 based?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


reswqa commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194160609


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala:
##
@@ -32,12 +35,11 @@ class FileSystemTestCsvITCase extends 
BatchFileSystemITCaseBase {
 
   override def getScheme: String = "test"
 
-  @After
+  @AfterEach
   def close(): Unit = {
 val path = new Path(resultPath)
-assertEquals(
-  s"File $resultPath is not closed",
-  0,
-  TestFileSystem.getNumberOfUnclosedOutputStream(path))
+if (TestFileSystem.getNumberOfUnclosedOutputStream(path) != 0) {
+  Assertions.fail(s"File $resultPath is not closed");
+}

Review Comment:
   I spent some time trying something similar, but I didn't come to any useful 
conclusions. Feeling disappointed about this 😞.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


snuyanzin commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194131282


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala:
##
@@ -32,12 +35,11 @@ class FileSystemTestCsvITCase extends 
BatchFileSystemITCaseBase {
 
   override def getScheme: String = "test"
 
-  @After
+  @AfterEach
   def close(): Unit = {
 val path = new Path(resultPath)
-assertEquals(
-  s"File $resultPath is not closed",
-  0,
-  TestFileSystem.getNumberOfUnclosedOutputStream(path))
+if (TestFileSystem.getNumberOfUnclosedOutputStream(path) != 0) {
+  Assertions.fail(s"File $resultPath is not closed");
+}

Review Comment:
   yeah.. that's weird...
   there is WA for that
   ```scala
   assertThat(TestFileSystem.getNumberOfUnclosedOutputStream(path))
 .as(s"File $resultPath is not closed")
 .asInstanceOf[AbstractIntegerAssert[_]]
 .isEqualTo(0)
   ```
   
   however there is an issue with such WA: in case there is more than one such 
WA per class it leads to issues for IntellijIdea (mvn works fine)...
   I've filed scala plugin issue https://youtrack.jetbrains.com/issue/SCL-20679



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


snuyanzin commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194131282


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala:
##
@@ -32,12 +35,11 @@ class FileSystemTestCsvITCase extends 
BatchFileSystemITCaseBase {
 
   override def getScheme: String = "test"
 
-  @After
+  @AfterEach
   def close(): Unit = {
 val path = new Path(resultPath)
-assertEquals(
-  s"File $resultPath is not closed",
-  0,
-  TestFileSystem.getNumberOfUnclosedOutputStream(path))
+if (TestFileSystem.getNumberOfUnclosedOutputStream(path) != 0) {
+  Assertions.fail(s"File $resultPath is not closed");
+}

Review Comment:
   yeah.. that's weird...
   there is WA for that
   ```scala
   assertThat(TestFileSystem.getNumberOfUnclosedOutputStream(path))
 .as(s"File $resultPath is not closed")
 .asInstanceOf[AbstractIntegerAssert[_]]
 .isEqualTo(0)
   ```
   
   however there is an issue with such WA: in case there is more than one such 
WA per class it leads to issues for IntellijIdea (mvn works fine)...
   I've files scala plugin issue https://youtrack.jetbrains.com/issue/SCL-20679



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


snuyanzin commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194131282


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala:
##
@@ -32,12 +35,11 @@ class FileSystemTestCsvITCase extends 
BatchFileSystemITCaseBase {
 
   override def getScheme: String = "test"
 
-  @After
+  @AfterEach
   def close(): Unit = {
 val path = new Path(resultPath)
-assertEquals(
-  s"File $resultPath is not closed",
-  0,
-  TestFileSystem.getNumberOfUnclosedOutputStream(path))
+if (TestFileSystem.getNumberOfUnclosedOutputStream(path) != 0) {
+  Assertions.fail(s"File $resultPath is not closed");
+}

Review Comment:
   yeah.. that's weird...
   there is WA for that
   ```scala
   assertThat(TestFileSystem.getNumberOfUnclosedOutputStream(path))
 .as(s"File $resultPath is not closed")
 .asInstanceOf[AbstractIntegerAssert[_]]
 .isEqualTo(0)
   ```
   
   how there is an issue with such WA: in case there is more than one such WA 
per class it leads to issues for IntellijIdea (mvn works fine)...
   I've files scala plugin issue https://youtrack.jetbrains.com/issue/SCL-20679



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-15 Thread via GitHub


reswqa commented on code in PR #22574:
URL: https://github.com/apache/flink/pull/22574#discussion_r1194117257


##
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemITCase.java:
##
@@ -69,22 +72,18 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** ITCase for {@link OrcFileFormatFactory}. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class OrcFileSystemITCase extends BatchFileSystemITCaseBase {
 
-@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+@TempDir public static java.nio.file.Path temporaryFolder;
 
-private final boolean configure;
+@Parameter public boolean configure;
 
-@Parameterized.Parameters(name = "{0}")
+@Parameters(name = "{0}")

Review Comment:
   Somehow I forgot to synchronize the changes here 😞 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Aleksandr Iushmanov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722858#comment-17722858
 ] 

Aleksandr Iushmanov edited comment on FLINK-32069 at 5/15/23 4:47 PM:
--

Thanks a lot [~mapohl] for this detailed analysis. I have only one question 
left around this dispatcher method that is responsible for job status:
{code:java}
@Override
public CompletableFuture requestMultipleJobDetails(Time 
timeout) {
List>> individualOptionalJobDetails =
queryJobMastersForInformation(
jobManagerRunner -> 
jobManagerRunner.requestJobDetails(timeout));

CompletableFuture>> 
optionalCombinedJobDetails =
FutureUtils.combineAll(individualOptionalJobDetails);

CompletableFuture> combinedJobDetails =

optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection);

final Collection completedJobDetails =
executionGraphInfoStore.getAvailableJobDetails();

return combinedJobDetails.thenApply(
(Collection runningJobDetails) -> {
final Map deduplicatedJobs = new HashMap<>();

completedJobDetails.forEach(job -> 
deduplicatedJobs.put(job.getJobId(), job));
runningJobDetails.forEach(job -> 
deduplicatedJobs.put(job.getJobId(), job));

return new MultipleJobsDetails(new 
HashSet<>(deduplicatedJobs.values()));
});
} {code}

I am sorry if this is naive, but I wonder if the following is possible?
1. {{optionalCombinedJobDetails}} contains job status as RUNNING right after we 
complete future in onJobCompletion
2. in parallel job transitioned to FINISHED state and {{completedJobDetails}} 
already has correct execution graph with FINISHED state
3. {{combinedJobDetails}} are compiled with both results but 
{{runningJobDetails}} has priority over {{completedJobDetails}} and overrides.

I will continue to look into this issue on my side, I will try to intercept JM 
on {{onJobCompletion}} method and in parallel request job status.

If what I think is not possible or not causing the issue, I will pivot and have 
a look at InsertResultProvider side of things to check if another promise is 
broken (that insert operation has only one row in result)




was (Author: izeren):
Thanks a lot [~mapohl] for this detailed analysis. I have only one question 
left around this dispatcher method that is responsible for job status:
{code:java}
@Override
public CompletableFuture requestMultipleJobDetails(Time 
timeout) {
List>> individualOptionalJobDetails =
queryJobMastersForInformation(
jobManagerRunner -> 
jobManagerRunner.requestJobDetails(timeout));

CompletableFuture>> 
optionalCombinedJobDetails =
FutureUtils.combineAll(individualOptionalJobDetails);

CompletableFuture> combinedJobDetails =

optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection);

final Collection completedJobDetails =
executionGraphInfoStore.getAvailableJobDetails();

return combinedJobDetails.thenApply(
(Collection runningJobDetails) -> {
final Map deduplicatedJobs = new HashMap<>();

completedJobDetails.forEach(job -> 
deduplicatedJobs.put(job.getJobId(), job));
runningJobDetails.forEach(job -> 
deduplicatedJobs.put(job.getJobId(), job));

return new MultipleJobsDetails(new 
HashSet<>(deduplicatedJobs.values()));
});
} {code}

I am sorry if this is naive, but I wonder if the following is possible?
1. {{optionalCombinedJobDetails}} contains job status as RUNNING
2. in parallel job transitioned to FINISHED state and {{completedJobDetails}} 
already has correct execution graph with FINISHED state
3. {{combinedJobDetails}} are compiled with both results but 
{{runningJobDetails}} has priority over {{completedJobDetails}} and overrides.

I will continue to look into this issue on my side, I will try to intercept JM 
on {{onJobCompletion}} method and in parallel request job status.

If what I think is not possible or not causing the issue, I will pivot and have 
a look at InsertResultProvider side of things to check if another promise is 
broken (that insert operation has only one row in result)



> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed

[jira] [Commented] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Aleksandr Iushmanov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722858#comment-17722858
 ] 

Aleksandr Iushmanov commented on FLINK-32069:
-

Thanks a lot [~mapohl] for this detailed analysis. I have only one question 
left around this dispatcher method that is responsible for job status:
{code:java}
@Override
public CompletableFuture requestMultipleJobDetails(Time 
timeout) {
List>> individualOptionalJobDetails =
queryJobMastersForInformation(
jobManagerRunner -> 
jobManagerRunner.requestJobDetails(timeout));

CompletableFuture>> 
optionalCombinedJobDetails =
FutureUtils.combineAll(individualOptionalJobDetails);

CompletableFuture> combinedJobDetails =

optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection);

final Collection completedJobDetails =
executionGraphInfoStore.getAvailableJobDetails();

return combinedJobDetails.thenApply(
(Collection runningJobDetails) -> {
final Map deduplicatedJobs = new HashMap<>();

completedJobDetails.forEach(job -> 
deduplicatedJobs.put(job.getJobId(), job));
runningJobDetails.forEach(job -> 
deduplicatedJobs.put(job.getJobId(), job));

return new MultipleJobsDetails(new 
HashSet<>(deduplicatedJobs.values()));
});
} {code}

I am sorry if this is naive, but I wonder if the following is possible?
1. {{optionalCombinedJobDetails}} contains job status as RUNNING
2. in parallel job transitioned to FINISHED state and {{completedJobDetails}} 
already has correct execution graph with FINISHED state
3. {{combinedJobDetails}} are compiled with both results but 
{{runningJobDetails}} has priority over {{completedJobDetails}} and overrides.

I will continue to look into this issue on my side, I will try to intercept JM 
on {{onJobCompletion}} method and in parallel request job status.

If what I think is not possible or not causing the issue, I will pivot and have 
a look at InsertResultProvider side of things to check if another promise is 
broken (that insert operation has only one row in result)



> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
>   tableResult.await();
>   JobClient jobClient = tableResult.getJobClient().get();
>   if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
>   } else {
> throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>   }
> } catch (InterruptedException e) {
>   throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
>   throw new IOException("Flink job is failed", e);
> } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not g

[jira] [Commented] (FLINK-31946) DynamoDB Sink Allow Multiple Item Writes

2023-05-15 Thread Hong Liang Teoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722855#comment-17722855
 ] 

Hong Liang Teoh commented on FLINK-31946:
-

Hi [~cjensen] ! Thanks for creating this issue. 

Trying to understand the use case better here. If we want 1 record to map to 2 
WriteRequests, would doing a FlatMap before the sink operator work?

If the intention is to have more efficient reads, could we instead use a GSI?

> DynamoDB Sink Allow Multiple Item Writes
> 
>
> Key: FLINK-31946
> URL: https://issues.apache.org/jira/browse/FLINK-31946
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB
>Reporter: Curtis Jensen
>Priority: Minor
>
> In some cases, it is desirable to be able to write aggregation data to 
> multiple partition keys.  This supports the case of denormalizing data to 
> facilitate more efficient read operations.
> However, the DynamoDBSink allows for only a single DynamoDB item to be 
> generated for each Flink Element.  This appears to be a limitation of the 
> ElementConverter more than DyanmoDBSink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31810) RocksDBException: Bad table magic number on checkpoint rescale

2023-05-15 Thread David Artiga (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722771#comment-17722771
 ] 

David Artiga edited comment on FLINK-31810 at 5/15/23 4:22 PM:
---

Not sure what you mean by "enable local recovery" but in any case we just 
restore from older checkpoint/savepoint as a workaround.

Edit: Found we have {{state.backend.local-recovery: true}}


was (Author: david.artiga):
Not sure what you mean by "enable local recovery" but in any case we just 
restore from older checkpoint/savepoint as a workaround.

> RocksDBException: Bad table magic number on checkpoint rescale
> --
>
> Key: FLINK-31810
> URL: https://issues.apache.org/jira/browse/FLINK-31810
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.15.2
>Reporter: Robert Metzger
>Priority: Major
>
> While rescaling a job from checkpoint, I ran into this exception:
> {code:java}
> SinkMaterializer[7] -> rob-result[7]: Writer -> rob-result[7]: Committer 
> (4/4)#3 (c1b348f7eed6e1ce0e41ef75338ae754) switched from INITIALIZING to 
> FAILED with failure cause: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> SinkUpsertMaterializer_7d9b7588bc2ff89baed50d7a4558caa4_(4/4) from any of the 
> 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
>   ... 11 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
>   at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
>   at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   ... 13 more
> Caused by: java.io.IOException: Error while opening RocksDB instance.
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:92)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreDBInstanceFromStateHandle(RocksDBIncrementalRestoreOperation.java:465)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithRescaling(RocksDBIncrementalRestoreOperation.java:321)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:164)
>   at 
> or

[GitHub] [flink-kubernetes-operator] mxm opened a new pull request, #598: [FLINK-32102] Aggregate multiple pendingRecords metric per source if present

2023-05-15 Thread via GitHub


mxm opened a new pull request, #598:
URL: https://github.com/apache/flink-kubernetes-operator/pull/598

   Some source expose multiple `.pendingRecords` metrics. If that is the case, 
we must sum up these records to yield the correct internal pending records 
count.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722820#comment-17722820
 ] 

Matthias Pohl edited comment on FLINK-32069 at 5/15/23 3:25 PM:


Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed 
by the following lines. I'm writing it down for documentation purposes. It 
would be good if you could verify my findings.
{quote}My line of thinking was that job result future is returned based on 
registry and job status based on result store and it it is not in sync same way 
as it used to be.
{quote}
The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The 
cleanup of the job is triggered (in 
[Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681])
 as soon as the result of the {{JobManagerRunner}} is completed (which happens 
in 
[JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L384]).
 In that moment, {{JobManagerRunnerRegistry}} is the source of truth when it 
comes to the {{{}JobStatus{}}}. The next step is writing the 
{{ExecutionGraphInfo}} of the terminated job into the 
{{ExecutionGraphInfoStore}} in 
[Dispatcher#jobReachedTerminalState:1334|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1334].
 After that, we create a dirty entry in the {{JobResultStore}} in 
[Dispatcher#jobReachedTerminalState:1347|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1347]
 which runs the operation asynchronously in the {{ioExecutor}} (see 
[Dispatcher#registerGloballyTerminatedJobInJobResultStore:1365|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1365].
 The future is forwarded to 
[Dispatcher#runJob:678|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L678]
 where it will trigger the {{Dispatcher#globalResourceCleaner}} through 
[Dispatcher#removeJob:1245|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1245].

We have the job's status stored in the {{{}JobManagerRunnerRegistry{}}}, the 
{{{}ExecutionGraphInforStore{}}}, and the {{JobResultStore}} persisted based on 
the same {{{}ExecutionGraph{}}}. Any job status call should rely on the 
{{{}JobManagerRunnerRegistry{}}}, still.

The {{Dispatcher#globalResourceCleaner}} is configured in 
[DisptacherResourceCleanerFactory#createGlobalResourceClearner:106ff|https://github.com/apache/flink/blob/c5352fc55972420ed5bf1afdfd97834540b1407a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java#L106].
 It removes the job from the {{JobManagerRunnerRegistry}} before cleaning up 
all the other artifacts. This cleanup procedure is triggered after the 
{{JobResultStore}} has the job's dirty entry written to disk/memory (and 
consequently as well, after having the {{ExecutionGraphInfo}} being persisted 
in the {{{}ExecutionGraphInfoStore{}}}).

The cleanup will result in the job's {{JobManagerRunner}} not being present in 
the {{{}JobManagerRunnerRegistry{}}}, anymore. The {{Dispatcher}} starts to 
rely on the {{ExecutionGraphInfoStore}} at that moment which has the same 
{{JobStatus}} present as the {{{}JobManagerRunnerRegistry{}}}.

>From that findings, I would conclude that there is no race condition possible. 
>WDYT?


was (Author: mapohl):
Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed 
by the following lines. I'm writing it down for documentation purposes. It 
would be good if you could verify my findings.
{quote}My line of thinking was that job result future is returned based on 
registry and job status based on result store and it it is not in sync same way 
as it used to be.
{quote}
The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The 
cleanup of the job is triggered (in 
[Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681])
 as soon as the result of the {{JobManagerRunner}} is completed (which happens 
in 
[JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apac

[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722820#comment-17722820
 ] 

Matthias Pohl edited comment on FLINK-32069 at 5/15/23 3:24 PM:


Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed 
by the following lines. I'm writing it down for documentation purposes. It 
would be good if you could verify my findings.
{quote}My line of thinking was that job result future is returned based on 
registry and job status based on result store and it it is not in sync same way 
as it used to be.
{quote}
The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The 
cleanup of the job is triggered (in 
[Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681])
 as soon as the result of the {{JobManagerRunner}} is completed (which happens 
in 
[JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L384]).
 In that moment, {{JobManagerRunnerRegistry}} is the source of truth when it 
comes to the {{{}JobStatus{}}}. The next step is writing the 
{{ExecutionGraphInfo}} of the terminated job into the 
{{ExecutionGraphInfoStore}} in 
[Dispatcher#jobReachedTerminalState:1334|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1334].
 After that, we create a dirty entry in the {{JobResultStore}} in 
[Dispatcher#jobReachedTerminalState:1347|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1347]
 which runs the operation asynchronously in the {{ioExecutor}} (see 
[Dispatcher#registerGloballyTerminatedJobInJobResultStore:1365|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1365].
 The future is forwarded to 
[Dispatcher#runJob:678|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L678]
 where it will trigger the {{Dispatcher#globalResourceCleaner}} through 
[Dispatcher#removeJob:1245|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1245].

We have the job's status stored in the {{{}JobManagerRunnerRegistry{}}}, the 
{{{}ExecutionGraphInforStore{}}}, and the {{JobResultStore}} persisted based on 
the same {{{}ExecutionGraph{}}}. Any job status call should rely on the 
{{{}JobManagerRunnerRegistry{}}}, still.

The {{Dispatcher#globalResourceCleaner}} is configured in 
[DisptacherResourceCleanerFactory#createGlobalResourceClearner:106ff|https://github.com/apache/flink/blob/c5352fc55972420ed5bf1afdfd97834540b1407a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java#L106].
 It removes the job from the {{JobManagerRunnerRegistry}} before cleaning up 
all the other artifacts. This cleanup procedure is triggered after the 
{{JobResultStore}} has the job's dirty entry written to disk/memory (and 
consequently as well, after having the {{ExecutionGraphInfo}} being persisted 
in the {{{}ExecutionGraphInfoStore{}}}).

The cleanup will result in the job's {{JobManagerRunner}} not being present in 
the {{{}JobManagerRunnerRegistry{}}}, anymore. The {{Dispatcher}} starts to 
rely on the {{ExecutionGraphInfoStore}} at that moment which has the same 
{{JobStatus}} present as the {{{}JobManagerRunnerRegistry{}}}.


was (Author: mapohl):
Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed 
by the following lines. I'm writing it down for documentation purposes. It 
would be good if you could verify my findings.
{quote}My line of thinking was that job result future is returned based on 
registry and job status based on result store and it it is not in sync same way 
as it used to be.
{quote}
The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The 
cleanup of the job is triggered (in 
[Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681])
 as soon as the result of the {{JobManagerRunner}} is completed (which happens 
in 
[JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/ap

[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722820#comment-17722820
 ] 

Matthias Pohl edited comment on FLINK-32069 at 5/15/23 3:23 PM:


Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed 
by the following lines. I'm writing it down for documentation purposes. It 
would be good if you could verify my findings.
{quote}My line of thinking was that job result future is returned based on 
registry and job status based on result store and it it is not in sync same way 
as it used to be.
{quote}
The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The 
cleanup of the job is triggered (in 
[Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681])
 as soon as the result of the {{JobManagerRunner}} is completed (which happens 
in 
[JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L384]).
 In that moment, {{JobManagerRunnerRegistry}} is the source of truth when it 
comes to the {{{}JobStatus{}}}. The next step is writing the 
{{ExecutionGraphInfo}} of the terminated job into the 
{{ExecutionGraphInfoStore}} in 
[Dispatcher#jobReachedTerminalState:1334|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1334].
 After that, we create a dirty entry in the {{JobResultStore}} in 
[Dispatcher#jobReachedTerminalState:1347|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1347]
 which runs the operation asynchronously in the {{ioExecutor}} (see 
[Dispatcher#registerGloballyTerminatedJobInJobResultStore:1365|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1365].
 The future is forwarded to 
[Dispatcher#runJob:678|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L678]
 where it will trigger the {{Dispatcher#globalResourceCleaner}} through 
[Dispatcher#removeJob:1245|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1245].

We have the job's status stored in the {{{}JobManagerRunnerRegistry{}}}, the 
{{{}ExecutionGraphInforStore{}}}, and the {{JobResultStore}} persisted based on 
the same {{{}ExecutionGraph{}}}. Any job status call should rely on the 
{{{}JobManagerRunnerRegistry{}}}, still.

The {{Dispatcher#globalResourceCleaner}} is configured in 
[DisptacherResourceCleanerFactory#createGlobalResourceClearner:106ff|https://github.com/apache/flink/blob/c5352fc55972420ed5bf1afdfd97834540b1407a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java#L106].
 It removes the job from the {{JobManagerRunnerRegistry}} before cleaning up 
all the other artifacts. This cleanup procedure is triggered after the 
{{JobResultStore}} has the job's dirty entry written to disk/memory.

The cleanup will result in the job's {{JobManagerRunner}} not being present in 
the {{{}JobManagerRunnerRegistry{}}}, anymore. The {{Dispatcher}} starts to 
rely on the {{ExecutionGraphInfoStore}} at that moment which has the same 
{{JobStatus}} present as the {{{}JobManagerRunnerRegistry{}}}.


was (Author: mapohl):
work-in-progress

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInter

[jira] [Commented] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722820#comment-17722820
 ] 

Matthias Pohl commented on FLINK-32069:
---

Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed 
by the following lines. I'm writing it down for documentation purposes. :inno

{quote}
My line of thinking was that job result future is returned based on registry 
and job status based on result store and it it is not in sync same way as it 
used to be. 
{quote}

The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The 
cleanup of the job is triggered (in 
[Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681])
 as soon as the result of the {{JobManagerRunner}} is completed (which happens 
in 
[JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L384]).
 In that moment, {{JobManagerRunnerRegistry}} is the source of truth when it 
comes to the {{JobStatus}}. The next step is writing the {{ExecutionGraphInfo}} 
of the terminated job into the {{ExecutionGraphInfoStore}} in 
[Dispatcher#jobReachedTerminalState:1334|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1334].
 After that, we create a dirty entry in the {{JobResultStore}} in 
[Dispatcher#jobReachedTerminalState:1347|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1347]
 which runs the operation asynchronously in the {{ioExecutor}} (see 
[Dispatcher#registerGloballyTerminatedJobInJobResultStore:1365|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1365].
 The future is forwarded to 
[Dispatcher#runJob:678|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L678]
 where it will trigger the {{Dispatcher#globalResourceCleaner}} through 
[Dispatcher#removeJob:1245|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1245].

The 

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
>   tableResult.await();
>   JobClient jobClient = tableResult.getJobClient().get();
>   if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
>   } else {
> throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>   }
> } catch (InterruptedException e) {
>   throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
>   throw new IOException("Flink job is failed", e);
> } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race

[jira] [Created] (FLINK-32102) Aggregate multiple pendingRecords metric per source if present

2023-05-15 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-32102:
--

 Summary: Aggregate multiple pendingRecords metric per source if 
present
 Key: FLINK-32102
 URL: https://issues.apache.org/jira/browse/FLINK-32102
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler, Kubernetes Operator
Affects Versions: kubernetes-operator-1.4.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.6.0


Some source expose multiple {{.pendingRecords}} metrics. If that is the case, 
we must sum up these records to yield the correct internal pending records 
count.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722820#comment-17722820
 ] 

Matthias Pohl edited comment on FLINK-32069 at 5/15/23 3:15 PM:


work-in-progress


was (Author: mapohl):
Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed 
by the following lines. I'm writing it down for documentation purposes. :inno

{quote}
My line of thinking was that job result future is returned based on registry 
and job status based on result store and it it is not in sync same way as it 
used to be. 
{quote}

The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The 
cleanup of the job is triggered (in 
[Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681])
 as soon as the result of the {{JobManagerRunner}} is completed (which happens 
in 
[JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L384]).
 In that moment, {{JobManagerRunnerRegistry}} is the source of truth when it 
comes to the {{JobStatus}}. The next step is writing the {{ExecutionGraphInfo}} 
of the terminated job into the {{ExecutionGraphInfoStore}} in 
[Dispatcher#jobReachedTerminalState:1334|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1334].
 After that, we create a dirty entry in the {{JobResultStore}} in 
[Dispatcher#jobReachedTerminalState:1347|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1347]
 which runs the operation asynchronously in the {{ioExecutor}} (see 
[Dispatcher#registerGloballyTerminatedJobInJobResultStore:1365|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1365].
 The future is forwarded to 
[Dispatcher#runJob:678|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L678]
 where it will trigger the {{Dispatcher#globalResourceCleaner}} through 
[Dispatcher#removeJob:1245|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1245].

The 

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
>   tableResult.await();
>   JobClient jobClient = tableResult.getJobClient().get();
>   if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
>   } else {
> throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>   }
> } catch (InterruptedException e) {
>   throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
>   throw new IOException("Flink job is failed", e);
> } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b8

[jira] [Closed] (FLINK-31609) Fatal error in ResourceManager caused YARNSessionFIFOSecuredITCase.testDetachedMode to fail

2023-05-15 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi closed FLINK-31609.
-

> Fatal error in ResourceManager caused 
> YARNSessionFIFOSecuredITCase.testDetachedMode to fail
> ---
>
> Key: FLINK-31609
> URL: https://issues.apache.org/jira/browse/FLINK-31609
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.18.0
>Reporter: Matthias Pohl
>Assignee: Ferenc Csaky
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
>
> This looks like FLINK-30908. I created a follow-up ticket because we reached 
> a new minor version.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47547&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461
> {code}
> Mar 24 09:32:29 2023-03-24 09:31:50,001 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - 
> Exception on heartbeat
> Mar 24 09:32:29 java.io.InterruptedIOException: Interrupted waiting to send 
> RPC request to server
> Mar 24 09:32:29 java.io.InterruptedIOException: Interrupted waiting to send 
> RPC request to server
> Mar 24 09:32:29   at org.apache.hadoop.ipc.Client.call(Client.java:1461) 
> ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at org.apache.hadoop.ipc.Client.call(Client.java:1403) 
> ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at com.sun.proxy.$Proxy33.allocate(Unknown Source) 
> ~[?:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
>  ~[hadoop-yarn-common-2.10.2.jar:?]
> Mar 24 09:32:29   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_292]
> Mar 24 09:32:29   at java.lang.reflect.Method.invoke(Method.java:498) 
> ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:433)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:166)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:158)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:96)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:362)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at com.sun.proxy.$Proxy34.allocate(Unknown Source) 
> ~[?:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:297)
>  ~[hadoop-yarn-client-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:274)
>  [hadoop-yarn-client-2.10.2.jar:?]
> Mar 24 09:32:29 Caused by: java.lang.InterruptedException
> Mar 24 09:32:29   at 
> java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1177) 
> ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at org.apache.hadoop.ipc.Client.call(Client.java:1456) 
> ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   ... 17 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31609) Fatal error in ResourceManager caused YARNSessionFIFOSecuredITCase.testDetachedMode to fail

2023-05-15 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi resolved FLINK-31609.
---
Fix Version/s: 1.18.0
   Resolution: Fixed

4415c5150eda071b219db5532c359ca29730a378 on master

> Fatal error in ResourceManager caused 
> YARNSessionFIFOSecuredITCase.testDetachedMode to fail
> ---
>
> Key: FLINK-31609
> URL: https://issues.apache.org/jira/browse/FLINK-31609
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.18.0
>Reporter: Matthias Pohl
>Assignee: Ferenc Csaky
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
>
> This looks like FLINK-30908. I created a follow-up ticket because we reached 
> a new minor version.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47547&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461
> {code}
> Mar 24 09:32:29 2023-03-24 09:31:50,001 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - 
> Exception on heartbeat
> Mar 24 09:32:29 java.io.InterruptedIOException: Interrupted waiting to send 
> RPC request to server
> Mar 24 09:32:29 java.io.InterruptedIOException: Interrupted waiting to send 
> RPC request to server
> Mar 24 09:32:29   at org.apache.hadoop.ipc.Client.call(Client.java:1461) 
> ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at org.apache.hadoop.ipc.Client.call(Client.java:1403) 
> ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at com.sun.proxy.$Proxy33.allocate(Unknown Source) 
> ~[?:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
>  ~[hadoop-yarn-common-2.10.2.jar:?]
> Mar 24 09:32:29   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_292]
> Mar 24 09:32:29   at java.lang.reflect.Method.invoke(Method.java:498) 
> ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:433)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:166)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:158)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:96)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:362)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at com.sun.proxy.$Proxy34.allocate(Unknown Source) 
> ~[?:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:297)
>  ~[hadoop-yarn-client-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:274)
>  [hadoop-yarn-client-2.10.2.jar:?]
> Mar 24 09:32:29 Caused by: java.lang.InterruptedException
> Mar 24 09:32:29   at 
> java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1177) 
> ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at org.apache.hadoop.ipc.Client.call(Client.java:1456) 
> ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   ... 17 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] gaborgsomogyi merged pull request #22550: [FLINK-31609][yarn][test] Extend log whitelist for expected AMRM heartbeat interrupt

2023-05-15 Thread via GitHub


gaborgsomogyi merged PR #22550:
URL: https://github.com/apache/flink/pull/22550


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] tweise commented on a diff in pull request #22556: [FLINK-32030][sql-client] Add URLs support for SQL Client gateway mode

2023-05-15 Thread via GitHub


tweise commented on code in PR #22556:
URL: https://github.com/apache/flink/pull/22556#discussion_r1193954446


##
flink-core/src/main/java/org/apache/flink/util/NetUtils.java:
##
@@ -120,6 +120,18 @@ private static URL validateHostPortString(String hostPort) 
{
 }
 }
 
+/**
+ * Converts an InetSocketAddress to a URL. This method assigns the 
"http://"; schema to the URL

Review Comment:
   So the existing implementation does not support https at all? I know that 
you are going to add support for https scheme later, just wanted to clarify the 
current state for host:port ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722691#comment-17722691
 ] 

Matthias Pohl edited comment on FLINK-32069 at 5/15/23 2:42 PM:


Thanks for documenting this issue in Jira, [~izeren]. Here are my findings so 
far:

I struggle to find a connection between the {{RunningJobsRegistry}} and the 
{{getJobStatus}} call of the client (which calls 
{{Dispatcher.requestJobStatus}} in the end. [~izeren] is right with claiming 
that we did a slight modification of the code when removing the 
{{RunningJobRegistry}} in 
[JobMasterServiceLeadershipRunner:385ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL385-L395].
 Marking this job as done happened before completing the 
{{JobMasterServiceLeadershipRunner#resultFuture}} through the 
{{{}RunningJobsRegistry{}}}. In the current code, we mark the job as completed 
after completing {{JobMasterServiceLeadershipRunner#resultFuture}} through the 
{{{}JobResultStore{}}}.

My issue is, though, that we're not relying on the {{RunningJobsRegistry}} in 
any way for the {{Dispatcher#requestJob}} call. The {{RunningJobsRegistry}} was 
only used for leader recovery in 
[JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess:272ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL272-L278]
 and when submitting a job through 
[Dispatcher#isInGloballyTerminalState:375ff|https://github.com/apache/flink/pull/18189/files#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cL375]
 in {{{}Dispatcher#submitJob{}}}. I would have expected that we would find 
{{Dispatcher#requestJobStatus}} somewhere in the call hierarchy of 
{{{}RunningJobsRegistry#getJobSchedulingStatus{}}}, if it would have had an 
influence on {{{}Dispatcher#requestJobStatus{}}}.

I don't want to say that [~izeren]'s conclusion is wrong, yet. It just doesn't 
match my findings in the code. It could be also that I'm missing a code path 
here.

[~dmvk], do you have something to add? 


was (Author: mapohl):
Thanks for documenting this issue in Jira, [~izeren]. Here are my findings so 
far:

I struggle to find a connection between the {{RunningJobRegistry}} and the 
{{getJobStatus}} call of the client (which calls 
{{Dispatcher.requestJobStatus}} in the end. [~izeren] is right with claiming 
that we did a slight modification of the code when removing the 
{{RunningJobRegistry}} in 
[JobMasterServiceLeadershipRunner:385ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL385-L395].
 Marking this job as done happened before completing the 
{{JobMasterServiceLeadershipRunner#resultFuture}} through the 
{{{}RunningJobsRegistry{}}}. In the current code, we mark the job as completed 
after completing {{JobMasterServiceLeadershipRunner#resultFuture}} through the 
{{{}JobResultStore{}}}.

My issue is, though, that we're not relying on the {{RunningJobsRegistry}} in 
any way for the {{Dispatcher#requestJob}} call. The {{RunningJobsRegistry}} was 
only used for leader recovery in 
[JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess:272ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL272-L278]
 and when submitting a job through 
[Dispatcher#isInGloballyTerminalState:375ff|https://github.com/apache/flink/pull/18189/files#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cL375]
 in {{{}Dispatcher#submitJob{}}}. I would have expected that we would find 
{{Dispatcher#requestJobStatus}} somewhere in the call hierarchy of 
{{{}RunningJobsRegistry#getJobSchedulingStatus{}}}, if it would have had an 
influence on {{{}Dispatcher#requestJobStatus{}}}.

I don't want to say that [~izeren]'s conclusion is wrong, yet. It just doesn't 
match my findings in the code. It could be also that I'm missing a code path 
here.

[~dmvk], do you have something to add? 

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert opera

[jira] [Assigned] (FLINK-24696) Translate how to configure unaligned checkpoints into Chinese

2023-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-24696:
--

Assignee: lijie

> Translate how to configure unaligned checkpoints into Chinese
> -
>
> Key: FLINK-24696
> URL: https://issues.apache.org/jira/browse/FLINK-24696
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation
>Affects Versions: 1.14.1, 1.15.0
>Reporter: Piotr Nowojski
>Assignee: lijie
>Priority: Not a Priority
>
> As part of FLINK-24695 
> {{docs/content/docs/ops/state/checkpointing_under_backpressure.md}} and 
> {{docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md}} were 
> modified. Those modifications should be translated into Chinese



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-24696) Translate how to configure unaligned checkpoints into Chinese

2023-05-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722802#comment-17722802
 ] 

Piotr Nowojski commented on FLINK-24696:


Sure! Thanks :)

> Translate how to configure unaligned checkpoints into Chinese
> -
>
> Key: FLINK-24696
> URL: https://issues.apache.org/jira/browse/FLINK-24696
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation
>Affects Versions: 1.14.1, 1.15.0
>Reporter: Piotr Nowojski
>Assignee: lijie
>Priority: Not a Priority
>
> As part of FLINK-24695 
> {{docs/content/docs/ops/state/checkpointing_under_backpressure.md}} and 
> {{docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md}} were 
> modified. Those modifications should be translated into Chinese



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] pnowojski commented on a diff in pull request #22584: [FLINK-31963][state] Fix rescaling bug in recovery from unaligned checkpoints.

2023-05-15 Thread via GitHub


pnowojski commented on code in PR #22584:
URL: https://github.com/apache/flink/pull/22584#discussion_r1193934653


##
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java:
##
@@ -785,6 +789,130 @@ public void testOnlyUpstreamChannelStateAssignment()
 }
 }
 
+/** FLINK-31963: Tests rescaling for stateless operators and upstream 
result partition state. */
+@Test
+public void testOnlyUpstreamChannelRescaleStateAssignment()
+throws JobException, JobExecutionException {
+Random random = new Random();
+OperatorSubtaskState upstreamOpState =
+OperatorSubtaskState.builder()
+.setResultSubpartitionState(
+new StateObjectCollection<>(
+asList(
+
createNewResultSubpartitionStateHandle(10, random),
+
createNewResultSubpartitionStateHandle(
+10, random
+.build();
+testOnlyUpstreamOrDownstreamRescalingInternal(upstreamOpState, null, 
5, 7);
+}
+
+/** FLINK-31963: Tests rescaling for stateless operators and downstream 
input channel state. */
+@Test
+public void testOnlyDownstreamChannelRescaleStateAssignment()
+throws JobException, JobExecutionException {
+Random random = new Random();
+OperatorSubtaskState downstreamOpState =
+OperatorSubtaskState.builder()
+.setInputChannelState(
+new StateObjectCollection<>(
+asList(
+
createNewInputChannelStateHandle(10, random),
+
createNewInputChannelStateHandle(10, random
+.build();
+testOnlyUpstreamOrDownstreamRescalingInternal(null, downstreamOpState, 
5, 5);
+}
+
+private void testOnlyUpstreamOrDownstreamRescalingInternal(
+@Nullable OperatorSubtaskState upstreamOpState,
+@Nullable OperatorSubtaskState downstreamOpState,
+int expectedUpstreamCount,
+int expectedDownstreamCount)
+throws JobException, JobExecutionException {
+
+if ((upstreamOpState == null && downstreamOpState == null)
+|| (upstreamOpState != null && downstreamOpState != null)) {
+// Either upstream or downstream state must exist, but not both.
+return;
+}
+
+// Start from parallelism 5 for both operators
+int upstreamParallelism = 5;
+int downstreamParallelism = 5;
+
+// Build states
+List operatorIds = buildOperatorIds(2);
+Map states = new HashMap<>();
+OperatorState upstreamState =
+new OperatorState(operatorIds.get(0), upstreamParallelism, 
MAX_P);
+OperatorState downstreamState =
+new OperatorState(operatorIds.get(1), downstreamParallelism, 
MAX_P);
+
+states.put(operatorIds.get(0), upstreamState);
+states.put(operatorIds.get(1), downstreamState);
+
+if (upstreamOpState != null) {
+upstreamState.putState(0, upstreamOpState);
+// rescale downstream 5 -> 3
+downstreamParallelism = 3;
+}
+
+if (downstreamOpState != null) {
+downstreamState.putState(0, downstreamOpState);
+// rescale upstream 5 -> 3
+upstreamParallelism = 3;
+}
+
+List opIdWithParallelism = new 
ArrayList<>(2);
+opIdWithParallelism.add(
+new OperatorIdWithParallelism(operatorIds.get(0), 
upstreamParallelism));
+opIdWithParallelism.add(
+new OperatorIdWithParallelism(operatorIds.get(1), 
downstreamParallelism));
+
+Map vertices =
+buildVertices(opIdWithParallelism, RANGE, ROUND_ROBIN);
+
+// Run state assignment
+new StateAssignmentOperation(0, new HashSet<>(vertices.values()), 
states, false)
+.assignStates();
+
+// Check results
+ExecutionJobVertex upstreamExecutionJobVertex = 
vertices.get(operatorIds.get(0));
+ExecutionJobVertex downstreamExecutionJobVertex = 
vertices.get(operatorIds.get(1));
+
+List upstreamRescalingDescriptors =
+getRescalingDescriptorsFromVertex(upstreamExecutionJobVertex);
+List downstreamRescalingDescriptors =
+
getRescalingDescriptorsFromVertex(downstreamExecutionJobVertex);
+
+checkMappings(
+upstreamRescalingDescriptors,
+TaskStateSnapshot::getOutputRescalingDescriptor,
+expectedUpstreamCount);
+
+checkMappings(
+downstreamRescaling

[GitHub] [flink] StefanRRichter commented on a diff in pull request #22584: [FLINK-31963][state] Fix rescaling bug in recovery from unaligned checkpoints.

2023-05-15 Thread via GitHub


StefanRRichter commented on code in PR #22584:
URL: https://github.com/apache/flink/pull/22584#discussion_r1193925009


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -136,19 +136,24 @@ public void assignStates() {
 
 // repartition state
 for (TaskStateAssignment stateAssignment : vertexAssignments.values()) 
{
-if (stateAssignment.hasNonFinishedState) {
+if (stateAssignment.hasNonFinishedState
+// FLINK-31963: We need to run repartitioning for 
stateless operators that have
+// upstream output or downstream input states.
+|| stateAssignment.hasUpstreamOutputStates()
+|| stateAssignment.hasDownstreamInputStates()) {
 assignAttemptState(stateAssignment);
 }
 }
 
 // actually assign the state
 for (TaskStateAssignment stateAssignment : vertexAssignments.values()) 
{
-// If upstream has output states, even the empty task state should 
be assigned for the
-// current task in order to notify this task that the old states 
will send to it which
-// likely should be filtered.
+// If upstream has output states or downstream has input states, 
even the empty task
+// state should be assigned for the current task in order to 
notify this task that the
+// old states will send to it which likely should be filtered.
 if (stateAssignment.hasNonFinishedState
 || stateAssignment.isFullyFinished
-|| stateAssignment.hasUpstreamOutputStates()) {
+|| stateAssignment.hasUpstreamOutputStates()
+|| stateAssignment.hasDownstreamInputStates()) {

Review Comment:
   I was considering that, and then I saw that most of the heavy work for the 
functions is already cached. Do you think it's still worth it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32047) Fix args in JobSpec not being passed through to Flink in Standalone mode - 1.4.0

2023-05-15 Thread Gil Shmaya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722785#comment-17722785
 ] 

Gil Shmaya edited comment on FLINK-32047 at 5/15/23 2:26 PM:
-

We run with 1.14.6, sorry for the confusion.
I see the same error while trying to run your test with 1.14.6.

However, I can't find an explanation for the fact we see the args pass in the 
version 1.2.0 operator version but not with 1.4.0. Is it possible that there 
was a change in the way we should configure the args in 1.4.0?


was (Author: JIRAUSER299921):
We run with 1.14.6, sorry for the confusion.
I see the same error while trying to run your test with 1.14.6.

However, I can't find an explanation for the fact we see the args pass in the 
version 1.2.0 operator version but not with 1.4.0.

> Fix args in JobSpec not being passed through to Flink in Standalone mode - 
> 1.4.0
> 
>
> Key: FLINK-32047
> URL: https://issues.apache.org/jira/browse/FLINK-32047
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Gil Shmaya
>Priority: Major
> Fix For: 1.14.0
>
> Attachments: image-2023-04-30-18-54-22-291.png, 
> image-2023-04-30-19-56-30-150.png, image-2023-04-30-19-56-57-680.png
>
>
> This issue is related to a previously fixed bug in version 1.2.0 -  
> FLINK-29388
> I have noticed that while the args are successfully being passed when using 
> version 1.2.0, this is not the case with version 1.4.0.
> {+}Scenario{+}:
> I added a log that prints the argument array length at the beginning of the 
> main  function of the flink job:
> !image-2023-04-30-18-54-22-291.png!
> The result when running with 1.2.0:
> !image-2023-04-30-19-56-30-150.png!
> The result when running with 1.4.0:
> !image-2023-04-30-19-56-57-680.png!
> h4.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32047) Fix args in JobSpec not being passed through to Flink in Standalone mode - 1.4.0

2023-05-15 Thread Gil Shmaya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722785#comment-17722785
 ] 

Gil Shmaya edited comment on FLINK-32047 at 5/15/23 2:25 PM:
-

We run with 1.14.6, sorry for the confusion.
I see the same error while trying to run your test with 1.14.6.

However, I can't find an explanation for the fact we see the args pass in the 
version 1.2.0 operator version but not with 1.4.0.


was (Author: JIRAUSER299921):
We run with 1.14.6, sorry for the confusion.

> Fix args in JobSpec not being passed through to Flink in Standalone mode - 
> 1.4.0
> 
>
> Key: FLINK-32047
> URL: https://issues.apache.org/jira/browse/FLINK-32047
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Gil Shmaya
>Priority: Major
> Fix For: 1.14.0
>
> Attachments: image-2023-04-30-18-54-22-291.png, 
> image-2023-04-30-19-56-30-150.png, image-2023-04-30-19-56-57-680.png
>
>
> This issue is related to a previously fixed bug in version 1.2.0 -  
> FLINK-29388
> I have noticed that while the args are successfully being passed when using 
> version 1.2.0, this is not the case with version 1.4.0.
> {+}Scenario{+}:
> I added a log that prints the argument array length at the beginning of the 
> main  function of the flink job:
> !image-2023-04-30-18-54-22-291.png!
> The result when running with 1.2.0:
> !image-2023-04-30-19-56-30-150.png!
> The result when running with 1.4.0:
> !image-2023-04-30-19-56-57-680.png!
> h4.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22584: [FLINK-31963][state] Fix rescaling bug in recovery from unaligned checkpoints.

2023-05-15 Thread via GitHub


rkhachatryan commented on code in PR #22584:
URL: https://github.com/apache/flink/pull/22584#discussion_r1193902234


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -136,19 +136,24 @@ public void assignStates() {
 
 // repartition state
 for (TaskStateAssignment stateAssignment : vertexAssignments.values()) 
{
-if (stateAssignment.hasNonFinishedState) {
+if (stateAssignment.hasNonFinishedState
+// FLINK-31963: We need to run repartitioning for 
stateless operators that have
+// upstream output or downstream input states.
+|| stateAssignment.hasUpstreamOutputStates()
+|| stateAssignment.hasDownstreamInputStates()) {
 assignAttemptState(stateAssignment);
 }
 }
 
 // actually assign the state
 for (TaskStateAssignment stateAssignment : vertexAssignments.values()) 
{
-// If upstream has output states, even the empty task state should 
be assigned for the
-// current task in order to notify this task that the old states 
will send to it which
-// likely should be filtered.
+// If upstream has output states or downstream has input states, 
even the empty task
+// state should be assigned for the current task in order to 
notify this task that the
+// old states will send to it which likely should be filtered.
 if (stateAssignment.hasNonFinishedState
 || stateAssignment.isFullyFinished
-|| stateAssignment.hasUpstreamOutputStates()) {
+|| stateAssignment.hasUpstreamOutputStates()
+|| stateAssignment.hasDownstreamInputStates()) {

Review Comment:
   Should we compute `hasUpstreamOutputStates` and `hasDownstreamInputStates` 
once, similar to `isFullyFinished`?
   (that would at least be consistent, and also faster)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22584: [FLINK-31963][state] Fix rescaling bug in recovery from unaligned checkpoints.

2023-05-15 Thread via GitHub


rkhachatryan commented on code in PR #22584:
URL: https://github.com/apache/flink/pull/22584#discussion_r1193908276


##
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java:
##
@@ -1029,6 +1192,15 @@ private JobVertex createJobVertex(
 return jobVertex;
 }
 
+private List getRescalingDescriptorsFromVertex(

Review Comment:
   `getTaskStateSnapshotFromVertex`?



##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -136,19 +136,24 @@ public void assignStates() {
 
 // repartition state
 for (TaskStateAssignment stateAssignment : vertexAssignments.values()) 
{
-if (stateAssignment.hasNonFinishedState) {
+if (stateAssignment.hasNonFinishedState
+// FLINK-31963: We need to run repartitioning for 
stateless operators that have
+// upstream output or downstream input states.
+|| stateAssignment.hasUpstreamOutputStates()
+|| stateAssignment.hasDownstreamInputStates()) {
 assignAttemptState(stateAssignment);
 }
 }
 
 // actually assign the state
 for (TaskStateAssignment stateAssignment : vertexAssignments.values()) 
{
-// If upstream has output states, even the empty task state should 
be assigned for the
-// current task in order to notify this task that the old states 
will send to it which
-// likely should be filtered.
+// If upstream has output states or downstream has input states, 
even the empty task
+// state should be assigned for the current task in order to 
notify this task that the
+// old states will send to it which likely should be filtered.
 if (stateAssignment.hasNonFinishedState
 || stateAssignment.isFullyFinished
-|| stateAssignment.hasUpstreamOutputStates()) {
+|| stateAssignment.hasUpstreamOutputStates()
+|| stateAssignment.hasDownstreamInputStates()) {

Review Comment:
   Should we compute once `hasUpstreamOutputStates` and 
`hasDownstreamInputStates` similar to `isFullyFinished`?
   (that would at least be consistent, and also faster)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722691#comment-17722691
 ] 

Matthias Pohl edited comment on FLINK-32069 at 5/15/23 2:05 PM:


Thanks for documenting this issue in Jira, [~izeren]. Here are my findings so 
far:

I struggle to find a connection between the {{RunningJobRegistry}} and the 
{{getJobStatus}} call of the client (which calls 
{{Dispatcher.requestJobStatus}} in the end. [~izeren] is right with claiming 
that we did a slight modification of the code when removing the 
{{RunningJobRegistry}} in 
[JobMasterServiceLeadershipRunner:385ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL385-L395].
 Marking this job as done happened before completing the 
{{JobMasterServiceLeadershipRunner#resultFuture}} through the 
{{{}RunningJobsRegistry{}}}. In the current code, we mark the job as completed 
after completing {{JobMasterServiceLeadershipRunner#resultFuture}} through the 
{{{}JobResultStore{}}}.

My issue is, though, that we're not relying on the {{RunningJobsRegistry}} in 
any way for the {{Dispatcher#requestJob}} call. The {{RunningJobsRegistry}} was 
only used for leader recovery in 
[JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess:272ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL272-L278]
 and when submitting a job through 
[Dispatcher#isInGloballyTerminalState:375ff|https://github.com/apache/flink/pull/18189/files#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cL375]
 in {{{}Dispatcher#submitJob{}}}. I would have expected that we would find 
{{Dispatcher#requestJobStatus}} somewhere in the call hierarchy of 
{{{}RunningJobsRegistry#getJobSchedulingStatus{}}}, if it would have had an 
influence on {{{}Dispatcher#requestJobStatus{}}}.

I don't want to say that [~izeren]'s conclusion is wrong, yet. It just doesn't 
match my findings in the code. It could be also that I'm missing a code path 
here.

[~dmvk], do you have something to add? 


was (Author: mapohl):
Thanks for documenting this issue in Jira, [~izeren]. Here are my findings so 
far:

I struggle to find a connection between the {{RunningJobRegistry}} and the 
{{getJobStatus}} call of the client (which calls 
{{Dispatcher.requestJobStatus}} in the end. [~izeren] is right with claiming 
that we did a slight modification of the code when removing the 
{{RunningJobRegistry}} in 
[JobMasterServiceLeadershipRunner:385ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL385-L395].
 Marking this job as done happened before completing the 
{{JobMasterServiceLeadershipRunner#resultFuture}} through the 
{{{}RunningJobsRegistry{}}}. In the current code, we mark the job as completed 
after completing {{JobMasterServiceLeadershipRunner#resultFuture}} through the 
{{{}JobResultStore{}}}.

My issue is, though, that we're not relying on the {{RunningJobsRegistry}} in 
any way for the {{Dispatcher#requestJob}} call. The {{RunningJobsRegistry}} was 
only used for leader recovery in 
[JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess:272ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL272-L278]
 and when submitting a job through 
[Dispatcher#isInGloballyTerminalState:375ff|https://github.com/apache/flink/pull/18189/files#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cL375]
 in {{{}Dispatcher#submitJob{}}}. I would have expected that we would find 
{{Dispatcher#requestJobStatus}} somewhere in the call hierarchy of 
{{{}RunningJobsRegistry#getJobSchedulingStatus{}}}, if it would have had an 
influence on {{{}Dispatcher#requestJobStatus{}}}.

I don't want to say that [~izeren]'s conclusion is wrong, yet. It just doesn't 
match my findings in the code. I could perfectly be that I'm missing a codepath 
here.

[~dmvk], do you have something to add? 

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert ope

[jira] [Commented] (FLINK-32101) FlinkKafkaInternalProducerITCase.testInitTransactionId test failed

2023-05-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722790#comment-17722790
 ] 

Matthias Pohl commented on FLINK-32101:
---

[~tanyuxin] this looks like a Kafka-internal instability, doesn't it? We're 
collecting these kind of errors under the umbrella issue FLINK-31145. I'm gonna 
link FLINK-31145 at least in case it is related.

> FlinkKafkaInternalProducerITCase.testInitTransactionId test failed
> --
>
> Key: FLINK-32101
> URL: https://issues.apache.org/jira/browse/FLINK-32101
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Priority: Major
>  Labels: test
>
> FlinkKafkaInternalProducerITCase.testInitTransactionId test failed.
>   Caused by: org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The request timed out.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48990&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=502fb6c0-30a2-5e49-c5c2-a00fa3acb203&l=22973
> {code:java}
> Caused by: org.apache.kafka.common.KafkaException: 
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The request timed out.
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
>   at 
> java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
>   at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
>   at 
> java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
>   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
>   at 
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
>   at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1290)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1216)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:95)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:747)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:688)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The request timed out.
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1418)
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322)
>   at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>   at 
> org.apache.kafka.clients.NetworkClient.

[jira] [Commented] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-15 Thread Aleksandr Iushmanov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722788#comment-17722788
 ] 

Aleksandr Iushmanov commented on FLINK-32069:
-

I can provide more details on the code path used in my tests:

On zeppelin side of things it goes to:
ClusterClientJobClientAdapter -> RestClusterClient -> getJobStatus -> 
getJobDetails
getJobDetails is a rest API call to /jobs/:jobid
 
On flink cluster side:
this request is handled by JobOwerviewHandler and processed with 
Dispatcher::requestMultipleJobDetails
this method requests both info on running and completed jobs 
for completed jobs it relies on ExecutionGraphInfoStore, in my case it is 
FileExecutionGraphInfoStore::getAvailableJobDetails()
Which is local file jobDetailCache
if I get it right, this cache is updated in Dispatcher::jobReachedTerminalState
 
For job result it goes to /jobs/:jobid/execution-result 
Dispatcher::requestJobResult checks if job is in jobManagerRunnerRegistry and 
if it is there it returns result future based on job instance in registry, 
otherwise it picks up result from executionGraphInfoStore (which should be the 
same as for status)
 
My line of thinking was that job result future is returned based on registry 
and job status based on result store and it it is not in sync same way as it 
used to be. 
 
I am not 100% insisting though that bug is there. It can also be in 
InsertResultProvider. I was originally looking at TableResult impl and I have 
noticed that "firstRow" logic is not new. Also for insert operations you get 
"Program execution finished" when you request result. I would expect job to 
have status finished in this case (as per documentation). But it is true that 
this can be on ResultProvider side as well (if it doesn't ensure that job 
status is finished)

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
>   tableResult.await();
>   JobClient jobClient = tableResult.getJobClient().get();
>   if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
>   } else {
> throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>   }
> } catch (InterruptedException e) {
>   throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
>   throw new IOException("Flink job is failed", e);
> } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>  
> [~mapohl], Thank you for assistance in slack, I have created this ticket to 
> back our  conversation, could you please add your thoughts o

[jira] [Commented] (FLINK-32047) Fix args in JobSpec not being passed through to Flink in Standalone mode - 1.4.0

2023-05-15 Thread Gil Shmaya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722785#comment-17722785
 ] 

Gil Shmaya commented on FLINK-32047:


We run with 1.14.6, sorry for the confusion.

> Fix args in JobSpec not being passed through to Flink in Standalone mode - 
> 1.4.0
> 
>
> Key: FLINK-32047
> URL: https://issues.apache.org/jira/browse/FLINK-32047
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Gil Shmaya
>Priority: Major
> Fix For: 1.14.0
>
> Attachments: image-2023-04-30-18-54-22-291.png, 
> image-2023-04-30-19-56-30-150.png, image-2023-04-30-19-56-57-680.png
>
>
> This issue is related to a previously fixed bug in version 1.2.0 -  
> FLINK-29388
> I have noticed that while the args are successfully being passed when using 
> version 1.2.0, this is not the case with version 1.4.0.
> {+}Scenario{+}:
> I added a log that prints the argument array length at the beginning of the 
> main  function of the flink job:
> !image-2023-04-30-18-54-22-291.png!
> The result when running with 1.2.0:
> !image-2023-04-30-19-56-30-150.png!
> The result when running with 1.4.0:
> !image-2023-04-30-19-56-57-680.png!
> h4.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver

2023-05-15 Thread via GitHub


XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1193867705


##
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderBase.java:
##
@@ -80,6 +84,20 @@ public Throwable getError() {
 return error == null ? errorQueue.poll() : error;
 }
 
+/**
+ * Method for exposing errors that were caught during the test execution 
and need to be exposed
+ * within the test.
+ */
+public void throwErrorIfPresent() throws Throwable {
+if (error != null) {
+throw error;
+}
+
+if (!errorQueue.isEmpty()) {
+throw errorQueue.poll();
+}
+}

Review Comment:
   fair point. That reduces the diff for `DefaultLeaderElectionServiceTest`. 
:+1: I wrapped it into an `AssertionError` now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver

2023-05-15 Thread via GitHub


XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1193866384


##
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##
@@ -270,65 +390,73 @@ public void onLeaderInformationChange(LeaderInformation 
leaderInformation) {
 runInLeaderEventThread(() -> 
onLeaderInformationChangeInternal(leaderInformation));
 }
 
+@GuardedBy("lock")
 private void onLeaderInformationChangeInternal(LeaderInformation 
leaderInformation) {
-synchronized (lock) {
-if (running) {
-LOG.trace(
-"Leader node changed while {} is the leader with 
session ID {}. New leader information {}.",
-leaderContender.getDescription(),
-confirmedLeaderInformation.getLeaderSessionID(),
-leaderInformation);
-if (!confirmedLeaderInformation.isEmpty()) {
-final LeaderInformation confirmedLeaderInfo = 
this.confirmedLeaderInformation;
-if (leaderInformation.isEmpty()) {
-LOG.debug(
-"Writing leader information by {} since the 
external storage is empty.",
-leaderContender.getDescription());
-
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
-} else if (!leaderInformation.equals(confirmedLeaderInfo)) 
{
-// the data field does not correspond to the expected 
leader information
-LOG.debug(
-"Correcting leader information by {}.",
-leaderContender.getDescription());
-
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
-}
+if (leaderContender != null) {
+LOG.trace(
+"Leader node changed while {} is the leader with {}. New 
leader information {}.",
+leaderContender.getDescription(),
+
LeaderElectionUtils.convertToString(confirmedLeaderInformation),
+LeaderElectionUtils.convertToString(leaderInformation));
+if (!confirmedLeaderInformation.isEmpty()) {
+final LeaderInformation confirmedLeaderInfo = 
this.confirmedLeaderInformation;
+if (leaderInformation.isEmpty()) {
+LOG.debug(
+"Writing leader information by {} since the 
external storage is empty.",
+leaderContender.getDescription());
+
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
+} else if (!leaderInformation.equals(confirmedLeaderInfo)) {
+// the data field does not correspond to the expected 
leader information
+LOG.debug(
+"Correcting leader information by {}.",
+leaderContender.getDescription());
+
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
 }
-} else {
-LOG.debug(
-"Ignoring change notification since the {} has already 
been closed.",
-leaderElectionDriver);
 }
+} else {
+LOG.debug(
+"Ignoring change notification since the {} has already 
been stopped.",
+leaderElectionDriver);
 }
 }
 
 private void runInLeaderEventThread(Runnable callback) {
-if (running) {
+if (leadershipOperationExecutor != null) {

Review Comment:
   No it's not: I missed that one (even with passing over the PR last week 
-.-). I added a lock around the execute call and added the @GuardedBy 
annotation to the `leadershipOperationExecutor`. Additional, I removed the 
`volatile` from `leaderElectionDriver`. That seem to have survived previous 
changes and is not necessary anymore. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver

2023-05-15 Thread via GitHub


XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1193863651


##
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##
@@ -32,49 +31,73 @@
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Default implementation for leader election service. Composed with different 
{@link
  * LeaderElectionDriver}, we could perform a leader election for the 
contender, and then persist the
  * leader information to various storage.
+ *
+ * {@code DefaultLeaderElectionService} handles a single {@link 
LeaderContender}.
  */
 public class DefaultLeaderElectionService
-implements LeaderElectionService, LeaderElectionEventHandler {
+implements LeaderElectionService, LeaderElectionEventHandler, 
AutoCloseable {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
 private final Object lock = new Object();
 
 private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-/** The leader contender which applies for leadership. */
+/**
+ * {@code leaderContender} being {@code null} indicates that no {@link 
LeaderContender} is
+ * registered that participates in the leader election, yet. See {@link 
#start(LeaderContender)}
+ * and {@link #stop()} for lifecycle management.
+ *
+ * {@code @Nullable} isn't used here to avoid having multiple warnings 
spread over this class
+ * in a supporting IDE.
+ */
 @GuardedBy("lock")
-// @Nullable is commented-out to avoid having multiple warnings spread 
over this class
-// this.running=true ensures that leaderContender != null
-private volatile LeaderContender leaderContender;
+private LeaderContender leaderContender;
 
+/**
+ * Saves the session ID which was issued by the {@link 
LeaderElectionDriver} if and only if the
+ * leadership is acquired by this service. {@code issuedLeaderSessionID} 
being {@code null}
+ * indicates that this service isn't the leader right now (i.e. {@link
+ * #onGrantLeadership(UUID)}) wasn't called, yet (independently of what 
{@code
+ * leaderElectionDriver#hasLeadership()} returns).
+ */
 @GuardedBy("lock")
 @Nullable
-private volatile UUID issuedLeaderSessionID;
+private UUID issuedLeaderSessionID;
 
+/**
+ * Saves the leader information for a registered {@link LeaderContender} 
after this contender
+ * confirmed the leadership.
+ */
 @GuardedBy("lock")
-private volatile LeaderInformation confirmedLeaderInformation;
+private LeaderInformation confirmedLeaderInformation;
 
+/**
+ * {@code leaderElectionDriver} being {@code null} indicates that the 
connection to the
+ * LeaderElection backend isn't established, yet. See {@link 
#startLeaderElectionBackend()} and
+ * {@link #close()} for lifecycle management. The lifecycle of the driver 
should have been
+ * established before registering a {@link LeaderContender} and stopped 
after the contender has
+ * been removed.
+ *
+ * {@code @Nullable} isn't used here to avoid having multiple warnings 
spread over this class
+ * in a supporting IDE.
+ */
 @GuardedBy("lock")
-private volatile boolean running;
+private volatile LeaderElectionDriver leaderElectionDriver;
 
-// @Nullable is commented-out to avoid having multiple warnings spread 
over this class
-// this.running=true ensures that leaderContender != null
-private LeaderElectionDriver leaderElectionDriver;
-
-private final ExecutorService leadershipOperationExecutor;
+@Nullable private ExecutorService leadershipOperationExecutor;

Review Comment:
   args, good idea - I should have gone through the `ExecutorService` interface 
for such a check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] StefanRRichter commented on a diff in pull request #22584: [FLINK-31963][state] Fix rescaling bug in recovery from unaligned checkpoints.

2023-05-15 Thread via GitHub


StefanRRichter commented on code in PR #22584:
URL: https://github.com/apache/flink/pull/22584#discussion_r1193856106


##
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java:
##
@@ -785,6 +789,130 @@ public void testOnlyUpstreamChannelStateAssignment()
 }
 }
 
+/** FLINK-31963: Tests rescaling for stateless operators and upstream 
result partition state. */
+@Test
+public void testOnlyUpstreamChannelRescaleStateAssignment()
+throws JobException, JobExecutionException {
+Random random = new Random();
+OperatorSubtaskState upstreamOpState =
+OperatorSubtaskState.builder()
+.setResultSubpartitionState(
+new StateObjectCollection<>(
+asList(
+
createNewResultSubpartitionStateHandle(10, random),
+
createNewResultSubpartitionStateHandle(
+10, random
+.build();
+testOnlyUpstreamOrDownstreamRescalingInternal(upstreamOpState, null, 
5, 7);
+}
+
+/** FLINK-31963: Tests rescaling for stateless operators and downstream 
input channel state. */
+@Test
+public void testOnlyDownstreamChannelRescaleStateAssignment()
+throws JobException, JobExecutionException {
+Random random = new Random();
+OperatorSubtaskState downstreamOpState =
+OperatorSubtaskState.builder()
+.setInputChannelState(
+new StateObjectCollection<>(
+asList(
+
createNewInputChannelStateHandle(10, random),
+
createNewInputChannelStateHandle(10, random
+.build();
+testOnlyUpstreamOrDownstreamRescalingInternal(null, downstreamOpState, 
5, 5);
+}
+
+private void testOnlyUpstreamOrDownstreamRescalingInternal(
+@Nullable OperatorSubtaskState upstreamOpState,
+@Nullable OperatorSubtaskState downstreamOpState,
+int expectedUpstreamCount,
+int expectedDownstreamCount)
+throws JobException, JobExecutionException {
+
+if ((upstreamOpState == null && downstreamOpState == null)
+|| (upstreamOpState != null && downstreamOpState != null)) {
+// Either upstream or downstream state must exist, but not both.
+return;
+}
+
+// Start from parallelism 5 for both operators
+int upstreamParallelism = 5;
+int downstreamParallelism = 5;
+
+// Build states
+List operatorIds = buildOperatorIds(2);
+Map states = new HashMap<>();
+OperatorState upstreamState =
+new OperatorState(operatorIds.get(0), upstreamParallelism, 
MAX_P);
+OperatorState downstreamState =
+new OperatorState(operatorIds.get(1), downstreamParallelism, 
MAX_P);
+
+states.put(operatorIds.get(0), upstreamState);
+states.put(operatorIds.get(1), downstreamState);
+
+if (upstreamOpState != null) {
+upstreamState.putState(0, upstreamOpState);
+// rescale downstream 5 -> 3
+downstreamParallelism = 3;
+}
+
+if (downstreamOpState != null) {
+downstreamState.putState(0, downstreamOpState);
+// rescale upstream 5 -> 3
+upstreamParallelism = 3;
+}
+
+List opIdWithParallelism = new 
ArrayList<>(2);
+opIdWithParallelism.add(
+new OperatorIdWithParallelism(operatorIds.get(0), 
upstreamParallelism));
+opIdWithParallelism.add(
+new OperatorIdWithParallelism(operatorIds.get(1), 
downstreamParallelism));
+
+Map vertices =
+buildVertices(opIdWithParallelism, RANGE, ROUND_ROBIN);
+
+// Run state assignment
+new StateAssignmentOperation(0, new HashSet<>(vertices.values()), 
states, false)
+.assignStates();
+
+// Check results
+ExecutionJobVertex upstreamExecutionJobVertex = 
vertices.get(operatorIds.get(0));
+ExecutionJobVertex downstreamExecutionJobVertex = 
vertices.get(operatorIds.get(1));
+
+List upstreamRescalingDescriptors =
+getRescalingDescriptorsFromVertex(upstreamExecutionJobVertex);
+List downstreamRescalingDescriptors =
+
getRescalingDescriptorsFromVertex(downstreamExecutionJobVertex);
+
+checkMappings(
+upstreamRescalingDescriptors,
+TaskStateSnapshot::getOutputRescalingDescriptor,
+expectedUpstreamCount);
+
+checkMappings(
+downstreamResc

[jira] [Commented] (FLINK-32094) startScheduling.BATCH performance regression since May 11th

2023-05-15 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722782#comment-17722782
 ] 

Weijie Guo commented on FLINK-32094:


master(1.18) via 1a3b539fa43b4e1c8f07accf2d4aa352b7f63858.

> startScheduling.BATCH performance regression since May 11th
> ---
>
> Key: FLINK-32094
> URL: https://issues.apache.org/jira/browse/FLINK-32094
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Martijn Visser
>Assignee: Yuxin Tan
>Priority: Blocker
> Fix For: 1.18.0
>
> Attachments: image-2023-05-14-22-58-00-886.png, 
> image-2023-05-15-12-33-56-319.png
>
>
> http://codespeed.dak8s.net:8000/timeline/#/?exe=5&ben=startScheduling.BATCH&extr=on&quarts=on&equid=off&env=2&revs=200



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >