[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566180#comment-17566180 ] Danny Cranmer commented on FLINK-24229: --- [~prasaanth07] Flink connectors are being moved to new repositories with dedicated release lifecycles and independent versioning strategies. I plan on proposing to target Flink 1.15+ with this connector, so once released you can start using it right away on older Flink versions. > [FLIP-171] DynamoDB implementation of Async Sink > > > Key: FLINK-24229 > URL: https://issues.apache.org/jira/browse/FLINK-24229 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Yuri Gusev >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.16.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. > *Scope:* > * Implement an asynchronous sink for DynamoDB by inheriting the > AsyncSinkBase class. The implementation can for now reside in its own module > in flink-connectors. > * Implement an asynchornous sink writer for DynamoDB by extending the > AsyncSinkWriter. The implementation must deal with failed requests and retry > them using the {{requeueFailedRequestEntry}} method. If possible, the > implementation should batch multiple requests (PutRecordsRequestEntry > objects) to Firehose for increased throughput. The implemented Sink Writer > will be used by the Sink class that will be created as part of this story. > * Java / code-level docs. > * End to end testing: add tests that hits a real AWS instance. (How to best > donate resources to the Flink project to allow this to happen?) > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] trushev commented on pull request #20261: [FLINK-28530] Improvement of extraction of conditions that can be pushed into join inputs
trushev commented on PR #20261: URL: https://github.com/apache/flink/pull/20261#issuecomment-1182786958 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] trushev commented on pull request #20261: [FLINK-28530] Improvement of extraction of conditions that can be pushed into join inputs
trushev commented on PR #20261: URL: https://github.com/apache/flink/pull/20261#issuecomment-1182786906 `CassandraConnectorITCase` failed on CI but it shouldn't have been affected by this changes and it works fine locally. Triggering pipeline again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #311: [FLINK-28479] Add metrics for resource lifecycle state transitions
morhidi commented on code in PR #311: URL: https://github.com/apache/flink-kubernetes-operator/pull/311#discussion_r919663834 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java: ## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.metrics.lifecycle; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram; + +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +import java.time.Clock; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.CREATED; +import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.DEPLOYED; +import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLED_BACK; +import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLING_BACK; +import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.STABLE; +import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.SUSPENDED; +import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.UPGRADING; + +/** + * Utility for tracking resource lifecycle metrics globally and per namespace. + * + * @param Flink resource type. + */ +public class LifecycleMetrics> { + +private static final String TRANSITION_FIRST_DEPLOYMENT = "FirstDeployment"; +private static final String TRANSITION_RESUME = "Resume"; +private static final String TRANSITION_UPGRADE = "Upgrade"; +private static final String TRANSITION_SUSPEND = "Suspend"; +private static final String TRANSITION_SUBMISSION = "Submission"; +private static final String TRANSITION_STABILIZATION = "Stabilization"; +private static final String TRANSITION_ROLLBACK = "Rollback"; + +public static final List TRACKED_TRANSITIONS = getTrackedTransitions(); + +private final Map, ResourceLifecycleMetricTracker> lifecycleTrackers = +new ConcurrentHashMap<>(); +private final Set namespaces = Collections.newSetFromMap(new ConcurrentHashMap<>()); + +private final FlinkConfigManager configManager; +private final Clock clock; +private final KubernetesOperatorMetricGroup operatorMetricGroup; + +private Map>> transitionMetrics; +private Function metricGroupFunction; + +public LifecycleMetrics( +FlinkConfigManager configManager, +Clock clock, +KubernetesOperatorMetricGroup operatorMetricGroup) { +this.configManager = configManager; +this.clock = clock; +this.operatorMetricGroup = operatorMetricGroup; +} + +public void onUpdate(CR cr) { + getLifecycleMetricTracker(cr).onUpdate(cr.getStatus().getLifecycleState(), clock.instant()); +} + +public void onRemove(CR cr) { +lifecycleTrackers.remove( +Tuple2.of(cr.getMetadata().getNamespace(), cr.getMetadata().getName())); +} + +private ResourceLifecycleMetricTracker getLifecycleMetricTracker(CR cr) { +init(cr); +createNamespaceStateCountIfMissing(cr.getMetadata().getNamespace()); +return lifecycleTrackers.computeIfAbsent( +Tuple2.of(cr.getMetadata().getNamespace(), cr.getMetadata().getName()), +k -> { +var initialState = cr.getStatus().getLifecycleState(); +var time = +initialState ==
[GitHub] [flink] wanglijie95 commented on a diff in pull request #20222: [FLINK-28137] Introduce SpeculativeScheduler
wanglijie95 commented on code in PR #20222: URL: https://github.com/apache/flink/pull/20222#discussion_r919661044 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -385,7 +386,8 @@ private SchedulerNG createScheduler( initializationTimestamp, getMainThreadExecutor(), fatalErrorHandler, -jobStatusListener); +jobStatusListener, +new NoOpBlocklistHandler()); Review Comment: Maybe abstract a `BlocklistOperations` and only expose `addNewBlockedNodes` to scheduler -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dinggege1024 commented on pull request #20255: [FLINK-28486][docs-zh] Flink FileSystem SQL Connector Doc is not right
dinggege1024 commented on PR #20255: URL: https://github.com/apache/flink/pull/20255#issuecomment-1182777301 Hi @JingsongLi review this issue,plz. I have updated in this issue by your suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #20222: [FLINK-28137] Introduce SpeculativeScheduler
wanglijie95 commented on code in PR #20222: URL: https://github.com/apache/flink/pull/20222#discussion_r919645557 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java: ## @@ -324,13 +332,24 @@ private void restartTasksWithDelay(final FailureHandlingResult failureHandlingRe .values()); final boolean globalRecovery = failureHandlingResult.isGlobalFailure(); +if (globalRecovery) { +log.info( +"{} tasks will be restarted to recover from a global failure.", +verticesToRestart.size()); +} else { + checkArgument(failureHandlingResult.getFailedExecution().isPresent()); +log.info( +"{} tasks will be restarted to recover the failed task {}.", Review Comment: The printed task id was changed from `ExecutionVertexID` to `ExecutionAttemptID`. But I think it is more reasonable than before. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java: ## @@ -220,46 +217,57 @@ protected void startSchedulingInternal() { } @Override -protected void updateTaskExecutionStateInternal( -final ExecutionVertexID executionVertexId, -final TaskExecutionStateTransition taskExecutionState) { +protected void onTaskExecutionStateUpdate(final Execution execution) { +switch (execution.getState()) { +case FINISHED: +onTaskFinished(execution); +break; +case FAILED: +onTaskFailed(execution); +break; +default: +throw new IllegalArgumentException( +String.format( +"State %s should not be notified to DefaultScheduler.", +execution.getState())); +} +} +protected void onTaskFinished(final Execution execution) { +checkState(execution.getState() == ExecutionState.FINISHED); + +final ExecutionVertexID executionVertexId = execution.getVertex().getID(); // once a task finishes, it will release the assigned allocation/slot and no longer // needs it. Therefore, it should stop reserving the slot so that other tasks are // possible to use the slot. Ideally, the `stopReserveAllocation` should happen // along with the release slot process. However, that process is hidden in the depth // of the ExecutionGraph, so we currently do it in DefaultScheduler after that process // is done. -if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) { -stopReserveAllocation(executionVertexId); -} +stopReserveAllocation(executionVertexId); -schedulingStrategy.onExecutionStateChange( -executionVertexId, taskExecutionState.getExecutionState()); -maybeHandleTaskFailure(taskExecutionState, getCurrentExecutionOfVertex(executionVertexId)); +schedulingStrategy.onExecutionStateChange(executionVertexId, ExecutionState.FINISHED); Review Comment: The `SchedulingStrategy#onExecutionStateChange` is only called when a task finishes currently, how about change it to `onExecutionFinish` ? Or call it for all state changes? Otherwise, it may confuse subsequent developers. ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java: ## @@ -85,6 +85,10 @@ public Collection getCurrentExecutions() { return Collections.unmodifiableCollection(currentExecutions.values()); } +public Execution getCurrentExecutionOrThrow(final ExecutionAttemptID attemptId) { Review Comment: never be used ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java: ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.api.common.time.Time; +import
[jira] [Commented] (FLINK-28531) Shutdown cluster after history server archive finished
[ https://issues.apache.org/jira/browse/FLINK-28531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566162#comment-17566162 ] Xintong Song commented on FLINK-28531: -- [~aitozi], Which version of Flink did you use when encountered this problem? Could it be already fixed by FLINK-24491? > Shutdown cluster after history server archive finished > -- > > Key: FLINK-28531 > URL: https://issues.apache.org/jira/browse/FLINK-28531 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Aitozi >Priority: Major > > I met a problem that the job cluster may be shutdown with history server > archive file upload not finished. > After some research, It's may be caused by two reason. > First, the {{HistoryServerArchivist#archiveExecutionGraph}} is not wait to > complete > Second, the deregisterApp in the > {{KubernetesResourceManagerDriver#deregisterApplication}} will directly > remove the deployment. So in the shutdown flow in ClusterEntrypoint, it will > first trigger the delete deployment, it will cause the master pod deleted > with some operation/future can not finished -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #316: [FLINK-28517] Bump Flink version to 1.15.1
morhidi commented on code in PR #316: URL: https://github.com/apache/flink-kubernetes-operator/pull/316#discussion_r919652468 ## flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java: ## @@ -293,7 +293,7 @@ public void testClusterInfoRestCompatibility() throws JsonProcessingException { String flink14Response = "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609 @ 2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; String flink15Response = -"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; +"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.1\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; Review Comment: see my comment above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #316: [FLINK-28517] Bump Flink version to 1.15.1
morhidi commented on code in PR #316: URL: https://github.com/apache/flink-kubernetes-operator/pull/316#discussion_r919652210 ## flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java: ## @@ -293,7 +293,7 @@ public void testClusterInfoRestCompatibility() throws JsonProcessingException { String flink14Response = "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609 @ 2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; String flink15Response = -"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; +"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.1\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; Review Comment: We could potentially remove this test, since the e2e tests would cover the functionality. I guess I'll just remove the flink15Response check since the schema didn't change between 1.14 and 1.15. It's good to have a test showing the schema changes tho. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #316: [FLINK-28517] Bump Flink version to 1.15.1
morhidi commented on code in PR #316: URL: https://github.com/apache/flink-kubernetes-operator/pull/316#discussion_r919652210 ## flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java: ## @@ -293,7 +293,7 @@ public void testClusterInfoRestCompatibility() throws JsonProcessingException { String flink14Response = "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609 @ 2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; String flink15Response = -"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; +"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.1\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; Review Comment: We could potentially remove this test, e2e test would cover the functionality. I guess I'll just remove the flink15Response check since the schema didn't change between 1.14 and 1.15. It's good to have a test showing the schema changes tho. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #210: [FLINK-28511] Improve Spark dependencies and document
LadyForest commented on code in PR #210: URL: https://github.com/apache/flink-table-store/pull/210#discussion_r919649696 ## docs/content/docs/engines/spark.md: ## @@ -41,17 +41,33 @@ Download [flink-table-store-spark-{{< version >}}.jar](https://repo.maven.apache You are using an unreleased version of Table Store, you need to manually [Build Spark Bundled Jar]({{< ref "docs/engines/build" >}}) from the source code. {{< /unstable >}} -Copy Table Store Spark bundle jar to `spark/jars`. +Use `--jars` in spark-sql: +```bash +spark-sql ... --jars flink-table-store-spark-{{< version >}}.jar +``` -## Table Store Catalog +You can also copy `flink-table-store-spark-{{< version >}}.jar` to `spark/jars` in your Spark installation. + +## Catalog The following command registers the Table Store's Spark catalog with the name `table_store`: ```bash -spark-sql --conf spark.sql.catalog.table_store=org.apache.flink.table.store.spark.SparkCatalog \ +spark-sql ... \ +--conf spark.sql.catalog.table_store=org.apache.flink.table.store.spark.SparkCatalog \ --conf spark.sql.catalog.table_store.warehouse=file:/tmp/warehouse ``` +If you are using the Hive Metastore, you will need to add some configuration: Review Comment: ```suggestion Some extra configurations are needed if your Spark application uses the Hive Metastore to manage metadata. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuzhuang2017 commented on pull request #20253: [hotfix][docs-zh] Add missing the working_directory.md file to the standalone part.
liuzhuang2017 commented on PR #20253: URL: https://github.com/apache/flink/pull/20253#issuecomment-1182767837 @MartijnVisser , Sorry to bother you again, can you help me review this pr? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566154#comment-17566154 ] Prasaanth commented on FLINK-24229: --- Hi all, excited to see Flink supporting a DynamoDB sink natively. Is 1.17 the Flink version where we can expect this? Looks like Flink 1.16 has a release freeze on the 25th of July this year. > [FLIP-171] DynamoDB implementation of Async Sink > > > Key: FLINK-24229 > URL: https://issues.apache.org/jira/browse/FLINK-24229 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Yuri Gusev >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.16.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. > *Scope:* > * Implement an asynchronous sink for DynamoDB by inheriting the > AsyncSinkBase class. The implementation can for now reside in its own module > in flink-connectors. > * Implement an asynchornous sink writer for DynamoDB by extending the > AsyncSinkWriter. The implementation must deal with failed requests and retry > them using the {{requeueFailedRequestEntry}} method. If possible, the > implementation should batch multiple requests (PutRecordsRequestEntry > objects) to Firehose for increased throughput. The implemented Sink Writer > will be used by the Sink class that will be created as part of this story. > * Java / code-level docs. > * End to end testing: add tests that hits a real AWS instance. (How to best > donate resources to the Flink project to allow this to happen?) > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #210: [FLINK-28511] Improve Spark dependencies and document
LadyForest commented on code in PR #210: URL: https://github.com/apache/flink-table-store/pull/210#discussion_r919642060 ## docs/content/docs/engines/spark.md: ## @@ -41,17 +41,33 @@ Download [flink-table-store-spark-{{< version >}}.jar](https://repo.maven.apache You are using an unreleased version of Table Store, you need to manually [Build Spark Bundled Jar]({{< ref "docs/engines/build" >}}) from the source code. {{< /unstable >}} -Copy Table Store Spark bundle jar to `spark/jars`. +Use `--jars` in spark-sql: +```bash +spark-sql ... --jars flink-table-store-spark-{{< version >}}.jar +``` -## Table Store Catalog +You can also copy `flink-table-store-spark-{{< version >}}.jar` to `spark/jars` in your Spark installation. Review Comment: ```suggestion Alternatively, you can copy `flink-table-store-spark-{{< version >}}.jar` under `spark/jars` in your Spark installation. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] swuferhong commented on a diff in pull request #20008: [FLINK-27990][table-planner] Parquet format supports reporting statis…
swuferhong commented on code in PR #20008: URL: https://github.com/apache/flink/pull/20008#discussion_r919636056 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StatisticsReportTestBase.java: ## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.utils; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.utils.DateTimeUtils; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelVisitor; +import org.apache.calcite.rel.core.TableScan; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; + +/** The base class for statistics report testing. */ +public abstract class StatisticsReportTestBase extends TestLogger { + +protected TableEnvironment tEnv; +protected File folder; + +@BeforeEach +public void setup(@TempDir File file) throws Exception { +folder = file; +tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); +} + +@AfterEach +public void after() { +TestValuesTableFactory.clearAllData(); +} + +protected void createFileSystemSource(String format) { +// now format can be parquet, orc and csv. +String[] properties; +if (format.equals("orc")) { +properties = formatOrcProperties(); +} else if (format.equals("parquet")) { +properties = formatParquetProperties(); +} else { +// default is Csv format +properties = formatCsvProperties(); +} + +String ddl1 = +String.format( +"CREATE TABLE sourceTable (\n" ++ "%s" ++ ") with (\n" ++ " 'connector' = 'filesystem'," ++ " 'path' = '%s'," ++ "%s )", +String.join(",\n", ddlTypesMapToStringList(ddlTypesMap())), +folder, +String.join(",\n", properties)); +tEnv.executeSql(ddl1); +} + +private String[] formatParquetProperties() { +List ret = new ArrayList<>(); +ret.add("'format'='parquet'"); +ret.add("'parquet.utc-timezone'='true'"); +ret.add("'parquet.compression'='gzip'"); +return ret.toArray(new String[0]); +} + +private String[] formatOrcProperties() { +List ret = new ArrayList<>(); +ret.add("'format'='orc'"); +ret.add("'orc.compress'='snappy'"); +return ret.toArray(new String[0]); +} + +private String[] formatCsvProperties() { +List ret = new ArrayList<>(); +ret.add("'format' = 'csv'"); +return ret.toArray(new String[0]); +} + +protected Map ddlTypesMap() { +Map ddlTypesMap = new LinkedHashMap<>(); +ddlTypesMap.put("boolean", "a"); +ddlTypesMap.put("tinyint", "b"); +ddlTypesMap.put("smallint", "c"); +ddlTypesMap.put("int", "d"); +ddlTypesMap.put("bigint", "e"); +ddlTypesMap.put("float", "f"); +ddlTypesMap.put("double", "g"); +ddlTypesMap.put("string", "h"); +
[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook
gaoyunhaii commented on code in PR #20223: URL: https://github.com/apache/flink/pull/20223#discussion_r919606689 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusHook.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; + +import java.io.Serializable; + +/** + * Hooks provided by users on job status changing. Triggered at the initial(CREATED) and final + * state(FINISHED/CANCELED/FAILED) of the job. + * + * Usage examples: + * StreamGraph streamGraph = env.getStreamGraph(); + * streamGraph.registerJobStatusHook(myJobStatusHook); + * streamGraph.setJobName("my_flink"); + * env.execute(streamGraph); + * + */ +@Internal +public interface JobStatusHook extends Serializable { + +/** When Job become {@link JobStatus#CREATED} status, it would only be called one time. */ Review Comment: `become` -> `becomes` nit: import JobStatus to avoid warnings ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ## @@ -1618,6 +1621,141 @@ public void testCheckpointCleanerIsClosedAfterCheckpointServices() throws Except } } +@Test +public void testJobStatusHookWithJobFailed() throws Exception { Review Comment: The three test methods seem quite similar, could we extract a common help method? ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java: ## @@ -628,4 +632,12 @@ public void writeUserArtifactEntriesToConfiguration() { userArtifact.getKey(), userArtifact.getValue(), jobConfiguration); } } + +public void setJobStatusHooks(List hooks) { +this.jobStatusHooks = hooks; Review Comment: Might also `checkNotNull` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaoyunhaii commented on pull request #20239: [FLINK-27703][core] Extend timeout and add logs for FileChannelManagerImplTest
gaoyunhaii commented on PR #20239: URL: https://github.com/apache/flink/pull/20239#issuecomment-1182736598 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #316: [FLINK-28517] Bump Flink version to 1.15.1
morhidi commented on code in PR #316: URL: https://github.com/apache/flink-kubernetes-operator/pull/316#discussion_r919620212 ## flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java: ## @@ -293,7 +293,7 @@ public void testClusterInfoRestCompatibility() throws JsonProcessingException { String flink14Response = "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609 @ 2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; String flink15Response = -"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; +"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.1\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; Review Comment: good point, I'll have a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] DavidLiu001 commented on pull request #20143: [FLINK-25735][content.zh] Chinese Translation - Add documentation for…
DavidLiu001 commented on PR #20143: URL: https://github.com/apache/flink/pull/20143#issuecomment-1182728130 @MartijnVisser The comment issues have been resolved, if there is no further comment, please help merge this PR. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #211: [FLINK-28465] commit.force-compact should always be true under batch mode
SteNicholas commented on code in PR #211: URL: https://github.com/apache/flink-table-store/pull/211#discussion_r919616462 ## flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java: ## @@ -132,9 +133,14 @@ private static void validateFileStoreContinuous(Configuration options) { } static FileStoreTable buildFileStoreTable(DynamicTableFactory.Context context) { -FileStoreTable table = -FileStoreTableFactory.create( - Configuration.fromMap(context.getCatalogTable().getOptions())); +boolean batch = +context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) +== RuntimeExecutionMode.BATCH; +Map options = new HashMap<>(context.getCatalogTable().getOptions()); +if (batch) { +options.put(CoreOptions.COMMIT_FORCE_COMPACT.key(), String.valueOf(true)); Review Comment: Does the config option `COMMIT_FORCE_COMPACT` need to explain in the description that the default value is true in BATCH mode? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28515) The files in local recovery directory hasn't be clean up properly after checkpoint abort
[ https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566131#comment-17566131 ] Jinzhong Li commented on FLINK-28515: - [~roman] [~yunta] Could you please take a look at this ticket? > The files in local recovery directory hasn't be clean up properly after > checkpoint abort > > > Key: FLINK-28515 > URL: https://issues.apache.org/jira/browse/FLINK-28515 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Jinzhong Li >Priority: Major > Labels: pull-request-available > Attachments: C7245668-CE31-4F56-B9CB-12E2F1E900C5.png, image.png > > > In my case, i found that some files in local recovery directory hasn't be > clean up properly after checkpoint abort(as shown in the attached picture). > By analyzing flink log, I found that when stateBackend completes the local > snapshot but the task has not completed the whole snapshot, > then checkpoint is aborted (caused by checkpoint timeout or netword-error), > files in the local directory directory may not be cleaned up properly. > I think the reason for local snapshot file residual is: > (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, > the comleted localSnapshot info can be registered into > org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task has > completed the whole snapshot. > ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]). > (2) If stateBackend completes the local snapshot but the task has not > completed the entire snapshot, when checkpoint-aborting is triggered, the > TaskLocalStateStore can't clean up the unregistered localSnapshot files. > ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301]) > To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, > we can try to delete the corresponding localRecovery directory, even if the > checkpoint is not unregistered into TaskLocalStateStoreImpl. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #211: [FLINK-28465] commit.force-compact should always be true under batch mode
SteNicholas commented on code in PR #211: URL: https://github.com/apache/flink-table-store/pull/211#discussion_r919615647 ## flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java: ## @@ -132,9 +133,14 @@ private static void validateFileStoreContinuous(Configuration options) { } static FileStoreTable buildFileStoreTable(DynamicTableFactory.Context context) { -FileStoreTable table = -FileStoreTableFactory.create( - Configuration.fromMap(context.getCatalogTable().getOptions())); +boolean batch = +context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) +== RuntimeExecutionMode.BATCH; +Map options = new HashMap<>(context.getCatalogTable().getOptions()); Review Comment: ```suggestion Configuration configuration = Configuration.fromMap(context.getCatalogTable().getOptions()); configuration.setBoolean(CoreOptions.COMMIT_FORCE_COMPACT, batch); FileStoreTable table = FileStoreTableFactory.create(configuration); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-28515) The files in local recovery directory hasn't be clean up properly after checkpoint abort
[ https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566127#comment-17566127 ] Yanfei Lei edited comment on FLINK-28515 at 7/13/22 3:20 AM: - I think this is reasonable. Although TM will delete other checkpoints [when recovering|#L199],] , if JM re-allocate the task, the files in local recovery directory of aborted checkpoint would not be clean up properly, it's right to delete early. was (Author: yanfei lei): I think this is reasonable. Although TM will delete other checkpoints [when recovering|[https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java#L199],] if JM re-allocate the task, the files in local recovery directory of aborted checkpoint would not be clean up properly, it's right to delete early. > The files in local recovery directory hasn't be clean up properly after > checkpoint abort > > > Key: FLINK-28515 > URL: https://issues.apache.org/jira/browse/FLINK-28515 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Jinzhong Li >Priority: Major > Labels: pull-request-available > Attachments: C7245668-CE31-4F56-B9CB-12E2F1E900C5.png, image.png > > > In my case, i found that some files in local recovery directory hasn't be > clean up properly after checkpoint abort(as shown in the attached picture). > By analyzing flink log, I found that when stateBackend completes the local > snapshot but the task has not completed the whole snapshot, > then checkpoint is aborted (caused by checkpoint timeout or netword-error), > files in the local directory directory may not be cleaned up properly. > I think the reason for local snapshot file residual is: > (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, > the comleted localSnapshot info can be registered into > org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task has > completed the whole snapshot. > ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]). > (2) If stateBackend completes the local snapshot but the task has not > completed the entire snapshot, when checkpoint-aborting is triggered, the > TaskLocalStateStore can't clean up the unregistered localSnapshot files. > ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301]) > To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, > we can try to delete the corresponding localRecovery directory, even if the > checkpoint is not unregistered into TaskLocalStateStoreImpl. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-28515) The files in local recovery directory hasn't be clean up properly after checkpoint abort
[ https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566127#comment-17566127 ] Yanfei Lei edited comment on FLINK-28515 at 7/13/22 3:20 AM: - I think this is reasonable. Although TM will delete other checkpoints [when recovering|#L199], if JM re-allocate the task, the files in local recovery directory of aborted checkpoint would not be clean up properly, it's right to delete early. was (Author: yanfei lei): I think this is reasonable. Although TM will delete other checkpoints [when recovering|#L199],] , if JM re-allocate the task, the files in local recovery directory of aborted checkpoint would not be clean up properly, it's right to delete early. > The files in local recovery directory hasn't be clean up properly after > checkpoint abort > > > Key: FLINK-28515 > URL: https://issues.apache.org/jira/browse/FLINK-28515 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Jinzhong Li >Priority: Major > Labels: pull-request-available > Attachments: C7245668-CE31-4F56-B9CB-12E2F1E900C5.png, image.png > > > In my case, i found that some files in local recovery directory hasn't be > clean up properly after checkpoint abort(as shown in the attached picture). > By analyzing flink log, I found that when stateBackend completes the local > snapshot but the task has not completed the whole snapshot, > then checkpoint is aborted (caused by checkpoint timeout or netword-error), > files in the local directory directory may not be cleaned up properly. > I think the reason for local snapshot file residual is: > (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, > the comleted localSnapshot info can be registered into > org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task has > completed the whole snapshot. > ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]). > (2) If stateBackend completes the local snapshot but the task has not > completed the entire snapshot, when checkpoint-aborting is triggered, the > TaskLocalStateStore can't clean up the unregistered localSnapshot files. > ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301]) > To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, > we can try to delete the corresponding localRecovery directory, even if the > checkpoint is not unregistered into TaskLocalStateStoreImpl. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Aitozi commented on pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving
Aitozi commented on PR #20256: URL: https://github.com/apache/flink/pull/20256#issuecomment-1182719574 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28515) The files in local recovery directory hasn't be clean up properly after checkpoint abort
[ https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566127#comment-17566127 ] Yanfei Lei commented on FLINK-28515: I think this is reasonable. Although TM will delete other checkpoints [when recovering|[https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java#L199],] if JM re-allocate the task, the files in local recovery directory of aborted checkpoint would not be clean up properly, it's right to delete early. > The files in local recovery directory hasn't be clean up properly after > checkpoint abort > > > Key: FLINK-28515 > URL: https://issues.apache.org/jira/browse/FLINK-28515 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Jinzhong Li >Priority: Major > Labels: pull-request-available > Attachments: C7245668-CE31-4F56-B9CB-12E2F1E900C5.png, image.png > > > In my case, i found that some files in local recovery directory hasn't be > clean up properly after checkpoint abort(as shown in the attached picture). > By analyzing flink log, I found that when stateBackend completes the local > snapshot but the task has not completed the whole snapshot, > then checkpoint is aborted (caused by checkpoint timeout or netword-error), > files in the local directory directory may not be cleaned up properly. > I think the reason for local snapshot file residual is: > (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, > the comleted localSnapshot info can be registered into > org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task has > completed the whole snapshot. > ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]). > (2) If stateBackend completes the local snapshot but the task has not > completed the entire snapshot, when checkpoint-aborting is triggered, the > TaskLocalStateStore can't clean up the unregistered localSnapshot files. > ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301]) > To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, > we can try to delete the corresponding localRecovery directory, even if the > checkpoint is not unregistered into TaskLocalStateStoreImpl. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] leozhangsr commented on pull request #20234: [FLINK-28475] [Connector/kafka] Stopping offset can be 0
leozhangsr commented on PR #20234: URL: https://github.com/apache/flink/pull/20234#issuecomment-1182716948 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28531) Shutdown cluster after history server archive finished
[ https://issues.apache.org/jira/browse/FLINK-28531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566124#comment-17566124 ] Aitozi commented on FLINK-28531: cc [~wangyang0918] [~xtsong] > Shutdown cluster after history server archive finished > -- > > Key: FLINK-28531 > URL: https://issues.apache.org/jira/browse/FLINK-28531 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Aitozi >Priority: Major > > I met a problem that the job cluster may be shutdown with history server > archive file upload not finished. > After some research, It's may be caused by two reason. > First, the {{HistoryServerArchivist#archiveExecutionGraph}} is not wait to > complete > Second, the deregisterApp in the > {{KubernetesResourceManagerDriver#deregisterApplication}} will directly > remove the deployment. So in the shutdown flow in ClusterEntrypoint, it will > first trigger the delete deployment, it will cause the master pod deleted > with some operation/future can not finished -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28122) Translate "Overview " and "Project Configuration" in "User-defined Sources & Sinks" page
[ https://issues.apache.org/jira/browse/FLINK-28122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-28122: --- Assignee: hunter > Translate "Overview " and "Project Configuration" in "User-defined Sources & > Sinks" page > - > > Key: FLINK-28122 > URL: https://issues.apache.org/jira/browse/FLINK-28122 > Project: Flink > Issue Type: Improvement > Components: chinese-translation, Documentation >Reporter: Chengkai Yang >Assignee: hunter >Priority: Minor > > The links are > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#overview > and > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#project-configuration -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28531) Shutdown cluster after history server archive finished
[ https://issues.apache.org/jira/browse/FLINK-28531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566123#comment-17566123 ] Aitozi commented on FLINK-28531: I propose to fix this in two way: First, in the Dispatcher, we also add the archive future to the jobTerminationFuture to let it be finished when shutdown. Second, avoid to delete the master pod in the deregisterApp, and delete the cluster until the ClusterEntrypoint terminationFuture have finished. > Shutdown cluster after history server archive finished > -- > > Key: FLINK-28531 > URL: https://issues.apache.org/jira/browse/FLINK-28531 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Aitozi >Priority: Major > > I met a problem that the job cluster may be shutdown with history server > archive file upload not finished. > After some research, It's may be caused by two reason. > First, the {{HistoryServerArchivist#archiveExecutionGraph}} is not wait to > complete > Second, the deregisterApp in the > {{KubernetesResourceManagerDriver#deregisterApplication}} will directly > remove the deployment. So in the shutdown flow in ClusterEntrypoint, it will > first trigger the delete deployment, it will cause the master pod deleted > with some operation/future can not finished -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28529) ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode failed with CheckpointException: Checkpoint expired before comple
[ https://issues.apache.org/jira/browse/FLINK-28529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566122#comment-17566122 ] Yanfei Lei commented on FLINK-28529: Sure, look like it's caused by checkpoint expired, I will check it. > ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode > failed with CheckpointException: Checkpoint expired before completing > --- > > Key: FLINK-28529 > URL: https://issues.apache.org/jira/browse/FLINK-28529 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Yanfei Lei >Priority: Critical > Labels: test-stability > > {code:java} > 2022-07-12T04:30:49.9912088Z Jul 12 04:30:49 [ERROR] > ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode > Time elapsed: 617.048 s <<< ERROR! > 2022-07-12T04:30:49.9913108Z Jul 12 04:30:49 > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired > before completing. > 2022-07-12T04:30:49.9913880Z Jul 12 04:30:49 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > 2022-07-12T04:30:49.9914606Z Jul 12 04:30:49 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > 2022-07-12T04:30:49.9915572Z Jul 12 04:30:49 at > org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode(ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:125) > 2022-07-12T04:30:49.9916483Z Jul 12 04:30:49 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-07-12T04:30:49.9917377Z Jul 12 04:30:49 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-07-12T04:30:49.9918121Z Jul 12 04:30:49 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-07-12T04:30:49.9918788Z Jul 12 04:30:49 at > java.lang.reflect.Method.invoke(Method.java:498) > 2022-07-12T04:30:49.9919456Z Jul 12 04:30:49 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2022-07-12T04:30:49.9920193Z Jul 12 04:30:49 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2022-07-12T04:30:49.9920923Z Jul 12 04:30:49 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > 2022-07-12T04:30:49.9921630Z Jul 12 04:30:49 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2022-07-12T04:30:49.9922326Z Jul 12 04:30:49 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2022-07-12T04:30:49.9923023Z Jul 12 04:30:49 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2022-07-12T04:30:49.9923708Z Jul 12 04:30:49 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > 2022-07-12T04:30:49.9924449Z Jul 12 04:30:49 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2022-07-12T04:30:49.9925124Z Jul 12 04:30:49 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > 2022-07-12T04:30:49.9925912Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-07-12T04:30:49.9926742Z Jul 12 04:30:49 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > 2022-07-12T04:30:49.9928142Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > 2022-07-12T04:30:49.9928715Z Jul 12 04:30:49 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > 2022-07-12T04:30:49.9929311Z Jul 12 04:30:49 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > 2022-07-12T04:30:49.9929863Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > 2022-07-12T04:30:49.9930376Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > 2022-07-12T04:30:49.9930911Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > 2022-07-12T04:30:49.9931441Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > 2022-07-12T04:30:49.9931975Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > 2022-07-12T04:30:49.9932493Z Jul 12 04:30:49 at >
[GitHub] [flink-kubernetes-operator] bgeng777 commented on a diff in pull request #316: [FLINK-28517] Bump Flink version to 1.15.1
bgeng777 commented on code in PR #316: URL: https://github.com/apache/flink-kubernetes-operator/pull/316#discussion_r919604652 ## flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java: ## @@ -293,7 +293,7 @@ public void testClusterInfoRestCompatibility() throws JsonProcessingException { String flink14Response = "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609 @ 2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; String flink15Response = -"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; +"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.1\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; Review Comment: Maybe the fink-revision number/date should be changed to 1.15.1's as well. The revision number should be [f494be6](https://github.com/apache/flink/commit/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-28531) Shutdown cluster after history server archive finished
Aitozi created FLINK-28531: -- Summary: Shutdown cluster after history server archive finished Key: FLINK-28531 URL: https://issues.apache.org/jira/browse/FLINK-28531 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: Aitozi I met a problem that the job cluster may be shutdown with history server archive file upload not finished. After some research, It's may be caused by two reason. First, the {{HistoryServerArchivist#archiveExecutionGraph}} is not wait to complete Second, the deregisterApp in the {{KubernetesResourceManagerDriver#deregisterApplication}} will directly remove the deployment. So in the shutdown flow in ClusterEntrypoint, it will first trigger the delete deployment, it will cause the master pod deleted with some operation/future can not finished -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] bgeng777 commented on a diff in pull request #316: [FLINK-28517] Bump Flink version to 1.15.1
bgeng777 commented on code in PR #316: URL: https://github.com/apache/flink-kubernetes-operator/pull/316#discussion_r919604652 ## flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java: ## @@ -293,7 +293,7 @@ public void testClusterInfoRestCompatibility() throws JsonProcessingException { String flink14Response = "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609 @ 2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; String flink15Response = -"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; +"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.1\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; Review Comment: Maybe the fink-revision number/date should be changed 1.15.1's as well. The revision number should be [f494be6](https://github.com/apache/flink/commit/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #316: [FLINK-28517] Bump Flink version to 1.15.1
wangyang0918 commented on code in PR #316: URL: https://github.com/apache/flink-kubernetes-operator/pull/316#discussion_r919604132 ## flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java: ## @@ -293,7 +293,7 @@ public void testClusterInfoRestCompatibility() throws JsonProcessingException { String flink14Response = "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609 @ 2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; String flink15Response = -"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; +"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.1\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; Review Comment: Do we really need to bump the version in the tests every time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20261: [FLINK-28530] Improvement of extraction of conditions that can be pushed into join inputs
flinkbot commented on PR #20261: URL: https://github.com/apache/flink/pull/20261#issuecomment-1182705940 ## CI report: * 221a9ecd46069d80583c766bf39f095714801f88 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Tartarus0zm commented on pull request #20252: [FLINK-28463][flink-sql-parser] Flink dialect supports CREATE TABLE AS SELECT(CTAS) syntax
Tartarus0zm commented on PR #20252: URL: https://github.com/apache/flink/pull/20252#issuecomment-1182705141 @wuchong @godfreyhe @lsyldliu @luoyuxia @beyond1920 please take a look, if you have time, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28122) Translate "Overview " and "Project Configuration" in "User-defined Sources & Sinks" page
[ https://issues.apache.org/jira/browse/FLINK-28122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566118#comment-17566118 ] hunter commented on FLINK-28122: [~jark] Hi,Jark,I am very interested to do it, I can do this work if needed. > Translate "Overview " and "Project Configuration" in "User-defined Sources & > Sinks" page > - > > Key: FLINK-28122 > URL: https://issues.apache.org/jira/browse/FLINK-28122 > Project: Flink > Issue Type: Improvement > Components: chinese-translation, Documentation >Reporter: Chengkai Yang >Priority: Minor > > The links are > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#overview > and > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#project-configuration -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] godfreyhe commented on a diff in pull request #20246: [FLINK-28074][table-planner] show statistics details for DESCRIBE EXT…
godfreyhe commented on code in PR #20246: URL: https://github.com/apache/flink/pull/20246#discussion_r919600208 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala: ## @@ -397,7 +398,7 @@ class TableEnvironmentTest { () => tableEnv.executeSql("SELECT c FROM my_view /*+ OPTIONS('is-bounded' = 'true') */")) .hasMessageContaining("View '`default_catalog`.`default_database`.`my_view`' " + "cannot be enriched with new options. Hints can only be applied to tables.") - .isInstanceOf(classOf[ValidationException]) + .isInstanceOf[ValidationException] Review Comment: `isInstanceOf` is the method of `AbstractAssert` instead of `scala`, please revert the change ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/print/DescExtendedStyle.java: ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils.print; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.utils.EncodingUtils; + +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +/** + * Print desc extended result. + * + * For example: (printRowKind is false) + * + * + * col_name data_type hasNull + * firstSTRINGTRUE + * second INT TRUE + * thirdSTRINGTRUE + * + * # Detailed Table Information + * Table Statistics: + *rowCount:10 + *fileCount: 2 + *rawDataSize: 100 + *totalSize: 10 + * + * + * From this example, we can see that this print style can print different contents in different + * parts and calculate the column widths for different parts respectively. + */ +@Internal +public final class DescExtendedStyle implements PrintStyle { Review Comment: I would like introduce a combined `PrintStyle`, which contains multiple `PrintStyle`s, each of which can have its own style. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28530) Improvement of extraction of conditions that can be pushed into join inputs
[ https://issues.apache.org/jira/browse/FLINK-28530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28530: --- Labels: pull-request-available (was: ) > Improvement of extraction of conditions that can be pushed into join inputs > --- > > Key: FLINK-28530 > URL: https://issues.apache.org/jira/browse/FLINK-28530 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Alexander Trushev >Priority: Major > Labels: pull-request-available > > Conditions extraction in batch mode was introduced here FLINK-12509 and in > stream mode here FLINK-24139 > h2. Proposal > This ticket is aimed at replacing current extraction algorithm with new one > which covers more complex case with deep nested predicate: > for all n > 0 > a0 and b0) or a1) and b1) or a2) and b2) or a3) ... and bn-1) or an) > => (a0 or a1 or ... or an) > *Example.* For n = 3 Flink does not extract (a0 or a1 or a2 or a3): > {code:java} > FlinkSQL> explain select * from A join B on (a0=0 and b0=0) or a1=0) and > b1=0) or a2=0) and b2=0) or a3=0; > == Optimized Physical Plan == > Join(joinType=[InnerJoin], where=[OR(AND(OR(AND(OR(AND(=(a0, 0), =(b0, 0)), > =(a1, 0)), =(b1, 0)), =(a2, 0)), =(b2, 0)), =(a3, 0))], select=[a0, a1, a2, > a3, a4, b0, b1, b2, b3, b4], leftInputSpec=[NoUniqueKey], > rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[single]) > : +- TableSourceScan(table=[[default_catalog, default_database, A]], > fields=[a0, a1, a2, a3, a4]) > +- Exchange(distribution=[single]) >+- TableSourceScan(table=[[default_catalog, default_database, B]], > fields=[b0, b1, b2, b3, b4]) > {code} > while PostgreSQL does: > {code:java} > postgres=# explain select * from A join B on ((a0=0 and b0=0) or a1=0) > and b1=0) or a2=0) and b2=0) or a3=0); > QUERY PLAN > -- > Nested Loop (cost=0.00..1805.09 rows=14632 width=40) >Join Filter: (((a.a0 = 0) AND (b.b0 = 0)) OR (a.a1 = 0)) AND (b.b1 = > 0)) OR (a.a2 = 0)) AND (b.b2 = 0)) OR (a.a3 = 0)) >-> Seq Scan on b (cost=0.00..27.00 rows=1700 width=20) >-> Materialize (cost=0.00..44.17 rows=34 width=20) > -> Seq Scan on a (cost=0.00..44.00 rows=34 width=20) >Filter: ((a0 = 0) OR (a1 = 0) OR (a2 = 0) OR (a3 = 0)) > {code} > h2. Details > Pseudocode of new algorithm: > f – predicate > rel – table > var(rel) – columns > {code:java} > extract(f, rel) > if f = AND(left, right) > return AND(extract(left, rel), extract(left, rel)) > if f = OR(left, right) > return OR(extract(left, rel), extract(left, rel)) > if var(f) subsetOf var(rel) > return f > return True > AND(f, True) = AND(True, f) = f > OR(f, True) = OR(True, f) = True > {code} > This algorithm covers deep nested predicates and does not use CNF which > increases length of predicate to O(n * e^n) in the worst case. > The same recursive approache is used in [PostgreSQL > orclauses.c|https://github.com/postgres/postgres/blob/164d174bbf9a3aba719c845497863cd3c49a3ad0/src/backend/optimizer/util/orclauses.c#L151-L252] > and [Apache Spark > predicates.scala|https://github.com/apache/spark/blob/v3.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L227-L272] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] trushev opened a new pull request, #20261: [FLINK-28530] Improvement of extraction of conditions that can be pushed into join inputs
trushev opened a new pull request, #20261: URL: https://github.com/apache/flink/pull/20261 ## What is the purpose of the change This PR is aimed at replacing current extraction algorithm with new one which covers more complex case with deep nested predicate: for all n > 0 a0 and b0) or a1) and b1) or a2) and b2) or a3) ... and bn-1) or an) => (a0 or a1 or ... or an) ## Brief change log - Introduced new algorithm placed in `FlinkRexExtract` ## Verifying this change This change added tests and can be verified as follows: - Added unit tests for extraction `FlinkRexExtractTest` - Added test to `JoinTest` to verify changed sql plan ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28265) Inconsistency in Kubernetes HA service: broken state handle
[ https://issues.apache.org/jira/browse/FLINK-28265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566116#comment-17566116 ] Yang Wang commented on FLINK-28265: --- Since we always add the new checkpoint first and then subsume the oldest one, I am curious how it could happen we only have one checkpoint which is invalid. If adding the new checkpoint failed, we should have the old successful checkpoint. On the contrary, if subsuming the oldest one failed, we should still have the newly added checkpoint. Could you please verify the checkpoint 9701 or 9703 exists on the S3? I believe the the logs of previous run(e.g. kubectl logs --previous) and the Kubernetes APIServer audit log will help a lot to debug the root cause. > Inconsistency in Kubernetes HA service: broken state handle > --- > > Key: FLINK-28265 > URL: https://issues.apache.org/jira/browse/FLINK-28265 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.14.4 >Reporter: Robert Metzger >Priority: Major > Attachments: flink_checkpoint_issue.txt > > > I have a JobManager, which at some point failed to acknowledge a checkpoint: > {code} > Error while processing AcknowledgeCheckpoint message > org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete > the pending checkpoint 193393. Failure reason: Failure to finalize checkpoint. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1255) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1100) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: > org.apache.flink.runtime.persistence.StateHandleStore$AlreadyExistException: > checkpointID-0193393 already exists in ConfigMap > cm--jobmanager-leader > at > org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.getKeyAlreadyExistException(KubernetesStateHandleStore.java:534) > at > org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.lambda$addAndLock$0(KubernetesStateHandleStore.java:155) > at > org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$attemptCheckAndUpdateConfigMap$11(Fabric8FlinkKubeClient.java:316) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown > Source) > ... 3 common frames omitted > {code} > the JobManager creates subsequent checkpoints successfully. > Upon failure, it tries to recover this checkpoint (0193393), but > fails to do so because of: > {code} > Caused by: org.apache.flink.util.FlinkException: Could not retrieve > checkpoint 193393 from state handle under checkpointID-0193393. > This indicates that the retrieved state handle is broken. Try cleaning the > state handle store ... Caused by: java.io.FileNotFoundException: No such file > or directory: s3://xxx/flink-ha/xxx/completedCheckpoint72e30229420c > {code} > I'm running Flink 1.14.4. > Note: This issue has been first discussed here: > https://github.com/apache/flink/pull/15832#pullrequestreview-1005973050 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] luoyuxia commented on pull request #20247: introduce operation execution plugin
luoyuxia commented on PR #20247: URL: https://github.com/apache/flink/pull/20247#issuecomment-1182702339 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on pull request #20251: [FLINK-26412][hive] Hive dialect supports "create function using jar"
luoyuxia commented on PR #20251: URL: https://github.com/apache/flink/pull/20251#issuecomment-1182701721 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-26412) Hive dialect supports "CREATE FUNCTION USING xxx.jar"
[ https://issues.apache.org/jira/browse/FLINK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-26412: - Summary: Hive dialect supports "CREATE FUNCTION USING xxx.jar" (was: Hive dialect supports "CREATE TEMPORARY FUNCTION USING xxx.jar") > Hive dialect supports "CREATE FUNCTION USING xxx.jar" > - > > Key: FLINK-26412 > URL: https://issues.apache.org/jira/browse/FLINK-26412 > Project: Flink > Issue Type: Sub-task >Reporter: luoyuxia >Priority: Major > Labels: pull-request-available > > In Hive, it's supported to use such sql like > {code:java} > CREATE TEMPORARY FUNCTION USING xxx.jar > {code} > to create udf. > It's also need to be supported using Hive dialect in Flink -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-26412) Hive dialect supports "CREATE FUNCTION USING xxx.jar"
[ https://issues.apache.org/jira/browse/FLINK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-26412: - Description: In Hive, it's supported to use such sql like {code:java} CREATE FUNCTION USING xxx.jar {code} to create udf. It's also need to be supported using Hive dialect in Flink was: In Hive, it's supported to use such sql like {code:java} CREATE TEMPORARY FUNCTION USING xxx.jar {code} to create udf. It's also need to be supported using Hive dialect in Flink > Hive dialect supports "CREATE FUNCTION USING xxx.jar" > - > > Key: FLINK-26412 > URL: https://issues.apache.org/jira/browse/FLINK-26412 > Project: Flink > Issue Type: Sub-task >Reporter: luoyuxia >Priority: Major > Labels: pull-request-available > > In Hive, it's supported to use such sql like > {code:java} > CREATE FUNCTION USING xxx.jar > {code} > to create udf. > It's also need to be supported using Hive dialect in Flink -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #20260: [FLINK-28315][runtime-web] introduce aggregate stats in tables of the subtasks and taskmanagers
flinkbot commented on PR #20260: URL: https://github.com/apache/flink/pull/20260#issuecomment-1182700946 ## CI report: * 0c6876ba81e8834bf4846331783638f29b619884 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-24900) Support to run multiple shuffle plugins in one session cluster
[ https://issues.apache.org/jira/browse/FLINK-24900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yingjie Cao updated FLINK-24900: Fix Version/s: 1.17.0 > Support to run multiple shuffle plugins in one session cluster > -- > > Key: FLINK-24900 > URL: https://issues.apache.org/jira/browse/FLINK-24900 > Project: Flink > Issue Type: Sub-task >Reporter: Yingjie Cao >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > Currently, one Flink cluster can only use one shuffle plugin. However, there > are cases where different jobs may need different shuffle implementations. By > loading shuffle plugin with the plugin manager and letting jobs select their > shuffle service freely, Flink can support to run multiple shuffle plugins in > one session cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #20259: [FLINK-28525][hive] Fix unstable test for HiveDialectITCase#testTableWithSubDirsInPartitionDir
flinkbot commented on PR #20259: URL: https://github.com/apache/flink/pull/20259#issuecomment-1182700677 ## CI report: * af4fd0821d53c752d5fe88982e17af7cfd0cbb6b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28525) HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with AssertJMultipleFailuresError
[ https://issues.apache.org/jira/browse/FLINK-28525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566114#comment-17566114 ] Huang Xingbo commented on FLINK-28525: -- Thanks for the quick fix. > HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with > AssertJMultipleFailuresError > - > > Key: FLINK-28525 > URL: https://issues.apache.org/jira/browse/FLINK-28525 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: luoyuxia >Priority: Critical > Labels: pull-request-available, test-stability > > {code:java} > 2022-07-12T04:20:27.9597185Z Jul 12 04:20:27 [ERROR] > org.apache.flink.connectors.hive.HiveDialectITCase.testTableWithSubDirsInPartitionDir > Time elapsed: 16.31 s <<< FAILURE! > 2022-07-12T04:20:27.9598238Z Jul 12 04:20:27 > org.assertj.core.error.AssertJMultipleFailuresError: > 2022-07-12T04:20:27.9598978Z Jul 12 04:20:27 > 2022-07-12T04:20:27.9599419Z Jul 12 04:20:27 Multiple Failures (1 failure) > 2022-07-12T04:20:27.9600269Z Jul 12 04:20:27 -- failure 1 -- > 2022-07-12T04:20:27.9601093Z Jul 12 04:20:27 [Any cause contains message 'Not > a file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=2'] > 2022-07-12T04:20:27.9602679Z Jul 12 04:20:27 Expecting any element of: > 2022-07-12T04:20:27.9603145Z Jul 12 04:20:27 > [org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: > Fail to create input splits. > 2022-07-12T04:20:27.9603766Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98) > 2022-07-12T04:20:27.9604403Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:153) > 2022-07-12T04:20:27.9605039Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:117) > 2022-07-12T04:20:27.9605902Z Jul 12 04:20:27 ...(75 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9606408Z Jul 12 04:20:27 java.io.IOException: Fail to > create input splits. > 2022-07-12T04:20:27.9606952Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:84) > 2022-07-12T04:20:27.960Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:69) > 2022-07-12T04:20:27.9608438Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:158) > 2022-07-12T04:20:27.9609261Z Jul 12 04:20:27 ...(79 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9609899Z Jul 12 04:20:27 > java.util.concurrent.ExecutionException: java.io.IOException: Not a file: > file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1 > 2022-07-12T04:20:27.9610503Z Jul 12 04:20:27 at > java.util.concurrent.FutureTask.report(FutureTask.java:122) > 2022-07-12T04:20:27.9610998Z Jul 12 04:20:27 at > java.util.concurrent.FutureTask.get(FutureTask.java:192) > 2022-07-12T04:20:27.9611574Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:79) > 2022-07-12T04:20:27.9612390Z Jul 12 04:20:27 ...(81 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9612953Z Jul 12 04:20:27 java.io.IOException: Not a > file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1 > 2022-07-12T04:20:27.9613522Z Jul 12 04:20:27 at > org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322) > 2022-07-12T04:20:27.9614119Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:134) > 2022-07-12T04:20:27.9614731Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:96) > 2022-07-12T04:20:27.9615535Z Jul 12 04:20:27 ...(4 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed)] > 2022-07-12T04:20:27.9616042Z Jul 12 04:20:27 to satisfy the given assertions > requirements but none did: > 2022-07-12T04:20:27.9616381Z Jul 12 04:20:27 > 2022-07-12T04:20:27.9616795Z Jul 12 04:20:27 > org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: > Fail to create input splits. > 2022-07-12T04:20:27.9617407Z Jul 12 04:20:27 at >
[GitHub] [flink] yangjunhan commented on pull request #20260: [FLINK-28315][runtime-web] introduce aggregate stats in tables of the subtasks and taskmanagers
yangjunhan commented on PR #20260: URL: https://github.com/apache/flink/pull/20260#issuecomment-1182699144 Hi, @simplejason. Could you please help review this PR regarding the runtime-web changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28525) HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with AssertJMultipleFailuresError
[ https://issues.apache.org/jira/browse/FLINK-28525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566112#comment-17566112 ] luoyuxia commented on FLINK-28525: -- Thanks for reporting it. I'll fix it as soon as possible. The reason is it will read any path first, but the test assumes it'll always read the specific path first, thus cause the unstable test. > HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with > AssertJMultipleFailuresError > - > > Key: FLINK-28525 > URL: https://issues.apache.org/jira/browse/FLINK-28525 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: luoyuxia >Priority: Critical > Labels: pull-request-available, test-stability > > {code:java} > 2022-07-12T04:20:27.9597185Z Jul 12 04:20:27 [ERROR] > org.apache.flink.connectors.hive.HiveDialectITCase.testTableWithSubDirsInPartitionDir > Time elapsed: 16.31 s <<< FAILURE! > 2022-07-12T04:20:27.9598238Z Jul 12 04:20:27 > org.assertj.core.error.AssertJMultipleFailuresError: > 2022-07-12T04:20:27.9598978Z Jul 12 04:20:27 > 2022-07-12T04:20:27.9599419Z Jul 12 04:20:27 Multiple Failures (1 failure) > 2022-07-12T04:20:27.9600269Z Jul 12 04:20:27 -- failure 1 -- > 2022-07-12T04:20:27.9601093Z Jul 12 04:20:27 [Any cause contains message 'Not > a file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=2'] > 2022-07-12T04:20:27.9602679Z Jul 12 04:20:27 Expecting any element of: > 2022-07-12T04:20:27.9603145Z Jul 12 04:20:27 > [org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: > Fail to create input splits. > 2022-07-12T04:20:27.9603766Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98) > 2022-07-12T04:20:27.9604403Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:153) > 2022-07-12T04:20:27.9605039Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:117) > 2022-07-12T04:20:27.9605902Z Jul 12 04:20:27 ...(75 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9606408Z Jul 12 04:20:27 java.io.IOException: Fail to > create input splits. > 2022-07-12T04:20:27.9606952Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:84) > 2022-07-12T04:20:27.960Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:69) > 2022-07-12T04:20:27.9608438Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:158) > 2022-07-12T04:20:27.9609261Z Jul 12 04:20:27 ...(79 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9609899Z Jul 12 04:20:27 > java.util.concurrent.ExecutionException: java.io.IOException: Not a file: > file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1 > 2022-07-12T04:20:27.9610503Z Jul 12 04:20:27 at > java.util.concurrent.FutureTask.report(FutureTask.java:122) > 2022-07-12T04:20:27.9610998Z Jul 12 04:20:27 at > java.util.concurrent.FutureTask.get(FutureTask.java:192) > 2022-07-12T04:20:27.9611574Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:79) > 2022-07-12T04:20:27.9612390Z Jul 12 04:20:27 ...(81 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9612953Z Jul 12 04:20:27 java.io.IOException: Not a > file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1 > 2022-07-12T04:20:27.9613522Z Jul 12 04:20:27 at > org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322) > 2022-07-12T04:20:27.9614119Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:134) > 2022-07-12T04:20:27.9614731Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:96) > 2022-07-12T04:20:27.9615535Z Jul 12 04:20:27 ...(4 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed)] > 2022-07-12T04:20:27.9616042Z Jul 12 04:20:27 to satisfy the given assertions > requirements but none did: > 2022-07-12T04:20:27.9616381Z Jul 12 04:20:27 > 2022-07-12T04:20:27.9616795Z Jul 12 04:20:27 > org.apache.flink.connectors.hive.FlinkHiveException:
[jira] [Updated] (FLINK-28530) Improvement of extraction of conditions that can be pushed into join inputs
[ https://issues.apache.org/jira/browse/FLINK-28530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Trushev updated FLINK-28530: -- Summary: Improvement of extraction of conditions that can be pushed into join inputs (was: Improvement of conditions extraction that can be pushed into join inputs) > Improvement of extraction of conditions that can be pushed into join inputs > --- > > Key: FLINK-28530 > URL: https://issues.apache.org/jira/browse/FLINK-28530 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Alexander Trushev >Priority: Major > > Conditions extraction in batch mode was introduced here FLINK-12509 and in > stream mode here FLINK-24139 > h2. Proposal > This ticket is aimed at replacing current extraction algorithm with new one > which covers more complex case with deep nested predicate: > for all n > 0 > a0 and b0) or a1) and b1) or a2) and b2) or a3) ... and bn-1) or an) > => (a0 or a1 or ... or an) > *Example.* For n = 3 Flink does not extract (a0 or a1 or a2 or a3): > {code:java} > FlinkSQL> explain select * from A join B on (a0=0 and b0=0) or a1=0) and > b1=0) or a2=0) and b2=0) or a3=0; > == Optimized Physical Plan == > Join(joinType=[InnerJoin], where=[OR(AND(OR(AND(OR(AND(=(a0, 0), =(b0, 0)), > =(a1, 0)), =(b1, 0)), =(a2, 0)), =(b2, 0)), =(a3, 0))], select=[a0, a1, a2, > a3, a4, b0, b1, b2, b3, b4], leftInputSpec=[NoUniqueKey], > rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[single]) > : +- TableSourceScan(table=[[default_catalog, default_database, A]], > fields=[a0, a1, a2, a3, a4]) > +- Exchange(distribution=[single]) >+- TableSourceScan(table=[[default_catalog, default_database, B]], > fields=[b0, b1, b2, b3, b4]) > {code} > while PostgreSQL does: > {code:java} > postgres=# explain select * from A join B on ((a0=0 and b0=0) or a1=0) > and b1=0) or a2=0) and b2=0) or a3=0); > QUERY PLAN > -- > Nested Loop (cost=0.00..1805.09 rows=14632 width=40) >Join Filter: (((a.a0 = 0) AND (b.b0 = 0)) OR (a.a1 = 0)) AND (b.b1 = > 0)) OR (a.a2 = 0)) AND (b.b2 = 0)) OR (a.a3 = 0)) >-> Seq Scan on b (cost=0.00..27.00 rows=1700 width=20) >-> Materialize (cost=0.00..44.17 rows=34 width=20) > -> Seq Scan on a (cost=0.00..44.00 rows=34 width=20) >Filter: ((a0 = 0) OR (a1 = 0) OR (a2 = 0) OR (a3 = 0)) > {code} > h2. Details > Pseudocode of new algorithm: > f – predicate > rel – table > var(rel) – columns > {code:java} > extract(f, rel) > if f = AND(left, right) > return AND(extract(left, rel), extract(left, rel)) > if f = OR(left, right) > return OR(extract(left, rel), extract(left, rel)) > if var(f) subsetOf var(rel) > return f > return True > AND(f, True) = AND(True, f) = f > OR(f, True) = OR(True, f) = True > {code} > This algorithm covers deep nested predicates and does not use CNF which > increases length of predicate to O(n * e^n) in the worst case. > The same recursive approache is used in [PostgreSQL > orclauses.c|https://github.com/postgres/postgres/blob/164d174bbf9a3aba719c845497863cd3c49a3ad0/src/backend/optimizer/util/orclauses.c#L151-L252] > and [Apache Spark > predicates.scala|https://github.com/apache/spark/blob/v3.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L227-L272] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28315) [UI] Introduce aggregate stats in tables of the subtasks and taskmanagers
[ https://issues.apache.org/jira/browse/FLINK-28315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28315: --- Labels: pull-request-available (was: ) > [UI] Introduce aggregate stats in tables of the subtasks and taskmanagers > - > > Key: FLINK-28315 > URL: https://issues.apache.org/jira/browse/FLINK-28315 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.16.0 >Reporter: Junhan Yang >Assignee: Junhan Yang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] yangjunhan opened a new pull request, #20260: [FLINK-28315][runtime-web] introduce aggregate stats in tables of the subtasks and taskmanagers
yangjunhan opened a new pull request, #20260: URL: https://github.com/apache/flink/pull/20260 ## What is the purpose of the change A subtask for the [FLIP-241](https://cwiki.apache.org/confluence/display/FLINK/FLIP-241%3A+Completed+Jobs+Information+Enhancement), the referenced issue is FLINK-28315. ## Brief change log Split the per-subtask metrics and aggregated metrics into tabs, and add a table of aggregated metrics. Extend the original subtasks table by adding extra columns: - "Accumulated Time" - "Status Durations" Under the TaskManagers tab, each taskManager row supports an action of viewing the "Aggregated Metrics". The modal's content table is identical to that under the subtask tab. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28524) Translate "User-defined Sources & Sinks"
[ https://issues.apache.org/jira/browse/FLINK-28524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566111#comment-17566111 ] Jark Wu commented on FLINK-28524: - Sorry, there is already an issue for this. FLINK-16105 > Translate "User-defined Sources & Sinks" > > > Key: FLINK-28524 > URL: https://issues.apache.org/jira/browse/FLINK-28524 > Project: Flink > Issue Type: Improvement >Reporter: hunter >Priority: Minor > > The file is docs/content.zh/docs/dev/table/sourcesSinks.md > The link is > [用户自定义Sources|https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sourcessinks/] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-28524) Translate "User-defined Sources & Sinks"
[ https://issues.apache.org/jira/browse/FLINK-28524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-28524. --- Resolution: Duplicate > Translate "User-defined Sources & Sinks" > > > Key: FLINK-28524 > URL: https://issues.apache.org/jira/browse/FLINK-28524 > Project: Flink > Issue Type: Improvement >Reporter: hunter >Priority: Minor > > The file is docs/content.zh/docs/dev/table/sourcesSinks.md > The link is > [用户自定义Sources|https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sourcessinks/] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28530) Improvement of conditions extraction that can be pushed into join inputs
Alexander Trushev created FLINK-28530: - Summary: Improvement of conditions extraction that can be pushed into join inputs Key: FLINK-28530 URL: https://issues.apache.org/jira/browse/FLINK-28530 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Alexander Trushev Conditions extraction in batch mode was introduced here FLINK-12509 and in stream mode here FLINK-24139 h2. Proposal This ticket is aimed at replacing current extraction algorithm with new one which covers more complex case with deep nested predicate: for all n > 0 a0 and b0) or a1) and b1) or a2) and b2) or a3) ... and bn-1) or an) => (a0 or a1 or ... or an) *Example.* For n = 3 Flink does not extract (a0 or a1 or a2 or a3): {code:java} FlinkSQL> explain select * from A join B on (a0=0 and b0=0) or a1=0) and b1=0) or a2=0) and b2=0) or a3=0; == Optimized Physical Plan == Join(joinType=[InnerJoin], where=[OR(AND(OR(AND(OR(AND(=(a0, 0), =(b0, 0)), =(a1, 0)), =(b1, 0)), =(a2, 0)), =(b2, 0)), =(a3, 0))], select=[a0, a1, a2, a3, a4, b0, b1, b2, b3, b4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[single]) : +- TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a0, a1, a2, a3, a4]) +- Exchange(distribution=[single]) +- TableSourceScan(table=[[default_catalog, default_database, B]], fields=[b0, b1, b2, b3, b4]) {code} while PostgreSQL does: {code:java} postgres=# explain select * from A join B on ((a0=0 and b0=0) or a1=0) and b1=0) or a2=0) and b2=0) or a3=0); QUERY PLAN -- Nested Loop (cost=0.00..1805.09 rows=14632 width=40) Join Filter: (((a.a0 = 0) AND (b.b0 = 0)) OR (a.a1 = 0)) AND (b.b1 = 0)) OR (a.a2 = 0)) AND (b.b2 = 0)) OR (a.a3 = 0)) -> Seq Scan on b (cost=0.00..27.00 rows=1700 width=20) -> Materialize (cost=0.00..44.17 rows=34 width=20) -> Seq Scan on a (cost=0.00..44.00 rows=34 width=20) Filter: ((a0 = 0) OR (a1 = 0) OR (a2 = 0) OR (a3 = 0)) {code} h2. Details Pseudocode of new algorithm: f – predicate rel – table var(rel) – columns {code:java} extract(f, rel) if f = AND(left, right) return AND(extract(left, rel), extract(left, rel)) if f = OR(left, right) return OR(extract(left, rel), extract(left, rel)) if var(f) subsetOf var(rel) return f return True AND(f, True) = AND(True, f) = f OR(f, True) = OR(True, f) = True {code} This algorithm covers deep nested predicates and does not use CNF which increases length of predicate to O(n * e^n) in the worst case. The same recursive approache is used in [PostgreSQL orclauses.c|https://github.com/postgres/postgres/blob/164d174bbf9a3aba719c845497863cd3c49a3ad0/src/backend/optimizer/util/orclauses.c#L151-L252] and [Apache Spark predicates.scala|https://github.com/apache/spark/blob/v3.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L227-L272] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26993) CheckpointCoordinatorTest#testMinCheckpointPause
[ https://issues.apache.org/jira/browse/FLINK-26993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566110#comment-17566110 ] Huang Xingbo commented on FLINK-26993: -- instance in release-1.14: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=38058=logs=b0a398c0-685b-599c-eb57-c8c2a771138e=747432ad-a576-5911-1e2a-68c6bedc248a > CheckpointCoordinatorTest#testMinCheckpointPause > > > Key: FLINK-26993 > URL: https://issues.apache.org/jira/browse/FLINK-26993 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.15.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.15.1, 1.16.0 > > > The test triggers checkpoints, waits for the CC to have stored a pending > checkpoint, and then sends an acknowledge. > The acknowledge can fail with an NPE because the > PendingCheckpoint#checkpointTargetLocation hasn't been set yet. This doesn't > happen synchronously with the PendingCheckpoint being added to > CheckpointCoordinator#pendingCheckpoints. > {code} > Apr 01 19:57:36 [ERROR] > org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.testMinCheckpointPause > Time elapsed: 0.012 s <<< ERROR! > Apr 01 19:57:36 org.apache.flink.runtime.checkpoint.CheckpointException: > Could not finalize the pending checkpoint 1. Failure reason: Failure to > finalize checkpoint. > Apr 01 19:57:36 at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1354) > Apr 01 19:57:36 at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1241) > Apr 01 19:57:36 at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > ... > Apr 01 19:57:36 Caused by: java.lang.NullPointerException > Apr 01 19:57:36 at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:327) > Apr 01 19:57:36 at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1337) > Apr 01 19:57:36 ... 50 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28525) HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with AssertJMultipleFailuresError
[ https://issues.apache.org/jira/browse/FLINK-28525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28525: --- Labels: pull-request-available test-stability (was: test-stability) > HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with > AssertJMultipleFailuresError > - > > Key: FLINK-28525 > URL: https://issues.apache.org/jira/browse/FLINK-28525 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: luoyuxia >Priority: Critical > Labels: pull-request-available, test-stability > > {code:java} > 2022-07-12T04:20:27.9597185Z Jul 12 04:20:27 [ERROR] > org.apache.flink.connectors.hive.HiveDialectITCase.testTableWithSubDirsInPartitionDir > Time elapsed: 16.31 s <<< FAILURE! > 2022-07-12T04:20:27.9598238Z Jul 12 04:20:27 > org.assertj.core.error.AssertJMultipleFailuresError: > 2022-07-12T04:20:27.9598978Z Jul 12 04:20:27 > 2022-07-12T04:20:27.9599419Z Jul 12 04:20:27 Multiple Failures (1 failure) > 2022-07-12T04:20:27.9600269Z Jul 12 04:20:27 -- failure 1 -- > 2022-07-12T04:20:27.9601093Z Jul 12 04:20:27 [Any cause contains message 'Not > a file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=2'] > 2022-07-12T04:20:27.9602679Z Jul 12 04:20:27 Expecting any element of: > 2022-07-12T04:20:27.9603145Z Jul 12 04:20:27 > [org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: > Fail to create input splits. > 2022-07-12T04:20:27.9603766Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98) > 2022-07-12T04:20:27.9604403Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:153) > 2022-07-12T04:20:27.9605039Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:117) > 2022-07-12T04:20:27.9605902Z Jul 12 04:20:27 ...(75 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9606408Z Jul 12 04:20:27 java.io.IOException: Fail to > create input splits. > 2022-07-12T04:20:27.9606952Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:84) > 2022-07-12T04:20:27.960Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:69) > 2022-07-12T04:20:27.9608438Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:158) > 2022-07-12T04:20:27.9609261Z Jul 12 04:20:27 ...(79 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9609899Z Jul 12 04:20:27 > java.util.concurrent.ExecutionException: java.io.IOException: Not a file: > file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1 > 2022-07-12T04:20:27.9610503Z Jul 12 04:20:27 at > java.util.concurrent.FutureTask.report(FutureTask.java:122) > 2022-07-12T04:20:27.9610998Z Jul 12 04:20:27 at > java.util.concurrent.FutureTask.get(FutureTask.java:192) > 2022-07-12T04:20:27.9611574Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:79) > 2022-07-12T04:20:27.9612390Z Jul 12 04:20:27 ...(81 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9612953Z Jul 12 04:20:27 java.io.IOException: Not a > file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1 > 2022-07-12T04:20:27.9613522Z Jul 12 04:20:27 at > org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322) > 2022-07-12T04:20:27.9614119Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:134) > 2022-07-12T04:20:27.9614731Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:96) > 2022-07-12T04:20:27.9615535Z Jul 12 04:20:27 ...(4 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed)] > 2022-07-12T04:20:27.9616042Z Jul 12 04:20:27 to satisfy the given assertions > requirements but none did: > 2022-07-12T04:20:27.9616381Z Jul 12 04:20:27 > 2022-07-12T04:20:27.9616795Z Jul 12 04:20:27 > org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: > Fail to create input splits. > 2022-07-12T04:20:27.9617407Z Jul 12 04:20:27 at >
[GitHub] [flink] luoyuxia opened a new pull request, #20259: [FLINK-28525][hive] Fix unstable test for HiveDialectITCase#testTableWithSubDirsInPartitionDir
luoyuxia opened a new pull request, #20259: URL: https://github.com/apache/flink/pull/20259 ## What is the purpose of the change To fix the unstable test for `HiveDialectITCase#testTableWithSubDirsInPartitionDir` ## Brief change log - Change `satisfies` to `satisfiesAnyOf` for it may read any of file first and throw the corresponding exception. ## Verifying this change This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? N/A -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28305) SQL Client end-to-end test failed with ElasticsearchException
[ https://issues.apache.org/jira/browse/FLINK-28305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566109#comment-17566109 ] Huang Xingbo commented on FLINK-28305: -- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=38058=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c > SQL Client end-to-end test failed with ElasticsearchException > - > > Key: FLINK-28305 > URL: https://issues.apache.org/jira/browse/FLINK-28305 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.14.5 >Reporter: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-06-29T19:14:39.0384925Z Jun 29 19:14:38 java.lang.RuntimeException: An > error occurred in ElasticsearchSink. > 2022-06-29T19:14:39.0385547Z Jun 29 19:14:38 at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427) > ~[?:?] > 2022-06-29T19:14:39.0386327Z Jun 29 19:14:38 at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432) > ~[?:?] > 2022-06-29T19:14:39.0387078Z Jun 29 19:14:38 at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329) > ~[?:?] > 2022-06-29T19:14:39.0388071Z Jun 29 19:14:38 at > org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) > ~[flink-table_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > 2022-06-29T19:14:39.0389189Z Jun 29 19:14:38 at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > 2022-06-29T19:14:39.0390354Z Jun 29 19:14:38 at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > 2022-06-29T19:14:39.0391515Z Jun 29 19:14:38 at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > 2022-06-29T19:14:39.0392712Z Jun 29 19:14:38 at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > 2022-06-29T19:14:39.0393822Z Jun 29 19:14:38 at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:495) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > 2022-06-29T19:14:39.0394861Z Jun 29 19:14:38 at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > 2022-06-29T19:14:39.0395885Z Jun 29 19:14:38 at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:806) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > 2022-06-29T19:14:39.0396858Z Jun 29 19:14:38 at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > 2022-06-29T19:14:39.0397824Z Jun 29 19:14:38 at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > 2022-06-29T19:14:39.0398882Z Jun 29 19:14:38 at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > 2022-06-29T19:14:39.0400103Z Jun 29 19:14:38 at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > 2022-06-29T19:14:39.0401000Z Jun 29 19:14:38 at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > 2022-06-29T19:14:39.0401575Z Jun 29 19:14:38 at > java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_332] > 2022-06-29T19:14:39.0402051Z Jun 29 19:14:38 Suppressed: > java.lang.RuntimeException: An error occurred in ElasticsearchSink. > 2022-06-29T19:14:39.0402682Z Jun 29 19:14:38 at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427) > ~[?:?] > 2022-06-29T19:14:39.0403421Z Jun 29 19:14:38 at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:366) > ~[?:?] > 2022-06-29T19:14:39.0404395Z Jun 29 19:14:38 at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) >
[jira] [Closed] (FLINK-28527) Fail to lateral join with UDTF from Table with timstamp column
[ https://issues.apache.org/jira/browse/FLINK-28527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo closed FLINK-28527. Resolution: Duplicate > Fail to lateral join with UDTF from Table with timstamp column > -- > > Key: FLINK-28527 > URL: https://issues.apache.org/jira/browse/FLINK-28527 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.15.1 >Reporter: Xuannan Su >Priority: Major > > The bug can be reproduced with the following test > {code:python} > def test_flink(self): > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env) > table = t_env.from_descriptor( > TableDescriptor.for_connector("filesystem") > .schema( > Schema.new_builder() > .column("name", DataTypes.STRING()) > .column("cost", DataTypes.INT()) > .column("distance", DataTypes.INT()) > .column("time", DataTypes.TIMESTAMP(3)) > .watermark("time", "`time` - INTERVAL '60' SECOND") > .build() > ) > .format("csv") > .option("path", "./input.csv") > .build() > ) > @udtf(result_types=DataTypes.INT()) > def table_func(row: Row): > return row.cost + row.distance > table = table.join_lateral(table_func.alias("cost_times_distance")) > table.execute().print() > {code} > It causes the following exception > {code:none} > E pyflink.util.exceptions.TableException: > org.apache.flink.table.api.TableException: Unsupported Python SqlFunction > CAST. > E at > org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:146) > E at > org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:429) > E at > org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:135) > E at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.extractPythonTableFunctionInfo(CommonExecPythonCorrelate.java:133) > E at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.createPythonOneInputTransformation(CommonExecPythonCorrelate.java:106) > E at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.translateToPlanInternal(CommonExecPythonCorrelate.java:95) > E at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) > E at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249) > E at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:136) > E at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) > E at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:79) > E at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > E at scala.collection.Iterator.foreach(Iterator.scala:937) > E at scala.collection.Iterator.foreach$(Iterator.scala:937) > E at > scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > E at scala.collection.IterableLike.foreach(IterableLike.scala:70) > E at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > E at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > E at > scala.collection.TraversableLike.map(TraversableLike.scala:233) > E at > scala.collection.TraversableLike.map$(TraversableLike.scala:226) > E at > scala.collection.AbstractTraversable.map(Traversable.scala:104) > E at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:78) > E at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181) > E at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656) > E at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828) > E at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317) > E at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605) >
[jira] [Commented] (FLINK-28198) CassandraConnectorITCase fails with timeout
[ https://issues.apache.org/jira/browse/FLINK-28198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566106#comment-17566106 ] Huang Xingbo commented on FLINK-28198: -- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=38057=logs=dbe51908-4958-5c8c-9557-e10952d4259d=55d11a16-067d-538d-76a3-4c096a3a8e24 > CassandraConnectorITCase fails with timeout > --- > > Key: FLINK-28198 > URL: https://issues.apache.org/jira/browse/FLINK-28198 > Project: Flink > Issue Type: Bug > Components: Connectors / Cassandra >Affects Versions: 1.16.0 >Reporter: Martijn Visser >Assignee: Etienne Chauchot >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.16.0 > > > {code:java} > Jun 22 07:57:37 [ERROR] > org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testRaiseCassandraRequestsTimeouts > Time elapsed: 12.067 s <<< ERROR! > Jun 22 07:57:37 > com.datastax.driver.core.exceptions.OperationTimedOutException: > [/172.17.0.1:59915] Timed out waiting for server response > Jun 22 07:57:37 at > com.datastax.driver.core.exceptions.OperationTimedOutException.copy(OperationTimedOutException.java:43) > Jun 22 07:57:37 at > com.datastax.driver.core.exceptions.OperationTimedOutException.copy(OperationTimedOutException.java:25) > Jun 22 07:57:37 at > com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:35) > Jun 22 07:57:37 at > com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:293) > Jun 22 07:57:37 at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:58) > Jun 22 07:57:37 at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37037=logs=4eda0b4a-bd0d-521a-0916-8285b9be9bb5=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9=13736 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28529) ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode failed with CheckpointException: Checkpoint expired before complet
[ https://issues.apache.org/jira/browse/FLINK-28529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo reassigned FLINK-28529: Assignee: Yanfei Lei > ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode > failed with CheckpointException: Checkpoint expired before completing > --- > > Key: FLINK-28529 > URL: https://issues.apache.org/jira/browse/FLINK-28529 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Yanfei Lei >Priority: Critical > Labels: test-stability > > {code:java} > 2022-07-12T04:30:49.9912088Z Jul 12 04:30:49 [ERROR] > ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode > Time elapsed: 617.048 s <<< ERROR! > 2022-07-12T04:30:49.9913108Z Jul 12 04:30:49 > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired > before completing. > 2022-07-12T04:30:49.9913880Z Jul 12 04:30:49 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > 2022-07-12T04:30:49.9914606Z Jul 12 04:30:49 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > 2022-07-12T04:30:49.9915572Z Jul 12 04:30:49 at > org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode(ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:125) > 2022-07-12T04:30:49.9916483Z Jul 12 04:30:49 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-07-12T04:30:49.9917377Z Jul 12 04:30:49 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-07-12T04:30:49.9918121Z Jul 12 04:30:49 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-07-12T04:30:49.9918788Z Jul 12 04:30:49 at > java.lang.reflect.Method.invoke(Method.java:498) > 2022-07-12T04:30:49.9919456Z Jul 12 04:30:49 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2022-07-12T04:30:49.9920193Z Jul 12 04:30:49 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2022-07-12T04:30:49.9920923Z Jul 12 04:30:49 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > 2022-07-12T04:30:49.9921630Z Jul 12 04:30:49 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2022-07-12T04:30:49.9922326Z Jul 12 04:30:49 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2022-07-12T04:30:49.9923023Z Jul 12 04:30:49 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2022-07-12T04:30:49.9923708Z Jul 12 04:30:49 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > 2022-07-12T04:30:49.9924449Z Jul 12 04:30:49 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2022-07-12T04:30:49.9925124Z Jul 12 04:30:49 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > 2022-07-12T04:30:49.9925912Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-07-12T04:30:49.9926742Z Jul 12 04:30:49 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > 2022-07-12T04:30:49.9928142Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > 2022-07-12T04:30:49.9928715Z Jul 12 04:30:49 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > 2022-07-12T04:30:49.9929311Z Jul 12 04:30:49 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > 2022-07-12T04:30:49.9929863Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > 2022-07-12T04:30:49.9930376Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > 2022-07-12T04:30:49.9930911Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > 2022-07-12T04:30:49.9931441Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > 2022-07-12T04:30:49.9931975Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > 2022-07-12T04:30:49.9932493Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > 2022-07-12T04:30:49.9932966Z Jul 12 04:30:49 at >
[jira] [Commented] (FLINK-28529) ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode failed with CheckpointException: Checkpoint expired before comple
[ https://issues.apache.org/jira/browse/FLINK-28529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566105#comment-17566105 ] Huang Xingbo commented on FLINK-28529: -- Hi [~Yanfei Lei], could you help take a look? > ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode > failed with CheckpointException: Checkpoint expired before completing > --- > > Key: FLINK-28529 > URL: https://issues.apache.org/jira/browse/FLINK-28529 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-07-12T04:30:49.9912088Z Jul 12 04:30:49 [ERROR] > ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode > Time elapsed: 617.048 s <<< ERROR! > 2022-07-12T04:30:49.9913108Z Jul 12 04:30:49 > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired > before completing. > 2022-07-12T04:30:49.9913880Z Jul 12 04:30:49 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > 2022-07-12T04:30:49.9914606Z Jul 12 04:30:49 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > 2022-07-12T04:30:49.9915572Z Jul 12 04:30:49 at > org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode(ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:125) > 2022-07-12T04:30:49.9916483Z Jul 12 04:30:49 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-07-12T04:30:49.9917377Z Jul 12 04:30:49 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-07-12T04:30:49.9918121Z Jul 12 04:30:49 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-07-12T04:30:49.9918788Z Jul 12 04:30:49 at > java.lang.reflect.Method.invoke(Method.java:498) > 2022-07-12T04:30:49.9919456Z Jul 12 04:30:49 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2022-07-12T04:30:49.9920193Z Jul 12 04:30:49 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2022-07-12T04:30:49.9920923Z Jul 12 04:30:49 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > 2022-07-12T04:30:49.9921630Z Jul 12 04:30:49 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2022-07-12T04:30:49.9922326Z Jul 12 04:30:49 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2022-07-12T04:30:49.9923023Z Jul 12 04:30:49 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2022-07-12T04:30:49.9923708Z Jul 12 04:30:49 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > 2022-07-12T04:30:49.9924449Z Jul 12 04:30:49 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2022-07-12T04:30:49.9925124Z Jul 12 04:30:49 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > 2022-07-12T04:30:49.9925912Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-07-12T04:30:49.9926742Z Jul 12 04:30:49 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > 2022-07-12T04:30:49.9928142Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > 2022-07-12T04:30:49.9928715Z Jul 12 04:30:49 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > 2022-07-12T04:30:49.9929311Z Jul 12 04:30:49 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > 2022-07-12T04:30:49.9929863Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > 2022-07-12T04:30:49.9930376Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > 2022-07-12T04:30:49.9930911Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > 2022-07-12T04:30:49.9931441Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > 2022-07-12T04:30:49.9931975Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > 2022-07-12T04:30:49.9932493Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > 2022-07-12T04:30:49.9932966Z Jul 12
[jira] [Assigned] (FLINK-28525) HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with AssertJMultipleFailuresError
[ https://issues.apache.org/jira/browse/FLINK-28525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo reassigned FLINK-28525: Assignee: luoyuxia > HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with > AssertJMultipleFailuresError > - > > Key: FLINK-28525 > URL: https://issues.apache.org/jira/browse/FLINK-28525 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: luoyuxia >Priority: Critical > Labels: test-stability > > {code:java} > 2022-07-12T04:20:27.9597185Z Jul 12 04:20:27 [ERROR] > org.apache.flink.connectors.hive.HiveDialectITCase.testTableWithSubDirsInPartitionDir > Time elapsed: 16.31 s <<< FAILURE! > 2022-07-12T04:20:27.9598238Z Jul 12 04:20:27 > org.assertj.core.error.AssertJMultipleFailuresError: > 2022-07-12T04:20:27.9598978Z Jul 12 04:20:27 > 2022-07-12T04:20:27.9599419Z Jul 12 04:20:27 Multiple Failures (1 failure) > 2022-07-12T04:20:27.9600269Z Jul 12 04:20:27 -- failure 1 -- > 2022-07-12T04:20:27.9601093Z Jul 12 04:20:27 [Any cause contains message 'Not > a file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=2'] > 2022-07-12T04:20:27.9602679Z Jul 12 04:20:27 Expecting any element of: > 2022-07-12T04:20:27.9603145Z Jul 12 04:20:27 > [org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: > Fail to create input splits. > 2022-07-12T04:20:27.9603766Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98) > 2022-07-12T04:20:27.9604403Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:153) > 2022-07-12T04:20:27.9605039Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:117) > 2022-07-12T04:20:27.9605902Z Jul 12 04:20:27 ...(75 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9606408Z Jul 12 04:20:27 java.io.IOException: Fail to > create input splits. > 2022-07-12T04:20:27.9606952Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:84) > 2022-07-12T04:20:27.960Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:69) > 2022-07-12T04:20:27.9608438Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:158) > 2022-07-12T04:20:27.9609261Z Jul 12 04:20:27 ...(79 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9609899Z Jul 12 04:20:27 > java.util.concurrent.ExecutionException: java.io.IOException: Not a file: > file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1 > 2022-07-12T04:20:27.9610503Z Jul 12 04:20:27 at > java.util.concurrent.FutureTask.report(FutureTask.java:122) > 2022-07-12T04:20:27.9610998Z Jul 12 04:20:27 at > java.util.concurrent.FutureTask.get(FutureTask.java:192) > 2022-07-12T04:20:27.9611574Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:79) > 2022-07-12T04:20:27.9612390Z Jul 12 04:20:27 ...(81 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9612953Z Jul 12 04:20:27 java.io.IOException: Not a > file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1 > 2022-07-12T04:20:27.9613522Z Jul 12 04:20:27 at > org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322) > 2022-07-12T04:20:27.9614119Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:134) > 2022-07-12T04:20:27.9614731Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:96) > 2022-07-12T04:20:27.9615535Z Jul 12 04:20:27 ...(4 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed)] > 2022-07-12T04:20:27.9616042Z Jul 12 04:20:27 to satisfy the given assertions > requirements but none did: > 2022-07-12T04:20:27.9616381Z Jul 12 04:20:27 > 2022-07-12T04:20:27.9616795Z Jul 12 04:20:27 > org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: > Fail to create input splits. > 2022-07-12T04:20:27.9617407Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98) >
[jira] [Created] (FLINK-28529) ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode failed with CheckpointException: Checkpoint expired before completi
Huang Xingbo created FLINK-28529: Summary: ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode failed with CheckpointException: Checkpoint expired before completing Key: FLINK-28529 URL: https://issues.apache.org/jira/browse/FLINK-28529 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends Affects Versions: 1.16.0 Reporter: Huang Xingbo {code:java} 2022-07-12T04:30:49.9912088Z Jul 12 04:30:49 [ERROR] ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode Time elapsed: 617.048 s <<< ERROR! 2022-07-12T04:30:49.9913108Z Jul 12 04:30:49 java.util.concurrent.ExecutionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing. 2022-07-12T04:30:49.9913880Z Jul 12 04:30:49at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 2022-07-12T04:30:49.9914606Z Jul 12 04:30:49at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) 2022-07-12T04:30:49.9915572Z Jul 12 04:30:49at org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode(ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:125) 2022-07-12T04:30:49.9916483Z Jul 12 04:30:49at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2022-07-12T04:30:49.9917377Z Jul 12 04:30:49at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2022-07-12T04:30:49.9918121Z Jul 12 04:30:49at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2022-07-12T04:30:49.9918788Z Jul 12 04:30:49at java.lang.reflect.Method.invoke(Method.java:498) 2022-07-12T04:30:49.9919456Z Jul 12 04:30:49at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) 2022-07-12T04:30:49.9920193Z Jul 12 04:30:49at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 2022-07-12T04:30:49.9920923Z Jul 12 04:30:49at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) 2022-07-12T04:30:49.9921630Z Jul 12 04:30:49at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 2022-07-12T04:30:49.9922326Z Jul 12 04:30:49at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 2022-07-12T04:30:49.9923023Z Jul 12 04:30:49at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 2022-07-12T04:30:49.9923708Z Jul 12 04:30:49at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) 2022-07-12T04:30:49.9924449Z Jul 12 04:30:49at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) 2022-07-12T04:30:49.9925124Z Jul 12 04:30:49at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) 2022-07-12T04:30:49.9925912Z Jul 12 04:30:49at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) 2022-07-12T04:30:49.9926742Z Jul 12 04:30:49at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) 2022-07-12T04:30:49.9928142Z Jul 12 04:30:49at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) 2022-07-12T04:30:49.9928715Z Jul 12 04:30:49at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) 2022-07-12T04:30:49.9929311Z Jul 12 04:30:49at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) 2022-07-12T04:30:49.9929863Z Jul 12 04:30:49at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) 2022-07-12T04:30:49.9930376Z Jul 12 04:30:49at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) 2022-07-12T04:30:49.9930911Z Jul 12 04:30:49at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) 2022-07-12T04:30:49.9931441Z Jul 12 04:30:49at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) 2022-07-12T04:30:49.9931975Z Jul 12 04:30:49at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) 2022-07-12T04:30:49.9932493Z Jul 12 04:30:49at org.junit.runners.ParentRunner.run(ParentRunner.java:413) 2022-07-12T04:30:49.9932966Z Jul 12 04:30:49at org.junit.runners.Suite.runChild(Suite.java:128) 2022-07-12T04:30:49.9933427Z Jul 12 04:30:49at org.junit.runners.Suite.runChild(Suite.java:27) 2022-07-12T04:30:49.9933911Z Jul 12 04:30:49at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) 2022-07-12T04:30:49.9934424Z Jul 12 04:30:49at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) 2022-07-12T04:30:49.9934951Z Jul 12 04:30:49at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
[jira] [Updated] (FLINK-28528) Table.getSchema fails on table with watermark
[ https://issues.apache.org/jira/browse/FLINK-28528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-28528: --- Description: The bug can be reproduced with the following test. The test can pass if we use the commented way to define the watermark. {code:python} def test_flink_2(self): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) table = t_env.from_descriptor( TableDescriptor.for_connector("filesystem") .schema( Schema.new_builder() .column("name", DataTypes.STRING()) .column("cost", DataTypes.INT()) .column("distance", DataTypes.INT()) .column("time", DataTypes.TIMESTAMP(3)) .watermark("time", expr.col("time") - expr.lit(60).seconds) # .watermark("time", "`time` - INTERVAL '60' SECOND") .build() ) .format("csv") .option("path", "./input.csv") .build() ) print(table.get_schema()) {code} It causes the following exception {code:none} E pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Expression 'minus(time, 6)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation. E at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51) E at org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455) E at java.util.Collections$SingletonList.forEach(Collections.java:4824) E at org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451) E at org.apache.flink.table.api.Table.getSchema(Table.java:101) E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) E at java.lang.reflect.Method.invoke(Method.java:498) E at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) E at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) E at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) E at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) E at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) E at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) E at java.lang.Thread.run(Thread.java:748) {code} was: The bug can be reproduced with the following test. The test can pass if we use the commented way to define the watermark. {code:python} def test_flink_2(self): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) table = t_env.from_descriptor( TableDescriptor.for_connector("filesystem") .schema( Schema.new_builder() .column("name", DataTypes.STRING()) .column("cost", DataTypes.INT()) .column("distance", DataTypes.INT()) .column("time", DataTypes.TIMESTAMP(3)) .watermark("time", expr.col("time") - expr.lit(60).seconds) # .watermark("time", "`time` - INTERVAL '60' SECOND") .build() ) .format("csv") .option("path", "./input.csv") .build() ) print(table.get_schema()) {code} It causes the following exception {code:none} // Some comments here E pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Expression 'minus(time, 6)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation. E at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51) E at org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455) E at java.util.Collections$SingletonList.forEach(Collections.java:4824) E at org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451) E at org.apache.flink.table.api.Table.getSchema(Table.java:101) E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) E at
[jira] [Created] (FLINK-28528) Table.getSchema fails on table with watermark
Xuannan Su created FLINK-28528: -- Summary: Table.getSchema fails on table with watermark Key: FLINK-28528 URL: https://issues.apache.org/jira/browse/FLINK-28528 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.1 Reporter: Xuannan Su The bug can be reproduced with the following test. The test can pass if we use the commented way to define the watermark. {code:python} def test_flink_2(self): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) table = t_env.from_descriptor( TableDescriptor.for_connector("filesystem") .schema( Schema.new_builder() .column("name", DataTypes.STRING()) .column("cost", DataTypes.INT()) .column("distance", DataTypes.INT()) .column("time", DataTypes.TIMESTAMP(3)) .watermark("time", expr.col("time") - expr.lit(60).seconds) # .watermark("time", "`time` - INTERVAL '60' SECOND") .build() ) .format("csv") .option("path", "./input.csv") .build() ) print(table.get_schema()) {code} It causes the following exception {code:none} // Some comments here E pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Expression 'minus(time, 6)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation. E at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51) E at org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455) E at java.util.Collections$SingletonList.forEach(Collections.java:4824) E at org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451) E at org.apache.flink.table.api.Table.getSchema(Table.java:101) E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) E at java.lang.reflect.Method.invoke(Method.java:498) E at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) E at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) E at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) E at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) E at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) E at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) E at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28267) KafkaSourceLegacyITCase.testBrokerFailure hang on azure
[ https://issues.apache.org/jira/browse/FLINK-28267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren reassigned FLINK-28267: - Assignee: Qingsheng Ren > KafkaSourceLegacyITCase.testBrokerFailure hang on azure > --- > > Key: FLINK-28267 > URL: https://issues.apache.org/jira/browse/FLINK-28267 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.16.0 >Reporter: Zhu Zhu >Assignee: Qingsheng Ren >Priority: Critical > > "main" #1 prio=5 os_prio=0 tid=0x7f5ae000b800 nid=0x22c2 waiting on > condition [0x7f5ae8398000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xa5565010> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:67) > at > org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runBrokerFailureTest(KafkaConsumerTestBase.java:1506) > at > org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase.testBrokerFailure(KafkaSourceLegacyITCase.java:94) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37247=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203=41526 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28527) Fail to lateral join with UDTF from Table with timstamp column
Xuannan Su created FLINK-28527: -- Summary: Fail to lateral join with UDTF from Table with timstamp column Key: FLINK-28527 URL: https://issues.apache.org/jira/browse/FLINK-28527 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.1 Reporter: Xuannan Su The bug can be reproduced with the following test {code:python} def test_flink(self): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) table = t_env.from_descriptor( TableDescriptor.for_connector("filesystem") .schema( Schema.new_builder() .column("name", DataTypes.STRING()) .column("cost", DataTypes.INT()) .column("distance", DataTypes.INT()) .column("time", DataTypes.TIMESTAMP(3)) .watermark("time", "`time` - INTERVAL '60' SECOND") .build() ) .format("csv") .option("path", "./input.csv") .build() ) @udtf(result_types=DataTypes.INT()) def table_func(row: Row): return row.cost + row.distance table = table.join_lateral(table_func.alias("cost_times_distance")) table.execute().print() {code} It causes the following exception {code:none} E pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Unsupported Python SqlFunction CAST. E at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:146) E at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:429) E at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:135) E at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.extractPythonTableFunctionInfo(CommonExecPythonCorrelate.java:133) E at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.createPythonOneInputTransformation(CommonExecPythonCorrelate.java:106) E at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.translateToPlanInternal(CommonExecPythonCorrelate.java:95) E at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) E at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249) E at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:136) E at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) E at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:79) E at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) E at scala.collection.Iterator.foreach(Iterator.scala:937) E at scala.collection.Iterator.foreach$(Iterator.scala:937) E at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) E at scala.collection.IterableLike.foreach(IterableLike.scala:70) E at scala.collection.IterableLike.foreach$(IterableLike.scala:69) E at scala.collection.AbstractIterable.foreach(Iterable.scala:54) E at scala.collection.TraversableLike.map(TraversableLike.scala:233) E at scala.collection.TraversableLike.map$(TraversableLike.scala:226) E at scala.collection.AbstractTraversable.map(Traversable.scala:104) E at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:78) E at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181) E at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656) E at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828) E at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317) E at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605) E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) E at
[jira] [Created] (FLINK-28526) Fail to lateral join with UDTF from Table with timstamp column
Xuannan Su created FLINK-28526: -- Summary: Fail to lateral join with UDTF from Table with timstamp column Key: FLINK-28526 URL: https://issues.apache.org/jira/browse/FLINK-28526 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.1 Reporter: Xuannan Su The bug can be reproduced with the following test {code:python} def test_flink(self): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) table = t_env.from_descriptor( TableDescriptor.for_connector("filesystem") .schema( Schema.new_builder() .column("name", DataTypes.STRING()) .column("cost", DataTypes.INT()) .column("distance", DataTypes.INT()) .column("time", DataTypes.TIMESTAMP(3)) .watermark("time", "`time` - INTERVAL '60' SECOND") .build() ) .format("csv") .option("path", "./input.csv") .build() ) @udtf(result_types=DataTypes.INT()) def table_func(row: Row): return row.cost + row.distance table = table.join_lateral(table_func.alias("cost_times_distance")) table.execute().print() {code} It causes the following exception {code:none} E pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Unsupported Python SqlFunction CAST. E at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:146) E at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:429) E at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:135) E at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.extractPythonTableFunctionInfo(CommonExecPythonCorrelate.java:133) E at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.createPythonOneInputTransformation(CommonExecPythonCorrelate.java:106) E at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.translateToPlanInternal(CommonExecPythonCorrelate.java:95) E at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) E at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249) E at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:136) E at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) E at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:79) E at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) E at scala.collection.Iterator.foreach(Iterator.scala:937) E at scala.collection.Iterator.foreach$(Iterator.scala:937) E at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) E at scala.collection.IterableLike.foreach(IterableLike.scala:70) E at scala.collection.IterableLike.foreach$(IterableLike.scala:69) E at scala.collection.AbstractIterable.foreach(Iterable.scala:54) E at scala.collection.TraversableLike.map(TraversableLike.scala:233) E at scala.collection.TraversableLike.map$(TraversableLike.scala:226) E at scala.collection.AbstractTraversable.map(Traversable.scala:104) E at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:78) E at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181) E at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656) E at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828) E at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317) E at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605) E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) E at
[jira] [Commented] (FLINK-28525) HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with AssertJMultipleFailuresError
[ https://issues.apache.org/jira/browse/FLINK-28525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566100#comment-17566100 ] Huang Xingbo commented on FLINK-28525: -- Hi [~luoyuxia] , could you help take a look? > HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with > AssertJMultipleFailuresError > - > > Key: FLINK-28525 > URL: https://issues.apache.org/jira/browse/FLINK-28525 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-07-12T04:20:27.9597185Z Jul 12 04:20:27 [ERROR] > org.apache.flink.connectors.hive.HiveDialectITCase.testTableWithSubDirsInPartitionDir > Time elapsed: 16.31 s <<< FAILURE! > 2022-07-12T04:20:27.9598238Z Jul 12 04:20:27 > org.assertj.core.error.AssertJMultipleFailuresError: > 2022-07-12T04:20:27.9598978Z Jul 12 04:20:27 > 2022-07-12T04:20:27.9599419Z Jul 12 04:20:27 Multiple Failures (1 failure) > 2022-07-12T04:20:27.9600269Z Jul 12 04:20:27 -- failure 1 -- > 2022-07-12T04:20:27.9601093Z Jul 12 04:20:27 [Any cause contains message 'Not > a file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=2'] > 2022-07-12T04:20:27.9602679Z Jul 12 04:20:27 Expecting any element of: > 2022-07-12T04:20:27.9603145Z Jul 12 04:20:27 > [org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: > Fail to create input splits. > 2022-07-12T04:20:27.9603766Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98) > 2022-07-12T04:20:27.9604403Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:153) > 2022-07-12T04:20:27.9605039Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:117) > 2022-07-12T04:20:27.9605902Z Jul 12 04:20:27 ...(75 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9606408Z Jul 12 04:20:27 java.io.IOException: Fail to > create input splits. > 2022-07-12T04:20:27.9606952Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:84) > 2022-07-12T04:20:27.960Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:69) > 2022-07-12T04:20:27.9608438Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:158) > 2022-07-12T04:20:27.9609261Z Jul 12 04:20:27 ...(79 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9609899Z Jul 12 04:20:27 > java.util.concurrent.ExecutionException: java.io.IOException: Not a file: > file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1 > 2022-07-12T04:20:27.9610503Z Jul 12 04:20:27 at > java.util.concurrent.FutureTask.report(FutureTask.java:122) > 2022-07-12T04:20:27.9610998Z Jul 12 04:20:27 at > java.util.concurrent.FutureTask.get(FutureTask.java:192) > 2022-07-12T04:20:27.9611574Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:79) > 2022-07-12T04:20:27.9612390Z Jul 12 04:20:27 ...(81 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed), > 2022-07-12T04:20:27.9612953Z Jul 12 04:20:27 java.io.IOException: Not a > file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1 > 2022-07-12T04:20:27.9613522Z Jul 12 04:20:27 at > org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322) > 2022-07-12T04:20:27.9614119Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:134) > 2022-07-12T04:20:27.9614731Z Jul 12 04:20:27 at > org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:96) > 2022-07-12T04:20:27.9615535Z Jul 12 04:20:27 ...(4 remaining lines not > displayed - this can be changed with > Assertions.setMaxStackTraceElementsDisplayed)] > 2022-07-12T04:20:27.9616042Z Jul 12 04:20:27 to satisfy the given assertions > requirements but none did: > 2022-07-12T04:20:27.9616381Z Jul 12 04:20:27 > 2022-07-12T04:20:27.9616795Z Jul 12 04:20:27 > org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: > Fail to create input splits. > 2022-07-12T04:20:27.9617407Z Jul 12 04:20:27 at >
[jira] [Created] (FLINK-28525) HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with AssertJMultipleFailuresError
Huang Xingbo created FLINK-28525: Summary: HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with AssertJMultipleFailuresError Key: FLINK-28525 URL: https://issues.apache.org/jira/browse/FLINK-28525 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.16.0 Reporter: Huang Xingbo {code:java} 2022-07-12T04:20:27.9597185Z Jul 12 04:20:27 [ERROR] org.apache.flink.connectors.hive.HiveDialectITCase.testTableWithSubDirsInPartitionDir Time elapsed: 16.31 s <<< FAILURE! 2022-07-12T04:20:27.9598238Z Jul 12 04:20:27 org.assertj.core.error.AssertJMultipleFailuresError: 2022-07-12T04:20:27.9598978Z Jul 12 04:20:27 2022-07-12T04:20:27.9599419Z Jul 12 04:20:27 Multiple Failures (1 failure) 2022-07-12T04:20:27.9600269Z Jul 12 04:20:27 -- failure 1 -- 2022-07-12T04:20:27.9601093Z Jul 12 04:20:27 [Any cause contains message 'Not a file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=2'] 2022-07-12T04:20:27.9602679Z Jul 12 04:20:27 Expecting any element of: 2022-07-12T04:20:27.9603145Z Jul 12 04:20:27 [org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: Fail to create input splits. 2022-07-12T04:20:27.9603766Z Jul 12 04:20:27at org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98) 2022-07-12T04:20:27.9604403Z Jul 12 04:20:27at org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:153) 2022-07-12T04:20:27.9605039Z Jul 12 04:20:27at org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:117) 2022-07-12T04:20:27.9605902Z Jul 12 04:20:27...(75 remaining lines not displayed - this can be changed with Assertions.setMaxStackTraceElementsDisplayed), 2022-07-12T04:20:27.9606408Z Jul 12 04:20:27 java.io.IOException: Fail to create input splits. 2022-07-12T04:20:27.9606952Z Jul 12 04:20:27at org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:84) 2022-07-12T04:20:27.960Z Jul 12 04:20:27at org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:69) 2022-07-12T04:20:27.9608438Z Jul 12 04:20:27at org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:158) 2022-07-12T04:20:27.9609261Z Jul 12 04:20:27...(79 remaining lines not displayed - this can be changed with Assertions.setMaxStackTraceElementsDisplayed), 2022-07-12T04:20:27.9609899Z Jul 12 04:20:27 java.util.concurrent.ExecutionException: java.io.IOException: Not a file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1 2022-07-12T04:20:27.9610503Z Jul 12 04:20:27at java.util.concurrent.FutureTask.report(FutureTask.java:122) 2022-07-12T04:20:27.9610998Z Jul 12 04:20:27at java.util.concurrent.FutureTask.get(FutureTask.java:192) 2022-07-12T04:20:27.9611574Z Jul 12 04:20:27at org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:79) 2022-07-12T04:20:27.9612390Z Jul 12 04:20:27...(81 remaining lines not displayed - this can be changed with Assertions.setMaxStackTraceElementsDisplayed), 2022-07-12T04:20:27.9612953Z Jul 12 04:20:27 java.io.IOException: Not a file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1 2022-07-12T04:20:27.9613522Z Jul 12 04:20:27at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322) 2022-07-12T04:20:27.9614119Z Jul 12 04:20:27at org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:134) 2022-07-12T04:20:27.9614731Z Jul 12 04:20:27at org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:96) 2022-07-12T04:20:27.9615535Z Jul 12 04:20:27...(4 remaining lines not displayed - this can be changed with Assertions.setMaxStackTraceElementsDisplayed)] 2022-07-12T04:20:27.9616042Z Jul 12 04:20:27 to satisfy the given assertions requirements but none did: 2022-07-12T04:20:27.9616381Z Jul 12 04:20:27 2022-07-12T04:20:27.9616795Z Jul 12 04:20:27 org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: Fail to create input splits. 2022-07-12T04:20:27.9617407Z Jul 12 04:20:27at org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98) 2022-07-12T04:20:27.9618034Z Jul 12 04:20:27at org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:153) 2022-07-12T04:20:27.9618650Z Jul 12 04:20:27at org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:117) 2022-07-12T04:20:27.9619455Z Jul 12 04:20:27...(75 remaining lines not displayed - this can be changed with Assertions.setMaxStackTraceElementsDisplayed)
[jira] [Commented] (FLINK-28524) Translate "User-defined Sources & Sinks"
[ https://issues.apache.org/jira/browse/FLINK-28524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566098#comment-17566098 ] hunter commented on FLINK-28524: I am very interested in this part, please assign it to me > Translate "User-defined Sources & Sinks" > > > Key: FLINK-28524 > URL: https://issues.apache.org/jira/browse/FLINK-28524 > Project: Flink > Issue Type: Improvement >Reporter: hunter >Priority: Minor > > The file is docs/content.zh/docs/dev/table/sourcesSinks.md > The link is > [用户自定义Sources|https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sourcessinks/] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28524) Translate "User-defined Sources & Sinks"
hunter created FLINK-28524: -- Summary: Translate "User-defined Sources & Sinks" Key: FLINK-28524 URL: https://issues.apache.org/jira/browse/FLINK-28524 Project: Flink Issue Type: Improvement Reporter: hunter The file is docs/content.zh/docs/dev/table/sourcesSinks.md The link is [用户自定义Sources|https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sourcessinks/] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28267) KafkaSourceLegacyITCase.testBrokerFailure hang on azure
[ https://issues.apache.org/jira/browse/FLINK-28267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566096#comment-17566096 ] Huang Xingbo commented on FLINK-28267: -- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=38057=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203 > KafkaSourceLegacyITCase.testBrokerFailure hang on azure > --- > > Key: FLINK-28267 > URL: https://issues.apache.org/jira/browse/FLINK-28267 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.16.0 >Reporter: Zhu Zhu >Priority: Critical > > "main" #1 prio=5 os_prio=0 tid=0x7f5ae000b800 nid=0x22c2 waiting on > condition [0x7f5ae8398000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xa5565010> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:67) > at > org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runBrokerFailureTest(KafkaConsumerTestBase.java:1506) > at > org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase.testBrokerFailure(KafkaSourceLegacyITCase.java:94) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37247=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203=41526 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28523) LocalRecoveryITCase failed with AssertionError
Huang Xingbo created FLINK-28523: Summary: LocalRecoveryITCase failed with AssertionError Key: FLINK-28523 URL: https://issues.apache.org/jira/browse/FLINK-28523 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.16.0 Reporter: Huang Xingbo {code:java} 2022-07-12T02:28:07.2243130Z Jul 12 02:28:07 [ERROR] LocalRecoveryITCase.executeTest Time elapsed: 29.154 s <<< FAILURE! 2022-07-12T02:28:07.2243760Z Jul 12 02:28:07 java.lang.AssertionError: Job completed with illegal application status: UNKNOWN. 2022-07-12T02:28:07.2244333Z Jul 12 02:28:07at org.junit.Assert.fail(Assert.java:89) 2022-07-12T02:28:07.2245097Z Jul 12 02:28:07at org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.testSlidingTimeWindow(EventTimeWindowCheckpointingITCase.java:529) 2022-07-12T02:28:07.2245983Z Jul 12 02:28:07at org.apache.flink.test.checkpointing.LocalRecoveryITCase.executeTest(LocalRecoveryITCase.java:84) 2022-07-12T02:28:07.2246802Z Jul 12 02:28:07at org.apache.flink.test.checkpointing.LocalRecoveryITCase.executeTest(LocalRecoveryITCase.java:66) 2022-07-12T02:28:07.2247680Z Jul 12 02:28:07at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2022-07-12T02:28:07.2248340Z Jul 12 02:28:07at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2022-07-12T02:28:07.2249573Z Jul 12 02:28:07at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2022-07-12T02:28:07.2250788Z Jul 12 02:28:07at java.lang.reflect.Method.invoke(Method.java:498) 2022-07-12T02:28:07.2251836Z Jul 12 02:28:07at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) 2022-07-12T02:28:07.2253000Z Jul 12 02:28:07at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 2022-07-12T02:28:07.2253985Z Jul 12 02:28:07at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) 2022-07-12T02:28:07.2254718Z Jul 12 02:28:07at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 2022-07-12T02:28:07.2255385Z Jul 12 02:28:07at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) 2022-07-12T02:28:07.2256041Z Jul 12 02:28:07at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) 2022-07-12T02:28:07.2256709Z Jul 12 02:28:07at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) 2022-07-12T02:28:07.2257481Z Jul 12 02:28:07at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) 2022-07-12T02:28:07.2258155Z Jul 12 02:28:07at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) 2022-07-12T02:28:07.2258837Z Jul 12 02:28:07at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) 2022-07-12T02:28:07.2259514Z Jul 12 02:28:07at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) 2022-07-12T02:28:07.2260373Z Jul 12 02:28:07at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) 2022-07-12T02:28:07.2261087Z Jul 12 02:28:07at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) 2022-07-12T02:28:07.2261755Z Jul 12 02:28:07at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) 2022-07-12T02:28:07.2262398Z Jul 12 02:28:07at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) 2022-07-12T02:28:07.2263045Z Jul 12 02:28:07at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) 2022-07-12T02:28:07.2263680Z Jul 12 02:28:07at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) 2022-07-12T02:28:07.2264302Z Jul 12 02:28:07at org.junit.runners.ParentRunner.run(ParentRunner.java:413) 2022-07-12T02:28:07.2265018Z Jul 12 02:28:07at org.junit.runners.Suite.runChild(Suite.java:128) 2022-07-12T02:28:07.2265581Z Jul 12 02:28:07at org.junit.runners.Suite.runChild(Suite.java:27) 2022-07-12T02:28:07.2266314Z Jul 12 02:28:07at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) 2022-07-12T02:28:07.2266941Z Jul 12 02:28:07at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) 2022-07-12T02:28:07.2267694Z Jul 12 02:28:07at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) 2022-07-12T02:28:07.2268376Z Jul 12 02:28:07at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) 2022-07-12T02:28:07.2269147Z Jul 12 02:28:07at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) 2022-07-12T02:28:07.2269785Z Jul 12 02:28:07at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) 2022-07-12T02:28:07.2270449Z Jul 12 02:28:07at org.junit.runners.ParentRunner.run(ParentRunner.java:413) 2022-07-12T02:28:07.2271039Z Jul
[GitHub] [flink] gaoyunhaii commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API
gaoyunhaii commented on code in PR #19983: URL: https://github.com/apache/flink/pull/19983#discussion_r919556536 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java: ## @@ -128,6 +156,15 @@ public AsyncWaitOperator( this.timeout = timeout; +this.asyncRetryStrategy = asyncRetryStrategy; + +this.retryEnabled = +// construct from utility class +asyncRetryStrategy != NO_RETRY_STRATEGY +// construct from api +|| asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent() Review Comment: Should it be `asyncRetryStrategy != NO_RETRY_STRATEGY` && (asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent() ||asyncRetryStrategy.getRetryPredicate().exceptionPredicate().isPresent())` ? otherwise if user construct a new strategy return empty for both methods, it should be indeed not enabled? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java: ## @@ -154,6 +190,14 @@ public void setup( default: throw new IllegalStateException("Unknown async mode: " + outputMode + '.'); } +if (asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()) { Review Comment: I tend we do not leave retryResultPredicate / retryExceptionPredicate to be null to avoid possible errors. It could be ``` retryResultPredicate = asyncRetryStrategy.getRetryPredicate().resultPredicate() .orElse(ignored -> false); retryExceptionPredicate = asyncRetryStrategy.getRetryPredicate().exceptionPredicate() .orElse(ignored -> false); ``` Then we could also simplify the test to ``` boolean satisfy = (null != results && retryResultPredicate.test(results)) || (null != error && retryExceptionPredicate.test(error)); ``` ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java: ## @@ -318,6 +389,143 @@ private void outputCompletedElement() { } } +/** + * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry + * in-flight flag and remove retry handler from the incomplete retry handlers. + */ +private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator) +throws Exception { +doRetry(resultHandlerDelegator); + +// reset retryInFlight for next possible retry +resultHandlerDelegator.retryInFlight.set(false); +// remove from incomplete retry handlers +inFlightDelayRetryHandlers.remove(resultHandlerDelegator); +} + +/** Increments number of attempts and fire the attempt. */ +private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception { +// increment current attempt number +resultHandlerDelegator.currentAttempts++; + +// fire a new attempt +userFunction.asyncInvoke( +resultHandlerDelegator.resultHandler.inputRecord.getValue(), +resultHandlerDelegator); +} + +/** A delegator holds the real {@link ResultHandler} to handle retries. */ +private class RetryableResultHandlerDelegator implements ResultFuture { + +private final ResultHandler resultHandler; +private final ProcessingTimeService processingTimeService; + +private ScheduledFuture delayedRetryTimer; + +/** start from 1, when this entry created, the first attempt will happen. */ +private int currentAttempts = 1; + +/** + * A guard similar to ResultHandler.complete to prevent repeated complete calls from + * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new + * retry request if true. And wil be reset to false after the retry fired. + */ +private final AtomicBoolean retryInFlight = new AtomicBoolean(false); + +public RetryableResultHandlerDelegator( +StreamRecord inputRecord, +ResultFuture resultFuture, +ProcessingTimeService processingTimeService) { +this.resultHandler = new ResultHandler(inputRecord, resultFuture); +this.processingTimeService = processingTimeService; +} + +public void registerTimeout(long timeout) { +resultHandler.registerTimeout(processingTimeService, timeout); +} + +@Override +public void complete(Collection results) { +Preconditions.checkNotNull( +results, "Results must not be null, use empty collection to emit nothing"); +if (retryEnabled +&&
[GitHub] [flink-kubernetes-operator] wangyang0918 merged pull request #306: [FLINK-28364] Add Python Job example using Kubernetes Operator
wangyang0918 merged PR #306: URL: https://github.com/apache/flink-kubernetes-operator/pull/306 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on a diff in pull request #20227: [FLINK-28471]Translate hive_read_write.md into Chinese
luoyuxia commented on code in PR #20227: URL: https://github.com/apache/flink/pull/20227#discussion_r919570445 ## docs/content.zh/docs/connectors/table/hive/hive_read_write.md: ## @@ -102,124 +91,119 @@ FROM hive_table ``` -**Notes** +**注意** -- Monitor strategy is to scan all directories/files currently in the location path. Many partitions may cause performance degradation. -- Streaming reads for non-partitioned tables requires that each file be written atomically into the target directory. -- Streaming reading for partitioned tables requires that each partition should be added atomically in the view of hive metastore. If not, new data added to an existing partition will be consumed. -- Streaming reads do not support watermark grammar in Flink DDL. These tables cannot be used for window operators. +- 监控策略是扫描当前位置路径中的所有目录/文件,分区太多可能导致性能下降。 +- 流读非分区表时要求每个文件应原子地写入目标目录。 +- 流读分区表要求每个分区应该被原子地添加进 Hive metastore 中。如果不是的话,只有添加到现有分区的新数据会被消费。 +- 流读 Hive 表不支持 Flink DDL 的 watermark 语法。这些表不能被用于窗口算子。 -### Reading Hive Views +### 读取 Hive Views -Flink is able to read from Hive defined views, but some limitations apply: +Flink 能够读取 Hive 中已经定义的视图。但是也有一些限制: -1) The Hive catalog must be set as the current catalog before you can query the view. -This can be done by either `tableEnv.useCatalog(...)` in Table API or `USE CATALOG ...` in SQL Client. +1) Hive catalog 必须设置成当前的 catalog 才能查询视图。在 Table API 中使用 `tableEnv.useCatalog(...)`,或者在 SQL 客户端使用 `USE CATALOG ...` 来改变当前 catalog。 -2) Hive and Flink SQL have different syntax, e.g. different reserved keywords and literals. -Make sure the view’s query is compatible with Flink grammar. +2) Hive 和 Flink SQL 的语法不同, 比如不同的关键字和字面值。确保查询视图与 Flink 语法兼容。 Review Comment: ```suggestion 2) Hive 和 Flink SQL 的语法不同, 比如不同的关键字和字面值。请确保对视图的查询语法与 Flink 语法兼容。 ``` ## docs/content.zh/docs/connectors/table/hive/hive_read_write.md: ## @@ -25,74 +25,63 @@ specific language governing permissions and limitations under the License. --> -# Hive Read & Write +# Hive 读 & 写 -Using the `HiveCatalog`, Apache Flink can be used for unified `BATCH` and `STREAM` processing of Apache -Hive Tables. This means Flink can be used as a more performant alternative to Hive’s batch engine, -or to continuously read and write data into and out of Hive tables to power real-time data -warehousing applications. +通过使用 `HiveCatalog`,Apache Flink 可以对 Apache Hive 表做统一的批和流处理。这意味着 Flink 可以成为 Hive 批处理引擎的一个性能更好的选择,或者连续读写 Hive 表中的数据以支持实时数据仓库应用。 -## Reading +## 读 -Flink supports reading data from Hive in both `BATCH` and `STREAMING` modes. When run as a `BATCH` -application, Flink will execute its query over the state of the table at the point in time when the -query is executed. `STREAMING` reads will continuously monitor the table and incrementally fetch -new data as it is made available. Flink will read tables as bounded by default. +Flink 支持以批和流两种模式从 Hive 表中读取数据。批读的时候,Flink 会基于执行查询时表的状态进行查询。流读时将持续监控表,并在表中新数据可用时进行增量获取,默认情况下,Flink 将以批模式读取数据。 -`STREAMING` reads support consuming both partitioned and non-partitioned tables. -For partitioned tables, Flink will monitor the generation of new partitions, and read -them incrementally when available. For non-partitioned tables, Flink will monitor the generation -of new files in the folder and read new files incrementally. +流读支持消费分区表和非分区表。对于分区表,Flink 会监控新分区的生成,并且在数据可用的情况下增量获取数据。对于非分区表,Flink 将监控文件夹中新文件的生成,并增量地读取新文件。 -Key -Default -Type -Description +键 +默认值 +类型 +描述 streaming-source.enable false Boolean -Enable streaming source or not. NOTES: Please make sure that each partition/file should be written atomically, otherwise the reader may get incomplete data. +是否启动流读。注意:请确保每个分区/文件都应该原子地写入,否则读取不到完整的数据。 streaming-source.partition.include all String -Option to set the partitions to read, the supported option are `all` and `latest`, the `all` means read all partitions; the `latest` means read latest partition in order of 'streaming-source.partition.order', the `latest` only works` when the streaming hive source table used as temporal table. By default the option is `all`. -Flink supports temporal join the latest hive partition by enabling 'streaming-source.enable' and setting 'streaming-source.partition.include' to 'latest', at the same time, user can assign the partition compare order and data update interval by configuring following partition-related options. +选择读取的分区,可选项为 `all` 和 `latest`,`all` 读取所有分区;`latest` 读取按照 'streaming-source.partition.order' 排序后的最新分区,`latest` 仅在流模式的 Hive 源表作为时态表时有效。默认的选项是 `all`。在开启 'streaming-source.enable' 并设置 'streaming-source.partition.include' 为 'latest' 时,Flink 支持 temporal join 最新的 Hive 分区,同时,用户可以通过配置分区相关的选项来配置分区比较顺序和数据更新时间间隔。
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #306: [FLINK-28364] Add Python Job example using Kubernetes Operator
wangyang0918 commented on PR #306: URL: https://github.com/apache/flink-kubernetes-operator/pull/306#issuecomment-1182672242 Merging now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sunshineJK commented on pull request #20127: [FLINK-26270] Flink SQL write data to kafka by CSV format , whether d…
sunshineJK commented on PR #20127: URL: https://github.com/apache/flink/pull/20127#issuecomment-1182668416 hello @MartijnVisser @afedulov @PatrickRen , Do you have any suggestions for the latest submission ? Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaoyunhaii commented on pull request #20239: [FLINK-27703][core] Extend timeout and add logs for FileChannelManagerImplTest
gaoyunhaii commented on PR #20239: URL: https://github.com/apache/flink/pull/20239#issuecomment-1182650857 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #20217: [FLINK-27693][state] Support local recovery for non-materialized part
rkhachatryan commented on code in PR #20217: URL: https://github.com/apache/flink/pull/20217#discussion_r919544947 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendLocalHandle.java: ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.changelog; + +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateHandleID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.List; + +public class ChangelogStateBackendLocalHandle implements ChangelogStateBackendHandle { +private static final long serialVersionUID = 1L; +private static final Logger LOG = +LoggerFactory.getLogger(ChangelogStateBackendLocalHandle.class); +private final List localMaterialized; +private final List localNonMaterialized; +private final ChangelogStateBackendHandleImpl remoteHandle; + +public ChangelogStateBackendLocalHandle( +List localMaterialized, +List localNonMaterialized, +ChangelogStateBackendHandleImpl remoteHandle) { +this.localMaterialized = localMaterialized; +this.localNonMaterialized = localNonMaterialized; +this.remoteHandle = remoteHandle; +} + +@Override +public List getMaterializedStateHandles() { +return localMaterialized; +} + +@Override +public List getNonMaterializedStateHandles() { +return localNonMaterialized; +} + +@Override +public long getMaterializationID() { +return remoteHandle.getMaterializationID(); +} + +@Override +public ChangelogStateBackendHandle rebound(long checkpointId) { +return remoteHandle.rebound(checkpointId); +} + +public List getRemoteMaterializedStateHandles() { +return remoteHandle.getMaterializedStateHandles(); +} + +public List getRemoteNonMaterializedStateHandles() { +return remoteHandle.getNonMaterializedStateHandles(); +} + +@Override +public long getCheckpointId() { +return remoteHandle.getCheckpointId(); +} + +@Override +public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) { +remoteHandle.registerSharedStates(stateRegistry, checkpointID); +} + +@Override +public long getCheckpointedSize() { +return remoteHandle.getCheckpointedSize(); +} + +@Override +public KeyGroupRange getKeyGroupRange() { +return remoteHandle.getKeyGroupRange(); +} + +@Nullable +@Override +public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { +throw new UnsupportedOperationException( +"This is a local state handle for the TM side only."); +} + +@Override +public StateHandleID getStateHandleId() { +return remoteHandle.getStateHandleId(); +} + +@Override +public void discardState() throws Exception {} Review Comment: Probably this case is not handled: the last checkpoint(s) aborted and then job terminated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #20217: [FLINK-27693][state] Support local recovery for non-materialized part
rkhachatryan commented on code in PR #20217: URL: https://github.com/apache/flink/pull/20217#discussion_r919537669 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendLocalHandle.java: ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.changelog; + +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateHandleID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.List; + +public class ChangelogStateBackendLocalHandle implements ChangelogStateBackendHandle { +private static final long serialVersionUID = 1L; +private static final Logger LOG = +LoggerFactory.getLogger(ChangelogStateBackendLocalHandle.class); +private final List localMaterialized; +private final List localNonMaterialized; +private final ChangelogStateBackendHandleImpl remoteHandle; + +public ChangelogStateBackendLocalHandle( +List localMaterialized, +List localNonMaterialized, +ChangelogStateBackendHandleImpl remoteHandle) { +this.localMaterialized = localMaterialized; +this.localNonMaterialized = localNonMaterialized; +this.remoteHandle = remoteHandle; +} + +@Override +public List getMaterializedStateHandles() { +return localMaterialized; +} + +@Override +public List getNonMaterializedStateHandles() { +return localNonMaterialized; +} + +@Override +public long getMaterializationID() { +return remoteHandle.getMaterializationID(); +} + +@Override +public ChangelogStateBackendHandle rebound(long checkpointId) { +return remoteHandle.rebound(checkpointId); +} + +public List getRemoteMaterializedStateHandles() { +return remoteHandle.getMaterializedStateHandles(); +} + +public List getRemoteNonMaterializedStateHandles() { +return remoteHandle.getNonMaterializedStateHandles(); +} + +@Override +public long getCheckpointId() { +return remoteHandle.getCheckpointId(); +} + +@Override +public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) { +remoteHandle.registerSharedStates(stateRegistry, checkpointID); +} + +@Override +public long getCheckpointedSize() { +return remoteHandle.getCheckpointedSize(); +} + +@Override +public KeyGroupRange getKeyGroupRange() { +return remoteHandle.getKeyGroupRange(); +} + +@Nullable +@Override +public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { +throw new UnsupportedOperationException( +"This is a local state handle for the TM side only."); +} + +@Override +public StateHandleID getStateHandleId() { +return remoteHandle.getStateHandleId(); +} + +@Override +public void discardState() throws Exception {} Review Comment: :+1: IIUC, discard logic is correct: 1. `(Changelog)TaskLocalStateStore` calls this no-op method 1. `LocalStateRegistry` calls `discardState()` on "low-level" handle (`FileStateHandle`) - on checkpoint confirmation 1. `TaskChangelogRegistry` - calls `discardState()` on "low-level" handle (`FileStateHandle`) - when upload is no more needed 1. As opposed to "normal" local state, local changelog state is *not* discarded on recovery (because of 1); but it will be discarded on the 1st checkpoint confirmation Can you confirm that? Although I found it tricky to understand, so it makes sense to document it somewhere, WDYT? (and also to cover with tests). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #20217: [FLINK-27693][state] Support local recovery for non-materialized part
rkhachatryan commented on code in PR #20217: URL: https://github.com/apache/flink/pull/20217#discussion_r919521941 ## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * A DuplicatingOutputStreamWithPos is similar to {@link DuplicatingCheckpointOutputStream} which + * wraps a primary and a secondary OutputStream and duplicates all writes into both streams. The + * difference is that this stream does not delete the file when {@link #close()}. + */ +class DuplicatingOutputStreamWithPos extends OutputStreamWithPos { +private static final Logger LOG = LoggerFactory.getLogger(DuplicatingOutputStreamWithPos.class); + +private final OutputStream secondaryStream; + +public DuplicatingOutputStreamWithPos(OutputStream primaryStream, OutputStream secondaryStream) +throws IOException { +super(primaryStream); +this.secondaryStream = Preconditions.checkNotNull(secondaryStream); +} + +@Override +public void write(int b) throws IOException { +outputStream.write(b); +try { +secondaryStream.write(b); +} catch (Exception ex) { +LOG.warn("Exception encountered during write to secondary stream"); +} Review Comment: if this class remains: I think such error handling might result in data loss on recovery. ditt: other methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #20217: [FLINK-27693][state] Support local recovery for non-materialized part
rkhachatryan commented on code in PR #20217: URL: https://github.com/apache/flink/pull/20217#discussion_r919521374 ## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * A DuplicatingOutputStreamWithPos is similar to {@link DuplicatingCheckpointOutputStream} which + * wraps a primary and a secondary OutputStream and duplicates all writes into both streams. The + * difference is that this stream does not delete the file when {@link #close()}. + */ +class DuplicatingOutputStreamWithPos extends OutputStreamWithPos { Review Comment: Is this class necessary? Can't the existing `DuplicatingCheckpointOutputStream` be used instead? ## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * A DuplicatingOutputStreamWithPos is similar to {@link DuplicatingCheckpointOutputStream} which + * wraps a primary and a secondary OutputStream and duplicates all writes into both streams. The + * difference is that this stream does not delete the file when {@link #close()}. + */ +class DuplicatingOutputStreamWithPos extends OutputStreamWithPos { +private static final Logger LOG = LoggerFactory.getLogger(DuplicatingOutputStreamWithPos.class); + +private final OutputStream secondaryStream; + +public DuplicatingOutputStreamWithPos(OutputStream primaryStream, OutputStream secondaryStream) +throws IOException { +super(primaryStream); +this.secondaryStream = Preconditions.checkNotNull(secondaryStream); +} + +@Override +public void write(int b) throws IOException { +outputStream.write(b); +try { +secondaryStream.write(b); +} catch (Exception ex) { +LOG.warn("Exception encountered during write to secondary stream"); +} Review Comment: if this class remains: think such error handling might result in data loss on recovery. ditt: other methods ## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java: ## @@ -19,50 +19,30 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask; -import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; -import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.StreamStateHandle; -import
[GitHub] [flink-kubernetes-operator] tedhtchang commented on pull request #313: [FLINK-27852][docs] OLM installation and development documentation
tedhtchang commented on PR #313: URL: https://github.com/apache/flink-kubernetes-operator/pull/313#issuecomment-1182614400 @mbalassi Done!. Also added some info for webhook certificate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20258: [FLINK-28522][tests][JUnit5 migration] flink-sequence-file
flinkbot commented on PR #20258: URL: https://github.com/apache/flink/pull/20258#issuecomment-1182611408 ## CI report: * 455af0ef4247c0f4fc6751c16fc483750ad703de UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28522) [JUnit5 Migration] Module: flink-sequence-file
[ https://issues.apache.org/jira/browse/FLINK-28522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28522: --- Labels: pull-request-available (was: ) > [JUnit5 Migration] Module: flink-sequence-file > -- > > Key: FLINK-28522 > URL: https://issues.apache.org/jira/browse/FLINK-28522 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Ryan Skraba >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] RyanSkraba opened a new pull request, #20258: [FLINK-28522][tests][JUnit5 migration] flink-sequence-file
RyanSkraba opened a new pull request, #20258: URL: https://github.com/apache/flink/pull/20258 ## What is the purpose of the change Update the `flink-formats/flink-sequence-file` module to AssertJ and JUnit 5 following the [JUnit 5 Migration Guide](https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU/edit) ## Brief change log * Removed dependences on JUnit 4, JUnit 5 Assertions and Hamcrest where possible. ## Verifying this change This change is a code cleanup without any test coverage. I verified that there were 4 tests run before and after the change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive):no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-28522) [JUnit5 Migration] Module: flink-sequence-file
Ryan Skraba created FLINK-28522: --- Summary: [JUnit5 Migration] Module: flink-sequence-file Key: FLINK-28522 URL: https://issues.apache.org/jira/browse/FLINK-28522 Project: Flink Issue Type: Sub-task Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: Ryan Skraba -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28522) [JUnit5 Migration] Module: flink-sequence-file
[ https://issues.apache.org/jira/browse/FLINK-28522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566073#comment-17566073 ] Ryan Skraba commented on FLINK-28522: - Can this be assigned to me please? > [JUnit5 Migration] Module: flink-sequence-file > -- > > Key: FLINK-28522 > URL: https://issues.apache.org/jira/browse/FLINK-28522 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Ryan Skraba >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27918) SavepointITCase.testStopWithSavepointFailingAfterSnapshotCreation failed with Expected RuntimeException after snapshot creation
[ https://issues.apache.org/jira/browse/FLINK-27918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-27918: --- Labels: stale-critical test-stability (was: test-stability) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Critical but is unassigned and neither itself nor its Sub-Tasks have been updated for 14 days. I have gone ahead and marked it "stale-critical". If this ticket is critical, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > SavepointITCase.testStopWithSavepointFailingAfterSnapshotCreation failed with > Expected RuntimeException after snapshot creation > --- > > Key: FLINK-27918 > URL: https://issues.apache.org/jira/browse/FLINK-27918 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Priority: Critical > Labels: stale-critical, test-stability > > {code:java} > 2022-06-06T03:13:54.0165829Z Jun 06 03:13:54 [ERROR] > org.apache.flink.test.checkpointing.SavepointITCase.testStopWithSavepointFailingAfterSnapshotCreation > Time elapsed: 0.242 s <<< ERROR! > 2022-06-06T03:13:54.0167256Z Jun 06 03:13:54 > java.util.concurrent.ExecutionException: > org.apache.flink.util.FlinkException: Stop with savepoint operation could not > be completed. > 2022-06-06T03:13:54.0173825Z Jun 06 03:13:54 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > 2022-06-06T03:13:54.0174662Z Jun 06 03:13:54 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > 2022-06-06T03:13:54.0180645Z Jun 06 03:13:54 at > org.apache.flink.test.checkpointing.SavepointITCase.testStopWithFailingSourceInOnePipeline(SavepointITCase.java:1175) > 2022-06-06T03:13:54.0181702Z Jun 06 03:13:54 at > org.apache.flink.test.checkpointing.SavepointITCase.testStopWithSavepointFailingAfterSnapshotCreation(SavepointITCase.java:1020) > 2022-06-06T03:13:54.0182472Z Jun 06 03:13:54 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-06-06T03:13:54.0184012Z Jun 06 03:13:54 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-06-06T03:13:54.0185109Z Jun 06 03:13:54 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-06-06T03:13:54.0185907Z Jun 06 03:13:54 at > java.lang.reflect.Method.invoke(Method.java:498) > 2022-06-06T03:13:54.0187049Z Jun 06 03:13:54 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2022-06-06T03:13:54.0188081Z Jun 06 03:13:54 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2022-06-06T03:13:54.0189241Z Jun 06 03:13:54 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > 2022-06-06T03:13:54.0190002Z Jun 06 03:13:54 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2022-06-06T03:13:54.0190704Z Jun 06 03:13:54 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2022-06-06T03:13:54.0191400Z Jun 06 03:13:54 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > 2022-06-06T03:13:54.0192051Z Jun 06 03:13:54 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > 2022-06-06T03:13:54.0192883Z Jun 06 03:13:54 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2022-06-06T03:13:54.0194154Z Jun 06 03:13:54 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > 2022-06-06T03:13:54.0195096Z Jun 06 03:13:54 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-06-06T03:13:54.0196226Z Jun 06 03:13:54 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > 2022-06-06T03:13:54.0197088Z Jun 06 03:13:54 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > 2022-06-06T03:13:54.0198037Z Jun 06 03:13:54 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > 2022-06-06T03:13:54.0199186Z Jun 06 03:13:54 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > 2022-06-06T03:13:54.0200147Z Jun 06 03:13:54 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > 2022-06-06T03:13:54.0200956Z Jun 06 03:13:54 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > 2022-06-06T03:13:54.0201666Z Jun 06 03:13:54 at >
[jira] [Updated] (FLINK-26907) RMQSourceITCase failed on azure due to ContainerLaunchException: Container startup failed
[ https://issues.apache.org/jira/browse/FLINK-26907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-26907: --- Labels: auto-deprioritized-major test-stability (was: stale-major test-stability) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > RMQSourceITCase failed on azure due to ContainerLaunchException: Container > startup failed > - > > Key: FLINK-26907 > URL: https://issues.apache.org/jira/browse/FLINK-26907 > Project: Flink > Issue Type: Bug > Components: Build System, Connectors/ RabbitMQ >Affects Versions: 1.15.0 >Reporter: Yun Gao >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > {code:java} > 2-03-28T09:41:01.3374229Z Mar 28 09:41:01 [ERROR] Tests run: 1, Failures: 0, > Errors: 1, Skipped: 0, Time elapsed: 91.834 s <<< FAILURE! - in > org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase > 2022-03-28T09:41:01.3375722Z Mar 28 09:41:01 [ERROR] > org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase Time elapsed: > 91.834 s <<< ERROR! > 2022-03-28T09:41:01.3376743Z Mar 28 09:41:01 > org.testcontainers.containers.ContainerLaunchException: Container startup > failed > 2022-03-28T09:41:01.3378470Z Mar 28 09:41:01 at > org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:336) > 2022-03-28T09:41:01.3379355Z Mar 28 09:41:01 at > org.testcontainers.containers.GenericContainer.start(GenericContainer.java:317) > 2022-03-28T09:41:01.3380117Z Mar 28 09:41:01 at > org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1066) > 2022-03-28T09:41:01.3381076Z Mar 28 09:41:01 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29) > 2022-03-28T09:41:01.3382198Z Mar 28 09:41:01 at > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2022-03-28T09:41:01.3383575Z Mar 28 09:41:01 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-03-28T09:41:01.3384717Z Mar 28 09:41:01 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > 2022-03-28T09:41:01.3385671Z Mar 28 09:41:01 at > org.junit.runner.JUnitCore.run(JUnitCore.java:137) > 2022-03-28T09:41:01.3386611Z Mar 28 09:41:01 at > org.junit.runner.JUnitCore.run(JUnitCore.java:115) > 2022-03-28T09:41:01.3387691Z Mar 28 09:41:01 at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > 2022-03-28T09:41:01.3388981Z Mar 28 09:41:01 at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) > 2022-03-28T09:41:01.3390250Z Mar 28 09:41:01 at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > 2022-03-28T09:41:01.3391619Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) > 2022-03-28T09:41:01.3393437Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) > 2022-03-28T09:41:01.3394826Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) > 2022-03-28T09:41:01.3396333Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) > 2022-03-28T09:41:01.3397800Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) > 2022-03-28T09:41:01.3399166Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) > 2022-03-28T09:41:01.3400315Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) > 2022-03-28T09:41:01.3401636Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86) > 2022-03-28T09:41:01.3403403Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) > 2022-03-28T09:41:01.3404823Z Mar 28 09:41:01 at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188) > 2022-03-28T09:41:01.3406517Z Mar 28 09:41:01 at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154) > 2022-03-28T09:41:01.3407936Z Mar 28 09:41:01 at >
[jira] [Updated] (FLINK-26772) Application and Job Mode does not wait for job cleanup during shutdown
[ https://issues.apache.org/jira/browse/FLINK-26772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-26772: --- Labels: auto-deprioritized-critical pull-request-available (was: pull-request-available stale-critical) Priority: Major (was: Critical) This issue was labeled "stale-critical" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Critical, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Application and Job Mode does not wait for job cleanup during shutdown > -- > > Key: FLINK-26772 > URL: https://issues.apache.org/jira/browse/FLINK-26772 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.15.0 >Reporter: Mika Naylor >Priority: Major > Labels: auto-deprioritized-critical, pull-request-available > Attachments: FLINK-26772.standalone-job.log, > testcluster-599f4d476b-bghw5_log.txt > > > We discovered that in Application Mode, when the application has completed, > the cluster is shutdown even if there are ongoing resource cleanup events > happening in the background. For example, if ha cleanup fails, further > retries are not attempted as the cluster is shut down before this can happen. > > We should also add a flag for the shutdown that will prevent further jobs > from being submitted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-15123) remove uniqueKeys from FlinkStatistic in blink planner
[ https://issues.apache.org/jira/browse/FLINK-15123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-15123: --- Labels: stale-major starter (was: starter) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 60 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > remove uniqueKeys from FlinkStatistic in blink planner > --- > > Key: FLINK-15123 > URL: https://issues.apache.org/jira/browse/FLINK-15123 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Planner >Reporter: godfrey he >Priority: Major > Labels: stale-major, starter > Attachments: b_5.txt > > > {{uniqueKeys}} is a kind of constraint, it's unreasonable that {{uniqueKeys}} > is a kind of statistic. so we should remove uniqueKeys from > {{FlinkStatistic}} in blink planner. Some temporary solutions (e.g. > {{RichTableSourceQueryOperation}}) should also be resolved after primaryKey > is introduced in {{TableSchema}} -- This message was sent by Atlassian Jira (v8.20.10#820010)