[GitHub] [flink-connector-aws] PatrickRen commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
PatrickRen commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1194559931 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/ShardAssignerFactory.java: ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator.assigner; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner; + +/** + * Factory that provides an instance of {@link KinesisShardAssigner} pre-packaged with the + * connector. + */ +@Experimental +public class ShardAssignerFactory { +private ShardAssignerFactory() { +// No-op private constructor to prevent instantiation of class +} + +public static KinesisShardAssigner uniformShardAssigner() { +return new UniformShardAssigner(); +} + +public static KinesisShardAssigner hashShardAssigner() { Review Comment: It looks like the `HashShardAssigner` is only used in tests. What about marking it as `@VisibleForTesting` or expose it to users with a configuration? ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java: ## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.reader; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; + +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.Record; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.Iterator; +import java.util.Set; + +/** + * An implementation of the SplitReader that periodically polls the Kinesis stream to retrieve + * records. + */ +@Internal +public class PollingKinesisShardSplitReader implements SplitReader { + +private static final RecordsWithSplitIds INCOMPLETE_SHARD_EMPTY_RECORDS = +new KinesisRecordsWithSplitIds(Collections.emptyIterator(), null, false); + +private final StreamProxy kinesis; +private final Deque assignedSplits = new ArrayDeque<>(); + +public PollingKinesisShardSplitReader(StreamProxy kinesisProxy) { +this.kinesis = kinesisProxy; +} + +@Override +public RecordsWithSplitIds fetch() throws IOException { +KinesisShardSplitState splitState = assignedSplits.poll(); +if (splitState == null) { +return INCOMPLETE_SHARD_EMPTY_RECORDS; +} + +GetRecordsResponse getRecordsResponse = +kinesis.getRecords( +
[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and regis
XComp commented on PR #22380: URL: https://github.com/apache/flink/pull/22380#issuecomment-1549085766 The e2e test failed due to some (temporary) network connectivity issues within the Docker image (see [logs](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49011&view=logs&j=bea52777-eaf8-5663-8482-18fbc3630e81&t=43ba8ce7-ebbf-57cd-9163-444305d74117&l=10378)) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and regis
XComp commented on PR #22380: URL: https://github.com/apache/flink/pull/22380#issuecomment-1549084955 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17723006#comment-17723006 ] Matthias Pohl edited comment on FLINK-32069 at 5/16/23 6:48 AM: >From the 1.15 finding above the issue might be that we're relying on the >{{ExecutionGraphCache}} in the {{JobDetailsHandler}} which might have an >out-dated version of the {{ExecutionGraph}} when returning the {{JobStatus}}. >Does that sound reasonable? The cache TTL is configurable through [web.refresh-interval|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#web-refresh-interval] which is set to 3s by default. Relying on the cache was removed in 1.16.0 with FLINK-26641. Can you confirm that you've seen this behavior as well in {{1.16.1}}? Because that would mean that my conclusion doesn't hold. was (Author: mapohl): >From the 1.15 finding above the issue might be that we're relying on the >{{ExecutionGraphCache}} in the {{JobDetailsHandler}} which might have an >out-dated version of the {{ExecutionGraph}} when returning the {{JobStatus}}. >Does that sound reasonable? The cache TTL is configurable through [web.refresh-interval|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#web-refresh-interval] which is set to 3s by default. > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race condition that is hard to reproduce on lightweight > setup. I was reproducing this running zeppelin notebook with remote flink > cluster and triggering SQL insert operation. If I find a smaller setup to > reproduce on small local cluster with lightweight client, I will update this > ticket when I have more input. I am open to suggestions on how to fix this. > > For Zeppelin I have a separate ticket because Flink 1.15 is not going to be > fixed but this issue if I understand it correctly should be common for all > versions starting 1.15, therefore it makes sense to address this starting > 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 > > [~mapohl], Thank you for assistance in slack, I have created this ticket to > back our conversation, could you please add your thoughts on this failure > mode? > > One possible solution would be to have additional check for presence of > JobResult in Result store before returning jobStatus (if there is a result, > job shouldn't be reported as running based on this documentation: > [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17723006#comment-17723006 ] Matthias Pohl commented on FLINK-32069: --- >From the 1.15 finding above the issue might be that we're relying on the >{{ExecutionGraphCache}} in the {{JobDetailsHandler}} which might have an >out-dated version of the {{ExecutionGraph}} when returning the {{JobStatus}}. >Does that sound reasonable? The cache TTL is configurable through [web.refresh-interval|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#web-refresh-interval] which is set to 3s by default. > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race condition that is hard to reproduce on lightweight > setup. I was reproducing this running zeppelin notebook with remote flink > cluster and triggering SQL insert operation. If I find a smaller setup to > reproduce on small local cluster with lightweight client, I will update this > ticket when I have more input. I am open to suggestions on how to fix this. > > For Zeppelin I have a separate ticket because Flink 1.15 is not going to be > fixed but this issue if I understand it correctly should be common for all > versions starting 1.15, therefore it makes sense to address this starting > 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 > > [~mapohl], Thank you for assistance in slack, I have created this ticket to > back our conversation, could you please add your thoughts on this failure > mode? > > One possible solution would be to have additional check for presence of > JobResult in Result store before returning jobStatus (if there is a result, > job shouldn't be reported as running based on this documentation: > [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-31759) Update some external connectors to use sql_connector_download_table and connector_artifact shortcode
[ https://issues.apache.org/jira/browse/FLINK-31759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo resolved FLINK-31759. Resolution: Fixed > Update some external connectors to use sql_connector_download_table and > connector_artifact shortcode > > > Key: FLINK-31759 > URL: https://issues.apache.org/jira/browse/FLINK-31759 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / ElasticSearch, Connectors / Google Cloud > PubSub, Connectors / HBase, Connectors / JDBC, Connectors/ RabbitMQ >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > Fix For: jdbc-3.2.0 > > > # FLINK-30291 introduce a new shortcode({{{}sql_connector_download_table{}}}) > for externalized connectors table document. Som externalized connectors does > not use it. Including: elasticsearch, hbase, jdbc. > # Some externalized connectors does not use {{connector_artifact}} shortcode > for datastream document. Including: jdbc, rabbitmq, gcp-pubsub. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722998#comment-17722998 ] Matthias Pohl edited comment on FLINK-32069 at 5/16/23 6:18 AM: Ok, I was too lazy to switch to the 1.15 branch because I assumed that the code didn't change that much (and I wanted to avoid running into compilation issues again locally due to switching branches). But looks like the code did change. I'm going to do the code analysis once more for {{release-1.15.4}} (assuming that that's the version you're using). {{jobClient.getJobStatus}}: * Client side: ** {{ClusterClientJobClientAdapter.getJobStatus}} > {{RestClusterClient.getJobStatus}} > {{RestClusterClient.requestJobStatus}} > {{RestClusterClient.getJobDetails}} > {{JobDetailsHeaders}} {{GET /jobs/:jobId}} * Server side: ** {{JobDetailsHandler.handleRequest}} is called which relies on the {{ExecutionGraphCache}} instead of accessing the {{Dispatcher}} directly (like it's done in the {{master}} code with {{JobStatusHandler}}) For {{jobClient.getJobExecutionResult()}}, I find the following (same as in {{master}}): * Client side: ** {{ClusterClientJobClientAdapter.getJobExecutionResult}} > {{RestClusterClient.requestJobResult}} > {{RestClusterClient.requestJobResultInternal}} > {{JobExecutionResultHeaders}} {{GET /jobs/:jobId/execution-result}} * Server side: ** {{JobExecutionResultHandler.handleRequest}} > {{Dispatcher.requestJobStatus}} I'm missing the path, though, where we use {{requestMultipleJobDetails}}. Do I have the wrong version of Flink still? was (Author: mapohl): Ok, I was too lazy to switch to the 1.15 branch because I assumed that the code didn't change that much (and I wanted to avoid running into compilation issues again locally due to switching branches). But looks like the code did change. I'm going to do the code analysis once more for {{release-1.15.4}} (assuming that that's the version you're using. > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race condition that is hard to reproduce on lightweight > setup. I was reproducing this running zeppelin notebook with remote flink > cluster and triggering SQL insert operation. If I find a smaller setup to > reproduce on small local cluster with lightweight client, I will update this > ticket when I have more input. I am open to suggestions on how to fix this. > > For Zeppelin I have a separate ticket because Flink 1.15 is not going to be > fixed but this issue if I understand it correctly should be common for all > versions starting 1.15, therefore it makes sense to address this starting > 1.16. https://issues.apache.org/jira/b
[GitHub] [flink-connector-jdbc] reswqa merged pull request #45: [hotfix] Fix incorrect sql_url in jdbc.yml.
reswqa merged PR #45: URL: https://github.com/apache/flink-connector-jdbc/pull/45 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722998#comment-17722998 ] Matthias Pohl edited comment on FLINK-32069 at 5/16/23 5:57 AM: Ok, I was too lazy to switch to the 1.15 branch because I assumed that the code didn't change that much (and I wanted to avoid running into compilation issues again locally due to switching branches). But looks like the code did change. I'm going to do the code analysis once more for {{release-1.15.4}} (assuming that that's the version you're using. was (Author: mapohl): Ok, I was too lazy to switch to the 1.15 branch because I assumed that the code didn't change that much (and I wanted to avoid running into compilation issues again). But looks like the code did change. I'm going to do the code analysis once more for {{release-1.15.4}} (assuming that that's the version you're using. > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race condition that is hard to reproduce on lightweight > setup. I was reproducing this running zeppelin notebook with remote flink > cluster and triggering SQL insert operation. If I find a smaller setup to > reproduce on small local cluster with lightweight client, I will update this > ticket when I have more input. I am open to suggestions on how to fix this. > > For Zeppelin I have a separate ticket because Flink 1.15 is not going to be > fixed but this issue if I understand it correctly should be common for all > versions starting 1.15, therefore it makes sense to address this starting > 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 > > [~mapohl], Thank you for assistance in slack, I have created this ticket to > back our conversation, could you please add your thoughts on this failure > mode? > > One possible solution would be to have additional check for presence of > JobResult in Result store before returning jobStatus (if there is a result, > job shouldn't be reported as running based on this documentation: > [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32104) stop-with-savepoint fails and times out with simple reproducible example
[ https://issues.apache.org/jira/browse/FLINK-32104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722993#comment-17722993 ] Weijie Guo edited comment on FLINK-32104 at 5/16/23 5:55 AM: - Hi Kurt, I have tested the uploaded job without any code change. However the native savepoint completed successfully on my own macbook. I'm worried that the timeout comes from prolonged sleep in {{DemoMapFunction}}. {{NumberSequenceSource}} will be finished soon after the job submitted, and the recoed(i.e. 1...20) will accumulate in the downstream's received buffers. Savepoint will be inserted as the last record, and the previous data processing time will be {{20 * 5 = 100s}}, but {{client.timeout}} defaults to only 60s. If a savepoint is submitted before most of the data is processed, there is a high probability that it will directly timeout. Have you tried increasing the timeout here? was (Author: weijie guo): Hi Kurt, I have tested the uploaded job without any code change. However the native savepoint completed successfully on my own macbook. I'm worried that the timeout comes from prolonged sleep in {{DemoMapFunction}}. {{NumberSequenceSource}} will be finished soon after the job submitted, and the recoed(i.e. 1...20) will accumulate in the downstream's received buffers. Savepoint will be inserted as the last record, and the previous data processing time will be {{20 * 5 = 100 s}}, but {{client.timeout}} defaults to only 60s. If a savepoint is submitted before most of the data is processed, there is a high probability that it will directly timeout. Have you tried increasing the timeout here? > stop-with-savepoint fails and times out with simple reproducible example > > > Key: FLINK-32104 > URL: https://issues.apache.org/jira/browse/FLINK-32104 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.17.0 >Reporter: Kurt Ostfeld >Priority: Major > > I've put together a simple demo app that reproduces the issue with > instructions on how to reproduce: > [https://github.com/kurtostfeld/flink-stop-issue] > > The issue is that with a very simple Flink DataStream API application, the > `stop-with-savepoint` fails and times out like this: > > {code:java} > ./bin/flink stop --type native --savepointPath ../savepoints > d69a952625497cca0665dfdcdb9f4718 > Suspending job "d69a952625497cca0665dfdcdb9f4718" with a NATIVE savepoint. > > The program finished with the following exception: > org.apache.flink.util.FlinkException: Could not stop with a savepoint job > "d69a952625497cca0665dfdcdb9f4718". > at > org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1041) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:578) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1110) > at > org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > at > org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) > Caused by: java.util.concurrent.TimeoutException > at > java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021) > at > org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:591) > ... 7 more {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722998#comment-17722998 ] Matthias Pohl commented on FLINK-32069: --- Ok, I was too lazy to switch to the 1.15 branch because I assumed that the code didn't change that much (and I wanted to avoid running into compilation issues again). But looks like the code did change. I'm going to do the code analysis once more for {{release-1.15.4}} (assuming that that's the version you're using. > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race condition that is hard to reproduce on lightweight > setup. I was reproducing this running zeppelin notebook with remote flink > cluster and triggering SQL insert operation. If I find a smaller setup to > reproduce on small local cluster with lightweight client, I will update this > ticket when I have more input. I am open to suggestions on how to fix this. > > For Zeppelin I have a separate ticket because Flink 1.15 is not going to be > fixed but this issue if I understand it correctly should be common for all > versions starting 1.15, therefore it makes sense to address this starting > 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 > > [~mapohl], Thank you for assistance in slack, I have created this ticket to > back our conversation, could you please add your thoughts on this failure > mode? > > One possible solution would be to have additional check for presence of > JobResult in Result store before returning jobStatus (if there is a result, > job shouldn't be reported as running based on this documentation: > [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32104) stop-with-savepoint fails and times out with simple reproducible example
[ https://issues.apache.org/jira/browse/FLINK-32104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722993#comment-17722993 ] Weijie Guo edited comment on FLINK-32104 at 5/16/23 5:49 AM: - Hi Kurt, I have tested the uploaded job without any code change. However the native savepoint completed successfully on my own macbook. I'm worried that the timeout comes from prolonged sleep in {{DemoMapFunction}}. {{NumberSequenceSource}} will be finished soon after the job submitted, and the recoed(i.e. 1...20) will accumulate in the downstream's received buffers. Savepoint will be inserted as the last record, and the previous data processing time will be {{20 * 5 = 100 s}}, but {{client.timeout}} defaults to only 60s. If a savepoint is submitted before most of the data is processed, there is a high probability that it will directly timeout. Have you tried increasing the timeout here? was (Author: weijie guo): Hi Kurt, I have tested the uploaded job without any code change. However the native savepoint completed successfully on my own macbook. I'm worried that the timeout comes from prolonged sleep in {{DemoMapFunction}}. {{NumberSequenceSource}} will be finished soon after the job submitted, and the recoed(i.e. 1...20) will accumulate in the downstream's received buffers. Savepoint will be inserted as the last record, and the previous data processing time will be {{20 * 5 = 100 s}}, but {{client.timeout}} defaults to only 60s. If a savepoint is submitted before most of the data is processed, there is a high probability that it will directly timeout. > stop-with-savepoint fails and times out with simple reproducible example > > > Key: FLINK-32104 > URL: https://issues.apache.org/jira/browse/FLINK-32104 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.17.0 >Reporter: Kurt Ostfeld >Priority: Major > > I've put together a simple demo app that reproduces the issue with > instructions on how to reproduce: > [https://github.com/kurtostfeld/flink-stop-issue] > > The issue is that with a very simple Flink DataStream API application, the > `stop-with-savepoint` fails and times out like this: > > {code:java} > ./bin/flink stop --type native --savepointPath ../savepoints > d69a952625497cca0665dfdcdb9f4718 > Suspending job "d69a952625497cca0665dfdcdb9f4718" with a NATIVE savepoint. > > The program finished with the following exception: > org.apache.flink.util.FlinkException: Could not stop with a savepoint job > "d69a952625497cca0665dfdcdb9f4718". > at > org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1041) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:578) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1110) > at > org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > at > org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) > Caused by: java.util.concurrent.TimeoutException > at > java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021) > at > org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:591) > ... 7 more {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32104) stop-with-savepoint fails and times out with simple reproducible example
[ https://issues.apache.org/jira/browse/FLINK-32104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722993#comment-17722993 ] Weijie Guo commented on FLINK-32104: Hi Kurt, I have tested the uploaded job without any code change. However the native savepoint completed successfully on my own macbook. I'm worried that the timeout comes from prolonged sleep in {{DemoMapFunction}}. {{NumberSequenceSource}} will be finished soon after the job submitted, and the recoed(i.e. 1...20) will accumulate in the downstream's received buffers. Savepoint will be inserted as the last record, and the previous data processing time will be {{20 * 5 = 100 s}}, but {{client.timeout}} defaults to only 60s. If a savepoint is submitted before most of the data is processed, there is a high probability that it will directly timeout. > stop-with-savepoint fails and times out with simple reproducible example > > > Key: FLINK-32104 > URL: https://issues.apache.org/jira/browse/FLINK-32104 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.17.0 >Reporter: Kurt Ostfeld >Priority: Major > > I've put together a simple demo app that reproduces the issue with > instructions on how to reproduce: > [https://github.com/kurtostfeld/flink-stop-issue] > > The issue is that with a very simple Flink DataStream API application, the > `stop-with-savepoint` fails and times out like this: > > {code:java} > ./bin/flink stop --type native --savepointPath ../savepoints > d69a952625497cca0665dfdcdb9f4718 > Suspending job "d69a952625497cca0665dfdcdb9f4718" with a NATIVE savepoint. > > The program finished with the following exception: > org.apache.flink.util.FlinkException: Could not stop with a savepoint job > "d69a952625497cca0665dfdcdb9f4718". > at > org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1041) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:578) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1110) > at > org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > at > org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) > Caused by: java.util.concurrent.TimeoutException > at > java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021) > at > org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:591) > ... 7 more {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression
lindong28 commented on code in PR #237: URL: https://github.com/apache/flink-ml/pull/237#discussion_r1194570809 ## flink-ml-lib/src/main/java/org/apache/flink/ml/common/updater/ModelUpdater.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.updater; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; + +import java.io.Serializable; +import java.util.Iterator; + +/** + * A model updater that could be used to handle push/pull request from workers. + * + * Note that model updater should also ensure that model data is robust to failures. + */ +public interface ModelUpdater extends Serializable { + +/** Initialize the model data. */ +void open(long startFeatureIndex, long endFeatureIndex); + +/** Applies the push to update the model data, e.g., using gradient to update model. */ +void handlePush(long[] keys, double[] values); + +/** Applies the pull and return the retrieved model data. */ +double[] handlePull(long[] keys); + +/** Returns model pieces with the format of (startFeatureIdx, endFeatureIdx, modelValues). */ +Iterator> getModelPieces(); + +/** Recover the model data from state. */ Review Comment: It would be useful to make the comment style consistent. E.g. Recover -> Recovers. Same for other comments. ## flink-ml-lib/src/main/java/org/apache/flink/ml/common/updater/ModelUpdater.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.updater; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; + +import java.io.Serializable; +import java.util.Iterator; + +/** + * A model updater that could be used to handle push/pull request from workers. + * + * Note that model updater should also ensure that model data is robust to failures. + */ +public interface ModelUpdater extends Serializable { + +/** Initialize the model data. */ +void open(long startFeatureIndex, long endFeatureIndex); + +/** Applies the push to update the model data, e.g., using gradient to update model. */ +void handlePush(long[] keys, double[] values); + +/** Applies the pull and return the retrieved model data. */ +double[] handlePull(long[] keys); + +/** Returns model pieces with the format of (startFeatureIdx, endFeatureIdx, modelValues). */ +Iterator> getModelPieces(); Review Comment: It would be useful to know what is the expected output of this API w.r.t. the invocation of other APIs (e.g. handlePush). ## flink-ml-servable-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LogisticRegressionModelServable.java: ## @@ -81,11 +82,49 @@ public DataFrame transform(DataFrame input) { public LogisticRegressionModelServable setModelData(InputStream... modelDataInputs) throws IOException { Preconditions.checkArgument(modelDataInputs.length == 1); +List modelPieces = new ArrayList<>(); +while (true) { +try { +LogisticRegressionModelData piece = +LogisticRegressionModelData.decode(modelDataInputs[0]);
[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722985#comment-17722985 ] Matthias Pohl edited comment on FLINK-32069 at 5/16/23 5:43 AM: For the client side, I'm not 100% sure whether your call hierarchy is correct. Based on the (master) code, I can find the following call hierarchies for the {{jobClient.getJobStatus()}}: * Client side: ** {{ClusterClientJobClientAdapter.getJobStatus}} > {{RestClusterClient.getJobStatus}} > {{RestClusterClient.requestJobStatus}} > {{GET /jobs/:jobId/status}} * Server side: ** {{JobStatusHandler.handleRequest}} > {{Dispatcher.requestJobStatus}} For the {{jobClient.getJobExecutionResult()}} call, I find the following call hierachy: * Client side: ** {{ClusterClientJobClientAdapter.getJobExecutionResult}} > {{RestClusterClient.requestJobResult}} > {{RestClusterClient.requestJobResultInternal}} > {{GET /jobs/:jobId/execution-result}} * Server side: ** {{JobExecutionResultHandler.handleRequest}} > {{Dispatcher.requestJobStatus}} Both call hierarchies end up in {{Dispatcher.requestJobStatus}} which look for the {{JobManagerRunner}} and use {{ExecutionGraphInfoStore}} as a fallback. Can you provide debug logs of the case? Additionally: Could you share what input (i.e. {{operations}} and {{{}tbenv{}}}) you use in your runs? was (Author: mapohl): Can you provide debug logs of the case? Additionally: Could you share what input (i.e. {{operations}} and {{{}tbenv{}}}) you use in your runs? > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race condition that is hard to reproduce on lightweight > setup. I was reproducing this running zeppelin notebook with remote flink > cluster and triggering SQL insert operation. If I find a smaller setup to > reproduce on small local cluster with lightweight client, I will update this > ticket when I have more input. I am open to suggestions on how to fix this. > > For Zeppelin I have a separate ticket because Flink 1.15 is not going to be > fixed but this issue if I understand it correctly should be common for all > versions starting 1.15, therefore it makes sense to address this starting > 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 > > [~mapohl], Thank you for assistance in slack, I have created this ticket to > back our conversation, could you please add your thoughts on this failure > mode? > > One possible solution would be to have additional check for presence of > JobResult in Result store before returning jobStatus (if there is a result, > job shouldn't be reported as running based on this documentation: > [https://nightlies.apache.
[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722985#comment-17722985 ] Matthias Pohl edited comment on FLINK-32069 at 5/16/23 5:03 AM: Can you provide debug logs of the case? Additionally: Could you share what input (i.e. {{operations}} and {{{}tbenv{}}}) you use in your runs? was (Author: mapohl): Can you provide debug logs of the case? > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race condition that is hard to reproduce on lightweight > setup. I was reproducing this running zeppelin notebook with remote flink > cluster and triggering SQL insert operation. If I find a smaller setup to > reproduce on small local cluster with lightweight client, I will update this > ticket when I have more input. I am open to suggestions on how to fix this. > > For Zeppelin I have a separate ticket because Flink 1.15 is not going to be > fixed but this issue if I understand it correctly should be common for all > versions starting 1.15, therefore it makes sense to address this starting > 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 > > [~mapohl], Thank you for assistance in slack, I have created this ticket to > back our conversation, could you please add your thoughts on this failure > mode? > > One possible solution would be to have additional check for presence of > JobResult in Result store before returning jobStatus (if there is a result, > job shouldn't be reported as running based on this documentation: > [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722985#comment-17722985 ] Matthias Pohl commented on FLINK-32069: --- Can you provide debug logs of the case? > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race condition that is hard to reproduce on lightweight > setup. I was reproducing this running zeppelin notebook with remote flink > cluster and triggering SQL insert operation. If I find a smaller setup to > reproduce on small local cluster with lightweight client, I will update this > ticket when I have more input. I am open to suggestions on how to fix this. > > For Zeppelin I have a separate ticket because Flink 1.15 is not going to be > fixed but this issue if I understand it correctly should be common for all > versions starting 1.15, therefore it makes sense to address this starting > 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 > > [~mapohl], Thank you for assistance in slack, I have created this ticket to > back our conversation, could you please add your thoughts on this failure > mode? > > One possible solution would be to have additional check for presence of > JobResult in Result store before returning jobStatus (if there is a result, > job shouldn't be reported as running based on this documentation: > [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22588: [FLINK-31663][table] Add ARRAY_EXCEPT function
flinkbot commented on PR #22588: URL: https://github.com/apache/flink/pull/22588#issuecomment-1548979423 ## CI report: * 965fc11fe20d8b962b2a4c2ca67342051b130201 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] reswqa merged pull request #50: [BP-4.0][FLINK-32024][docs] Short code related to externalized connector retrieve version from its own data yaml
reswqa merged PR #50: URL: https://github.com/apache/flink-connector-pulsar/pull/50 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] bvarghese1 opened a new pull request, #22588: [FLINK-31663][table] Add ARRAY_EXCEPT function
bvarghese1 opened a new pull request, #22588: URL: https://github.com/apache/flink/pull/22588 ## What is the purpose of the change This is an implementation of ARRAY_EXCEPT ## Brief change log ARRAY_EXCEPT for Table API and SQL ``` Flink SQL> SELECT array_except(array[1,2,2], array[2,3,4]); [1] Flink SQL> SELECT array_except(array[1,2,2], array[1]); [2] Flink SQL> SELECT array_except(array[1,2,2], array[42]); [1, 2] Flink SQL> SELECT array_except(array[1,2,2], cast(null as array)); [1, 2] Flink SQL> SELECT array_except(array[1,2,2], array[null,2]); [1] Flink SQL> SELECT array_except(cast(null as array), array[1,2,3]); Flink SQL> SELECT array_except(array[null,null,1], array[42]); [NULL, 1] Flink SQL> SELECT array_except(array[null,null,1], array[null, 42]); [1] Flink SQL> SELECT array_except(array[(TRUE, DATE '2022-04-20'), (TRUE, DATE '1990-10-14'), null], array[(TRUE, DATE '1990-10-14')]); [(TRUE, 2022-04-20), NULL] Flink SQL> SELECT array_except(array[(TRUE, DATE '2022-04-20'), (TRUE, DATE '1990-10-14'), null], cast(null as array>)); [(TRUE, 2022-04-20), (TRUE, 1990-10-14), NULL] Flink SQL> SELECT array_except(array[array[1,null,3], array[0], array[1]], array[array[0]]); [[1, NULL, 3], [1]] Flink SQL> SELECT array_except(array[map[1, 'a', 2, 'b'], map[3, 'c', 4, 'd']], array[map[3, 'c', 4, 'd']]); [{1=a, 2=b}] ``` See also https://spark.apache.org/docs/latest/api/sql/index.html#array_except ## Verifying this change - This change added tests in CollectionFunctionsITCase. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32104) stop-with-savepoint fails and times out with simple reproducible example
[ https://issues.apache.org/jira/browse/FLINK-32104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Ostfeld updated FLINK-32104: - Description: I've put together a simple demo app that reproduces the issue with instructions on how to reproduce: [https://github.com/kurtostfeld/flink-stop-issue] The issue is that with a very simple Flink DataStream API application, the `stop-with-savepoint` fails and times out like this: {code:java} ./bin/flink stop --type native --savepointPath ../savepoints d69a952625497cca0665dfdcdb9f4718 Suspending job "d69a952625497cca0665dfdcdb9f4718" with a NATIVE savepoint. The program finished with the following exception: org.apache.flink.util.FlinkException: Could not stop with a savepoint job "d69a952625497cca0665dfdcdb9f4718". at org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1041) at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:578) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1110) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) Caused by: java.util.concurrent.TimeoutException at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021) at org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:591) ... 7 more {code} was: I've put together a simple demo app that reproduces the issue with instructions on how to reproduce: [https://github.com/kurtostfeld/flink-stop-issue] The issue is with a very simple application written with the Flink DataStream API, `stop-with-savepoint` fails and times out like this: {code:java} ./bin/flink stop --type native --savepointPath ../savepoints d69a952625497cca0665dfdcdb9f4718 Suspending job "d69a952625497cca0665dfdcdb9f4718" with a NATIVE savepoint. The program finished with the following exception: org.apache.flink.util.FlinkException: Could not stop with a savepoint job "d69a952625497cca0665dfdcdb9f4718". at org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1041) at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:578) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1110) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) Caused by: java.util.concurrent.TimeoutException at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021) at org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:591) ... 7 more {code} > stop-with-savepoint fails and times out with simple reproducible example > > > Key: FLINK-32104 > URL: https://issues.apache.org/jira/browse/FLINK-32104 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.17.0 >Reporter: Kurt Ostfeld >Priority: Major > > I've put together a simple demo app that reproduces the issue with > instructions on how to reproduce: > [https://github.com/kurtostfeld/flink-stop-issue] > > The issue is that with a very simple Flink DataStream API application, the > `stop-with-savepoint` fails and times out like this: > > {code:java} > ./bin/flink stop --type native --savepointPath ../savepoints > d69a952625497cca0665dfdcdb9f4718 > Suspending job "d69a952625497cca0665dfdcdb9f4718" with a NATIVE savepoint. > > The program finished with the following exception: > org.apache.flink.util.FlinkException: Could not stop with a savepoint job > "d69a952625497cca0665dfdcdb9f4718". > at > org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595) > at > org.apache.flink.client.cli.C
[jira] [Created] (FLINK-32104) stop-with-savepoint fails and times out with simple reproducible example
Kurt Ostfeld created FLINK-32104: Summary: stop-with-savepoint fails and times out with simple reproducible example Key: FLINK-32104 URL: https://issues.apache.org/jira/browse/FLINK-32104 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.17.0 Reporter: Kurt Ostfeld I've put together a simple demo app that reproduces the issue with instructions on how to reproduce: [https://github.com/kurtostfeld/flink-stop-issue] The issue is with a very simple application written with the Flink DataStream API, `stop-with-savepoint` fails and times out like this: {code:java} ./bin/flink stop --type native --savepointPath ../savepoints d69a952625497cca0665dfdcdb9f4718 Suspending job "d69a952625497cca0665dfdcdb9f4718" with a NATIVE savepoint. The program finished with the following exception: org.apache.flink.util.FlinkException: Could not stop with a savepoint job "d69a952625497cca0665dfdcdb9f4718". at org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1041) at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:578) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1110) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) Caused by: java.util.concurrent.TimeoutException at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021) at org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:591) ... 7 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22587: [FLINK-31892][runtime] Introduce AdaptiveScheduler global failure enrichment/labeling
flinkbot commented on PR #22587: URL: https://github.com/apache/flink/pull/22587#issuecomment-1548949084 ## CI report: * 99f41cfb94b20162cce28f7e9ec5662afe385689 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myracle commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout
Myracle commented on code in PR #22560: URL: https://github.com/apache/flink/pull/22560#discussion_r1194578196 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -92,6 +92,17 @@ public class ExecutionOptions { .withDescription( "Tells if we should use compression for the state snapshot data or not"); +public static final ConfigOption BUFFER_TIMEOUT_ENABLED = +ConfigOptions.key("execution.buffer-timeout.enabled") Review Comment: How about changing the config execution.buffer-timeout.enabled to execution.flush-on-buffer-full? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] reswqa commented on pull request #50: [BP-4.0][FLINK-32024][docs] Short code related to externalized connector retrieve version from its own data yaml
reswqa commented on PR #50: URL: https://github.com/apache/flink-connector-pulsar/pull/50#issuecomment-1548938191 Reviewed in #49. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] reswqa opened a new pull request, #50: [BP-4.0][FLINK-32024][docs] Short code related to externalized connector retrieve version from its own data yaml
reswqa opened a new pull request, #50: URL: https://github.com/apache/flink-connector-pulsar/pull/50 Backport to `v4.0` as flink track this branch to render documentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22586: [FLINK-31880][tests] Fixes unit test so it will pass regardless of ti…
flinkbot commented on PR #22586: URL: https://github.com/apache/flink/pull/22586#issuecomment-1548929471 ## CI report: * 6e8d876beb9ff0f2c5eddf4833ec572068629616 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] yangjf2019 opened a new pull request, #601: [hotfix][docs] fix some typos
yangjf2019 opened a new pull request, #601: URL: https://github.com/apache/flink-kubernetes-operator/pull/601 ## What is the purpose of the change * I have updated some typos in the `docs/content/docs/custom-resource/autoscaler.md` file. ## Verifying this change * I checked the changes. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (no) - Core observer or reconciler logic that is regularly executed: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) Hi, @mbalassi please take a look in your free time, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31880) Bad Test in OrcColumnarRowSplitReaderTest
[ https://issues.apache.org/jira/browse/FLINK-31880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722973#comment-17722973 ] Kurt Ostfeld commented on FLINK-31880: -- Updated PR https://github.com/apache/flink/pull/22586 > Bad Test in OrcColumnarRowSplitReaderTest > - > > Key: FLINK-31880 > URL: https://issues.apache.org/jira/browse/FLINK-31880 > Project: Flink > Issue Type: Bug > Components: Connectors / ORC, Formats (JSON, Avro, Parquet, ORC, > SequenceFile) >Reporter: Kurt Ostfeld >Priority: Minor > Labels: pull-request-available > > This is a development issue with, what looks like a buggy unit test. > > I tried to build Flink with a clean copy of the repository and I get: > > ``` > [INFO] Results: > [INFO] > [ERROR] Failures: > [ERROR] OrcColumnarRowSplitReaderTest.testReadFileWithTypes:365 > expected: "1969-12-31" > but was: "1970-01-01" > [INFO] > [ERROR] Tests run: 26, Failures: 1, Errors: 0, Skipped: 0 > ``` > > I see the test is testing Date data types with `new Date(562423)` which is 9 > minutes and 22 seconds after the epoch time, which is 1970-01-01 UTC time, or > when I run that on my laptop in CST timezone, I get `Wed Dec 31 18:09:22 CST > 1969`. > > I have a simple pull request ready which fixes this issue and uses the Java 8 > LocalDate API instead which avoids time zones entirely. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] kurtostfeld opened a new pull request, #22586: [FLINK-31880][tests] Fixes unit test so it will pass regardless of ti…
kurtostfeld opened a new pull request, #22586: URL: https://github.com/apache/flink/pull/22586 Fixes unit test so it will pass regardless of time zone it is run in. Also use a better testing date. I submitted this before, I'm resubmitting with issues fixed. This should follow commit and PR style guides. This is a low priority issue, as the Flink projects only needs unit tests to pass in the CI system, not in other environments in other time zones, but it's helpful to some developers to have this pass, this is very low risk, and this also uses a better test value. The previous code uses `new Date(562423)` which is 9 minutes after the epoch in UTC, and in UTC-5 time zone it's before the epoch of 1970-01-01, and that's a poor test value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31893) Placeholder
[ https://issues.apache.org/jira/browse/FLINK-31893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Panagiotis Garefalakis updated FLINK-31893: --- Summary: Placeholder (was: Introduce AdaptiveScheduler global failure enrichment/labeling) > Placeholder > --- > > Key: FLINK-31893 > URL: https://issues.apache.org/jira/browse/FLINK-31893 > Project: Flink > Issue Type: Sub-task >Reporter: Panagiotis Garefalakis >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout
1996fanrui commented on code in PR #22560: URL: https://github.com/apache/flink/pull/22560#discussion_r1194556471 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -92,6 +92,17 @@ public class ExecutionOptions { .withDescription( "Tells if we should use compression for the state snapshot data or not"); +public static final ConfigOption BUFFER_TIMEOUT_ENABLED = +ConfigOptions.key("execution.buffer-timeout.enabled") Review Comment: FLINK-29372 limited that the option key cannot be a prefix of other options, however `execution.buffer-timeout` is the prefix of `execution.buffer-timeout.enabled`. So the CI fails. Hi @zentol , could we update the option key for `BUFFER_TIMEOUT` and mark the `"execution.buffer-timeout"` as the DeprecatedKey? How about update it to `execution.buffer-timeout.interval` ? Or do you have any suggestions here? Looking forward to your opinion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31892) Introduce AdaptiveScheduler global failure enrichment/labeling
[ https://issues.apache.org/jira/browse/FLINK-31892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Panagiotis Garefalakis updated FLINK-31892: --- Summary: Introduce AdaptiveScheduler global failure enrichment/labeling (was: Introduce DefaultScheduler global failure enrichment/labeling) > Introduce AdaptiveScheduler global failure enrichment/labeling > -- > > Key: FLINK-31892 > URL: https://issues.apache.org/jira/browse/FLINK-31892 > Project: Flink > Issue Type: Sub-task >Reporter: Panagiotis Garefalakis >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] kurtostfeld closed pull request #22451: FLINK-31880: Fixed broken date test. It was failing when run in CST time zone.
kurtostfeld closed pull request #22451: FLINK-31880: Fixed broken date test. It was failing when run in CST time zone. URL: https://github.com/apache/flink/pull/22451 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JunRuiLee commented on a diff in pull request #16593: [FLINK-23425][streaming-java] The impact of cpu cores on test results…
JunRuiLee commented on code in PR #16593: URL: https://github.com/apache/flink/pull/16593#discussion_r1194546629 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -1291,7 +1291,7 @@ public void testSlotSharingResourceConfigurationWithDefaultSlotSharingGroup() { StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, resourceProfile); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.fromElements(1, 2, 3).map(x -> x + 1); +env.fromElements(1, 2, 3).map(x -> x + 1).setParallelism(2); Review Comment: I suggest adding some comments explaining why this change is needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
reswqa commented on PR #22574: URL: https://github.com/apache/flink/pull/22574#issuecomment-1548891764 Rebased on `origin/master`, will be merged after AZP green. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32101) FlinkKafkaInternalProducerITCase.testInitTransactionId test failed
[ https://issues.apache.org/jira/browse/FLINK-32101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722966#comment-17722966 ] Yuxin Tan commented on FLINK-32101: --- [~mapohl] Yes, exactly. Thanks for tracking it. > FlinkKafkaInternalProducerITCase.testInitTransactionId test failed > -- > > Key: FLINK-32101 > URL: https://issues.apache.org/jira/browse/FLINK-32101 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.18.0 >Reporter: Yuxin Tan >Priority: Major > Labels: test > > FlinkKafkaInternalProducerITCase.testInitTransactionId test failed. > Caused by: org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The request timed out. > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48990&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=502fb6c0-30a2-5e49-c5c2-a00fa3acb203&l=22973 > {code:java} > Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The request timed out. > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593) > at > java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) > at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) > at > java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) > at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) > at > java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > at > java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1290) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1216) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:95) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:747) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:722) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:688) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The request timed out. > at > org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1418) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) > at > org.apache.kafka.clients.produce
[jira] [Updated] (FLINK-32101) FlinkKafkaInternalProducerITCase.testInitTransactionId test failed
[ https://issues.apache.org/jira/browse/FLINK-32101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuxin Tan updated FLINK-32101: -- Labels: test-stability (was: test) > FlinkKafkaInternalProducerITCase.testInitTransactionId test failed > -- > > Key: FLINK-32101 > URL: https://issues.apache.org/jira/browse/FLINK-32101 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.18.0 >Reporter: Yuxin Tan >Priority: Major > Labels: test-stability > > FlinkKafkaInternalProducerITCase.testInitTransactionId test failed. > Caused by: org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The request timed out. > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48990&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=502fb6c0-30a2-5e49-c5c2-a00fa3acb203&l=22973 > {code:java} > Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The request timed out. > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593) > at > java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) > at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) > at > java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) > at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) > at > java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > at > java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1290) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1216) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:95) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:747) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:722) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:688) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The request timed out. > at > org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1418) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(
[GitHub] [flink] godfreyhe commented on a diff in pull request #22566: [FLINK-32052][table-runtime] Introduce left and right state retention time to StreamingJoinOperator
godfreyhe commented on code in PR #22566: URL: https://github.com/apache/flink/pull/22566#discussion_r1194510238 ## flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java: ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.stream; + +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; + +/** Base test class for {@link AbstractStreamingJoinOperator}. */ +public abstract class StreamingJoinOperatorTestBase { + +protected final InternalTypeInfo leftTypeInfo = +InternalTypeInfo.of( +RowType.of( +new LogicalType[] { +new CharType(false, 20), +new CharType(false, 20), +VarCharType.STRING_TYPE +}, +new String[] {"order_id", "line_order_id", "shipping_address"})); + +protected final InternalTypeInfo rightTypeInfo = +InternalTypeInfo.of( +RowType.of( +new LogicalType[] {new CharType(false, 20), new CharType(true, 10)}, +new String[] {"line_order_id0", "line_order_ship_mode"})); + +protected final RowDataKeySelector leftKeySelector = +HandwrittenSelectorUtil.getRowDataSelector( +new int[] {1}, +leftTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])); +protected final RowDataKeySelector rightKeySelector = +HandwrittenSelectorUtil.getRowDataSelector( +new int[] {0}, +rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])); + +protected final JoinInputSideSpec leftInputSpec = +JoinInputSideSpec.withUniqueKeyContainedByJoinKey(leftTypeInfo, leftKeySelector); +protected final JoinInputSideSpec rightInputSpec = +JoinInputSideSpec.withUniqueKeyContainedByJoinKey(rightTypeInfo, rightKeySelector); + +protected final InternalTypeInfo joinKeyTypeInfo = +InternalTypeInfo.of(new CharType(false, 20)); + +protected final String funcCode = +"public class ConditionFunction extends org.apache.flink.api.common.functions.AbstractRichFunction " ++ "implements org.apache.flink.table.runtime.generated.JoinCondition {\n" ++ "\n" ++ "public ConditionFunction(Object[] reference) {\n" ++ "}\n" ++ "\n" ++ "@Override\n" ++ "public boolean apply(org.apache.flink.table.data.RowData in1, org.apache.flink.table.data.RowData in2) {\n" ++ "return true;\n" ++ "}\n" ++ "\n" ++ "@Override\n" ++ "public void close() throws Exception {\n" ++ "super.close();\n" ++ "}" ++ "}\n"; +protected final GeneratedJoinCondition joinCondition = +new GeneratedJoinCondition("ConditionFunction", funcCode, new Object[0]); + +protected f
[GitHub] [flink] pgaref commented on pull request #22506: [FLINK-31890][runtime] Introduce DefaultScheduler failure enrichment/labeling
pgaref commented on PR #22506: URL: https://github.com/apache/flink/pull/22506#issuecomment-1548830252 Thanks for the comments once again @zhuzhurk ! PTAL on latest PR: * introducing async task failure labeling as part of ExecutionFailureHandler#handleFailure (for local and global failures) * transient CompletableFuture with serializable labels as part of ErrorInfo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-32103) RBAC flinkdeployments/finalizers missing for OpenShift Deployment
[ https://issues.apache.org/jira/browse/FLINK-32103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722952#comment-17722952 ] James Busche edited comment on FLINK-32103 at 5/16/23 1:07 AM: --- Can fix this on an existing OpenShift install by adding "- flinkdeployments/finalizers" to the flink.apache.org resources like this: {quote}oc edit clusterrole flink-operator - apiGroups: - flink.apache.org resources: - flinkdeployments - flinkdeployments/status - flinkdeployments/finalizers - flinksessionjobs - flinksessionjobs/status verbs: - '*' {quote} was (Author: JIRAUSER287279): Can fix this on an existing OpenShift install by adding "- flinkdeployments/finalizers" to the flink.apache.org resources like this: {{}} {quote}oc edit clusterrole flink-operator - apiGroups: - flink.apache.org resources: - flinkdeployments - flinkdeployments/status - flinkdeployments/finalizers - flinksessionjobs - flinksessionjobs/status verbs: - '*' {quote} > RBAC flinkdeployments/finalizers missing for OpenShift Deployment > - > > Key: FLINK-32103 > URL: https://issues.apache.org/jira/browse/FLINK-32103 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: James Busche >Priority: Major > > In OpenShift 4.10 and above, I'm noticing with the Flink 1.5.0 RC release > that there's an issue with flinkdeployments on OpenShift. Flinkdeployments > are stuck in upgrading: > {quote}oc get flinkdep > NAME JOB STATUS LIFECYCLE STATE > basic-example UPGRADING > {quote} > > The error message looks like: > {quote}oc describe flinkdep basic-example > > Error: > {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException: > Could not create Kubernetes cluster > \"basic-example\".","throwableList":[\{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could > not create Kubernetes cluster > \"basic-example\"."},\{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure > executing: POST at: > https://172.30.0.1/apis/apps/v1/namespaces/default/deployments. Message: > Forbidden!Configured service account doesn't have access. Service account may > have been revoked. deployments.apps \"basic-example\" is forbidden: cannot > set blockOwnerDeletion if an ownerReference refers to a resource you can't > set finalizers on: , ."}]} > > Job Manager Deployment Status: MISSING > {quote} > > The solution is to fix it in the rbac.yaml of the helm template, adding a " > - flinkdeployments/finalizers" line to the flink.apache.org apiGroup. > > If the Operator is already running and flinkdeployments are having trouble on > OpenShift, then someone can manually edit the > flink-kubernetes-operator.v1.5.0 clusterrole and add the > " - flinkdeployments/finalizers" in the flink.apache.org apiGroup. > > I'll create a PR that addresses this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32103) RBAC flinkdeployments/finalizers missing for OpenShift Deployment
[ https://issues.apache.org/jira/browse/FLINK-32103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722952#comment-17722952 ] James Busche commented on FLINK-32103: -- Can fix this on an existing OpenShift install by adding "- flinkdeployments/finalizers" to the flink.apache.org resources like this: {{}} {quote}oc edit clusterrole flink-operator - apiGroups: - flink.apache.org resources: - flinkdeployments - flinkdeployments/status - flinkdeployments/finalizers - flinksessionjobs - flinksessionjobs/status verbs: - '*' {quote} > RBAC flinkdeployments/finalizers missing for OpenShift Deployment > - > > Key: FLINK-32103 > URL: https://issues.apache.org/jira/browse/FLINK-32103 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: James Busche >Priority: Major > > In OpenShift 4.10 and above, I'm noticing with the Flink 1.5.0 RC release > that there's an issue with flinkdeployments on OpenShift. Flinkdeployments > are stuck in upgrading: > {quote}oc get flinkdep > NAME JOB STATUS LIFECYCLE STATE > basic-example UPGRADING > {quote} > > The error message looks like: > {quote}oc describe flinkdep basic-example > > Error: > {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException: > Could not create Kubernetes cluster > \"basic-example\".","throwableList":[\{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could > not create Kubernetes cluster > \"basic-example\"."},\{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure > executing: POST at: > https://172.30.0.1/apis/apps/v1/namespaces/default/deployments. Message: > Forbidden!Configured service account doesn't have access. Service account may > have been revoked. deployments.apps \"basic-example\" is forbidden: cannot > set blockOwnerDeletion if an ownerReference refers to a resource you can't > set finalizers on: , ."}]} > > Job Manager Deployment Status: MISSING > {quote} > > The solution is to fix it in the rbac.yaml of the helm template, adding a " > - flinkdeployments/finalizers" line to the flink.apache.org apiGroup. > > If the Operator is already running and flinkdeployments are having trouble on > OpenShift, then someone can manually edit the > flink-kubernetes-operator.v1.5.0 clusterrole and add the > " - flinkdeployments/finalizers" in the flink.apache.org apiGroup. > > I'll create a PR that addresses this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32103) RBAC flinkdeployments/finalizers missing for OpenShift Deployment
[ https://issues.apache.org/jira/browse/FLINK-32103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722951#comment-17722951 ] James Busche commented on FLINK-32103: -- Created PR [https://github.com/apache/flink-kubernetes-operator/pull/600] to address this > RBAC flinkdeployments/finalizers missing for OpenShift Deployment > - > > Key: FLINK-32103 > URL: https://issues.apache.org/jira/browse/FLINK-32103 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: James Busche >Priority: Major > > In OpenShift 4.10 and above, I'm noticing with the Flink 1.5.0 RC release > that there's an issue with flinkdeployments on OpenShift. Flinkdeployments > are stuck in upgrading: > {quote}oc get flinkdep > NAME JOB STATUS LIFECYCLE STATE > basic-example UPGRADING > {quote} > > The error message looks like: > {quote}oc describe flinkdep basic-example > > Error: > {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException: > Could not create Kubernetes cluster > \"basic-example\".","throwableList":[\{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could > not create Kubernetes cluster > \"basic-example\"."},\{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure > executing: POST at: > https://172.30.0.1/apis/apps/v1/namespaces/default/deployments. Message: > Forbidden!Configured service account doesn't have access. Service account may > have been revoked. deployments.apps \"basic-example\" is forbidden: cannot > set blockOwnerDeletion if an ownerReference refers to a resource you can't > set finalizers on: , ."}]} > > Job Manager Deployment Status: MISSING > {quote} > > The solution is to fix it in the rbac.yaml of the helm template, adding a " > - flinkdeployments/finalizers" line to the flink.apache.org apiGroup. > > If the Operator is already running and flinkdeployments are having trouble on > OpenShift, then someone can manually edit the > flink-kubernetes-operator.v1.5.0 clusterrole and add the > " - flinkdeployments/finalizers" in the flink.apache.org apiGroup. > > I'll create a PR that addresses this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] jbusche commented on pull request #600: FLINK-32103 Update RBAC for OpenShift Deployment
jbusche commented on PR #600: URL: https://github.com/apache/flink-kubernetes-operator/pull/600#issuecomment-1548743480 OpenShift 4.10.54 is looking good with the PR: ``` oc get pods,flinkdeployments -n default api.jim410.cp.fyre.ibm.com: Mon May 15 16:16:24 2023 NAME READY STATUS RESTARTS AGE pod/basic-example-5b547c9669-cj8pz 1/1 Running 0 5m37s pod/basic-example-taskmanager-1-11/1 Running 0 5m8s pod/basic-session-deployment-only-example-77957ff659-rvp5h 1/1 Running 0 3m23s pod/basic-session-deployment-only-example-taskmanager-1-11/1 Running 0 2m39s pod/basic-session-deployment-only-example-taskmanager-1-21/1 Running 0 2m39s pod/flink-kubernetes-operator-66f8544889-sjjm5 2/2 Running 0 8m38s NAME JOB STATUS LIFECYCLE STATE flinkdeployment.flink.apache.org/basic-example RUNNING STABLE flinkdeployment.flink.apache.org/basic-session-deployment-only-example STABLE ``` and the clusterrole looks good: ``` oc describe clusterrole flink-kubernetes-operator.v1.5.0-866bfd99dd |grep flinkdeployments flinkdeployments.flink.apache.org/finalizers [] [] [*] flinkdeployments.flink.apache.org/status [] [] [*] flinkdeployments.flink.apache.org [] [] [*] -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] RamanVerma commented on a diff in pull request #29: [FLINK-32021][Connectors/Kafka] Improvement the Javadoc for SpecifiedOffsetsInitializer and TimestampOffsetsInitial
RamanVerma commented on code in PR #29: URL: https://github.com/apache/flink-connector-kafka/pull/29#discussion_r119366 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java: ## @@ -38,6 +38,10 @@ * An implementation of {@link OffsetsInitializer} which initializes the offsets of the partition * according to the user specified offsets. * + * Use specified offsets for specified partitions while use commit offsets or earliest for Review Comment: @loserwang1024 I have a question. When you mention `specified offsets`, do you mean the ones provided in the map `initialOffsets`. If that is true, these unspecified partitions are initialized to be consumed according to `offsetResetStrategy`, which can be `earliest` or `latest`. So, not necessarily `earliest` Also, could you point me to the code (or code change) that explains this > Specified partition's offset should be less than its latest offset, otherwise it will start from the earliest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] jbusche commented on pull request #600: FLINK-32103 Update RBAC for OpenShift Deployment
jbusche commented on PR #600: URL: https://github.com/apache/flink-kubernetes-operator/pull/600#issuecomment-1548682654 OK - the KIND cluster looks good - still deploying the samples fine. ``` NAME READY STATUS RESTARTS AGE basic-example-5bbd68657d-4dgj5 1/1 Running 0 3m44s basic-example-taskmanager-1-11/1 Running 0 3m14s basic-session-deployment-only-example-57994c7d96-bmlr6 1/1 Running 0 3m19s basic-session-deployment-only-example-taskmanager-1-11/1 Running 0 2m49s basic-session-deployment-only-example-taskmanager-1-21/1 Running 0 2m49s flink-kubernetes-operator-6b659d866-4t8g72/2 Running 0 4m26s ``` The flink clusterrole looks good: ``` oc describe clusterrole flink-operator |grep flinkdeployments flinkdeployments.flink.apache.org/finalizers [] [] [*] flinkdeployments.flink.apache.org/status [] [] [*] flinkdeployments.flink.apache.org [] [] [*] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] jbusche commented on pull request #600: FLINK-32103 Update RBAC for OpenShift Deployment
jbusche commented on PR #600: URL: https://github.com/apache/flink-kubernetes-operator/pull/600#issuecomment-1548632061 Trying a new deployment on both a KIND cluster (non OpenShift) as well as two OpenShift clusters to verify the change. Will post results here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] jbusche opened a new pull request, #600: FLINK-32103 Update RBAC for OpenShift Deployment
jbusche opened a new pull request, #600: URL: https://github.com/apache/flink-kubernetes-operator/pull/600 ## What is the purpose of the change Noticing in the 1.5.0-rc1 and 1.5.0-rc2 that we're having a problem deploying on OpenShift. The solution is to either manually update the flink-kubernetes-operator.v1.5.0 clusterrole, but a better way is to update the rbac.yaml. In either case, a `- flinkdeployments/finalizers` is needed in the flink.apache.org apiGroup. The following PR adds this finalizer for the deployment to succeed. ## Brief change log - *the `- flinkdeployments/finalizers` resource is added to the flink.apache.org apiGroup* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): No - The public API, i.e., is any changes to the `CustomResourceDescriptors`: No - Core observer or reconciler logic that is regularly executed: No ## Documentation - Does this pull request introduce a new feature? No - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31890) Introduce DefaultScheduler failure enrichment/labeling
[ https://issues.apache.org/jira/browse/FLINK-31890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Panagiotis Garefalakis updated FLINK-31890: --- Summary: Introduce DefaultScheduler failure enrichment/labeling (was: Introduce SchedulerBase per-task failure enrichment/labeling) > Introduce DefaultScheduler failure enrichment/labeling > -- > > Key: FLINK-31890 > URL: https://issues.apache.org/jira/browse/FLINK-31890 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Panagiotis Garefalakis >Assignee: Panagiotis Garefalakis >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32103) RBAC flinkdeployments/finalizers missing for OpenShift Deployment
James Busche created FLINK-32103: Summary: RBAC flinkdeployments/finalizers missing for OpenShift Deployment Key: FLINK-32103 URL: https://issues.apache.org/jira/browse/FLINK-32103 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.5.0 Reporter: James Busche In OpenShift 4.10 and above, I'm noticing with the Flink 1.5.0 RC release that there's an issue with flinkdeployments on OpenShift. Flinkdeployments are stuck in upgrading: {quote}oc get flinkdep NAME JOB STATUS LIFECYCLE STATE basic-example UPGRADING {quote} The error message looks like: {quote}oc describe flinkdep basic-example Error: {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException: Could not create Kubernetes cluster \"basic-example\".","throwableList":[\{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could not create Kubernetes cluster \"basic-example\"."},\{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure executing: POST at: https://172.30.0.1/apis/apps/v1/namespaces/default/deployments. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. deployments.apps \"basic-example\" is forbidden: cannot set blockOwnerDeletion if an ownerReference refers to a resource you can't set finalizers on: , ."}]} Job Manager Deployment Status: MISSING {quote} The solution is to fix it in the rbac.yaml of the helm template, adding a " - flinkdeployments/finalizers" line to the flink.apache.org apiGroup. If the Operator is already running and flinkdeployments are having trouble on OpenShift, then someone can manually edit the flink-kubernetes-operator.v1.5.0 clusterrole and add the " - flinkdeployments/finalizers" in the flink.apache.org apiGroup. I'll create a PR that addresses this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
reswqa commented on PR #22574: URL: https://github.com/apache/flink/pull/22574#issuecomment-1548347105 Thanks @snuyanzin very much for your patient review 👍 . Without you, I didn't even realize that the number of tests wouldn't be right (my mistake) 🙇 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
snuyanzin commented on PR #22574: URL: https://github.com/apache/flink/pull/22574#issuecomment-1548342955 I checked that now number of tests before the change is equal to number of tests after per changed class no more findings from my side lgtm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
snuyanzin commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194200879 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala: ## @@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.runtime.FileSystemITCaseBase import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestingAppendSink, TestSinkUtil} +import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension import org.apache.flink.types.Row -import org.junit.{Assert, Before, Test} -import org.junit.Assert.assertEquals +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.extension.ExtendWith import scala.collection.{mutable, Seq} /** Streaming [[FileSystemITCaseBase]]. */ +@ExtendWith(Array(classOf[NoOpTestExtension])) abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSystemITCaseBase { Review Comment: thanks for clarification -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
reswqa commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194197548 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala: ## @@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.runtime.FileSystemITCaseBase import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestingAppendSink, TestSinkUtil} +import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension import org.apache.flink.types.Row -import org.junit.{Assert, Before, Test} -import org.junit.Assert.assertEquals +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.extension.ExtendWith import scala.collection.{mutable, Seq} /** Streaming [[FileSystemITCaseBase]]. */ +@ExtendWith(Array(classOf[NoOpTestExtension])) abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSystemITCaseBase { Review Comment: To some extent, yes. Because we manually set the parallelism in `before` method: ``` env.setParallelism(4) ``` This is the same as `DEFAULT_PARALLELISM` in `AbstractTestBase`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
reswqa commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194197548 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala: ## @@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.runtime.FileSystemITCaseBase import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestingAppendSink, TestSinkUtil} +import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension import org.apache.flink.types.Row -import org.junit.{Assert, Before, Test} -import org.junit.Assert.assertEquals +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.extension.ExtendWith import scala.collection.{mutable, Seq} /** Streaming [[FileSystemITCaseBase]]. */ +@ExtendWith(Array(classOf[NoOpTestExtension])) abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSystemITCaseBase { Review Comment: To some extent, yes. Because we manually set the parallelism in `before` method: ``` env.setParallelism(4) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
snuyanzin commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194178109 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala: ## @@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.runtime.FileSystemITCaseBase import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestingAppendSink, TestSinkUtil} +import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension import org.apache.flink.types.Row -import org.junit.{Assert, Before, Test} -import org.junit.Assert.assertEquals +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.extension.ExtendWith import scala.collection.{mutable, Seq} /** Streaming [[FileSystemITCaseBase]]. */ +@ExtendWith(Array(classOf[NoOpTestExtension])) abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSystemITCaseBase { Review Comment: agree about rules, `@after/@before` annotations. Another point is that `StreamingTestBase` extends another junit4 based `AbstractTestBase` which has rule ```java @ClassRule public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) .build()); ``` it looks like this rule is not required for streaming tests (`before`/`after` is enough) please correct me if I'm wrong -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
reswqa commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194170872 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala: ## @@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.runtime.FileSystemITCaseBase import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestingAppendSink, TestSinkUtil} +import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension import org.apache.flink.types.Row -import org.junit.{Assert, Before, Test} -import org.junit.Assert.assertEquals +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.extension.ExtendWith import scala.collection.{mutable, Seq} /** Streaming [[FileSystemITCaseBase]]. */ +@ExtendWith(Array(classOf[NoOpTestExtension])) abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSystemITCaseBase { Review Comment: Of course, in the long run, we should also migrate it to `Junit5`. Perhaps we can create a new ticket to track it because it is also a significant workload. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
reswqa commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194169663 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala: ## @@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.runtime.FileSystemITCaseBase import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestingAppendSink, TestSinkUtil} +import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension import org.apache.flink.types.Row -import org.junit.{Assert, Before, Test} -import org.junit.Assert.assertEquals +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.extension.ExtendWith import scala.collection.{mutable, Seq} /** Streaming [[FileSystemITCaseBase]]. */ +@ExtendWith(Array(classOf[NoOpTestExtension])) abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSystemITCaseBase { Review Comment: And these two `Rules` are not used in this PR related subclass ``` @Rule def thrown: ExpectedException = expectedException @Rule def tempFolder: TemporaryFolder = _ tempFolder ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
reswqa commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194168520 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala: ## @@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.runtime.FileSystemITCaseBase import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestingAppendSink, TestSinkUtil} +import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension import org.apache.flink.types.Row -import org.junit.{Assert, Before, Test} -import org.junit.Assert.assertEquals +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.extension.ExtendWith import scala.collection.{mutable, Seq} /** Streaming [[FileSystemITCaseBase]]. */ +@ExtendWith(Array(classOf[NoOpTestExtension])) abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSystemITCaseBase { Review Comment: I did notice this issue, but I found that the `lifecycle` method like `before` in `StreamingTestBase` double annotationed with `@Before` and `@BeforeEach`. ``` @Before @BeforeEach def before(): Unit = { this.env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(4) if (enableObjectReuse) { this.env.getConfig.enableObjectReuse() } val setting = EnvironmentSettings.newInstance().inStreamingMode().build() this.tEnv = StreamTableEnvironment.create(env, setting) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
reswqa commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194168520 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala: ## @@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.runtime.FileSystemITCaseBase import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestingAppendSink, TestSinkUtil} +import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension import org.apache.flink.types.Row -import org.junit.{Assert, Before, Test} -import org.junit.Assert.assertEquals +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.extension.ExtendWith import scala.collection.{mutable, Seq} /** Streaming [[FileSystemITCaseBase]]. */ +@ExtendWith(Array(classOf[NoOpTestExtension])) abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSystemITCaseBase { Review Comment: I did notice this issue, but I found that the `lifecycle` method like `before` in `StreamingTestBase ` double annotationed with `@Before` and `@BeforeEach`. ``` @Before @BeforeEach def before(): Unit = { this.env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(4) if (enableObjectReuse) { this.env.getConfig.enableObjectReuse() } val setting = EnvironmentSettings.newInstance().inStreamingMode().build() this.tEnv = StreamTableEnvironment.create(env, setting) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
reswqa commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194167000 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala: ## @@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.runtime.FileSystemITCaseBase import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestingAppendSink, TestSinkUtil} +import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension import org.apache.flink.types.Row -import org.junit.{Assert, Before, Test} -import org.junit.Assert.assertEquals +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.extension.ExtendWith import scala.collection.{mutable, Seq} /** Streaming [[FileSystemITCaseBase]]. */ +@ExtendWith(Array(classOf[NoOpTestExtension])) abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSystemITCaseBase { Review Comment: Emmm, @Before @BeforeEach def before(): Unit = { this.env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(4) if (enableObjectReuse) { this.env.getConfig.enableObjectReuse() } val setting = EnvironmentSettings.newInstance().inStreamingMode().build() this.tEnv = StreamTableEnvironment.create(env, setting) } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
snuyanzin commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194163579 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala: ## @@ -23,17 +23,20 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.runtime.FileSystemITCaseBase import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestingAppendSink, TestSinkUtil} +import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension import org.apache.flink.types.Row -import org.junit.{Assert, Before, Test} -import org.junit.Assert.assertEquals +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.extension.ExtendWith import scala.collection.{mutable, Seq} /** Streaming [[FileSystemITCaseBase]]. */ +@ExtendWith(Array(classOf[NoOpTestExtension])) abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSystemITCaseBase { Review Comment: Couldn't it be another issue that `StreamingTestBase` is junit4 based? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
reswqa commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194160609 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala: ## @@ -32,12 +35,11 @@ class FileSystemTestCsvITCase extends BatchFileSystemITCaseBase { override def getScheme: String = "test" - @After + @AfterEach def close(): Unit = { val path = new Path(resultPath) -assertEquals( - s"File $resultPath is not closed", - 0, - TestFileSystem.getNumberOfUnclosedOutputStream(path)) +if (TestFileSystem.getNumberOfUnclosedOutputStream(path) != 0) { + Assertions.fail(s"File $resultPath is not closed"); +} Review Comment: I spent some time trying something similar, but I didn't come to any useful conclusions. Feeling disappointed about this 😞. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
snuyanzin commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194131282 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala: ## @@ -32,12 +35,11 @@ class FileSystemTestCsvITCase extends BatchFileSystemITCaseBase { override def getScheme: String = "test" - @After + @AfterEach def close(): Unit = { val path = new Path(resultPath) -assertEquals( - s"File $resultPath is not closed", - 0, - TestFileSystem.getNumberOfUnclosedOutputStream(path)) +if (TestFileSystem.getNumberOfUnclosedOutputStream(path) != 0) { + Assertions.fail(s"File $resultPath is not closed"); +} Review Comment: yeah.. that's weird... there is WA for that ```scala assertThat(TestFileSystem.getNumberOfUnclosedOutputStream(path)) .as(s"File $resultPath is not closed") .asInstanceOf[AbstractIntegerAssert[_]] .isEqualTo(0) ``` however there is an issue with such WA: in case there is more than one such WA per class it leads to issues for IntellijIdea (mvn works fine)... I've filed scala plugin issue https://youtrack.jetbrains.com/issue/SCL-20679 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
snuyanzin commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194131282 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala: ## @@ -32,12 +35,11 @@ class FileSystemTestCsvITCase extends BatchFileSystemITCaseBase { override def getScheme: String = "test" - @After + @AfterEach def close(): Unit = { val path = new Path(resultPath) -assertEquals( - s"File $resultPath is not closed", - 0, - TestFileSystem.getNumberOfUnclosedOutputStream(path)) +if (TestFileSystem.getNumberOfUnclosedOutputStream(path) != 0) { + Assertions.fail(s"File $resultPath is not closed"); +} Review Comment: yeah.. that's weird... there is WA for that ```scala assertThat(TestFileSystem.getNumberOfUnclosedOutputStream(path)) .as(s"File $resultPath is not closed") .asInstanceOf[AbstractIntegerAssert[_]] .isEqualTo(0) ``` however there is an issue with such WA: in case there is more than one such WA per class it leads to issues for IntellijIdea (mvn works fine)... I've files scala plugin issue https://youtrack.jetbrains.com/issue/SCL-20679 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
snuyanzin commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194131282 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala: ## @@ -32,12 +35,11 @@ class FileSystemTestCsvITCase extends BatchFileSystemITCaseBase { override def getScheme: String = "test" - @After + @AfterEach def close(): Unit = { val path = new Path(resultPath) -assertEquals( - s"File $resultPath is not closed", - 0, - TestFileSystem.getNumberOfUnclosedOutputStream(path)) +if (TestFileSystem.getNumberOfUnclosedOutputStream(path) != 0) { + Assertions.fail(s"File $resultPath is not closed"); +} Review Comment: yeah.. that's weird... there is WA for that ```scala assertThat(TestFileSystem.getNumberOfUnclosedOutputStream(path)) .as(s"File $resultPath is not closed") .asInstanceOf[AbstractIntegerAssert[_]] .isEqualTo(0) ``` how there is an issue with such WA: in case there is more than one such WA per class it leads to issues for IntellijIdea (mvn works fine)... I've files scala plugin issue https://youtrack.jetbrains.com/issue/SCL-20679 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
reswqa commented on code in PR #22574: URL: https://github.com/apache/flink/pull/22574#discussion_r1194117257 ## flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemITCase.java: ## @@ -69,22 +72,18 @@ import static org.assertj.core.api.Assertions.assertThat; /** ITCase for {@link OrcFileFormatFactory}. */ -@RunWith(Parameterized.class) +@ExtendWith(ParameterizedTestExtension.class) public class OrcFileSystemITCase extends BatchFileSystemITCaseBase { -@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); +@TempDir public static java.nio.file.Path temporaryFolder; -private final boolean configure; +@Parameter public boolean configure; -@Parameterized.Parameters(name = "{0}") +@Parameters(name = "{0}") Review Comment: Somehow I forgot to synchronize the changes here 😞 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722858#comment-17722858 ] Aleksandr Iushmanov edited comment on FLINK-32069 at 5/15/23 4:47 PM: -- Thanks a lot [~mapohl] for this detailed analysis. I have only one question left around this dispatcher method that is responsible for job status: {code:java} @Override public CompletableFuture requestMultipleJobDetails(Time timeout) { List>> individualOptionalJobDetails = queryJobMastersForInformation( jobManagerRunner -> jobManagerRunner.requestJobDetails(timeout)); CompletableFuture>> optionalCombinedJobDetails = FutureUtils.combineAll(individualOptionalJobDetails); CompletableFuture> combinedJobDetails = optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection); final Collection completedJobDetails = executionGraphInfoStore.getAvailableJobDetails(); return combinedJobDetails.thenApply( (Collection runningJobDetails) -> { final Map deduplicatedJobs = new HashMap<>(); completedJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job)); runningJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job)); return new MultipleJobsDetails(new HashSet<>(deduplicatedJobs.values())); }); } {code} I am sorry if this is naive, but I wonder if the following is possible? 1. {{optionalCombinedJobDetails}} contains job status as RUNNING right after we complete future in onJobCompletion 2. in parallel job transitioned to FINISHED state and {{completedJobDetails}} already has correct execution graph with FINISHED state 3. {{combinedJobDetails}} are compiled with both results but {{runningJobDetails}} has priority over {{completedJobDetails}} and overrides. I will continue to look into this issue on my side, I will try to intercept JM on {{onJobCompletion}} method and in parallel request job status. If what I think is not possible or not causing the issue, I will pivot and have a look at InsertResultProvider side of things to check if another promise is broken (that insert operation has only one row in result) was (Author: izeren): Thanks a lot [~mapohl] for this detailed analysis. I have only one question left around this dispatcher method that is responsible for job status: {code:java} @Override public CompletableFuture requestMultipleJobDetails(Time timeout) { List>> individualOptionalJobDetails = queryJobMastersForInformation( jobManagerRunner -> jobManagerRunner.requestJobDetails(timeout)); CompletableFuture>> optionalCombinedJobDetails = FutureUtils.combineAll(individualOptionalJobDetails); CompletableFuture> combinedJobDetails = optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection); final Collection completedJobDetails = executionGraphInfoStore.getAvailableJobDetails(); return combinedJobDetails.thenApply( (Collection runningJobDetails) -> { final Map deduplicatedJobs = new HashMap<>(); completedJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job)); runningJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job)); return new MultipleJobsDetails(new HashSet<>(deduplicatedJobs.values())); }); } {code} I am sorry if this is naive, but I wonder if the following is possible? 1. {{optionalCombinedJobDetails}} contains job status as RUNNING 2. in parallel job transitioned to FINISHED state and {{completedJobDetails}} already has correct execution graph with FINISHED state 3. {{combinedJobDetails}} are compiled with both results but {{runningJobDetails}} has priority over {{completedJobDetails}} and overrides. I will continue to look into this issue on my side, I will try to intercept JM on {{onJobCompletion}} method and in parallel request job status. If what I think is not possible or not causing the issue, I will pivot and have a look at InsertResultProvider side of things to check if another promise is broken (that insert operation has only one row in result) > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed
[jira] [Commented] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722858#comment-17722858 ] Aleksandr Iushmanov commented on FLINK-32069: - Thanks a lot [~mapohl] for this detailed analysis. I have only one question left around this dispatcher method that is responsible for job status: {code:java} @Override public CompletableFuture requestMultipleJobDetails(Time timeout) { List>> individualOptionalJobDetails = queryJobMastersForInformation( jobManagerRunner -> jobManagerRunner.requestJobDetails(timeout)); CompletableFuture>> optionalCombinedJobDetails = FutureUtils.combineAll(individualOptionalJobDetails); CompletableFuture> combinedJobDetails = optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection); final Collection completedJobDetails = executionGraphInfoStore.getAvailableJobDetails(); return combinedJobDetails.thenApply( (Collection runningJobDetails) -> { final Map deduplicatedJobs = new HashMap<>(); completedJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job)); runningJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job)); return new MultipleJobsDetails(new HashSet<>(deduplicatedJobs.values())); }); } {code} I am sorry if this is naive, but I wonder if the following is possible? 1. {{optionalCombinedJobDetails}} contains job status as RUNNING 2. in parallel job transitioned to FINISHED state and {{completedJobDetails}} already has correct execution graph with FINISHED state 3. {{combinedJobDetails}} are compiled with both results but {{runningJobDetails}} has priority over {{completedJobDetails}} and overrides. I will continue to look into this issue on my side, I will try to intercept JM on {{onJobCompletion}} method and in parallel request job status. If what I think is not possible or not causing the issue, I will pivot and have a look at InsertResultProvider side of things to check if another promise is broken (that insert operation has only one row in result) > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race condition that is hard to reproduce on lightweight > setup. I was reproducing this running zeppelin notebook with remote flink > cluster and triggering SQL insert operation. If I find a smaller setup to > reproduce on small local cluster with lightweight client, I will update this > ticket when I have more input. I am open to suggestions on how to fix this. > > For Zeppelin I have a separate ticket because Flink 1.15 is not g
[jira] [Commented] (FLINK-31946) DynamoDB Sink Allow Multiple Item Writes
[ https://issues.apache.org/jira/browse/FLINK-31946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722855#comment-17722855 ] Hong Liang Teoh commented on FLINK-31946: - Hi [~cjensen] ! Thanks for creating this issue. Trying to understand the use case better here. If we want 1 record to map to 2 WriteRequests, would doing a FlatMap before the sink operator work? If the intention is to have more efficient reads, could we instead use a GSI? > DynamoDB Sink Allow Multiple Item Writes > > > Key: FLINK-31946 > URL: https://issues.apache.org/jira/browse/FLINK-31946 > Project: Flink > Issue Type: Improvement > Components: Connectors / DynamoDB >Reporter: Curtis Jensen >Priority: Minor > > In some cases, it is desirable to be able to write aggregation data to > multiple partition keys. This supports the case of denormalizing data to > facilitate more efficient read operations. > However, the DynamoDBSink allows for only a single DynamoDB item to be > generated for each Flink Element. This appears to be a limitation of the > ElementConverter more than DyanmoDBSink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31810) RocksDBException: Bad table magic number on checkpoint rescale
[ https://issues.apache.org/jira/browse/FLINK-31810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722771#comment-17722771 ] David Artiga edited comment on FLINK-31810 at 5/15/23 4:22 PM: --- Not sure what you mean by "enable local recovery" but in any case we just restore from older checkpoint/savepoint as a workaround. Edit: Found we have {{state.backend.local-recovery: true}} was (Author: david.artiga): Not sure what you mean by "enable local recovery" but in any case we just restore from older checkpoint/savepoint as a workaround. > RocksDBException: Bad table magic number on checkpoint rescale > -- > > Key: FLINK-31810 > URL: https://issues.apache.org/jira/browse/FLINK-31810 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.15.2 >Reporter: Robert Metzger >Priority: Major > > While rescaling a job from checkpoint, I ran into this exception: > {code:java} > SinkMaterializer[7] -> rob-result[7]: Writer -> rob-result[7]: Committer > (4/4)#3 (c1b348f7eed6e1ce0e41ef75338ae754) switched from INITIALIZING to > FAILED with failure cause: java.lang.Exception: Exception while creating > StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for > SinkUpsertMaterializer_7d9b7588bc2ff89baed50d7a4558caa4_(4/4) from any of the > 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) > ... 11 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395) > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483) > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ... 13 more > Caused by: java.io.IOException: Error while opening RocksDB instance. > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:92) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreDBInstanceFromStateHandle(RocksDBIncrementalRestoreOperation.java:465) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithRescaling(RocksDBIncrementalRestoreOperation.java:321) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:164) > at > or
[GitHub] [flink-kubernetes-operator] mxm opened a new pull request, #598: [FLINK-32102] Aggregate multiple pendingRecords metric per source if present
mxm opened a new pull request, #598: URL: https://github.com/apache/flink-kubernetes-operator/pull/598 Some source expose multiple `.pendingRecords` metrics. If that is the case, we must sum up these records to yield the correct internal pending records count. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722820#comment-17722820 ] Matthias Pohl edited comment on FLINK-32069 at 5/15/23 3:25 PM: Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed by the following lines. I'm writing it down for documentation purposes. It would be good if you could verify my findings. {quote}My line of thinking was that job result future is returned based on registry and job status based on result store and it it is not in sync same way as it used to be. {quote} The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The cleanup of the job is triggered (in [Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681]) as soon as the result of the {{JobManagerRunner}} is completed (which happens in [JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L384]). In that moment, {{JobManagerRunnerRegistry}} is the source of truth when it comes to the {{{}JobStatus{}}}. The next step is writing the {{ExecutionGraphInfo}} of the terminated job into the {{ExecutionGraphInfoStore}} in [Dispatcher#jobReachedTerminalState:1334|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1334]. After that, we create a dirty entry in the {{JobResultStore}} in [Dispatcher#jobReachedTerminalState:1347|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1347] which runs the operation asynchronously in the {{ioExecutor}} (see [Dispatcher#registerGloballyTerminatedJobInJobResultStore:1365|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1365]. The future is forwarded to [Dispatcher#runJob:678|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L678] where it will trigger the {{Dispatcher#globalResourceCleaner}} through [Dispatcher#removeJob:1245|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1245]. We have the job's status stored in the {{{}JobManagerRunnerRegistry{}}}, the {{{}ExecutionGraphInforStore{}}}, and the {{JobResultStore}} persisted based on the same {{{}ExecutionGraph{}}}. Any job status call should rely on the {{{}JobManagerRunnerRegistry{}}}, still. The {{Dispatcher#globalResourceCleaner}} is configured in [DisptacherResourceCleanerFactory#createGlobalResourceClearner:106ff|https://github.com/apache/flink/blob/c5352fc55972420ed5bf1afdfd97834540b1407a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java#L106]. It removes the job from the {{JobManagerRunnerRegistry}} before cleaning up all the other artifacts. This cleanup procedure is triggered after the {{JobResultStore}} has the job's dirty entry written to disk/memory (and consequently as well, after having the {{ExecutionGraphInfo}} being persisted in the {{{}ExecutionGraphInfoStore{}}}). The cleanup will result in the job's {{JobManagerRunner}} not being present in the {{{}JobManagerRunnerRegistry{}}}, anymore. The {{Dispatcher}} starts to rely on the {{ExecutionGraphInfoStore}} at that moment which has the same {{JobStatus}} present as the {{{}JobManagerRunnerRegistry{}}}. >From that findings, I would conclude that there is no race condition possible. >WDYT? was (Author: mapohl): Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed by the following lines. I'm writing it down for documentation purposes. It would be good if you could verify my findings. {quote}My line of thinking was that job result future is returned based on registry and job status based on result store and it it is not in sync same way as it used to be. {quote} The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The cleanup of the job is triggered (in [Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681]) as soon as the result of the {{JobManagerRunner}} is completed (which happens in [JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apac
[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722820#comment-17722820 ] Matthias Pohl edited comment on FLINK-32069 at 5/15/23 3:24 PM: Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed by the following lines. I'm writing it down for documentation purposes. It would be good if you could verify my findings. {quote}My line of thinking was that job result future is returned based on registry and job status based on result store and it it is not in sync same way as it used to be. {quote} The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The cleanup of the job is triggered (in [Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681]) as soon as the result of the {{JobManagerRunner}} is completed (which happens in [JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L384]). In that moment, {{JobManagerRunnerRegistry}} is the source of truth when it comes to the {{{}JobStatus{}}}. The next step is writing the {{ExecutionGraphInfo}} of the terminated job into the {{ExecutionGraphInfoStore}} in [Dispatcher#jobReachedTerminalState:1334|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1334]. After that, we create a dirty entry in the {{JobResultStore}} in [Dispatcher#jobReachedTerminalState:1347|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1347] which runs the operation asynchronously in the {{ioExecutor}} (see [Dispatcher#registerGloballyTerminatedJobInJobResultStore:1365|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1365]. The future is forwarded to [Dispatcher#runJob:678|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L678] where it will trigger the {{Dispatcher#globalResourceCleaner}} through [Dispatcher#removeJob:1245|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1245]. We have the job's status stored in the {{{}JobManagerRunnerRegistry{}}}, the {{{}ExecutionGraphInforStore{}}}, and the {{JobResultStore}} persisted based on the same {{{}ExecutionGraph{}}}. Any job status call should rely on the {{{}JobManagerRunnerRegistry{}}}, still. The {{Dispatcher#globalResourceCleaner}} is configured in [DisptacherResourceCleanerFactory#createGlobalResourceClearner:106ff|https://github.com/apache/flink/blob/c5352fc55972420ed5bf1afdfd97834540b1407a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java#L106]. It removes the job from the {{JobManagerRunnerRegistry}} before cleaning up all the other artifacts. This cleanup procedure is triggered after the {{JobResultStore}} has the job's dirty entry written to disk/memory (and consequently as well, after having the {{ExecutionGraphInfo}} being persisted in the {{{}ExecutionGraphInfoStore{}}}). The cleanup will result in the job's {{JobManagerRunner}} not being present in the {{{}JobManagerRunnerRegistry{}}}, anymore. The {{Dispatcher}} starts to rely on the {{ExecutionGraphInfoStore}} at that moment which has the same {{JobStatus}} present as the {{{}JobManagerRunnerRegistry{}}}. was (Author: mapohl): Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed by the following lines. I'm writing it down for documentation purposes. It would be good if you could verify my findings. {quote}My line of thinking was that job result future is returned based on registry and job status based on result store and it it is not in sync same way as it used to be. {quote} The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The cleanup of the job is triggered (in [Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681]) as soon as the result of the {{JobManagerRunner}} is completed (which happens in [JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/ap
[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722820#comment-17722820 ] Matthias Pohl edited comment on FLINK-32069 at 5/15/23 3:23 PM: Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed by the following lines. I'm writing it down for documentation purposes. It would be good if you could verify my findings. {quote}My line of thinking was that job result future is returned based on registry and job status based on result store and it it is not in sync same way as it used to be. {quote} The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The cleanup of the job is triggered (in [Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681]) as soon as the result of the {{JobManagerRunner}} is completed (which happens in [JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L384]). In that moment, {{JobManagerRunnerRegistry}} is the source of truth when it comes to the {{{}JobStatus{}}}. The next step is writing the {{ExecutionGraphInfo}} of the terminated job into the {{ExecutionGraphInfoStore}} in [Dispatcher#jobReachedTerminalState:1334|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1334]. After that, we create a dirty entry in the {{JobResultStore}} in [Dispatcher#jobReachedTerminalState:1347|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1347] which runs the operation asynchronously in the {{ioExecutor}} (see [Dispatcher#registerGloballyTerminatedJobInJobResultStore:1365|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1365]. The future is forwarded to [Dispatcher#runJob:678|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L678] where it will trigger the {{Dispatcher#globalResourceCleaner}} through [Dispatcher#removeJob:1245|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1245]. We have the job's status stored in the {{{}JobManagerRunnerRegistry{}}}, the {{{}ExecutionGraphInforStore{}}}, and the {{JobResultStore}} persisted based on the same {{{}ExecutionGraph{}}}. Any job status call should rely on the {{{}JobManagerRunnerRegistry{}}}, still. The {{Dispatcher#globalResourceCleaner}} is configured in [DisptacherResourceCleanerFactory#createGlobalResourceClearner:106ff|https://github.com/apache/flink/blob/c5352fc55972420ed5bf1afdfd97834540b1407a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java#L106]. It removes the job from the {{JobManagerRunnerRegistry}} before cleaning up all the other artifacts. This cleanup procedure is triggered after the {{JobResultStore}} has the job's dirty entry written to disk/memory. The cleanup will result in the job's {{JobManagerRunner}} not being present in the {{{}JobManagerRunnerRegistry{}}}, anymore. The {{Dispatcher}} starts to rely on the {{ExecutionGraphInfoStore}} at that moment which has the same {{JobStatus}} present as the {{{}JobManagerRunnerRegistry{}}}. was (Author: mapohl): work-in-progress > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInter
[jira] [Commented] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722820#comment-17722820 ] Matthias Pohl commented on FLINK-32069: --- Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed by the following lines. I'm writing it down for documentation purposes. :inno {quote} My line of thinking was that job result future is returned based on registry and job status based on result store and it it is not in sync same way as it used to be. {quote} The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The cleanup of the job is triggered (in [Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681]) as soon as the result of the {{JobManagerRunner}} is completed (which happens in [JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L384]). In that moment, {{JobManagerRunnerRegistry}} is the source of truth when it comes to the {{JobStatus}}. The next step is writing the {{ExecutionGraphInfo}} of the terminated job into the {{ExecutionGraphInfoStore}} in [Dispatcher#jobReachedTerminalState:1334|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1334]. After that, we create a dirty entry in the {{JobResultStore}} in [Dispatcher#jobReachedTerminalState:1347|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1347] which runs the operation asynchronously in the {{ioExecutor}} (see [Dispatcher#registerGloballyTerminatedJobInJobResultStore:1365|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1365]. The future is forwarded to [Dispatcher#runJob:678|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L678] where it will trigger the {{Dispatcher#globalResourceCleaner}} through [Dispatcher#removeJob:1245|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1245]. The > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race
[jira] [Created] (FLINK-32102) Aggregate multiple pendingRecords metric per source if present
Maximilian Michels created FLINK-32102: -- Summary: Aggregate multiple pendingRecords metric per source if present Key: FLINK-32102 URL: https://issues.apache.org/jira/browse/FLINK-32102 Project: Flink Issue Type: Improvement Components: Autoscaler, Kubernetes Operator Affects Versions: kubernetes-operator-1.4.0 Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: kubernetes-operator-1.6.0 Some source expose multiple {{.pendingRecords}} metrics. If that is the case, we must sum up these records to yield the correct internal pending records count. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722820#comment-17722820 ] Matthias Pohl edited comment on FLINK-32069 at 5/15/23 3:15 PM: work-in-progress was (Author: mapohl): Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed by the following lines. I'm writing it down for documentation purposes. :inno {quote} My line of thinking was that job result future is returned based on registry and job status based on result store and it it is not in sync same way as it used to be. {quote} The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The cleanup of the job is triggered (in [Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681]) as soon as the result of the {{JobManagerRunner}} is completed (which happens in [JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L384]). In that moment, {{JobManagerRunnerRegistry}} is the source of truth when it comes to the {{JobStatus}}. The next step is writing the {{ExecutionGraphInfo}} of the terminated job into the {{ExecutionGraphInfoStore}} in [Dispatcher#jobReachedTerminalState:1334|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1334]. After that, we create a dirty entry in the {{JobResultStore}} in [Dispatcher#jobReachedTerminalState:1347|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1347] which runs the operation asynchronously in the {{ioExecutor}} (see [Dispatcher#registerGloballyTerminatedJobInJobResultStore:1365|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1365]. The future is forwarded to [Dispatcher#runJob:678|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L678] where it will trigger the {{Dispatcher#globalResourceCleaner}} through [Dispatcher#removeJob:1245|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1245]. The > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b8
[jira] [Closed] (FLINK-31609) Fatal error in ResourceManager caused YARNSessionFIFOSecuredITCase.testDetachedMode to fail
[ https://issues.apache.org/jira/browse/FLINK-31609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi closed FLINK-31609. - > Fatal error in ResourceManager caused > YARNSessionFIFOSecuredITCase.testDetachedMode to fail > --- > > Key: FLINK-31609 > URL: https://issues.apache.org/jira/browse/FLINK-31609 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.18.0 >Reporter: Matthias Pohl >Assignee: Ferenc Csaky >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.18.0 > > > This looks like FLINK-30908. I created a follow-up ticket because we reached > a new minor version. > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47547&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461 > {code} > Mar 24 09:32:29 2023-03-24 09:31:50,001 ERROR > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - > Exception on heartbeat > Mar 24 09:32:29 java.io.InterruptedIOException: Interrupted waiting to send > RPC request to server > Mar 24 09:32:29 java.io.InterruptedIOException: Interrupted waiting to send > RPC request to server > Mar 24 09:32:29 at org.apache.hadoop.ipc.Client.call(Client.java:1461) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at org.apache.hadoop.ipc.Client.call(Client.java:1403) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at com.sun.proxy.$Proxy33.allocate(Unknown Source) > ~[?:?] > Mar 24 09:32:29 at > org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77) > ~[hadoop-yarn-common-2.10.2.jar:?] > Mar 24 09:32:29 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) ~[?:1.8.0_292] > Mar 24 09:32:29 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_292] > Mar 24 09:32:29 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_292] > Mar 24 09:32:29 at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_292] > Mar 24 09:32:29 at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:433) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:166) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:158) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:96) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:362) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at com.sun.proxy.$Proxy34.allocate(Unknown Source) > ~[?:?] > Mar 24 09:32:29 at > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:297) > ~[hadoop-yarn-client-2.10.2.jar:?] > Mar 24 09:32:29 at > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:274) > [hadoop-yarn-client-2.10.2.jar:?] > Mar 24 09:32:29 Caused by: java.lang.InterruptedException > Mar 24 09:32:29 at > java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) ~[?:1.8.0_292] > Mar 24 09:32:29 at > java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:1.8.0_292] > Mar 24 09:32:29 at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1177) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at org.apache.hadoop.ipc.Client.call(Client.java:1456) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 ... 17 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-31609) Fatal error in ResourceManager caused YARNSessionFIFOSecuredITCase.testDetachedMode to fail
[ https://issues.apache.org/jira/browse/FLINK-31609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi resolved FLINK-31609. --- Fix Version/s: 1.18.0 Resolution: Fixed 4415c5150eda071b219db5532c359ca29730a378 on master > Fatal error in ResourceManager caused > YARNSessionFIFOSecuredITCase.testDetachedMode to fail > --- > > Key: FLINK-31609 > URL: https://issues.apache.org/jira/browse/FLINK-31609 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.18.0 >Reporter: Matthias Pohl >Assignee: Ferenc Csaky >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.18.0 > > > This looks like FLINK-30908. I created a follow-up ticket because we reached > a new minor version. > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47547&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461 > {code} > Mar 24 09:32:29 2023-03-24 09:31:50,001 ERROR > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - > Exception on heartbeat > Mar 24 09:32:29 java.io.InterruptedIOException: Interrupted waiting to send > RPC request to server > Mar 24 09:32:29 java.io.InterruptedIOException: Interrupted waiting to send > RPC request to server > Mar 24 09:32:29 at org.apache.hadoop.ipc.Client.call(Client.java:1461) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at org.apache.hadoop.ipc.Client.call(Client.java:1403) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at com.sun.proxy.$Proxy33.allocate(Unknown Source) > ~[?:?] > Mar 24 09:32:29 at > org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77) > ~[hadoop-yarn-common-2.10.2.jar:?] > Mar 24 09:32:29 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) ~[?:1.8.0_292] > Mar 24 09:32:29 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_292] > Mar 24 09:32:29 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_292] > Mar 24 09:32:29 at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_292] > Mar 24 09:32:29 at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:433) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:166) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:158) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:96) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:362) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at com.sun.proxy.$Proxy34.allocate(Unknown Source) > ~[?:?] > Mar 24 09:32:29 at > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:297) > ~[hadoop-yarn-client-2.10.2.jar:?] > Mar 24 09:32:29 at > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:274) > [hadoop-yarn-client-2.10.2.jar:?] > Mar 24 09:32:29 Caused by: java.lang.InterruptedException > Mar 24 09:32:29 at > java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) ~[?:1.8.0_292] > Mar 24 09:32:29 at > java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:1.8.0_292] > Mar 24 09:32:29 at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1177) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 at org.apache.hadoop.ipc.Client.call(Client.java:1456) > ~[hadoop-common-2.10.2.jar:?] > Mar 24 09:32:29 ... 17 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] gaborgsomogyi merged pull request #22550: [FLINK-31609][yarn][test] Extend log whitelist for expected AMRM heartbeat interrupt
gaborgsomogyi merged PR #22550: URL: https://github.com/apache/flink/pull/22550 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tweise commented on a diff in pull request #22556: [FLINK-32030][sql-client] Add URLs support for SQL Client gateway mode
tweise commented on code in PR #22556: URL: https://github.com/apache/flink/pull/22556#discussion_r1193954446 ## flink-core/src/main/java/org/apache/flink/util/NetUtils.java: ## @@ -120,6 +120,18 @@ private static URL validateHostPortString(String hostPort) { } } +/** + * Converts an InetSocketAddress to a URL. This method assigns the "http://"; schema to the URL Review Comment: So the existing implementation does not support https at all? I know that you are going to add support for https scheme later, just wanted to clarify the current state for host:port ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722691#comment-17722691 ] Matthias Pohl edited comment on FLINK-32069 at 5/15/23 2:42 PM: Thanks for documenting this issue in Jira, [~izeren]. Here are my findings so far: I struggle to find a connection between the {{RunningJobsRegistry}} and the {{getJobStatus}} call of the client (which calls {{Dispatcher.requestJobStatus}} in the end. [~izeren] is right with claiming that we did a slight modification of the code when removing the {{RunningJobRegistry}} in [JobMasterServiceLeadershipRunner:385ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL385-L395]. Marking this job as done happened before completing the {{JobMasterServiceLeadershipRunner#resultFuture}} through the {{{}RunningJobsRegistry{}}}. In the current code, we mark the job as completed after completing {{JobMasterServiceLeadershipRunner#resultFuture}} through the {{{}JobResultStore{}}}. My issue is, though, that we're not relying on the {{RunningJobsRegistry}} in any way for the {{Dispatcher#requestJob}} call. The {{RunningJobsRegistry}} was only used for leader recovery in [JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess:272ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL272-L278] and when submitting a job through [Dispatcher#isInGloballyTerminalState:375ff|https://github.com/apache/flink/pull/18189/files#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cL375] in {{{}Dispatcher#submitJob{}}}. I would have expected that we would find {{Dispatcher#requestJobStatus}} somewhere in the call hierarchy of {{{}RunningJobsRegistry#getJobSchedulingStatus{}}}, if it would have had an influence on {{{}Dispatcher#requestJobStatus{}}}. I don't want to say that [~izeren]'s conclusion is wrong, yet. It just doesn't match my findings in the code. It could be also that I'm missing a code path here. [~dmvk], do you have something to add? was (Author: mapohl): Thanks for documenting this issue in Jira, [~izeren]. Here are my findings so far: I struggle to find a connection between the {{RunningJobRegistry}} and the {{getJobStatus}} call of the client (which calls {{Dispatcher.requestJobStatus}} in the end. [~izeren] is right with claiming that we did a slight modification of the code when removing the {{RunningJobRegistry}} in [JobMasterServiceLeadershipRunner:385ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL385-L395]. Marking this job as done happened before completing the {{JobMasterServiceLeadershipRunner#resultFuture}} through the {{{}RunningJobsRegistry{}}}. In the current code, we mark the job as completed after completing {{JobMasterServiceLeadershipRunner#resultFuture}} through the {{{}JobResultStore{}}}. My issue is, though, that we're not relying on the {{RunningJobsRegistry}} in any way for the {{Dispatcher#requestJob}} call. The {{RunningJobsRegistry}} was only used for leader recovery in [JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess:272ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL272-L278] and when submitting a job through [Dispatcher#isInGloballyTerminalState:375ff|https://github.com/apache/flink/pull/18189/files#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cL375] in {{{}Dispatcher#submitJob{}}}. I would have expected that we would find {{Dispatcher#requestJobStatus}} somewhere in the call hierarchy of {{{}RunningJobsRegistry#getJobSchedulingStatus{}}}, if it would have had an influence on {{{}Dispatcher#requestJobStatus{}}}. I don't want to say that [~izeren]'s conclusion is wrong, yet. It just doesn't match my findings in the code. It could be also that I'm missing a code path here. [~dmvk], do you have something to add? > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert opera
[jira] [Assigned] (FLINK-24696) Translate how to configure unaligned checkpoints into Chinese
[ https://issues.apache.org/jira/browse/FLINK-24696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-24696: -- Assignee: lijie > Translate how to configure unaligned checkpoints into Chinese > - > > Key: FLINK-24696 > URL: https://issues.apache.org/jira/browse/FLINK-24696 > Project: Flink > Issue Type: Improvement > Components: chinese-translation, Documentation >Affects Versions: 1.14.1, 1.15.0 >Reporter: Piotr Nowojski >Assignee: lijie >Priority: Not a Priority > > As part of FLINK-24695 > {{docs/content/docs/ops/state/checkpointing_under_backpressure.md}} and > {{docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md}} were > modified. Those modifications should be translated into Chinese -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-24696) Translate how to configure unaligned checkpoints into Chinese
[ https://issues.apache.org/jira/browse/FLINK-24696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722802#comment-17722802 ] Piotr Nowojski commented on FLINK-24696: Sure! Thanks :) > Translate how to configure unaligned checkpoints into Chinese > - > > Key: FLINK-24696 > URL: https://issues.apache.org/jira/browse/FLINK-24696 > Project: Flink > Issue Type: Improvement > Components: chinese-translation, Documentation >Affects Versions: 1.14.1, 1.15.0 >Reporter: Piotr Nowojski >Assignee: lijie >Priority: Not a Priority > > As part of FLINK-24695 > {{docs/content/docs/ops/state/checkpointing_under_backpressure.md}} and > {{docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md}} were > modified. Those modifications should be translated into Chinese -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] pnowojski commented on a diff in pull request #22584: [FLINK-31963][state] Fix rescaling bug in recovery from unaligned checkpoints.
pnowojski commented on code in PR #22584: URL: https://github.com/apache/flink/pull/22584#discussion_r1193934653 ## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java: ## @@ -785,6 +789,130 @@ public void testOnlyUpstreamChannelStateAssignment() } } +/** FLINK-31963: Tests rescaling for stateless operators and upstream result partition state. */ +@Test +public void testOnlyUpstreamChannelRescaleStateAssignment() +throws JobException, JobExecutionException { +Random random = new Random(); +OperatorSubtaskState upstreamOpState = +OperatorSubtaskState.builder() +.setResultSubpartitionState( +new StateObjectCollection<>( +asList( + createNewResultSubpartitionStateHandle(10, random), + createNewResultSubpartitionStateHandle( +10, random +.build(); +testOnlyUpstreamOrDownstreamRescalingInternal(upstreamOpState, null, 5, 7); +} + +/** FLINK-31963: Tests rescaling for stateless operators and downstream input channel state. */ +@Test +public void testOnlyDownstreamChannelRescaleStateAssignment() +throws JobException, JobExecutionException { +Random random = new Random(); +OperatorSubtaskState downstreamOpState = +OperatorSubtaskState.builder() +.setInputChannelState( +new StateObjectCollection<>( +asList( + createNewInputChannelStateHandle(10, random), + createNewInputChannelStateHandle(10, random +.build(); +testOnlyUpstreamOrDownstreamRescalingInternal(null, downstreamOpState, 5, 5); +} + +private void testOnlyUpstreamOrDownstreamRescalingInternal( +@Nullable OperatorSubtaskState upstreamOpState, +@Nullable OperatorSubtaskState downstreamOpState, +int expectedUpstreamCount, +int expectedDownstreamCount) +throws JobException, JobExecutionException { + +if ((upstreamOpState == null && downstreamOpState == null) +|| (upstreamOpState != null && downstreamOpState != null)) { +// Either upstream or downstream state must exist, but not both. +return; +} + +// Start from parallelism 5 for both operators +int upstreamParallelism = 5; +int downstreamParallelism = 5; + +// Build states +List operatorIds = buildOperatorIds(2); +Map states = new HashMap<>(); +OperatorState upstreamState = +new OperatorState(operatorIds.get(0), upstreamParallelism, MAX_P); +OperatorState downstreamState = +new OperatorState(operatorIds.get(1), downstreamParallelism, MAX_P); + +states.put(operatorIds.get(0), upstreamState); +states.put(operatorIds.get(1), downstreamState); + +if (upstreamOpState != null) { +upstreamState.putState(0, upstreamOpState); +// rescale downstream 5 -> 3 +downstreamParallelism = 3; +} + +if (downstreamOpState != null) { +downstreamState.putState(0, downstreamOpState); +// rescale upstream 5 -> 3 +upstreamParallelism = 3; +} + +List opIdWithParallelism = new ArrayList<>(2); +opIdWithParallelism.add( +new OperatorIdWithParallelism(operatorIds.get(0), upstreamParallelism)); +opIdWithParallelism.add( +new OperatorIdWithParallelism(operatorIds.get(1), downstreamParallelism)); + +Map vertices = +buildVertices(opIdWithParallelism, RANGE, ROUND_ROBIN); + +// Run state assignment +new StateAssignmentOperation(0, new HashSet<>(vertices.values()), states, false) +.assignStates(); + +// Check results +ExecutionJobVertex upstreamExecutionJobVertex = vertices.get(operatorIds.get(0)); +ExecutionJobVertex downstreamExecutionJobVertex = vertices.get(operatorIds.get(1)); + +List upstreamRescalingDescriptors = +getRescalingDescriptorsFromVertex(upstreamExecutionJobVertex); +List downstreamRescalingDescriptors = + getRescalingDescriptorsFromVertex(downstreamExecutionJobVertex); + +checkMappings( +upstreamRescalingDescriptors, +TaskStateSnapshot::getOutputRescalingDescriptor, +expectedUpstreamCount); + +checkMappings( +downstreamRescaling
[GitHub] [flink] StefanRRichter commented on a diff in pull request #22584: [FLINK-31963][state] Fix rescaling bug in recovery from unaligned checkpoints.
StefanRRichter commented on code in PR #22584: URL: https://github.com/apache/flink/pull/22584#discussion_r1193925009 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -136,19 +136,24 @@ public void assignStates() { // repartition state for (TaskStateAssignment stateAssignment : vertexAssignments.values()) { -if (stateAssignment.hasNonFinishedState) { +if (stateAssignment.hasNonFinishedState +// FLINK-31963: We need to run repartitioning for stateless operators that have +// upstream output or downstream input states. +|| stateAssignment.hasUpstreamOutputStates() +|| stateAssignment.hasDownstreamInputStates()) { assignAttemptState(stateAssignment); } } // actually assign the state for (TaskStateAssignment stateAssignment : vertexAssignments.values()) { -// If upstream has output states, even the empty task state should be assigned for the -// current task in order to notify this task that the old states will send to it which -// likely should be filtered. +// If upstream has output states or downstream has input states, even the empty task +// state should be assigned for the current task in order to notify this task that the +// old states will send to it which likely should be filtered. if (stateAssignment.hasNonFinishedState || stateAssignment.isFullyFinished -|| stateAssignment.hasUpstreamOutputStates()) { +|| stateAssignment.hasUpstreamOutputStates() +|| stateAssignment.hasDownstreamInputStates()) { Review Comment: I was considering that, and then I saw that most of the heavy work for the functions is already cached. Do you think it's still worth it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-32047) Fix args in JobSpec not being passed through to Flink in Standalone mode - 1.4.0
[ https://issues.apache.org/jira/browse/FLINK-32047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722785#comment-17722785 ] Gil Shmaya edited comment on FLINK-32047 at 5/15/23 2:26 PM: - We run with 1.14.6, sorry for the confusion. I see the same error while trying to run your test with 1.14.6. However, I can't find an explanation for the fact we see the args pass in the version 1.2.0 operator version but not with 1.4.0. Is it possible that there was a change in the way we should configure the args in 1.4.0? was (Author: JIRAUSER299921): We run with 1.14.6, sorry for the confusion. I see the same error while trying to run your test with 1.14.6. However, I can't find an explanation for the fact we see the args pass in the version 1.2.0 operator version but not with 1.4.0. > Fix args in JobSpec not being passed through to Flink in Standalone mode - > 1.4.0 > > > Key: FLINK-32047 > URL: https://issues.apache.org/jira/browse/FLINK-32047 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Gil Shmaya >Priority: Major > Fix For: 1.14.0 > > Attachments: image-2023-04-30-18-54-22-291.png, > image-2023-04-30-19-56-30-150.png, image-2023-04-30-19-56-57-680.png > > > This issue is related to a previously fixed bug in version 1.2.0 - > FLINK-29388 > I have noticed that while the args are successfully being passed when using > version 1.2.0, this is not the case with version 1.4.0. > {+}Scenario{+}: > I added a log that prints the argument array length at the beginning of the > main function of the flink job: > !image-2023-04-30-18-54-22-291.png! > The result when running with 1.2.0: > !image-2023-04-30-19-56-30-150.png! > The result when running with 1.4.0: > !image-2023-04-30-19-56-57-680.png! > h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32047) Fix args in JobSpec not being passed through to Flink in Standalone mode - 1.4.0
[ https://issues.apache.org/jira/browse/FLINK-32047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722785#comment-17722785 ] Gil Shmaya edited comment on FLINK-32047 at 5/15/23 2:25 PM: - We run with 1.14.6, sorry for the confusion. I see the same error while trying to run your test with 1.14.6. However, I can't find an explanation for the fact we see the args pass in the version 1.2.0 operator version but not with 1.4.0. was (Author: JIRAUSER299921): We run with 1.14.6, sorry for the confusion. > Fix args in JobSpec not being passed through to Flink in Standalone mode - > 1.4.0 > > > Key: FLINK-32047 > URL: https://issues.apache.org/jira/browse/FLINK-32047 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Gil Shmaya >Priority: Major > Fix For: 1.14.0 > > Attachments: image-2023-04-30-18-54-22-291.png, > image-2023-04-30-19-56-30-150.png, image-2023-04-30-19-56-57-680.png > > > This issue is related to a previously fixed bug in version 1.2.0 - > FLINK-29388 > I have noticed that while the args are successfully being passed when using > version 1.2.0, this is not the case with version 1.4.0. > {+}Scenario{+}: > I added a log that prints the argument array length at the beginning of the > main function of the flink job: > !image-2023-04-30-18-54-22-291.png! > The result when running with 1.2.0: > !image-2023-04-30-19-56-30-150.png! > The result when running with 1.4.0: > !image-2023-04-30-19-56-57-680.png! > h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22584: [FLINK-31963][state] Fix rescaling bug in recovery from unaligned checkpoints.
rkhachatryan commented on code in PR #22584: URL: https://github.com/apache/flink/pull/22584#discussion_r1193902234 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -136,19 +136,24 @@ public void assignStates() { // repartition state for (TaskStateAssignment stateAssignment : vertexAssignments.values()) { -if (stateAssignment.hasNonFinishedState) { +if (stateAssignment.hasNonFinishedState +// FLINK-31963: We need to run repartitioning for stateless operators that have +// upstream output or downstream input states. +|| stateAssignment.hasUpstreamOutputStates() +|| stateAssignment.hasDownstreamInputStates()) { assignAttemptState(stateAssignment); } } // actually assign the state for (TaskStateAssignment stateAssignment : vertexAssignments.values()) { -// If upstream has output states, even the empty task state should be assigned for the -// current task in order to notify this task that the old states will send to it which -// likely should be filtered. +// If upstream has output states or downstream has input states, even the empty task +// state should be assigned for the current task in order to notify this task that the +// old states will send to it which likely should be filtered. if (stateAssignment.hasNonFinishedState || stateAssignment.isFullyFinished -|| stateAssignment.hasUpstreamOutputStates()) { +|| stateAssignment.hasUpstreamOutputStates() +|| stateAssignment.hasDownstreamInputStates()) { Review Comment: Should we compute `hasUpstreamOutputStates` and `hasDownstreamInputStates` once, similar to `isFullyFinished`? (that would at least be consistent, and also faster) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22584: [FLINK-31963][state] Fix rescaling bug in recovery from unaligned checkpoints.
rkhachatryan commented on code in PR #22584: URL: https://github.com/apache/flink/pull/22584#discussion_r1193908276 ## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java: ## @@ -1029,6 +1192,15 @@ private JobVertex createJobVertex( return jobVertex; } +private List getRescalingDescriptorsFromVertex( Review Comment: `getTaskStateSnapshotFromVertex`? ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -136,19 +136,24 @@ public void assignStates() { // repartition state for (TaskStateAssignment stateAssignment : vertexAssignments.values()) { -if (stateAssignment.hasNonFinishedState) { +if (stateAssignment.hasNonFinishedState +// FLINK-31963: We need to run repartitioning for stateless operators that have +// upstream output or downstream input states. +|| stateAssignment.hasUpstreamOutputStates() +|| stateAssignment.hasDownstreamInputStates()) { assignAttemptState(stateAssignment); } } // actually assign the state for (TaskStateAssignment stateAssignment : vertexAssignments.values()) { -// If upstream has output states, even the empty task state should be assigned for the -// current task in order to notify this task that the old states will send to it which -// likely should be filtered. +// If upstream has output states or downstream has input states, even the empty task +// state should be assigned for the current task in order to notify this task that the +// old states will send to it which likely should be filtered. if (stateAssignment.hasNonFinishedState || stateAssignment.isFullyFinished -|| stateAssignment.hasUpstreamOutputStates()) { +|| stateAssignment.hasUpstreamOutputStates() +|| stateAssignment.hasDownstreamInputStates()) { Review Comment: Should we compute once `hasUpstreamOutputStates` and `hasDownstreamInputStates` similar to `isFullyFinished`? (that would at least be consistent, and also faster) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722691#comment-17722691 ] Matthias Pohl edited comment on FLINK-32069 at 5/15/23 2:05 PM: Thanks for documenting this issue in Jira, [~izeren]. Here are my findings so far: I struggle to find a connection between the {{RunningJobRegistry}} and the {{getJobStatus}} call of the client (which calls {{Dispatcher.requestJobStatus}} in the end. [~izeren] is right with claiming that we did a slight modification of the code when removing the {{RunningJobRegistry}} in [JobMasterServiceLeadershipRunner:385ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL385-L395]. Marking this job as done happened before completing the {{JobMasterServiceLeadershipRunner#resultFuture}} through the {{{}RunningJobsRegistry{}}}. In the current code, we mark the job as completed after completing {{JobMasterServiceLeadershipRunner#resultFuture}} through the {{{}JobResultStore{}}}. My issue is, though, that we're not relying on the {{RunningJobsRegistry}} in any way for the {{Dispatcher#requestJob}} call. The {{RunningJobsRegistry}} was only used for leader recovery in [JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess:272ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL272-L278] and when submitting a job through [Dispatcher#isInGloballyTerminalState:375ff|https://github.com/apache/flink/pull/18189/files#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cL375] in {{{}Dispatcher#submitJob{}}}. I would have expected that we would find {{Dispatcher#requestJobStatus}} somewhere in the call hierarchy of {{{}RunningJobsRegistry#getJobSchedulingStatus{}}}, if it would have had an influence on {{{}Dispatcher#requestJobStatus{}}}. I don't want to say that [~izeren]'s conclusion is wrong, yet. It just doesn't match my findings in the code. It could be also that I'm missing a code path here. [~dmvk], do you have something to add? was (Author: mapohl): Thanks for documenting this issue in Jira, [~izeren]. Here are my findings so far: I struggle to find a connection between the {{RunningJobRegistry}} and the {{getJobStatus}} call of the client (which calls {{Dispatcher.requestJobStatus}} in the end. [~izeren] is right with claiming that we did a slight modification of the code when removing the {{RunningJobRegistry}} in [JobMasterServiceLeadershipRunner:385ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL385-L395]. Marking this job as done happened before completing the {{JobMasterServiceLeadershipRunner#resultFuture}} through the {{{}RunningJobsRegistry{}}}. In the current code, we mark the job as completed after completing {{JobMasterServiceLeadershipRunner#resultFuture}} through the {{{}JobResultStore{}}}. My issue is, though, that we're not relying on the {{RunningJobsRegistry}} in any way for the {{Dispatcher#requestJob}} call. The {{RunningJobsRegistry}} was only used for leader recovery in [JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess:272ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL272-L278] and when submitting a job through [Dispatcher#isInGloballyTerminalState:375ff|https://github.com/apache/flink/pull/18189/files#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cL375] in {{{}Dispatcher#submitJob{}}}. I would have expected that we would find {{Dispatcher#requestJobStatus}} somewhere in the call hierarchy of {{{}RunningJobsRegistry#getJobSchedulingStatus{}}}, if it would have had an influence on {{{}Dispatcher#requestJobStatus{}}}. I don't want to say that [~izeren]'s conclusion is wrong, yet. It just doesn't match my findings in the code. I could perfectly be that I'm missing a codepath here. [~dmvk], do you have something to add? > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert ope
[jira] [Commented] (FLINK-32101) FlinkKafkaInternalProducerITCase.testInitTransactionId test failed
[ https://issues.apache.org/jira/browse/FLINK-32101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722790#comment-17722790 ] Matthias Pohl commented on FLINK-32101: --- [~tanyuxin] this looks like a Kafka-internal instability, doesn't it? We're collecting these kind of errors under the umbrella issue FLINK-31145. I'm gonna link FLINK-31145 at least in case it is related. > FlinkKafkaInternalProducerITCase.testInitTransactionId test failed > -- > > Key: FLINK-32101 > URL: https://issues.apache.org/jira/browse/FLINK-32101 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.18.0 >Reporter: Yuxin Tan >Priority: Major > Labels: test > > FlinkKafkaInternalProducerITCase.testInitTransactionId test failed. > Caused by: org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The request timed out. > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48990&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=502fb6c0-30a2-5e49-c5c2-a00fa3acb203&l=22973 > {code:java} > Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The request timed out. > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593) > at > java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) > at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) > at > java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) > at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) > at > java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > at > java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1290) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1216) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:95) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:747) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:722) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:688) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The request timed out. > at > org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1418) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.
[jira] [Commented] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722788#comment-17722788 ] Aleksandr Iushmanov commented on FLINK-32069: - I can provide more details on the code path used in my tests: On zeppelin side of things it goes to: ClusterClientJobClientAdapter -> RestClusterClient -> getJobStatus -> getJobDetails getJobDetails is a rest API call to /jobs/:jobid On flink cluster side: this request is handled by JobOwerviewHandler and processed with Dispatcher::requestMultipleJobDetails this method requests both info on running and completed jobs for completed jobs it relies on ExecutionGraphInfoStore, in my case it is FileExecutionGraphInfoStore::getAvailableJobDetails() Which is local file jobDetailCache if I get it right, this cache is updated in Dispatcher::jobReachedTerminalState For job result it goes to /jobs/:jobid/execution-result Dispatcher::requestJobResult checks if job is in jobManagerRunnerRegistry and if it is there it returns result future based on job instance in registry, otherwise it picks up result from executionGraphInfoStore (which should be the same as for status) My line of thinking was that job result future is returned based on registry and job status based on result store and it it is not in sync same way as it used to be. I am not 100% insisting though that bug is there. It can also be in InsertResultProvider. I was originally looking at TableResult impl and I have noticed that "firstRow" logic is not new. Also for insert operations you get "Program execution finished" when you request result. I would expect job to have status finished in this case (as per documentation). But it is true that this can be on ResultProvider side as well (if it doesn't ensure that job status is finished) > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race condition that is hard to reproduce on lightweight > setup. I was reproducing this running zeppelin notebook with remote flink > cluster and triggering SQL insert operation. If I find a smaller setup to > reproduce on small local cluster with lightweight client, I will update this > ticket when I have more input. I am open to suggestions on how to fix this. > > For Zeppelin I have a separate ticket because Flink 1.15 is not going to be > fixed but this issue if I understand it correctly should be common for all > versions starting 1.15, therefore it makes sense to address this starting > 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 > > [~mapohl], Thank you for assistance in slack, I have created this ticket to > back our conversation, could you please add your thoughts o
[jira] [Commented] (FLINK-32047) Fix args in JobSpec not being passed through to Flink in Standalone mode - 1.4.0
[ https://issues.apache.org/jira/browse/FLINK-32047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722785#comment-17722785 ] Gil Shmaya commented on FLINK-32047: We run with 1.14.6, sorry for the confusion. > Fix args in JobSpec not being passed through to Flink in Standalone mode - > 1.4.0 > > > Key: FLINK-32047 > URL: https://issues.apache.org/jira/browse/FLINK-32047 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Gil Shmaya >Priority: Major > Fix For: 1.14.0 > > Attachments: image-2023-04-30-18-54-22-291.png, > image-2023-04-30-19-56-30-150.png, image-2023-04-30-19-56-57-680.png > > > This issue is related to a previously fixed bug in version 1.2.0 - > FLINK-29388 > I have noticed that while the args are successfully being passed when using > version 1.2.0, this is not the case with version 1.4.0. > {+}Scenario{+}: > I added a log that prints the argument array length at the beginning of the > main function of the flink job: > !image-2023-04-30-18-54-22-291.png! > The result when running with 1.2.0: > !image-2023-04-30-19-56-30-150.png! > The result when running with 1.4.0: > !image-2023-04-30-19-56-57-680.png! > h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver
XComp commented on code in PR #22380: URL: https://github.com/apache/flink/pull/22380#discussion_r1193867705 ## flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderBase.java: ## @@ -80,6 +84,20 @@ public Throwable getError() { return error == null ? errorQueue.poll() : error; } +/** + * Method for exposing errors that were caught during the test execution and need to be exposed + * within the test. + */ +public void throwErrorIfPresent() throws Throwable { +if (error != null) { +throw error; +} + +if (!errorQueue.isEmpty()) { +throw errorQueue.poll(); +} +} Review Comment: fair point. That reduces the diff for `DefaultLeaderElectionServiceTest`. :+1: I wrapped it into an `AssertionError` now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver
XComp commented on code in PR #22380: URL: https://github.com/apache/flink/pull/22380#discussion_r1193866384 ## flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java: ## @@ -270,65 +390,73 @@ public void onLeaderInformationChange(LeaderInformation leaderInformation) { runInLeaderEventThread(() -> onLeaderInformationChangeInternal(leaderInformation)); } +@GuardedBy("lock") private void onLeaderInformationChangeInternal(LeaderInformation leaderInformation) { -synchronized (lock) { -if (running) { -LOG.trace( -"Leader node changed while {} is the leader with session ID {}. New leader information {}.", -leaderContender.getDescription(), -confirmedLeaderInformation.getLeaderSessionID(), -leaderInformation); -if (!confirmedLeaderInformation.isEmpty()) { -final LeaderInformation confirmedLeaderInfo = this.confirmedLeaderInformation; -if (leaderInformation.isEmpty()) { -LOG.debug( -"Writing leader information by {} since the external storage is empty.", -leaderContender.getDescription()); - leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo); -} else if (!leaderInformation.equals(confirmedLeaderInfo)) { -// the data field does not correspond to the expected leader information -LOG.debug( -"Correcting leader information by {}.", -leaderContender.getDescription()); - leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo); -} +if (leaderContender != null) { +LOG.trace( +"Leader node changed while {} is the leader with {}. New leader information {}.", +leaderContender.getDescription(), + LeaderElectionUtils.convertToString(confirmedLeaderInformation), +LeaderElectionUtils.convertToString(leaderInformation)); +if (!confirmedLeaderInformation.isEmpty()) { +final LeaderInformation confirmedLeaderInfo = this.confirmedLeaderInformation; +if (leaderInformation.isEmpty()) { +LOG.debug( +"Writing leader information by {} since the external storage is empty.", +leaderContender.getDescription()); + leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo); +} else if (!leaderInformation.equals(confirmedLeaderInfo)) { +// the data field does not correspond to the expected leader information +LOG.debug( +"Correcting leader information by {}.", +leaderContender.getDescription()); + leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo); } -} else { -LOG.debug( -"Ignoring change notification since the {} has already been closed.", -leaderElectionDriver); } +} else { +LOG.debug( +"Ignoring change notification since the {} has already been stopped.", +leaderElectionDriver); } } private void runInLeaderEventThread(Runnable callback) { -if (running) { +if (leadershipOperationExecutor != null) { Review Comment: No it's not: I missed that one (even with passing over the PR last week -.-). I added a lock around the execute call and added the @GuardedBy annotation to the `leadershipOperationExecutor`. Additional, I removed the `volatile` from `leaderElectionDriver`. That seem to have survived previous changes and is not necessary anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver
XComp commented on code in PR #22380: URL: https://github.com/apache/flink/pull/22380#discussion_r1193863651 ## flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java: ## @@ -32,49 +31,73 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; /** * Default implementation for leader election service. Composed with different {@link * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the * leader information to various storage. + * + * {@code DefaultLeaderElectionService} handles a single {@link LeaderContender}. */ public class DefaultLeaderElectionService -implements LeaderElectionService, LeaderElectionEventHandler { +implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class); private final Object lock = new Object(); private final LeaderElectionDriverFactory leaderElectionDriverFactory; -/** The leader contender which applies for leadership. */ +/** + * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is + * registered that participates in the leader election, yet. See {@link #start(LeaderContender)} + * and {@link #stop()} for lifecycle management. + * + * {@code @Nullable} isn't used here to avoid having multiple warnings spread over this class + * in a supporting IDE. + */ @GuardedBy("lock") -// @Nullable is commented-out to avoid having multiple warnings spread over this class -// this.running=true ensures that leaderContender != null -private volatile LeaderContender leaderContender; +private LeaderContender leaderContender; +/** + * Saves the session ID which was issued by the {@link LeaderElectionDriver} if and only if the + * leadership is acquired by this service. {@code issuedLeaderSessionID} being {@code null} + * indicates that this service isn't the leader right now (i.e. {@link + * #onGrantLeadership(UUID)}) wasn't called, yet (independently of what {@code + * leaderElectionDriver#hasLeadership()} returns). + */ @GuardedBy("lock") @Nullable -private volatile UUID issuedLeaderSessionID; +private UUID issuedLeaderSessionID; +/** + * Saves the leader information for a registered {@link LeaderContender} after this contender + * confirmed the leadership. + */ @GuardedBy("lock") -private volatile LeaderInformation confirmedLeaderInformation; +private LeaderInformation confirmedLeaderInformation; +/** + * {@code leaderElectionDriver} being {@code null} indicates that the connection to the + * LeaderElection backend isn't established, yet. See {@link #startLeaderElectionBackend()} and + * {@link #close()} for lifecycle management. The lifecycle of the driver should have been + * established before registering a {@link LeaderContender} and stopped after the contender has + * been removed. + * + * {@code @Nullable} isn't used here to avoid having multiple warnings spread over this class + * in a supporting IDE. + */ @GuardedBy("lock") -private volatile boolean running; +private volatile LeaderElectionDriver leaderElectionDriver; -// @Nullable is commented-out to avoid having multiple warnings spread over this class -// this.running=true ensures that leaderContender != null -private LeaderElectionDriver leaderElectionDriver; - -private final ExecutorService leadershipOperationExecutor; +@Nullable private ExecutorService leadershipOperationExecutor; Review Comment: args, good idea - I should have gone through the `ExecutorService` interface for such a check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] StefanRRichter commented on a diff in pull request #22584: [FLINK-31963][state] Fix rescaling bug in recovery from unaligned checkpoints.
StefanRRichter commented on code in PR #22584: URL: https://github.com/apache/flink/pull/22584#discussion_r1193856106 ## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java: ## @@ -785,6 +789,130 @@ public void testOnlyUpstreamChannelStateAssignment() } } +/** FLINK-31963: Tests rescaling for stateless operators and upstream result partition state. */ +@Test +public void testOnlyUpstreamChannelRescaleStateAssignment() +throws JobException, JobExecutionException { +Random random = new Random(); +OperatorSubtaskState upstreamOpState = +OperatorSubtaskState.builder() +.setResultSubpartitionState( +new StateObjectCollection<>( +asList( + createNewResultSubpartitionStateHandle(10, random), + createNewResultSubpartitionStateHandle( +10, random +.build(); +testOnlyUpstreamOrDownstreamRescalingInternal(upstreamOpState, null, 5, 7); +} + +/** FLINK-31963: Tests rescaling for stateless operators and downstream input channel state. */ +@Test +public void testOnlyDownstreamChannelRescaleStateAssignment() +throws JobException, JobExecutionException { +Random random = new Random(); +OperatorSubtaskState downstreamOpState = +OperatorSubtaskState.builder() +.setInputChannelState( +new StateObjectCollection<>( +asList( + createNewInputChannelStateHandle(10, random), + createNewInputChannelStateHandle(10, random +.build(); +testOnlyUpstreamOrDownstreamRescalingInternal(null, downstreamOpState, 5, 5); +} + +private void testOnlyUpstreamOrDownstreamRescalingInternal( +@Nullable OperatorSubtaskState upstreamOpState, +@Nullable OperatorSubtaskState downstreamOpState, +int expectedUpstreamCount, +int expectedDownstreamCount) +throws JobException, JobExecutionException { + +if ((upstreamOpState == null && downstreamOpState == null) +|| (upstreamOpState != null && downstreamOpState != null)) { +// Either upstream or downstream state must exist, but not both. +return; +} + +// Start from parallelism 5 for both operators +int upstreamParallelism = 5; +int downstreamParallelism = 5; + +// Build states +List operatorIds = buildOperatorIds(2); +Map states = new HashMap<>(); +OperatorState upstreamState = +new OperatorState(operatorIds.get(0), upstreamParallelism, MAX_P); +OperatorState downstreamState = +new OperatorState(operatorIds.get(1), downstreamParallelism, MAX_P); + +states.put(operatorIds.get(0), upstreamState); +states.put(operatorIds.get(1), downstreamState); + +if (upstreamOpState != null) { +upstreamState.putState(0, upstreamOpState); +// rescale downstream 5 -> 3 +downstreamParallelism = 3; +} + +if (downstreamOpState != null) { +downstreamState.putState(0, downstreamOpState); +// rescale upstream 5 -> 3 +upstreamParallelism = 3; +} + +List opIdWithParallelism = new ArrayList<>(2); +opIdWithParallelism.add( +new OperatorIdWithParallelism(operatorIds.get(0), upstreamParallelism)); +opIdWithParallelism.add( +new OperatorIdWithParallelism(operatorIds.get(1), downstreamParallelism)); + +Map vertices = +buildVertices(opIdWithParallelism, RANGE, ROUND_ROBIN); + +// Run state assignment +new StateAssignmentOperation(0, new HashSet<>(vertices.values()), states, false) +.assignStates(); + +// Check results +ExecutionJobVertex upstreamExecutionJobVertex = vertices.get(operatorIds.get(0)); +ExecutionJobVertex downstreamExecutionJobVertex = vertices.get(operatorIds.get(1)); + +List upstreamRescalingDescriptors = +getRescalingDescriptorsFromVertex(upstreamExecutionJobVertex); +List downstreamRescalingDescriptors = + getRescalingDescriptorsFromVertex(downstreamExecutionJobVertex); + +checkMappings( +upstreamRescalingDescriptors, +TaskStateSnapshot::getOutputRescalingDescriptor, +expectedUpstreamCount); + +checkMappings( +downstreamResc
[jira] [Commented] (FLINK-32094) startScheduling.BATCH performance regression since May 11th
[ https://issues.apache.org/jira/browse/FLINK-32094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722782#comment-17722782 ] Weijie Guo commented on FLINK-32094: master(1.18) via 1a3b539fa43b4e1c8f07accf2d4aa352b7f63858. > startScheduling.BATCH performance regression since May 11th > --- > > Key: FLINK-32094 > URL: https://issues.apache.org/jira/browse/FLINK-32094 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Martijn Visser >Assignee: Yuxin Tan >Priority: Blocker > Fix For: 1.18.0 > > Attachments: image-2023-05-14-22-58-00-886.png, > image-2023-05-15-12-33-56-319.png > > > http://codespeed.dak8s.net:8000/timeline/#/?exe=5&ben=startScheduling.BATCH&extr=on&quarts=on&equid=off&env=2&revs=200 -- This message was sent by Atlassian Jira (v8.20.10#820010)