[jira] [Closed] (FLINK-28265) Inconsistency in Kubernetes HA service: broken state handle
[ https://issues.apache.org/jira/browse/FLINK-28265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang closed FLINK-28265. - Fix Version/s: 1.16.0 1.15.3 Resolution: Fixed Fixed via: master(1.16): aae96d0c9d1768c396bdf2ee6510677fbb8f317a release-1.15: fcff4903c8d625edb8f4e33b03bfded52c3deba8 > Inconsistency in Kubernetes HA service: broken state handle > --- > > Key: FLINK-28265 > URL: https://issues.apache.org/jira/browse/FLINK-28265 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes, Runtime / Coordination >Affects Versions: 1.14.4 >Reporter: Robert Metzger >Assignee: Yang Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.15.3 > > Attachments: flink_checkpoint_issue.txt > > > I have a JobManager, which at some point failed to acknowledge a checkpoint: > {code} > Error while processing AcknowledgeCheckpoint message > org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete > the pending checkpoint 193393. Failure reason: Failure to finalize checkpoint. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1255) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1100) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: > org.apache.flink.runtime.persistence.StateHandleStore$AlreadyExistException: > checkpointID-0193393 already exists in ConfigMap > cm--jobmanager-leader > at > org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.getKeyAlreadyExistException(KubernetesStateHandleStore.java:534) > at > org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.lambda$addAndLock$0(KubernetesStateHandleStore.java:155) > at > org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$attemptCheckAndUpdateConfigMap$11(Fabric8FlinkKubeClient.java:316) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown > Source) > ... 3 common frames omitted > {code} > the JobManager creates subsequent checkpoints successfully. > Upon failure, it tries to recover this checkpoint (0193393), but > fails to do so because of: > {code} > Caused by: org.apache.flink.util.FlinkException: Could not retrieve > checkpoint 193393 from state handle under checkpointID-0193393. > This indicates that the retrieved state handle is broken. Try cleaning the > state handle store ... Caused by: java.io.FileNotFoundException: No such file > or directory: s3://xxx/flink-ha/xxx/completedCheckpoint72e30229420c > {code} > I'm running Flink 1.14.4. > Note: This issue has been first discussed here: > https://github.com/apache/flink/pull/15832#pullrequestreview-1005973050 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wangyang0918 merged pull request #20673: [BP-1.15][FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
wangyang0918 merged PR #20673: URL: https://github.com/apache/flink/pull/20673 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on pull request #20673: [BP-1.15][FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
wangyang0918 commented on PR #20673: URL: https://github.com/apache/flink/pull/20673#issuecomment-1229137353 Merging now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on pull request #20684: [FLINK-29105][k8s] Fix the unstable k8s test 'testAddAndLockShouldNotThrowAlreadyExistExceptionWithSameContents'
wangyang0918 commented on PR #20684: URL: https://github.com/apache/flink/pull/20684#issuecomment-1229137197 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29124) Redundant checkNotNull in cli Package
[ https://issues.apache.org/jira/browse/FLINK-29124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ivan Qing updated FLINK-29124: -- Description: Redundant NotNull checks in function cli/CliFrontend. {*}getEffectiveConfiguration{*}(): {code:java} final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions( checkNotNull(programOptions), checkNotNull(jobJars)); {code} while *ExecutionConfigAccessor.fromProgramOptions* indeed does the notNull check {code:java} public static ExecutionConfigAccessor fromProgramOptions( final ProgramOptions options, final List jobJars) { checkNotNull(options); checkNotNull(jobJars); ... } {code} I have searched other *ExecutionConfigAccessor.fromProgramOptions()* usages, and all of them do not use checkNotNull in invokion was: Redundant NotNull checks in function cli/CliFrontend. {*}getEffectiveConfiguration{*}(): {code:java} final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions( checkNotNull(programOptions), checkNotNull(jobJars)); {code} while *ExecutionConfigAccessor.fromProgramOptions* indeed does the notNull check {code:java} public static ExecutionConfigAccessor fromProgramOptions( final ProgramOptions options, final List jobJars) { checkNotNull(options); checkNotNull(jobJars); ... } {code} I have searched other *ExecutionConfigAccessor.fromProgramOptions()* usages, and all of them do not use checkNotNull in invokion > Redundant checkNotNull in cli Package > - > > Key: FLINK-29124 > URL: https://issues.apache.org/jira/browse/FLINK-29124 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: ivan Qing >Priority: Not a Priority > > Redundant NotNull checks in function cli/CliFrontend. > {*}getEffectiveConfiguration{*}(): > {code:java} > final ExecutionConfigAccessor executionParameters = > ExecutionConfigAccessor.fromProgramOptions( > checkNotNull(programOptions), checkNotNull(jobJars)); > {code} > while *ExecutionConfigAccessor.fromProgramOptions* indeed does the notNull > check > {code:java} > public static ExecutionConfigAccessor fromProgramOptions( > final ProgramOptions options, > final List jobJars) { > checkNotNull(options); > checkNotNull(jobJars); > ... > } > {code} > I have searched other *ExecutionConfigAccessor.fromProgramOptions()* usages, > and all of them do not use checkNotNull in invokion -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29124) Redundant checkNotNull in cli Package
[ https://issues.apache.org/jira/browse/FLINK-29124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ivan Qing updated FLINK-29124: -- Description: Redundant NotNull checks in function cli/CliFrontend. {*}getEffectiveConfiguration{*}(): {code:java} final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions( checkNotNull(programOptions), checkNotNull(jobJars)); {code} while *ExecutionConfigAccessor.fromProgramOptions* indeed does the notNull check {code:java} public static ExecutionConfigAccessor fromProgramOptions( final ProgramOptions options, final List jobJars) { checkNotNull(options); checkNotNull(jobJars); ... } {code} I have searched other *ExecutionConfigAccessor.fromProgramOptions()* usages, and all of them do not use checkNotNull in invokion was: Redundant NotNull checks in function cli/CliFrontend. {*}getEffectiveConfiguration{*}(): {code:java} final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions( checkNotNull(programOptions), checkNotNull(jobJars)); {code} while *ExecutionConfigAccessor.fromProgramOptions* indeed does the notNull check {code:java} public static ExecutionConfigAccessor fromProgramOptions( final ProgramOptions options, final List jobJars) { checkNotNull(options); checkNotNull(jobJars); ... } {code} > Redundant checkNotNull in cli Package > - > > Key: FLINK-29124 > URL: https://issues.apache.org/jira/browse/FLINK-29124 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: ivan Qing >Priority: Not a Priority > > Redundant NotNull checks in function cli/CliFrontend. > {*}getEffectiveConfiguration{*}(): > {code:java} > final ExecutionConfigAccessor executionParameters = > ExecutionConfigAccessor.fromProgramOptions( > checkNotNull(programOptions), checkNotNull(jobJars)); > {code} > while *ExecutionConfigAccessor.fromProgramOptions* indeed does the notNull > check > {code:java} > public static ExecutionConfigAccessor fromProgramOptions( > final ProgramOptions options, > final List jobJars) { > checkNotNull(options); > checkNotNull(jobJars); > ... > } > {code} > I have searched other *ExecutionConfigAccessor.fromProgramOptions()* usages, > and all of them do not use checkNotNull in invokion -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29124) Redundant checkNotNull in cli Package
ivan Qing created FLINK-29124: - Summary: Redundant checkNotNull in cli Package Key: FLINK-29124 URL: https://issues.apache.org/jira/browse/FLINK-29124 Project: Flink Issue Type: Improvement Components: Command Line Client Reporter: ivan Qing Redundant NotNull checks in function cli/CliFrontend. {*}getEffectiveConfiguration{*}(): {code:java} final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions( checkNotNull(programOptions), checkNotNull(jobJars)); {code} while *ExecutionConfigAccessor.fromProgramOptions* indeed does the notNull check {code:java} public static ExecutionConfigAccessor fromProgramOptions( final ProgramOptions options, final List jobJars) { checkNotNull(options); checkNotNull(jobJars); ... } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] SwimSweet commented on a diff in pull request #20671: [FLINK-28915] Flink Native k8s mode jar localtion support s3 schema.
SwimSweet commented on code in PR #20671: URL: https://github.com/apache/flink/pull/20671#discussion_r956250930 ## docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md: ## @@ -97,14 +97,34 @@ COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar After creating and publishing the Docker image under `custom-image-name`, you can start an Application cluster with the following command: ```bash +# Local Schema $ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ local:///opt/flink/usrlib/my-flink-job.jar + +# FileSystem +$ ./bin/flink run-application \ +--target kubernetes-application \ +-Dkubernetes.cluster-id=my-first-application-cluster \ +-Dkubernetes.container.image=custom-image-name \ +s3://my-bucket/my-flink-job.jar + +# Http/Https Schema +$ ./bin/flink run-application \ +--target kubernetes-application \ +-Dkubernetes.cluster-id=my-first-application-cluster \ +-Dkubernetes.container.image=custom-image-name \ +http://ip:port/my-flink-job.jar ``` +{{< hint info >}} +Now, The jar package supports reading from the [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}#docker-hub-flink-images) or Http/Https in Application Mode. +The jar package will be downloaded from filesystem to +[kubernetes.user.artifacts.base.dir]({{< ref "docs/deployment/config" >}}#kubernetes-user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image. +{{< /hint >}} +Note `local` schema is also supported . Review Comment: Exactly as you said ,`/opt/job.jar` is equal `file:///opt/job.jar`.This makes it difficult for users to distinguish whether the Jar package is on the client machine or the container. BTW, are there plans to support submitting user jars from the client? If `kubernetes.rest-service.exposed.type` is set to ClusterIP, it is difficult for clients outside k8s to connect to the jobmanager deployed in k8s. please take a look @wangyang0918 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuzhuang2017 commented on a diff in pull request #20660: [FLINK-28998] Translate'Fine-Grained Resource Management' page into Chinese
liuzhuang2017 commented on code in PR #20660: URL: https://github.com/apache/flink/pull/20660#discussion_r952051966 ## docs/content.zh/docs/deployment/finegrained_resource.md: ## @@ -23,97 +23,86 @@ specific language governing permissions and limitations under the License. --> -# Fine-Grained Resource Management -Apache Flink works hard to auto-derive sensible default resource requirements for all applications out of the box. -For users who wish to fine-tune their resource consumption, based on knowledge of their specific scenarios, Flink offers **fine-grained resource management**. +# 细粒度资源管理 -This page describes the fine-grained resource management’s usage, applicable scenarios, and how it works. +Apache Flink 努力为所有开箱即用的应用程序自动派生合理的默认资源需求。对于希望更精细化调节资源消耗的用户,基于对特定场景的了解,Flink 提供了**细粒度资源管理**。 +本文介绍了细粒度资源管理的使用、适用场景以及工作原理。 {{< hint warning >}} -**Note:** This feature is currently an MVP (“minimum viable product”) feature and only available to [DataStream API]({{< ref "docs/dev/datastream/overview" >}}). +**注意:** 本特性是当前的一个最简化产品(版本)的特性,它支持只在 DataStream API [DataStream API]({{< ref "docs/dev/datastream/overview" >}})中使用。 {{< /hint >}} -## Applicable Scenarios +## 使用场景 -Typical scenarios that potentially benefit from fine-grained resource management are where: +可能从细粒度资源管理中受益的典型场景包括: - - Tasks have significantly different parallelisms. +- Tasks 有显著不同的并行度的场景。 - - The resource needed for an entire pipeline is too much to fit into a single slot/task manager. +- 整个pipeline需要的资源太大了以致不能和单一的slot/task Manager相适应的场景。 - - Batch jobs where resources needed for tasks of different stages are significantly different +- 批处理作业,其中不同stage的task所需的资源差异明显。 -An in-depth discussion on why fine-grained resource management can improve resource efficiency for the above scenarios is presented in [How it improves resource efficiency](#how-it-improves-resource-efficiency). +在它如何提高资源利用率 [How it improves resource efficiency](#how-it-improves-resource-efficiency)部分将会对细粒度资源管理为什么在以上使用场景中可以提高资源利用率作深入的讨论。 -## How it works -As described in [Flink Architecture]({{< ref "docs/concepts/flink-architecture" >}}#anatomy-of-a-flink-cluster), -task execution resources in a TaskManager are split into many slots. -The slot is the basic unit of both resource scheduling and resource requirement in Flink's runtime. +## 工作原理 +如Flink架构 [Flink Architecture]({{< ref "docs/concepts/flink-architecture" >}}#anatomy-of-a-flink-cluster)中描述, +在一个TaskManager中,执行task时使用的资源被分割成许多个slots. +slot既是资源调度的基本单元,又是flink运行时申请资源的基本单元. {{< img src="/fig/dynamic_slot_alloc.png" class="center" >}} -With fine-grained resource management, the slots requests contain specific resource profiles, which users can specify. -Flink will respect those user-specified resource requirements and dynamically cut an exactly-matched slot out of the TaskManager’s available -resources. As shown above, there is a requirement for a slot with 0.25 Core and 1GB memory, and Flink allocates *Slot 1* for it. +对于细粒度资源管理,Slot资源请求包含用户指定的特定的资源配置文件。Flink会遵从这些用户指定的资源请求并从TaskManager可用的资源中动态地切分出精确匹配的slot。如上图所示,对于一个slot,0.25core和1G内存的资源申请,Flink为它分配一个slot。 {{< hint info >}} -Previously in Flink, the resource requirement only contained the required slots, without fine-grained resource -profiles, namely **coarse-grained resource management**. The TaskManager had a fixed number of identical slots to fulfill those requirements. +Flink之前的资源申请只包含必须指定的slots,但没有精细化的资源配置,这是一种粗粒度的资源管理.在这种管理方式下, TaskManager以固定相同的slots的个数的方式来满足资源需求。 {{< /hint >}} -For the resource requirement without a specified resource profile, Flink will automatically decide a resource profile. -Currently, the resource profile of it is calculated from [TaskManager’s total resource]({{< ref "docs/deployment/memory/mem_setup_tm" >}}) -and [taskmanager.numberOfTaskSlots]({{< ref "docs/deployment/config" >}}#taskmanager-numberoftaskslots), just -like in coarse-grained resource management. As shown above, the total resource of TaskManager is 1 Core and 4 GB memory and the number of task slots -is set to 2, *Slot 2* is created with 0.5 Core and 2 GB memory for the requirement without a specified resource profile. +对于没有指定资源配置的资源请求,Flink会自动决定资源配置。粗粒度资源管理当前被计算的资源来自TaskManager总资源[TaskManager’s total resource]({{< ref "docs/deployment/memory/mem_setup_tm" >}})和TaskManager的总slot数[taskmanager.numberOfTaskSlots]({{< ref "docs/deployment/config" >}}#taskmanager-numberoftaskslots)。 +如上所示,TaskManager的总资源是1Core和4G内存,task的slot数设置为2,*Slot 2* 被创建,并申请0.5core和2G的内存而没有指定资源配置。 +在分配slot1和slot2后,在TaskManager留下0.25核和1G的内存作为未使用资源. -After the allocation of *Slot 1* and *Slot 2*, there is 0.25 Core and 1 GB memory remaining as the free resources in the -TaskManager. These free resources can be further partitioned to fulfill the following resource requirements. +详情请参考资源分配策略 [Resource Allocation Strategy](#resource-allocation-strategy)。 -Please refer to [Resource Allocation Strategy](#resource-allocation
[GitHub] [flink] xintongsong commented on a diff in pull request #20622: [FLINK-28974][sql-gateway]Add doc for the API and Option of sql gateway
xintongsong commented on code in PR #20622: URL: https://github.com/apache/flink/pull/20622#discussion_r956517011 ## flink-docs/pom.xml: ## @@ -173,6 +173,19 @@ under the License. ${project.version} + + org.apache.flink + flink-sql-gateway + ${project.version} + + + + org.apache.flink + flink-sql-gateway + ${project.version} + test-jar Review Comment: Which part of the test jar do we need? ## docs/layouts/shortcodes/generated/rest_v1_dispatcher.html: ## @@ -4151,6 +4151,13 @@ } } }, + "other-concurrent-attempts" : { +"type" : "array", +"items" : { + "type" : "object", + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo" +} + }, Review Comment: Changes in `rest_v1_dispatcher.html` and `rest_v1_dispatcher.yml` seem unrelated. I assume these are fixes for previous changes that forget to regenerate the open-api & doc files. Therefore, they should be placed in a separate hotfix commit. ## flink-docs/src/main/java/org/apache/flink/docs/rest/SqlGatewayRestAPIDocGenerator.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.docs.rest; + +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.table.gateway.rest.util.DocumentingSqlGatewayRestEndpoint; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; +import org.apache.flink.util.ConfigurationException; + +import java.io.IOException; +import java.nio.file.Paths; + +import static org.apache.flink.docs.rest.RestAPIDocGenerator.createHtmlFile; + +/** + * Generator for the Sql Gateway Rest API documentation. + * + * One HTML file is generated for each {@link RestServerEndpoint} implementation that can be + * embedded into .md files using {@code {% include ${generated.docs.dir}/file.html %}}. Each file + * contains a series of HTML tables, one for each REST call. + * + * The generated table for each REST call looks like this: + * + * + * -- + * | URL| + * -- + * | Verb: verb (GET|POST|...) | Response code: responseCode| + * -- + * | Path parameters (if any are defined) | + * -- + * | - parameterName: description | + * | ... | + * -- + * | Query parameters (if any are defined) | + * -- + * | - parameterName (requisiteness): description | + * | ... | + * -- + * | Request json schema (a collapsible "Request" button) | + * -- + * | Response json schema (a collapsible "Response" button) | + * -- + * + */ +public class SqlGatewayRestAPIDocGenerator { + +/** + * Generates the Sql Gateway REST API documentation. + * + * @param args args[0] contains the directory into which the generated files are placed + * @throws IOException if any file operation failed + */ +public static void main(String[] args) throws IOException, ConfigurationException { +String outputDirectory = args[0]; +for (final SqlGatewayRestAPIVersion apiVersion : SqlGatewayRestAPIVersion.values()) { +if (apiVersion == SqlGatewayRestAPIVersion.V0) { +// this version exists only for testing purposes +
[jira] [Updated] (FLINK-23633) HybridSource: Support dynamic stop position in FileSource
[ https://issues.apache.org/jira/browse/FLINK-23633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-23633: --- Labels: pull-request-available stale-assigned (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it. > HybridSource: Support dynamic stop position in FileSource > - > > Key: FLINK-23633 > URL: https://issues.apache.org/jira/browse/FLINK-23633 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem >Reporter: Thomas Weise >Assignee: Xinbin Huang >Priority: Major > Labels: pull-request-available, stale-assigned > > As of FLINK-22670 FileSource can be used with HybridSource with fixed end > position. To support the scenario where the switch position isn't known ahead > of time, FileSource needs to have a hook to decide when it is time to stop > with continuous polling and then expose the end position through the > enumerator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #20691: [hotfix][Tests] Replace deprecated AbstractThrowableAssert#getRootCause with AbstractThrowableAssert#rootCause
flinkbot commented on PR #20691: URL: https://github.com/apache/flink/pull/20691#issuecomment-1228967801 ## CI report: * cff97f48587863eed84c38b694acb8845c4fd44b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin opened a new pull request, #20691: [hotfix][Tests] Replace deprecated AbstractThrowableAssert#getRootCause with AbstractThrowableAssert#rootCause
snuyanzin opened a new pull request, #20691: URL: https://github.com/apache/flink/pull/20691 ## What is the purpose of the change This a trivial PR replacing deprecated `AbstractThrowableAssert#getRootCause` with `AbstractThrowableAssert#rootCause` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no ) - The S3 file system connector: (no ) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20663: [FLINK-29067][Table SQL/API] Replace deprecated SqlParser#configBuilder with SqlParser#config
snuyanzin commented on PR #20663: URL: https://github.com/apache/flink/pull/20663#issuecomment-1228763271 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SwimSweet commented on a diff in pull request #20671: [FLINK-28915] Flink Native k8s mode jar localtion support s3 schema.
SwimSweet commented on code in PR #20671: URL: https://github.com/apache/flink/pull/20671#discussion_r956250930 ## docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md: ## @@ -97,14 +97,34 @@ COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar After creating and publishing the Docker image under `custom-image-name`, you can start an Application cluster with the following command: ```bash +# Local Schema $ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ local:///opt/flink/usrlib/my-flink-job.jar + +# FileSystem +$ ./bin/flink run-application \ +--target kubernetes-application \ +-Dkubernetes.cluster-id=my-first-application-cluster \ +-Dkubernetes.container.image=custom-image-name \ +s3://my-bucket/my-flink-job.jar + +# Http/Https Schema +$ ./bin/flink run-application \ +--target kubernetes-application \ +-Dkubernetes.cluster-id=my-first-application-cluster \ +-Dkubernetes.container.image=custom-image-name \ +http://ip:port/my-flink-job.jar ``` +{{< hint info >}} +Now, The jar package supports reading from the [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}#docker-hub-flink-images) or Http/Https in Application Mode. +The jar package will be downloaded from filesystem to +[kubernetes.user.artifacts.base.dir]({{< ref "docs/deployment/config" >}}#kubernetes-user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image. +{{< /hint >}} +Note `local` schema is also supported . Review Comment: Exactly as you said ,`/opt/job.jar` is equal `file:///opt/job.jar`.This makes it difficult for users to distinguish whether the Jar package is on the client machine or the container. BTW, are there plans to support submitting user jars from the client? If `kubernetes.rest-service.exposed.type` is set to ClusterIP, it is difficult client connect the jobmanager deployed in k8s. please take a look @wangyang0918 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SwimSweet commented on a diff in pull request #20671: [FLINK-28915] Flink Native k8s mode jar localtion support s3 schema.
SwimSweet commented on code in PR #20671: URL: https://github.com/apache/flink/pull/20671#discussion_r956250930 ## docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md: ## @@ -97,14 +97,34 @@ COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar After creating and publishing the Docker image under `custom-image-name`, you can start an Application cluster with the following command: ```bash +# Local Schema $ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ local:///opt/flink/usrlib/my-flink-job.jar + +# FileSystem +$ ./bin/flink run-application \ +--target kubernetes-application \ +-Dkubernetes.cluster-id=my-first-application-cluster \ +-Dkubernetes.container.image=custom-image-name \ +s3://my-bucket/my-flink-job.jar + +# Http/Https Schema +$ ./bin/flink run-application \ +--target kubernetes-application \ +-Dkubernetes.cluster-id=my-first-application-cluster \ +-Dkubernetes.container.image=custom-image-name \ +http://ip:port/my-flink-job.jar ``` +{{< hint info >}} +Now, The jar package supports reading from the [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}#docker-hub-flink-images) or Http/Https in Application Mode. +The jar package will be downloaded from filesystem to +[kubernetes.user.artifacts.base.dir]({{< ref "docs/deployment/config" >}}#kubernetes-user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image. +{{< /hint >}} +Note `local` schema is also supported . Review Comment: Exactly as you said ,`/opt/job.jar` is equal `file:///opt/job.jar`.This makes it difficult for users to distinguish whether the Jar package is on the client machine or the container. BTW, are there plans to support submitting user jars from the client? If `kubernetes.rest-service.exposed.type` is set to ClusterIP, it is difficult client connect the jobmanager deployed in k8s. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] tedhtchang commented on pull request #313: [FLINK-27852][docs] OLM installation and development documentation
tedhtchang commented on PR #313: URL: https://github.com/apache/flink-kubernetes-operator/pull/313#issuecomment-1228733052 @shalberd Yes, I had considered to put it in the [redhat marketplace community-operators](https://github.com/redhat-openshift-ecosystem/community-operators-prod/tree/main/operators) before I started this PR. I installed the operator on OCP 4.8 to 4.11 from my own catalogsource image before so it should work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29123) Dynamic paramters are not pushed to working with kubernetes
[ https://issues.apache.org/jira/browse/FLINK-29123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Vary updated FLINK-29123: --- Summary: Dynamic paramters are not pushed to working with kubernetes (was: Dynamic paramters are not pushed to working with kubertenes) > Dynamic paramters are not pushed to working with kubernetes > --- > > Key: FLINK-29123 > URL: https://issues.apache.org/jira/browse/FLINK-29123 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.15.2 >Reporter: Peter Vary >Priority: Major > > It is not possible to push dynamic parameters for the kubernetes deployments -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29123) Dynamic paramters are not pushed to working with kubertenes
Peter Vary created FLINK-29123: -- Summary: Dynamic paramters are not pushed to working with kubertenes Key: FLINK-29123 URL: https://issues.apache.org/jira/browse/FLINK-29123 Project: Flink Issue Type: Bug Components: Deployment / Kubernetes Affects Versions: 1.15.2 Reporter: Peter Vary It is not possible to push dynamic parameters for the kubernetes deployments -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] SwimSweet commented on a diff in pull request #20671: [FLINK-28915] Flink Native k8s mode jar localtion support s3 schema.
SwimSweet commented on code in PR #20671: URL: https://github.com/apache/flink/pull/20671#discussion_r956232987 ## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java: ## @@ -498,6 +498,20 @@ public class KubernetesConfigOptions { "Whether to enable HostNetwork mode. " + "The HostNetwork allows the pod could use the node network namespace instead of the individual pod network namespace. Please note that the JobManager service account should have the permission to update Kubernetes service."); +public static final ConfigOption> KUBERNETES_USER_JAR_ARTIFACT_HTTP_HEADER = +ConfigOptions.key("kubernetes.user.artifacts.http.header") +.mapType() +.noDefaultValue() +.withDescription( +"Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the application job artifacts. " ++ "Expected format: headerKey1:headerValue1,headerKey2:headerValue2."); + +public static final ConfigOption KUBERNETES_USER_ARTIFACTS_BASE_DIR = +ConfigOptions.key("kubernetes.user.artifacts.base.dir") +.stringType() +.defaultValue("/opt/flink/artifacts") Review Comment: Good Idea. It is a good way to keep the jar resource without declaring pv and pvc .But ,I think We need to add a configuration to control whether `empty dir` is enabled.As far as I know, `empty dir` is forbidden in some user k8s cluster.I will create a ticket and attach a PR for this feature after finishing PR `FLINK-28915`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29108) Kubernetes operator: Support queryable state
[ https://issues.apache.org/jira/browse/FLINK-29108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585531#comment-17585531 ] Ron Crocker commented on FLINK-29108: - [~masteryhx] In a word, yes. There's two things in play: # While its "reaching end of life," Queryable State remains a supported capability in all of the Flink versions that the operator currently supports. My company would like to use this Kubernetes operator to manage our Flink jobs, and some of those jobs require Queryable State. We can't use this operator for those jobs until it supports Queryable State for some of those jobs. # I'm trying to rescue Queryable State from deprecation. In [my recent presentation at Flink Forward|[http://example.com|https://www.slideshare.net/FlinkForward/using-queryable-state-for-fun-and-profit]] I made what I'd claim is a fairly strong argument for keeping queryable state in the Flink feature set. ({_}TLDR: Using Flink Queryable State, I can save >90% of the cost of the equivalent Redis-based solution{_}) I'm looking for allies in the fight to keep Queryable State alive. > Kubernetes operator: Support queryable state > > > Key: FLINK-29108 > URL: https://issues.apache.org/jira/browse/FLINK-29108 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Ron Crocker >Priority: Minor > > Enable the kubernetes operator to deploy jobs where queryable state is > desired. > When queryable state is desired, the operator should configure the deployed > job with > # The deployed job has {{queryable-state.enabled:}} {{true}} applied to it. > # Configure the Queryable State proxy and Queryable State server (via the > {{queryable-state.proxy}} and {{queryable-state.server}} configuration > sections respectively). If these sections aren't provided, then the default > configuration is used. > The operator will need to create a Kubernetes service fronting the Task > Managers {{QueryableStateClientProxy}} port (as configured by the above). > Tearing down the job also tears down the service. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] snuyanzin commented on pull request #20663: [FLINK-29067][Table SQL/API] Replace deprecated SqlParser#configBuilder with SqlParser#config
snuyanzin commented on PR #20663: URL: https://github.com/apache/flink/pull/20663#issuecomment-1228695193 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on a diff in pull request #20647: [FLINK-29053] Hybrid shuffle has concurrent modification of buffer when compression is enabled
xintongsong commented on code in PR #20647: URL: https://github.com/apache/flink/pull/20647#discussion_r956152093 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java: ## @@ -141,7 +141,9 @@ public ReadOnlySlicedNetworkBuffer readOnlySlice() { @Override public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length) { -checkState(!isCompressed, "Unable to slice a compressed buffer."); +checkState( +!isCompressed || index + length == writerIndex(), +"Unable to slice a partial compressed buffer."); return new ReadOnlySlicedNetworkBuffer( super.unwrap(), index, length, memorySegmentOffset, false); Review Comment: ```suggestion super.unwrap(), index, length, memorySegmentOffset, isCompressed); ``` We should do the same for `NetworkBuffer#readOnlySlice`, by adding a new argument to the constructor of`ReadOnlySlicedNetworkBuffer`. Then you won't need to call `setCompressed` in `HsSubpartitionMemoryDataManager#getSliceBuffer`. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java: ## @@ -173,7 +173,9 @@ public ReadOnlySlicedNetworkBuffer readOnlySlice() { @Override public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length) { -checkState(!isCompressed, "Unable to slice a compressed buffer."); +checkState( +!isCompressed || index + length != writerIndex(), +"Unable to slice a partial compressed buffer."); Review Comment: ```suggestion "Unable to partially slice a compressed buffer."); ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java: ## @@ -359,13 +364,29 @@ private void finishCurrentWritingBuffer() { Buffer buffer = bufferConsumer.build(); currentWritingBuffer.close(); bufferConsumer.close(); - +// TODO support buffer compression Review Comment: What is this TODO for? ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java: ## @@ -141,7 +141,9 @@ public ReadOnlySlicedNetworkBuffer readOnlySlice() { @Override public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length) { -checkState(!isCompressed, "Unable to slice a compressed buffer."); +checkState( +!isCompressed || index + length == writerIndex(), +"Unable to slice a partial compressed buffer."); Review Comment: ```suggestion "Unable to partially slice a compressed buffer."); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on pull request #20265: [FLINK-25910][runtime][security] Propagate obtained delegation tokens to TaskManagers
gaborgsomogyi commented on PR #20265: URL: https://github.com/apache/flink/pull/20265#issuecomment-1228604965 Just resolved the conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-29056) Throw PartitionNotFoundException if the partition file is not readable for hybrid shuffle.
[ https://issues.apache.org/jira/browse/FLINK-29056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song closed FLINK-29056. Resolution: Fixed master (1.16): c643a2953ba44b3b316ba52983932329dc0162e4 > Throw PartitionNotFoundException if the partition file is not readable for > hybrid shuffle. > -- > > Key: FLINK-29056 > URL: https://issues.apache.org/jira/browse/FLINK-29056 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.16.0 >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > If data file is not readable especially data loss, throw > PartitionNotFoundException to mark this result partition failed. Otherwise, > the partition data is not regenerated, so failover can not recover the job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xintongsong closed pull request #20666: [FLINK-29056] Throw PartitionNotFoundException if the partition file is not readable for hybrid shuffle
xintongsong closed pull request #20666: [FLINK-29056] Throw PartitionNotFoundException if the partition file is not readable for hybrid shuffle URL: https://github.com/apache/flink/pull/20666 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-29101) PipelinedRegionSchedulingStrategy benchmark shows performance degradation
[ https://issues.apache.org/jira/browse/FLINK-29101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song reassigned FLINK-29101: Assignee: Weijie Guo > PipelinedRegionSchedulingStrategy benchmark shows performance degradation > - > > Key: FLINK-29101 > URL: https://issues.apache.org/jira/browse/FLINK-29101 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.0 >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Fix For: 1.16.0 > > > Throw TPC-DS and flink-benchmark testing, we found that > PipelinedRegionSchedulingStrategy has performance degradation. By > investigation, I can confirm that this was introduced by FLINK-28799 which > introduce HYBRID type edge support for scheduling strategy. > The key to the problem is for blocking ALL_TO_ALL type edges should only > enter the scheduling method when the last execution becomes finished, but the > current implementation ignores this fact, resulting in the complexity of O(n > ^ 2) in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29122) Improve robustness of FileUtils.expandDirectory()
[ https://issues.apache.org/jira/browse/FLINK-29122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585389#comment-17585389 ] Robert Metzger commented on FLINK-29122: Draft: https://github.com/rmetzger/flink/pull/new/expand_dir CI: https://dev.azure.com/rmetzger/Flink/_build/results?buildId=9224&view=results > Improve robustness of FileUtils.expandDirectory() > -- > > Key: FLINK-29122 > URL: https://issues.apache.org/jira/browse/FLINK-29122 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.16.0, 1.17.0 >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Major > > `FileUtils.expandDirectory()` can potentially write to invalid locations if > the zip file is invalid (contains entry names with ../). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-29122) Improve robustness of FileUtils.expandDirectory()
[ https://issues.apache.org/jira/browse/FLINK-29122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-29122: -- Assignee: Robert Metzger > Improve robustness of FileUtils.expandDirectory() > -- > > Key: FLINK-29122 > URL: https://issues.apache.org/jira/browse/FLINK-29122 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.16.0, 1.17.0 >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Major > > `FileUtils.expandDirectory()` can potentially write to invalid locations if > the zip file is invalid (contains entry names with ../). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29122) Improve robustness of FileUtils.expandDirectory()
Robert Metzger created FLINK-29122: -- Summary: Improve robustness of FileUtils.expandDirectory() Key: FLINK-29122 URL: https://issues.apache.org/jira/browse/FLINK-29122 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.16.0, 1.17.0 Reporter: Robert Metzger `FileUtils.expandDirectory()` can potentially write to invalid locations if the zip file is invalid (contains entry names with ../). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] fsk119 commented on pull request #20678: [FLINK-29097][sql-gateway]Moving json se/deserializers from sql-gateway-api to sql-gateway
fsk119 commented on PR #20678: URL: https://github.com/apache/flink/pull/20678#issuecomment-1228426125 I add a commit to 1. move the serializer 2. modify the ColumnInfo. I find the `Column` also has comment field. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28883) Fix HiveTableSink failed to report metrics to hive metastore
[ https://issues.apache.org/jira/browse/FLINK-28883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-28883. --- Release Note: In batch mode, Hive sink now will report statistics for written tables and partitions to Hive metastore by default. This might be time-consuming when there are many written files. You can disable this feature by setting `table.exec.hive.sink.statistic-auto-gather.enable` to `false`. Resolution: Fixed Fixed in master: 4399b3fc40d11c2083197b6a505c23c4fcfec6df > Fix HiveTableSink failed to report metrics to hive metastore > > > Key: FLINK-28883 > URL: https://issues.apache.org/jira/browse/FLINK-28883 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.16.0 >Reporter: Liu >Assignee: luoyuxia >Priority: Critical > Labels: pull-request-available > Fix For: 1.16.0 > > > Currently, HiveTableSink is failed to report metrics to metastores, like file > number, total line number and total size. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wuchong merged pull request #20549: [FLINK-28883][hive] Fix HiveTableSink failed to report statistic to hive metastore in batch mode
wuchong merged PR #20549: URL: https://github.com/apache/flink/pull/20549 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] HuangXingBo commented on pull request #20685: [FLINK-28429][python] Optimize PyFlink tests
HuangXingBo commented on PR #20685: URL: https://github.com/apache/flink/pull/20685#issuecomment-1228417670 Due to the different performance of machines in the Azure, there is actually a deviation of about 30~40 minutes at most. Sometimes, the test time on the master can be exceed to 1.5 hours. As a whole, the average test time of running three versions of Python can be controlled within 2 hours, which can effectively avoid the situation that the nightly test exceeds 4 hours https://dev.azure.com/hxbks2ks/FLINK-TEST/_build/results?buildId=2036&view=logs&j=fba17979-6d2e-591d-72f1-97cf42797c11 . However, I will try again to see if the test time can be controlled within 30 minutes, so as to deal with the situation of timeout after more tests added in the future. The longer the test time of this release than last release is due to many new tests are added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-27033) YARNITCase failed due to OOM
[ https://issues.apache.org/jira/browse/FLINK-27033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-27033. Resolution: Fixed > YARNITCase failed due to OOM > > > Key: FLINK-27033 > URL: https://issues.apache.org/jira/browse/FLINK-27033 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.16.0 >Reporter: Matthias Pohl >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > We experienced a 137 exit code in [this > build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34124&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=33678] > while executing {{YARNITCase}}: > {code} > ##[error]Exit code 137 returned from process: file name '/bin/docker', > arguments 'exec -i -u 1004 -w /home/agent05_azpcontainer > bb00bf8c80330d042d18da617194edc1ff1a8bf5f73851d8786eb6675d13b5f2 > /__a/externals/node/bin/node /__w/_temp/containerHandlerInvoker.js'. > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29006) PulsarSourceITCase failed with Could not acquire the minimum required resources.
[ https://issues.apache.org/jira/browse/FLINK-29006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585328#comment-17585328 ] Yufan Sheng commented on FLINK-29006: - [~hxbks2ks] The root cause should be the bug on testing tools. Maybe you could ask others for help? > PulsarSourceITCase failed with Could not acquire the minimum required > resources. > > > Key: FLINK-29006 > URL: https://issues.apache.org/jira/browse/FLINK-29006 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.15.1 >Reporter: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-08-17T01:58:54.4397238Z Aug 17 01:58:54 [ERROR] > PulsarSourceITCase>SourceTestSuiteBase.testScaleDown:280->SourceTestSuiteBase.restartFromSavepoint:330->SourceTestSuiteBase.checkResultWithSemantic:744 > > 2022-08-17T01:58:54.4397969Z Aug 17 01:58:54 Expecting > 2022-08-17T01:58:54.4398407Z Aug 17 01:58:54the following stack trace: > 2022-08-17T01:58:54.4399009Z Aug 17 01:58:54 java.lang.RuntimeException: > Failed to fetch next result > 2022-08-17T01:58:54.4399720Z Aug 17 01:58:54 at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) > 2022-08-17T01:58:54.4400608Z Aug 17 01:58:54 at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) > 2022-08-17T01:58:54.4401505Z Aug 17 01:58:54 at > org.apache.flink.connector.testframe.utils.CollectIteratorAssert.compareWithExactlyOnceSemantic(CollectIteratorAssert.java:116) > 2022-08-17T01:58:54.4402417Z Aug 17 01:58:54 at > org.apache.flink.connector.testframe.utils.CollectIteratorAssert.matchesRecordsFromSource(CollectIteratorAssert.java:71) > 2022-08-17T01:58:54.4403459Z Aug 17 01:58:54 at > org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.lambda$checkResultWithSemantic$3(SourceTestSuiteBase.java:741) > 2022-08-17T01:58:54.4404435Z Aug 17 01:58:54 at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > 2022-08-17T01:58:54.4405324Z Aug 17 01:58:54 at > java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596) > 2022-08-17T01:58:54.4406006Z Aug 17 01:58:54 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > 2022-08-17T01:58:54.4406645Z Aug 17 01:58:54 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > 2022-08-17T01:58:54.4407305Z Aug 17 01:58:54 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > 2022-08-17T01:58:54.4407974Z Aug 17 01:58:54 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > 2022-08-17T01:58:54.4408686Z Aug 17 01:58:54 Caused by: java.io.IOException: > Failed to fetch job execution result > 2022-08-17T01:58:54.4409432Z Aug 17 01:58:54 at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) > 2022-08-17T01:58:54.4410300Z Aug 17 01:58:54 at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) > 2022-08-17T01:58:54.4411158Z Aug 17 01:58:54 at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) > 2022-08-17T01:58:54.4411842Z Aug 17 01:58:54 ... 10 more > 2022-08-17T01:58:54.4412708Z Aug 17 01:58:54 Caused by: > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > 2022-08-17T01:58:54.4413686Z Aug 17 01:58:54 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > 2022-08-17T01:58:54.4414572Z Aug 17 01:58:54 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) > 2022-08-17T01:58:54.4415394Z Aug 17 01:58:54 at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) > 2022-08-17T01:58:54.4416024Z Aug 17 01:58:54 ... 12 more > 2022-08-17T01:58:54.4416508Z Aug 17 01:58:54 Caused by: > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > 2022-08-17T01:58:54.4417327Z Aug 17 01:58:54 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > 2022-08-17T01:58:54.4418138Z Aug 17 01:58:54 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > 2022-08-17T01:58:54.4419016Z Aug 17 01:58:54 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > 2022-08-17T01:58:54.4419715Z Aug 17 01
[GitHub] [flink] wuchong commented on a diff in pull request #20549: [FLINK-28883][hive] Fix HiveTableSink failed to report statistic to hive metastore in batch mode
wuchong commented on code in PR #20549: URL: https://github.com/apache/flink/pull/20549#discussion_r955971797 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java: ## @@ -134,6 +134,17 @@ public class HiveOptions { public static final ConfigOption SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME = FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME; +public static final ConfigOption TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE = +key("table.exec.hive.sink.statistic-auto-gather.enable") Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28976) Changelog 1st materialization delayed unneccesarily
[ https://issues.apache.org/jira/browse/FLINK-28976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan closed FLINK-28976. - Resolution: Fixed Merged into master as 91e1291e942afc69779f09ead549352d5d357f22 .. a38b852bbbdb812aa404c226717a2fa3bdd89665, into release-1.15 as 493a1aa8556038283e256efc5368bd319bd06d17 .. 258c3e35265bb3a966bd317340f2a5fe7cfd7364. > Changelog 1st materialization delayed unneccesarily > --- > > Key: FLINK-28976 > URL: https://issues.apache.org/jira/browse/FLINK-28976 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.15.1, 1.16.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.15.3 > > > In PeriodicMaterializationManager.start(), the 1st materialization is > scheduled with a delay: materialization_interval + random_offset > Here, random_offset is added to avoid thundering herd problem. > The next materialization will be scheduled with a delay of only > materialization_interval. > That means that the 1st materialization will have to compact up to 2 times > more state changes than the subsequent ones. > Which in turn can cause FLINK--26590 or other problems. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29035) ExpressionReducer does not work with jar resources
[ https://issues.apache.org/jira/browse/FLINK-29035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-29035. --- Assignee: dalongliu Resolution: Fixed Fixed in master: 0ade193d39326dd5b84334348a4b6ce76c4a915a > ExpressionReducer does not work with jar resources > -- > > Key: FLINK-29035 > URL: https://issues.apache.org/jira/browse/FLINK-29035 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Timo Walther >Assignee: dalongliu >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > It seems the code generation for expression reduction uses an invalid class > loader that does not contain the jar resource. > Reproducible example: > {code} > CREATE TEMPORARY SYSTEM FUNCTION myLower AS '%s' USING JAR '%s' > SELECT myLower('HELLO') > {code} > > fails with > {code} > java.lang.RuntimeException: Could not instantiate generated class > 'ExpressionReducer$4' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74) > at > org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:97) > at > org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressionsInternal(ReduceExpressionsRule.java:759) > at > org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressions(ReduceExpressionsRule.java:699) > at > org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:306) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > Caused by: org.codehaus.commons.compiler.CompileException: Line 13, Column > 37: Cannot determine simple type name "LowerUDF46" > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211) > at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6833) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wuchong merged pull request #20635: [FLINK-29035][table-planner] Fix bug of ExpressionReducer does not work with jar resources
wuchong merged PR #20635: URL: https://github.com/apache/flink/pull/20635 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan merged pull request #20687: [BP-1.15][FLINK-28976][state] Don't add extra delay to the 1st materialization
rkhachatryan merged PR #20687: URL: https://github.com/apache/flink/pull/20687 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29121) SqlGatewayRestAPIStabilityTest.testSqlGatewayRestAPIStability is failed
[ https://issues.apache.org/jira/browse/FLINK-29121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585320#comment-17585320 ] Jark Wu commented on FLINK-29121: - cc [~fsk119] > SqlGatewayRestAPIStabilityTest.testSqlGatewayRestAPIStability is failed > --- > > Key: FLINK-29121 > URL: https://issues.apache.org/jira/browse/FLINK-29121 > Project: Flink > Issue Type: Bug > Components: Table SQL / Gateway >Reporter: Jark Wu >Priority: Critical > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=40416&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4 > {code} > 2022-08-26T11:03:07.6108823Z Aug 26 11:03:07 [ERROR] Tests run: 1, Failures: > 1, Errors: 0, Skipped: 0, Time elapsed: 0.379 s <<< FAILURE! - in > org.apache.flink.table.gateway.rest.compatibility.SqlGatewayRestAPIStabilityTest > 2022-08-26T11:03:07.6110033Z Aug 26 11:03:07 [ERROR] > org.apache.flink.table.gateway.rest.compatibility.SqlGatewayRestAPIStabilityTest.testSqlGatewayRestAPIStability(SqlGatewayRestAPIVersion)[1] > Time elapsed: 0.347 s <<< FAILURE! > 2022-08-26T11:03:07.6110730Z Aug 26 11:03:07 > org.opentest4j.AssertionFailedError: > 2022-08-26T11:03:07.6112493Z Aug 26 11:03:07 No compatible call could be > found for > {"url":"/sessions/:session_handle/:operation_handle/cancel","method":"PUT","status-code":"200 > > OK","file-upload":false,"path-parameters":{"pathParameters":[{"key":"session_handle"},{"key":"operation_handle"}]},"query-parameters":{"queryParameters":[]},"request":{"type":"any"},"response":{"type":"object","id":"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody","properties":{"status":{"type":"string". > 2022-08-26T11:03:07.6115158Z Aug 26 11:03:07 Rejected by candidate: > {"url":"/sessions/:session_handle/operations/:operation_handle/cancel","method":"PUT","status-code":"200 > > OK","file-upload":false,"path-parameters":{"pathParameters":[{"key":"session_handle"},{"key":"operation_handle"}]},"query-parameters":{"queryParameters":[]},"request":{"type":"any"},"response":{"type":"object","id":"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody","properties":{"status":{"type":"string". > 2022-08-26T11:03:07.6116664Z Aug 26 11:03:07 Compatibility grade: 7/8 > 2022-08-26T11:03:07.6122943Z Aug 26 11:03:07 Incompatibilities: > 2022-08-26T11:03:07.6123711Z Aug 26 11:03:07 url: > 2022-08-26T11:03:07.6124489Z Aug 26 11:03:07 expected: > "/sessions/:session_handle/:operation_handle/cancel" > 2022-08-26T11:03:07.6125445Z Aug 26 11:03:07 but was: > "/sessions/:session_handle/operations/:operation_handle/cancel" > 2022-08-26T11:03:07.6128775Z Aug 26 11:03:07 Rejected by candidate: > {"url":"/sessions/:session_handle/operations/:operation_handle/close","method":"DELETE","status-code":"200 > > OK","file-upload":false,"path-parameters":{"pathParameters":[{"key":"session_handle"},{"key":"operation_handle"}]},"query-parameters":{"queryParameters":[]},"request":{"type":"any"},"response":{"type":"object","id":"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody","properties":{"status":{"type":"string". > 2022-08-26T11:03:07.6130953Z Aug 26 11:03:07 Compatibility grade: 6/8 > 2022-08-26T11:03:07.6131525Z Aug 26 11:03:07 Incompatibilities: > 2022-08-26T11:03:07.6132055Z Aug 26 11:03:07 url: > 2022-08-26T11:03:07.6132735Z Aug 26 11:03:07 expected: > "/sessions/:session_handle/:operation_handle/cancel" > 2022-08-26T11:03:07.6133551Z Aug 26 11:03:07 but was: > "/sessions/:session_handle/operations/:operation_handle/close" > 2022-08-26T11:03:07.6134338Z Aug 26 11:03:07 method: > 2022-08-26T11:03:07.6134777Z Aug 26 11:03:07 expected: "PUT" > 2022-08-26T11:03:07.6135124Z Aug 26 11:03:07 but was: "DELETE" > 2022-08-26T11:03:07.6137281Z Aug 26 11:03:07 Rejected by candidate: > {"url":"/sessions/:session_handle/operations/:operation_handle/status","method":"GET","status-code":"200 > > OK","file-upload":false,"path-parameters":{"pathParameters":[{"key":"session_handle"},{"key":"operation_handle"}]},"query-parameters":{"queryParameters":[]},"request":{"type":"any"},"response":{"type":"object","id":"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody","properties":{"status":{"type":"string". > 2022-08-26T11:03:07.6138587Z Aug 26 11:03:07 Compatibility grade: 6/8 > 2022-08-26T11:03:07.6138940Z Aug 26 11:03:07 Incompatibilities: > 2022-08-26T11:03:07.6139281Z Aug 26 11:03:07 url: > 2022-08-26T11:03:07.6139687Z Aug 26 11:03:07 expected: > "/sessions/:session_handle/:operation_handle/cancel" > 2022-08
[jira] [Updated] (FLINK-29121) SqlGatewayRestAPIStabilityTest.testSqlGatewayRestAPIStability is failed
[ https://issues.apache.org/jira/browse/FLINK-29121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-29121: Fix Version/s: 1.16.0 > SqlGatewayRestAPIStabilityTest.testSqlGatewayRestAPIStability is failed > --- > > Key: FLINK-29121 > URL: https://issues.apache.org/jira/browse/FLINK-29121 > Project: Flink > Issue Type: Bug > Components: Table SQL / Gateway >Reporter: Jark Wu >Priority: Critical > Fix For: 1.16.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=40416&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4 > {code} > 2022-08-26T11:03:07.6108823Z Aug 26 11:03:07 [ERROR] Tests run: 1, Failures: > 1, Errors: 0, Skipped: 0, Time elapsed: 0.379 s <<< FAILURE! - in > org.apache.flink.table.gateway.rest.compatibility.SqlGatewayRestAPIStabilityTest > 2022-08-26T11:03:07.6110033Z Aug 26 11:03:07 [ERROR] > org.apache.flink.table.gateway.rest.compatibility.SqlGatewayRestAPIStabilityTest.testSqlGatewayRestAPIStability(SqlGatewayRestAPIVersion)[1] > Time elapsed: 0.347 s <<< FAILURE! > 2022-08-26T11:03:07.6110730Z Aug 26 11:03:07 > org.opentest4j.AssertionFailedError: > 2022-08-26T11:03:07.6112493Z Aug 26 11:03:07 No compatible call could be > found for > {"url":"/sessions/:session_handle/:operation_handle/cancel","method":"PUT","status-code":"200 > > OK","file-upload":false,"path-parameters":{"pathParameters":[{"key":"session_handle"},{"key":"operation_handle"}]},"query-parameters":{"queryParameters":[]},"request":{"type":"any"},"response":{"type":"object","id":"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody","properties":{"status":{"type":"string". > 2022-08-26T11:03:07.6115158Z Aug 26 11:03:07 Rejected by candidate: > {"url":"/sessions/:session_handle/operations/:operation_handle/cancel","method":"PUT","status-code":"200 > > OK","file-upload":false,"path-parameters":{"pathParameters":[{"key":"session_handle"},{"key":"operation_handle"}]},"query-parameters":{"queryParameters":[]},"request":{"type":"any"},"response":{"type":"object","id":"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody","properties":{"status":{"type":"string". > 2022-08-26T11:03:07.6116664Z Aug 26 11:03:07 Compatibility grade: 7/8 > 2022-08-26T11:03:07.6122943Z Aug 26 11:03:07 Incompatibilities: > 2022-08-26T11:03:07.6123711Z Aug 26 11:03:07 url: > 2022-08-26T11:03:07.6124489Z Aug 26 11:03:07 expected: > "/sessions/:session_handle/:operation_handle/cancel" > 2022-08-26T11:03:07.6125445Z Aug 26 11:03:07 but was: > "/sessions/:session_handle/operations/:operation_handle/cancel" > 2022-08-26T11:03:07.6128775Z Aug 26 11:03:07 Rejected by candidate: > {"url":"/sessions/:session_handle/operations/:operation_handle/close","method":"DELETE","status-code":"200 > > OK","file-upload":false,"path-parameters":{"pathParameters":[{"key":"session_handle"},{"key":"operation_handle"}]},"query-parameters":{"queryParameters":[]},"request":{"type":"any"},"response":{"type":"object","id":"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody","properties":{"status":{"type":"string". > 2022-08-26T11:03:07.6130953Z Aug 26 11:03:07 Compatibility grade: 6/8 > 2022-08-26T11:03:07.6131525Z Aug 26 11:03:07 Incompatibilities: > 2022-08-26T11:03:07.6132055Z Aug 26 11:03:07 url: > 2022-08-26T11:03:07.6132735Z Aug 26 11:03:07 expected: > "/sessions/:session_handle/:operation_handle/cancel" > 2022-08-26T11:03:07.6133551Z Aug 26 11:03:07 but was: > "/sessions/:session_handle/operations/:operation_handle/close" > 2022-08-26T11:03:07.6134338Z Aug 26 11:03:07 method: > 2022-08-26T11:03:07.6134777Z Aug 26 11:03:07 expected: "PUT" > 2022-08-26T11:03:07.6135124Z Aug 26 11:03:07 but was: "DELETE" > 2022-08-26T11:03:07.6137281Z Aug 26 11:03:07 Rejected by candidate: > {"url":"/sessions/:session_handle/operations/:operation_handle/status","method":"GET","status-code":"200 > > OK","file-upload":false,"path-parameters":{"pathParameters":[{"key":"session_handle"},{"key":"operation_handle"}]},"query-parameters":{"queryParameters":[]},"request":{"type":"any"},"response":{"type":"object","id":"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody","properties":{"status":{"type":"string". > 2022-08-26T11:03:07.6138587Z Aug 26 11:03:07 Compatibility grade: 6/8 > 2022-08-26T11:03:07.6138940Z Aug 26 11:03:07 Incompatibilities: > 2022-08-26T11:03:07.6139281Z Aug 26 11:03:07 url: > 2022-08-26T11:03:07.6139687Z Aug 26 11:03:07 expected: > "/sessions/:session_handle/:operation_handle/cancel" > 2022-08-26T11:03:07.
[jira] [Created] (FLINK-29121) SqlGatewayRestAPIStabilityTest.testSqlGatewayRestAPIStability is failed
Jark Wu created FLINK-29121: --- Summary: SqlGatewayRestAPIStabilityTest.testSqlGatewayRestAPIStability is failed Key: FLINK-29121 URL: https://issues.apache.org/jira/browse/FLINK-29121 Project: Flink Issue Type: Bug Components: Table SQL / Gateway Reporter: Jark Wu https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=40416&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4 {code} 2022-08-26T11:03:07.6108823Z Aug 26 11:03:07 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.379 s <<< FAILURE! - in org.apache.flink.table.gateway.rest.compatibility.SqlGatewayRestAPIStabilityTest 2022-08-26T11:03:07.6110033Z Aug 26 11:03:07 [ERROR] org.apache.flink.table.gateway.rest.compatibility.SqlGatewayRestAPIStabilityTest.testSqlGatewayRestAPIStability(SqlGatewayRestAPIVersion)[1] Time elapsed: 0.347 s <<< FAILURE! 2022-08-26T11:03:07.6110730Z Aug 26 11:03:07 org.opentest4j.AssertionFailedError: 2022-08-26T11:03:07.6112493Z Aug 26 11:03:07 No compatible call could be found for {"url":"/sessions/:session_handle/:operation_handle/cancel","method":"PUT","status-code":"200 OK","file-upload":false,"path-parameters":{"pathParameters":[{"key":"session_handle"},{"key":"operation_handle"}]},"query-parameters":{"queryParameters":[]},"request":{"type":"any"},"response":{"type":"object","id":"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody","properties":{"status":{"type":"string". 2022-08-26T11:03:07.6115158Z Aug 26 11:03:07Rejected by candidate: {"url":"/sessions/:session_handle/operations/:operation_handle/cancel","method":"PUT","status-code":"200 OK","file-upload":false,"path-parameters":{"pathParameters":[{"key":"session_handle"},{"key":"operation_handle"}]},"query-parameters":{"queryParameters":[]},"request":{"type":"any"},"response":{"type":"object","id":"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody","properties":{"status":{"type":"string". 2022-08-26T11:03:07.6116664Z Aug 26 11:03:07Compatibility grade: 7/8 2022-08-26T11:03:07.6122943Z Aug 26 11:03:07Incompatibilities: 2022-08-26T11:03:07.6123711Z Aug 26 11:03:07url: 2022-08-26T11:03:07.6124489Z Aug 26 11:03:07 expected: "/sessions/:session_handle/:operation_handle/cancel" 2022-08-26T11:03:07.6125445Z Aug 26 11:03:07 but was: "/sessions/:session_handle/operations/:operation_handle/cancel" 2022-08-26T11:03:07.6128775Z Aug 26 11:03:07Rejected by candidate: {"url":"/sessions/:session_handle/operations/:operation_handle/close","method":"DELETE","status-code":"200 OK","file-upload":false,"path-parameters":{"pathParameters":[{"key":"session_handle"},{"key":"operation_handle"}]},"query-parameters":{"queryParameters":[]},"request":{"type":"any"},"response":{"type":"object","id":"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody","properties":{"status":{"type":"string". 2022-08-26T11:03:07.6130953Z Aug 26 11:03:07Compatibility grade: 6/8 2022-08-26T11:03:07.6131525Z Aug 26 11:03:07Incompatibilities: 2022-08-26T11:03:07.6132055Z Aug 26 11:03:07url: 2022-08-26T11:03:07.6132735Z Aug 26 11:03:07 expected: "/sessions/:session_handle/:operation_handle/cancel" 2022-08-26T11:03:07.6133551Z Aug 26 11:03:07 but was: "/sessions/:session_handle/operations/:operation_handle/close" 2022-08-26T11:03:07.6134338Z Aug 26 11:03:07method: 2022-08-26T11:03:07.6134777Z Aug 26 11:03:07 expected: "PUT" 2022-08-26T11:03:07.6135124Z Aug 26 11:03:07 but was: "DELETE" 2022-08-26T11:03:07.6137281Z Aug 26 11:03:07Rejected by candidate: {"url":"/sessions/:session_handle/operations/:operation_handle/status","method":"GET","status-code":"200 OK","file-upload":false,"path-parameters":{"pathParameters":[{"key":"session_handle"},{"key":"operation_handle"}]},"query-parameters":{"queryParameters":[]},"request":{"type":"any"},"response":{"type":"object","id":"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody","properties":{"status":{"type":"string". 2022-08-26T11:03:07.6138587Z Aug 26 11:03:07Compatibility grade: 6/8 2022-08-26T11:03:07.6138940Z Aug 26 11:03:07Incompatibilities: 2022-08-26T11:03:07.6139281Z Aug 26 11:03:07url: 2022-08-26T11:03:07.6139687Z Aug 26 11:03:07 expected: "/sessions/:session_handle/:operation_handle/cancel" 2022-08-26T11:03:07.6140200Z Aug 26 11:03:07 but was: "/sessions/:session_handle/operations/:operation_handle/status" 2022-08-26T11:03:07.6140643Z Aug 26 11:03:07method: 2022-08-26T11:03:07.6141136Z Aug 26 11:03:07 expected: "PUT" 2022-08-26T11:03:07.6141622Z Aug 26 11:03:07 but was: "GET" 2022-08-26T11:03:07.6144287Z Aug 26 11:03:07Rejected by can
[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on pull request #355: [FLINK-28734][helm] Configurable role(binding) names
gaborgsomogyi commented on PR #355: URL: https://github.com/apache/flink-kubernetes-operator/pull/355#issuecomment-1228391322 cc @gyfora @mbalassi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on pull request #18975: [FLINK-26474][hive] Fold exprNode to fix the issue of failing to call some hive udf required constant parameters with implicit constant passe
wuchong commented on PR #18975: URL: https://github.com/apache/flink/pull/18975#issuecomment-1228391354 The `HiveDialectQueryITCase.testCastTimeStampToDecimal` is still failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28734) Configurable role(binding) names in the helm chart
[ https://issues.apache.org/jira/browse/FLINK-28734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28734: --- Labels: pull-request-available (was: ) > Configurable role(binding) names in the helm chart > -- > > Key: FLINK-28734 > URL: https://issues.apache.org/jira/browse/FLINK-28734 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.2.0 >Reporter: Márton Balassi >Assignee: Gabor Somogyi >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.2.0 > > > The names of the roles and rolebindings in the helm chart are not yet > configurable, we should improve this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gaborgsomogyi opened a new pull request, #355: [FLINK-28734][helm] Configurable role(binding) names
gaborgsomogyi opened a new pull request, #355: URL: https://github.com/apache/flink-kubernetes-operator/pull/355 ## What is the purpose of the change The names of the roles and rolebindings in the helm chart are not yet configurable. In this PR it's made configurable. ## Brief change log * Configurable role(binding) names * Added option to bind rolebinding to clusterrole ## Verifying this change Manually on cluster. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on a diff in pull request #20673: [BP-1.15][FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
wangyang0918 commented on code in PR #20673: URL: https://github.com/apache/flink/pull/20673#discussion_r955943999 ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java: ## @@ -1119,4 +1166,34 @@ private TestingLongStateHandleHelper.LongStateHandle addDeletingEntry( configMap.getData().put(key, deleting); return state; } + +private static CompletableFuture retryWithFirstFailedK8sOperation( +Function> function, +KubernetesConfigMap leaderConfigMap) { +final AtomicInteger callbackInvocationCount = new AtomicInteger(0); +final CompletableFuture result = +FutureUtils.retry( +() -> +CompletableFuture.supplyAsync( +() -> { + callbackInvocationCount.incrementAndGet(); +function.apply(leaderConfigMap); +if (callbackInvocationCount.get() == 1) { +throw new KubernetesClientException( +"Expected exception to simulate unstable " ++ "kubernetes client operation"); +} +return true; +}, +Executors.newDirectExecutorService()), + KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES +.defaultValue(), +t -> +ExceptionUtils.findThrowable(t, KubernetesClientException.class) +.isPresent(), +Executors.newDirectExecutorService()); +assertThat(callbackInvocationCount.get(), is(2)); +assertThat(result.isDone(), is(true)); Review Comment: Make sense. I will update this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #20673: [BP-1.15][FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
XComp commented on code in PR #20673: URL: https://github.com/apache/flink/pull/20673#discussion_r955938498 ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java: ## @@ -1119,4 +1166,34 @@ private TestingLongStateHandleHelper.LongStateHandle addDeletingEntry( configMap.getData().put(key, deleting); return state; } + +private static CompletableFuture retryWithFirstFailedK8sOperation( +Function> function, +KubernetesConfigMap leaderConfigMap) { +final AtomicInteger callbackInvocationCount = new AtomicInteger(0); +final CompletableFuture result = +FutureUtils.retry( +() -> +CompletableFuture.supplyAsync( +() -> { + callbackInvocationCount.incrementAndGet(); +function.apply(leaderConfigMap); +if (callbackInvocationCount.get() == 1) { +throw new KubernetesClientException( +"Expected exception to simulate unstable " ++ "kubernetes client operation"); +} +return true; +}, +Executors.newDirectExecutorService()), + KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES +.defaultValue(), +t -> +ExceptionUtils.findThrowable(t, KubernetesClientException.class) +.isPresent(), +Executors.newDirectExecutorService()); +assertThat(callbackInvocationCount.get(), is(2)); +assertThat(result.isDone(), is(true)); Review Comment: I see, fair point. But then, you might need to do `assertThat(result.isDone() && !result.isCompletedExceptionally() && !result.isCancelled(), is(true));` to be more precise. You might want to put it into separate asserts instead of having a long boolean condition like that for readability purposes... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20663: [FLINK-29067][Table SQL/API] Replace deprecated SqlParser#configBuilder with SqlParser#config
snuyanzin commented on PR #20663: URL: https://github.com/apache/flink/pull/20663#issuecomment-1228362843 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on a diff in pull request #20673: [BP-1.15][FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
wangyang0918 commented on code in PR #20673: URL: https://github.com/apache/flink/pull/20673#discussion_r955930855 ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java: ## @@ -1119,4 +1166,34 @@ private TestingLongStateHandleHelper.LongStateHandle addDeletingEntry( configMap.getData().put(key, deleting); return state; } + +private static CompletableFuture retryWithFirstFailedK8sOperation( +Function> function, +KubernetesConfigMap leaderConfigMap) { +final AtomicInteger callbackInvocationCount = new AtomicInteger(0); +final CompletableFuture result = +FutureUtils.retry( +() -> +CompletableFuture.supplyAsync( +() -> { + callbackInvocationCount.incrementAndGet(); +function.apply(leaderConfigMap); +if (callbackInvocationCount.get() == 1) { +throw new KubernetesClientException( +"Expected exception to simulate unstable " ++ "kubernetes client operation"); +} +return true; +}, +Executors.newDirectExecutorService()), + KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES +.defaultValue(), +t -> +ExceptionUtils.findThrowable(t, KubernetesClientException.class) +.isPresent(), +Executors.newDirectExecutorService()); +assertThat(callbackInvocationCount.get(), is(2)); +assertThat(result.isDone(), is(true)); Review Comment: Since we expect the `result` is already completed without `.get()`, I prefer to use `assertThat(result.isDone() && !result.isCompletedExceptionally(), is(true));` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20690: FLINK-27107. Typo in Task.
flinkbot commented on PR #20690: URL: https://github.com/apache/flink/pull/20690#issuecomment-1228355773 ## CI report: * 459dc89ac49b2d10bf59471007c0f6bad2c38e31 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #20685: [FLINK-28429][python] Optimize PyFlink tests
zentol commented on PR #20685: URL: https://github.com/apache/flink/pull/20685#issuecomment-1228354535 This doesn't seem to have the desired effect; the runtime of flink-python is still 51m, like on master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] HarshaR99 opened a new pull request, #20690: FLINK-27107. Typo in Task.
HarshaR99 opened a new pull request, #20690: URL: https://github.com/apache/flink/pull/20690 Two small typos in Task TaskCancelerWatchDog/TaskInterrupter field: executerThread -> executorThread TaskCanceler field: executer -> executor ## What is the purpose of the change Link to the Apache JIRA :- https://issues.apache.org/jira/browse/FLINK-27107 Two small typos in Task TaskCancelerWatchDog/TaskInterrupter field: executerThread -> executorThread TaskCanceler field: executer -> executor ## Verifying this change This is a typo change so it should work fine without testing Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing ## Does this pull request potentially affect one of the following parts: No ## Documentation No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] HarshaR99 closed pull request #20604: FLINK-27107. Typo in Task.
HarshaR99 closed pull request #20604: FLINK-27107. Typo in Task. URL: https://github.com/apache/flink/pull/20604 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20689: [FLINK-28984][runtime] Fix the problem that FsCheckpointStateOutputStream is not being released normally
flinkbot commented on PR #20689: URL: https://github.com/apache/flink/pull/20689#issuecomment-1228342426 ## CI report: * 9b267d3f979a2d29653ef0ed00e03a39a5f2ab93 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] ChangjiGuo commented on pull request #20689: [FLINK-28984][runtime] Fix the problem that FsCheckpointStateOutputStream is not being released normally
ChangjiGuo commented on PR #20689: URL: https://github.com/apache/flink/pull/20689#issuecomment-1228341795 Hi, @Myasuka. Can you help me review this pr? Thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally
[ https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28984: --- Labels: pull-request-available (was: ) > FsCheckpointStateOutputStream is not being released normally > > > Key: FLINK-28984 > URL: https://issues.apache.org/jira/browse/FLINK-28984 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.6, 1.15.1 >Reporter: ChangjiGuo >Priority: Major > Labels: pull-request-available > Attachments: log.png > > > If the checkpoint is aborted, AsyncSnapshotCallable will close the > snapshotCloseableRegistry when it is canceled. There may be two situations > here: > # The FSDataOutputStream has been created and closed while closing > FsCheckpointStateOutputStream. > # The FSDataOutputStream has not been created yet, but closed flag has been > set to true. You can see this in log: > {code:java} > 2022-08-16 12:55:44,161 WARN > org.apache.flink.core.fs.SafetyNetCloseableRegistry - Closing > unclosed resource via safety-net: > ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64) > : > x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a > {code} > The output stream will be automatically closed by the > SafetyNetCloseableRegistry but the file will not be deleted. > The second case usually occurs when the storage system has high latency in > creating files. > How to reproduce? > This is not easy to reproduce, but you can try to set a smaller checkpoint > timeout and increase the parallelism of the flink job. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] ChangjiGuo opened a new pull request, #20689: [FLINK-28984][runtime] Fix the problem that FsCheckpointStateOutputStream is not being released normally
ChangjiGuo opened a new pull request, #20689: URL: https://github.com/apache/flink/pull/20689 ## What is the purpose of the change This pr is used to solve the problem that the FsCheckpointStateOutputStream is not released normally in some scenarios. See https://issues.apache.org/jira/browse/FLINK-28984 for detail. ## Brief change log - Check that the FsCheckpointStateOutputStream has been closed before creating the output stream is completed. If it has been closed, clean up it. ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on a diff in pull request #20673: [BP-1.15][FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
wangyang0918 commented on code in PR #20673: URL: https://github.com/apache/flink/pull/20673#discussion_r955901418 ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java: ## @@ -1119,4 +1166,34 @@ private TestingLongStateHandleHelper.LongStateHandle addDeletingEntry( configMap.getData().put(key, deleting); return state; } + +private static CompletableFuture retryWithFirstFailedK8sOperation( +Function> function, +KubernetesConfigMap leaderConfigMap) { +final AtomicInteger callbackInvocationCount = new AtomicInteger(0); +final CompletableFuture result = +FutureUtils.retry( +() -> +CompletableFuture.supplyAsync( +() -> { + callbackInvocationCount.incrementAndGet(); +function.apply(leaderConfigMap); +if (callbackInvocationCount.get() == 1) { +throw new KubernetesClientException( +"Expected exception to simulate unstable " ++ "kubernetes client operation"); +} +return true; +}, +Executors.newDirectExecutorService()), + KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES +.defaultValue(), +t -> +ExceptionUtils.findThrowable(t, KubernetesClientException.class) +.isPresent(), +Executors.newDirectExecutorService()); +assertThat(callbackInvocationCount.get(), is(2)); +assertThat(result.isDone(), is(true)); Review Comment: It seems that I find something. ``` /** * Returns {@code true} if this task completed. * * Completion may be due to normal termination, an exception, or * cancellation -- in all of these cases, this method will return * {@code true}. * * @return {@code true} if this task completed */ boolean isDone(); ``` ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java: ## @@ -1119,4 +1166,34 @@ private TestingLongStateHandleHelper.LongStateHandle addDeletingEntry( configMap.getData().put(key, deleting); return state; } + +private static CompletableFuture retryWithFirstFailedK8sOperation( +Function> function, +KubernetesConfigMap leaderConfigMap) { +final AtomicInteger callbackInvocationCount = new AtomicInteger(0); +final CompletableFuture result = +FutureUtils.retry( +() -> +CompletableFuture.supplyAsync( +() -> { + callbackInvocationCount.incrementAndGet(); +function.apply(leaderConfigMap); +if (callbackInvocationCount.get() == 1) { +throw new KubernetesClientException( +"Expected exception to simulate unstable " ++ "kubernetes client operation"); +} +return true; +}, +Executors.newDirectExecutorService()), + KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES +.defaultValue(), +t -> +ExceptionUtils.findThrowable(t, KubernetesClientException.class) +.isPresent(), +Executors.newDirectExecutorService()); +assertThat(callbackInvocationCount.get(), is(2)); +assertThat(result.isDone(), is(true)); Review Comment: Given that the exception of the `result` will be eventually thrown in the `addAndLock`, I do not think it is really necessary to check exception here. But it should be harmless. ``` try { assertThat(result.get(), is(true)); } catch (Exception ex) { fail("Exception should not be thrown.");
[GitHub] [flink] wangyang0918 commented on a diff in pull request #20673: [BP-1.15][FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
wangyang0918 commented on code in PR #20673: URL: https://github.com/apache/flink/pull/20673#discussion_r955896987 ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java: ## @@ -1119,4 +1166,34 @@ private TestingLongStateHandleHelper.LongStateHandle addDeletingEntry( configMap.getData().put(key, deleting); return state; } + +private static CompletableFuture retryWithFirstFailedK8sOperation( +Function> function, +KubernetesConfigMap leaderConfigMap) { +final AtomicInteger callbackInvocationCount = new AtomicInteger(0); +final CompletableFuture result = +FutureUtils.retry( +() -> +CompletableFuture.supplyAsync( +() -> { + callbackInvocationCount.incrementAndGet(); +function.apply(leaderConfigMap); +if (callbackInvocationCount.get() == 1) { +throw new KubernetesClientException( +"Expected exception to simulate unstable " ++ "kubernetes client operation"); +} +return true; +}, +Executors.newDirectExecutorService()), + KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES +.defaultValue(), +t -> +ExceptionUtils.findThrowable(t, KubernetesClientException.class) +.isPresent(), +Executors.newDirectExecutorService()); +assertThat(callbackInvocationCount.get(), is(2)); +assertThat(result.isDone(), is(true)); Review Comment: The above codes may hide the exception of future. Maybe I need to throw the exception directly in `retryWithFirstFailedK8sOperation`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28906) Add AlgoOperator for AgglomerativeClustering
[ https://issues.apache.org/jira/browse/FLINK-28906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28906: --- Labels: pull-request-available (was: ) > Add AlgoOperator for AgglomerativeClustering > > > Key: FLINK-28906 > URL: https://issues.apache.org/jira/browse/FLINK-28906 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] zhipeng93 opened a new pull request, #148: [FLINK-28906] Add AlgoOperator for AgglomerativeClustering
zhipeng93 opened a new pull request, #148: URL: https://github.com/apache/flink-ml/pull/148 ## What is the purpose of the change - Add AlgoOperator for AgglomerativeClustering[1] in Flink ML. ## Brief change log - Added AlgoOperator for AgglomerativeClustering. - Added java/python test/example for Transformer and Estimator for KBinsDiscretizer. - Comparing with Sklearn [1], we made the following changes: - The distance between two data points in sklearn could be `precomputed`, but we do not support this option in this PR - sklearn uses `memory` to cache the output of the computation of the tree`, but we do not support this in this PR. - sklearn uses `connectivity` to define for each sample the neighboring samples, but we do not support this in this PR. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with @Public(Evolving): (no) - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (Java doc) [1] https://scikit-learn.org/stable/modules/generated/sklearn.cluster.AgglomerativeClustering.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on a diff in pull request #20673: [BP-1.15][FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
wangyang0918 commented on code in PR #20673: URL: https://github.com/apache/flink/pull/20673#discussion_r955886917 ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java: ## @@ -1119,4 +1166,34 @@ private TestingLongStateHandleHelper.LongStateHandle addDeletingEntry( configMap.getData().put(key, deleting); return state; } + +private static CompletableFuture retryWithFirstFailedK8sOperation( +Function> function, +KubernetesConfigMap leaderConfigMap) { +final AtomicInteger callbackInvocationCount = new AtomicInteger(0); +final CompletableFuture result = +FutureUtils.retry( +() -> +CompletableFuture.supplyAsync( +() -> { + callbackInvocationCount.incrementAndGet(); +function.apply(leaderConfigMap); +if (callbackInvocationCount.get() == 1) { +throw new KubernetesClientException( +"Expected exception to simulate unstable " ++ "kubernetes client operation"); +} +return true; +}, +Executors.newDirectExecutorService()), + KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES +.defaultValue(), +t -> +ExceptionUtils.findThrowable(t, KubernetesClientException.class) +.isPresent(), +Executors.newDirectExecutorService()); +assertThat(callbackInvocationCount.get(), is(2)); +assertThat(result.isDone(), is(true)); Review Comment: Given that the exception of the `result` will be eventually thrown in the `addAndLock`, I do not think it is really necessary to check exception here. But it should be harmless. ``` try { assertThat(result.get(), is(true)); } catch (Exception ex) { fail("Exception should not be thrown."); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20688: [FLINK-28764][table-planner] Support more than 64 distinct aggregate function calls in one aggregate SQL query
flinkbot commented on PR #20688: URL: https://github.com/apache/flink/pull/20688#issuecomment-1228307836 ## CI report: * af5a62774c27a8b56fa6573053ace0701b2c677e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28764) Support more than 64 distinct aggregate function calls in one aggregate SQL query
[ https://issues.apache.org/jira/browse/FLINK-28764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28764: --- Labels: pull-request-available (was: ) > Support more than 64 distinct aggregate function calls in one aggregate SQL > query > - > > Key: FLINK-28764 > URL: https://issues.apache.org/jira/browse/FLINK-28764 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.6, 1.14.5, 1.15.1 >Reporter: Wei Zhong >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Currently Flink SQL does not support more than 64 distinct aggregate function > calls in one aggregate SQL query. We encountered this problem while migrating > batch jobs from spark to flink. The spark job has 79 distinct aggregate > function calls in one aggregate SQL query. > Reproduce code: > {code:java} > public class Test64Distinct { > public static void main(String[] args) { > TableEnvironment tableEnv = > TableEnvironment.create(EnvironmentSettings.inBatchMode()); > tableEnv.executeSql("create table datagen_source(id BIGINT, val > BIGINT) with " + > "('connector'='datagen', 'number-of-rows'='1000')"); > tableEnv.executeSql("select " + > "count(distinct val * 1), " + > "count(distinct val * 2), " + > "count(distinct val * 3), " + > "count(distinct val * 4), " + > "count(distinct val * 5), " + > "count(distinct val * 6), " + > "count(distinct val * 7), " + > "count(distinct val * 8), " + > "count(distinct val * 9), " + > "count(distinct val * 10), " + > "count(distinct val * 11), " + > "count(distinct val * 12), " + > "count(distinct val * 13), " + > "count(distinct val * 14), " + > "count(distinct val * 15), " + > "count(distinct val * 16), " + > "count(distinct val * 17), " + > "count(distinct val * 18), " + > "count(distinct val * 19), " + > "count(distinct val * 20), " + > "count(distinct val * 21), " + > "count(distinct val * 22), " + > "count(distinct val * 23), " + > "count(distinct val * 24), " + > "count(distinct val * 25), " + > "count(distinct val * 26), " + > "count(distinct val * 27), " + > "count(distinct val * 28), " + > "count(distinct val * 29), " + > "count(distinct val * 30), " + > "count(distinct val * 31), " + > "count(distinct val * 32), " + > "count(distinct val * 33), " + > "count(distinct val * 34), " + > "count(distinct val * 35), " + > "count(distinct val * 36), " + > "count(distinct val * 37), " + > "count(distinct val * 38), " + > "count(distinct val * 39), " + > "count(distinct val * 40), " + > "count(distinct val * 41), " + > "count(distinct val * 42), " + > "count(distinct val * 43), " + > "count(distinct val * 44), " + > "count(distinct val * 45), " + > "count(distinct val * 46), " + > "count(distinct val * 47), " + > "count(distinct val * 48), " + > "count(distinct val * 49), " + > "count(distinct val * 50), " + > "count(distinct val * 51), " + > "count(distinct val * 52), " + > "count(distinct val * 53), " + > "count(distinct val * 54), " + > "count(distinct val * 55), " + > "count(distinct val * 56), " + > "count(distinct val * 57), " + > "count(distinct val * 58), " + > "count(distinct val * 59), " + > "count(distinct val * 60), " + > "count(distinct val * 61), " + > "count(distinct val * 62), " + > "count(distinct val * 63), " + > "count(distinct val * 64), " + > "count(distinct val * 65) from datagen_source").print(); > } > } {code} > Exception: > {code:java} > Exception in thread "main" org.apache.flink.table.api.TableException: Sql > optimization: Cannot generate a valid execution plan for the given query: > LogicalSink(table=[*anonymous_collect$1*], fie
[GitHub] [flink] WeiZhong94 opened a new pull request, #20688: [FLINK-28764][table-planner] Support more than 64 distinct aggregate function calls in one aggregate SQL query
WeiZhong94 opened a new pull request, #20688: URL: https://github.com/apache/flink/pull/20688 ## What is the purpose of the change *This pull request rewrite the FlinkAggregateExpandDistinctAggregatesRule to support more than 64 distinct aggregate function calls in one aggregate SQL query.* ## Brief change log - *rewrite the FlinkAggregateExpandDistinctAggregatesRule to build the expand node by itself* - *introduce `ExpandUtil.genExpandIdByIndex` to using auto-increment id instead of `grouping_id()` to support more than 64 distinct call in one aggregate node.* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing This change is already covered by existing tests, such as *testTooManyDistinctAggOnDifferentColumn*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28461) PyFlink Table should add get_resolved_schema method
[ https://issues.apache.org/jira/browse/FLINK-28461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585280#comment-17585280 ] bo zhao commented on FLINK-28461: - May I file a PR for this? Looks like a lower hanging fruit for the newbee pyflink. Like me. ;) > PyFlink Table should add get_resolved_schema method > --- > > Key: FLINK-28461 > URL: https://issues.apache.org/jira/browse/FLINK-28461 > Project: Flink > Issue Type: Improvement > Components: API / Python >Affects Versions: 1.15.1 >Reporter: Xuannan Su >Priority: Major > > The Table#getSchema method is deprecated and replaced with the > Table#getResolvedSchema. > We should add the get_resolved_schema method to the PyFlink Table. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-27175) Fail to call Hive UDAF when the UDAF is with only one parameter with array type
[ https://issues.apache.org/jira/browse/FLINK-27175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-27175. --- Resolution: Fixed Fixed in master: c086a91d977a5cb51d1b7c962bcb51f7d2a867fc > Fail to call Hive UDAF when the UDAF is with only one parameter with array > type > --- > > Key: FLINK-27175 > URL: https://issues.apache.org/jira/browse/FLINK-27175 > Project: Flink > Issue Type: Sub-task >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Critical > Labels: pull-request-available > Fix For: 1.16.0 > > > When try to call Hive's collect_list function, it'll throw the following > exception: > > {code:java} > Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to > [Ljava.lang.Object; > at > org.apache.flink.table.functions.hive.conversion.HiveInspectors.lambda$getConversion$7f882244$1(HiveInspectors.java:201) > at > org.apache.flink.table.functions.hive.HiveGenericUDAF.accumulate(HiveGenericUDAF.java:185) > at LocalNoGroupingAggregateWithoutKeys$39.processElement(Unknown Source) > {code} > The reason is when the parameter is a single array, Flink calls > udf.accumulate(AggregationBuffer, Array[Double]), at this point java's > var-args will cast Array[Double] to Array[Object]and let it be Object... args. > {code:java} > public void accumulate(GenericUDAFEvaluator.AggregationBuffer acc, Object... > inputs) {code} > Then it will consider the elements in the array as parameters. > > The exception will also happen for other similar Hive UDAF. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wuchong merged pull request #19423: [FLINK-27175][hive] Fix fail to call Hive UDAF when the UDAF accepts one parameter with array type
wuchong merged PR #19423: URL: https://github.com/apache/flink/pull/19423 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29120) Unexpected join hint propagation into view
[ https://issues.apache.org/jira/browse/FLINK-29120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-29120: Description: As expected, Join Hint should only affects the current query block, and does not affect the Join strategy in subquery and view. But current implementation behaviors inconsistently: use source tables of flink-tpch-test, the following join hint takes effect unexpectedly {code:java} Flink SQL> create temporary view v1 as SELECT > p_name, > p_mfgr, > p_brand, > p_type, > s_name, > s_address > FROM > part, > supplier > WHERE p_partkey = s_suppkey; [INFO] Execute statement succeed. Flink SQL> explain SELECT /*+ SHUFFLE_MERGE(part) */ * from v1; == Abstract Syntax Tree == LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4], s_name=[$10], s_address=[$11]) +- LogicalFilter(condition=[=($0, $9)]) +- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[SHUFFLE_MERGE inheritPath:[0, 0] options:[part) :- LogicalTableScan(table=[[default_catalog, default_database, part]]) +- LogicalTableScan(table=[[default_catalog, default_database, supplier]]) == Optimized Physical Plan == Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address]) +- SortMergeJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, s_address]) :- Exchange(distribution=[hash[p_partkey]]) : +- TableSourceScan(table=[[default_catalog, default_database, part, project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], fields=[p_partkey, p_name, p_mfgr, p_brand, p_type]) +- Exchange(distribution=[hash[s_suppkey]]) +- TableSourceScan(table=[[default_catalog, default_database, supplier, project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, s_name, s_address]) == Optimized Execution Plan == Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address]) +- SortMergeJoin(joinType=[InnerJoin], where=[(p_partkey = s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, s_address]) :- Exchange(distribution=[hash[p_partkey]]) : +- TableSourceScan(table=[[default_catalog, default_database, part, project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], fields=[p_partkey, p_name, p_mfgr, p_brand, p_type]) +- Exchange(distribution=[hash[s_suppkey]]) +- TableSourceScan(table=[[default_catalog, default_database, supplier, project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, s_name, s_address]) {code} without hint {code} Flink SQL> explain SELECT * from v1; == Abstract Syntax Tree == LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4], s_name=[$10], s_address=[$11]) +- LogicalFilter(condition=[=($0, $9)]) +- LogicalJoin(condition=[true], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, part]]) +- LogicalTableScan(table=[[default_catalog, default_database, supplier]]) == Optimized Physical Plan == Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address]) +- HashJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, s_address], isBroadcast=[true], build=[right]) :- TableSourceScan(table=[[default_catalog, default_database, part, project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], fields=[p_partkey, p_name, p_mfgr, p_brand, p_type]) +- Exchange(distribution=[broadcast]) +- TableSourceScan(table=[[default_catalog, default_database, supplier, project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, s_name, s_address]) == Optimized Execution Plan == Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address]) +- MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(p_partkey = s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, s_address], isBroadcast=[true], build=[right])\n:- [#1] TableSourceScan(table=[[default_catalog, default_database, part, project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])\n+- [#2] Exchange(distribution=[broadcast])\n]) :- TableSourceScan(table=[[default_catalog, default_database, part, project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], fields=[p_partkey, p_name, p_mfgr, p_brand, p_type]) +- Exchange(distribution=[broadcast]) +- TableSourceScan(table=[[default_catalog, default_database, supplier, project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, s_name, s_address]) {code} was: As expected, Join Hint should only affects the current query block, and does not affect the Join strategy in subquery and view. But current implementation behaviors i
[jira] [Created] (FLINK-29120) Unexpected join hint propagation into view
lincoln lee created FLINK-29120: --- Summary: Unexpected join hint propagation into view Key: FLINK-29120 URL: https://issues.apache.org/jira/browse/FLINK-29120 Project: Flink Issue Type: Bug Reporter: lincoln lee As expected, Join Hint should only affects the current query block, and does not affect the Join strategy in subquery and view. But current implementation behaviors inconsistently: use source tables of flink-tpch-test, the following join hint takes effect unexpectedly {code} Flink SQL> create temporary view v1 as SELECT > p_name, > p_mfgr, > p_brand, > p_type, > s_name, > s_address > FROM > part, > supplier > WHERE p_partkey = s_suppkey; [INFO] Execute statement succeed. Flink SQL> explain SELECT /*+ SHUFFLE_MERGE(part) */ * from v1; == Abstract Syntax Tree == LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4], s_name=[$10], s_address=[$11]) +- LogicalFilter(condition=[=($0, $9)]) +- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[SHUFFLE_MERGE inheritPath:[0, 0] options:[part) :- LogicalTableScan(table=[[default_catalog, default_database, part]]) +- LogicalTableScan(table=[[default_catalog, default_database, supplier]]) == Optimized Physical Plan == Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address]) +- SortMergeJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, s_address]) :- Exchange(distribution=[hash[p_partkey]]) : +- TableSourceScan(table=[[default_catalog, default_database, part, project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], fields=[p_partkey, p_name, p_mfgr, p_brand, p_type]) +- Exchange(distribution=[hash[s_suppkey]]) +- TableSourceScan(table=[[default_catalog, default_database, supplier, project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, s_name, s_address]) == Optimized Execution Plan == Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address]) +- SortMergeJoin(joinType=[InnerJoin], where=[(p_partkey = s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, s_address]) :- Exchange(distribution=[hash[p_partkey]]) : +- TableSourceScan(table=[[default_catalog, default_database, part, project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], fields=[p_partkey, p_name, p_mfgr, p_brand, p_type]) +- Exchange(distribution=[hash[s_suppkey]]) +- TableSourceScan(table=[[default_catalog, default_database, supplier, project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, s_name, s_address]) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28764) Support more than 64 distinct aggregate function calls in one aggregate SQL query
[ https://issues.apache.org/jira/browse/FLINK-28764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-28764: --- Assignee: Wei Zhong > Support more than 64 distinct aggregate function calls in one aggregate SQL > query > - > > Key: FLINK-28764 > URL: https://issues.apache.org/jira/browse/FLINK-28764 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.6, 1.14.5, 1.15.1 >Reporter: Wei Zhong >Assignee: Wei Zhong >Priority: Major > Fix For: 1.16.0 > > > Currently Flink SQL does not support more than 64 distinct aggregate function > calls in one aggregate SQL query. We encountered this problem while migrating > batch jobs from spark to flink. The spark job has 79 distinct aggregate > function calls in one aggregate SQL query. > Reproduce code: > {code:java} > public class Test64Distinct { > public static void main(String[] args) { > TableEnvironment tableEnv = > TableEnvironment.create(EnvironmentSettings.inBatchMode()); > tableEnv.executeSql("create table datagen_source(id BIGINT, val > BIGINT) with " + > "('connector'='datagen', 'number-of-rows'='1000')"); > tableEnv.executeSql("select " + > "count(distinct val * 1), " + > "count(distinct val * 2), " + > "count(distinct val * 3), " + > "count(distinct val * 4), " + > "count(distinct val * 5), " + > "count(distinct val * 6), " + > "count(distinct val * 7), " + > "count(distinct val * 8), " + > "count(distinct val * 9), " + > "count(distinct val * 10), " + > "count(distinct val * 11), " + > "count(distinct val * 12), " + > "count(distinct val * 13), " + > "count(distinct val * 14), " + > "count(distinct val * 15), " + > "count(distinct val * 16), " + > "count(distinct val * 17), " + > "count(distinct val * 18), " + > "count(distinct val * 19), " + > "count(distinct val * 20), " + > "count(distinct val * 21), " + > "count(distinct val * 22), " + > "count(distinct val * 23), " + > "count(distinct val * 24), " + > "count(distinct val * 25), " + > "count(distinct val * 26), " + > "count(distinct val * 27), " + > "count(distinct val * 28), " + > "count(distinct val * 29), " + > "count(distinct val * 30), " + > "count(distinct val * 31), " + > "count(distinct val * 32), " + > "count(distinct val * 33), " + > "count(distinct val * 34), " + > "count(distinct val * 35), " + > "count(distinct val * 36), " + > "count(distinct val * 37), " + > "count(distinct val * 38), " + > "count(distinct val * 39), " + > "count(distinct val * 40), " + > "count(distinct val * 41), " + > "count(distinct val * 42), " + > "count(distinct val * 43), " + > "count(distinct val * 44), " + > "count(distinct val * 45), " + > "count(distinct val * 46), " + > "count(distinct val * 47), " + > "count(distinct val * 48), " + > "count(distinct val * 49), " + > "count(distinct val * 50), " + > "count(distinct val * 51), " + > "count(distinct val * 52), " + > "count(distinct val * 53), " + > "count(distinct val * 54), " + > "count(distinct val * 55), " + > "count(distinct val * 56), " + > "count(distinct val * 57), " + > "count(distinct val * 58), " + > "count(distinct val * 59), " + > "count(distinct val * 60), " + > "count(distinct val * 61), " + > "count(distinct val * 62), " + > "count(distinct val * 63), " + > "count(distinct val * 64), " + > "count(distinct val * 65) from datagen_source").print(); > } > } {code} > Exception: > {code:java} > Exception in thread "main" org.apache.flink.table.api.TableException: Sql > optimization: Cannot generate a valid execution plan for the given query: > LogicalSink(table=[*anonymous_collect$1*], fields=[EXPR$0, EXPR$1, EXPR$2, > EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7, E
[jira] [Updated] (FLINK-29118) Remove default_catalog in the HiveServer2 Endpoint
[ https://issues.apache.org/jira/browse/FLINK-29118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang updated FLINK-29118: -- Description: Hive only has one Catalog. We don't require the default_catalog. Hive JDBC Driver also doesn't support multiple catalogs. !image-2022-08-26-17-40-49-989.png! was:Hive only has one Catalog. We don't require the default_catalog > Remove default_catalog in the HiveServer2 Endpoint > -- > > Key: FLINK-29118 > URL: https://issues.apache.org/jira/browse/FLINK-29118 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive, Table SQL / Gateway >Affects Versions: 1.16.0 >Reporter: Shengkai Fang >Priority: Major > Fix For: 1.16.0 > > Attachments: image-2022-08-26-17-40-49-989.png > > > Hive only has one Catalog. We don't require the default_catalog. Hive JDBC > Driver also doesn't support multiple catalogs. > > > !image-2022-08-26-17-40-49-989.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29118) Remove default_catalog in the HiveServer2 Endpoint
[ https://issues.apache.org/jira/browse/FLINK-29118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang updated FLINK-29118: -- Attachment: image-2022-08-26-17-40-49-989.png > Remove default_catalog in the HiveServer2 Endpoint > -- > > Key: FLINK-29118 > URL: https://issues.apache.org/jira/browse/FLINK-29118 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive, Table SQL / Gateway >Affects Versions: 1.16.0 >Reporter: Shengkai Fang >Priority: Major > Fix For: 1.16.0 > > Attachments: image-2022-08-26-17-40-49-989.png > > > Hive only has one Catalog. We don't require the default_catalog -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29119) Should clarify how join hints work with CTE
lincoln lee created FLINK-29119: --- Summary: Should clarify how join hints work with CTE Key: FLINK-29119 URL: https://issues.apache.org/jira/browse/FLINK-29119 Project: Flink Issue Type: Improvement Reporter: lincoln lee use source tables of flink-tpch-test join hint on a single expression name of CTE works fine: {code} Flink SQL> explain with q1 as (SELECT > p_name, > p_mfgr, > p_brand, > p_type, > s_name, > s_address > FROM > part, > supplier > WHERE p_partkey = s_suppkey) > > SELECT /*+ SHUFFLE_MERGE(part,supplier) */ * from q1; == Abstract Syntax Tree == LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4], s_name=[$10], s_address=[$11]) +- LogicalFilter(condition=[=($0, $9)]) +- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[SHUFFLE_MERGE inheritPath:[0, 0] options:[part, supplier) :- LogicalTableScan(table=[[default_catalog, default_database, part]], hints=[[[ALIAS inheritPath:[] options:[part) +- LogicalTableScan(table=[[default_catalog, default_database, supplier]], hints=[[[ALIAS inheritPath:[] options:[supplier) == Optimized Physical Plan == Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address]) +- SortMergeJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, s_address]) :- Exchange(distribution=[hash[p_partkey]]) : +- TableSourceScan(table=[[default_catalog, default_database, part, project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], fields=[p_partkey, p_name, p_mfgr, p_brand, p_type]) +- Exchange(distribution=[hash[s_suppkey]]) +- TableSourceScan(table=[[default_catalog, default_database, supplier, project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, s_name, s_address]) == Optimized Execution Plan == Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address]) +- SortMergeJoin(joinType=[InnerJoin], where=[(p_partkey = s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, s_address]) :- Exchange(distribution=[hash[p_partkey]]) : +- TableSourceScan(table=[[default_catalog, default_database, part, project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], fields=[p_partkey, p_name, p_mfgr, p_brand, p_type]) +- Exchange(distribution=[hash[s_suppkey]]) +- TableSourceScan(table=[[default_catalog, default_database, supplier, project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, s_name, s_address]) {code} but raise an error when there co-exists an alias of the expression name {code} Flink SQL> explain with q1 as (SELECT > p_name, > p_mfgr, > p_brand, > p_type, > s_name, > s_address > FROM > part, > supplier > WHERE p_partkey = s_suppkey) > > SELECT /*+ SHUFFLE_MERGE(part,supplier) */ * from q1, q1 q2 where q1.p_name > = q2.p_name; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: The options of following hints cannot match the name of input tables or views: `SHUFFLE_MERGE(part, supplier)` {code} The expected behavior with CTE should be clarified in the documentation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29118) Remove default_catalog in the HiveServer2 Endpoint
Shengkai Fang created FLINK-29118: - Summary: Remove default_catalog in the HiveServer2 Endpoint Key: FLINK-29118 URL: https://issues.apache.org/jira/browse/FLINK-29118 Project: Flink Issue Type: Bug Components: Connectors / Hive, Table SQL / Gateway Affects Versions: 1.16.0 Reporter: Shengkai Fang Fix For: 1.16.0 Hive only has one Catalog. We don't require the default_catalog -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] WencongLiu commented on pull request #20678: [FLINK-29097][sql-gateway]Moving json se/deserializers from sql-gateway-api to sql-gateway
WencongLiu commented on PR #20678: URL: https://github.com/apache/flink/pull/20678#issuecomment-1228272556 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] HuangXingBo commented on pull request #20685: [FLINK-28429][python] Optimize PyFlink tests
HuangXingBo commented on PR #20685: URL: https://github.com/apache/flink/pull/20685#issuecomment-1228252796 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-29088) Project push down cause the source reuse can not work
[ https://issues.apache.org/jira/browse/FLINK-29088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585251#comment-17585251 ] Aitozi edited comment on FLINK-29088 at 8/26/22 9:04 AM: - [~twalthr] Thanks for your inputs. > I guess the implementation can be a nicely separable logical rule I'm not sure whether we can detect a same source node in the Logical rule, My current solution is working around the {{SubplanReuser}} to find the similar source except the project list. And then recreate a new source with the union fields of all the projects. Do you think it's a reasonable solution? was (Author: aitozi): [~twalthr] Thanks for your inputs. > I guess the implementation can be a nicely separable logical rule I'm not sure whether we can detect a same source node in the Logical rule, My current solution is working around the {{SubplanReuser}} to find the similar source except the project list. And the recreate a new source with the union fields of all the projects. Do you think it's a reasonable solution? > Project push down cause the source reuse can not work > - > > Key: FLINK-29088 > URL: https://issues.apache.org/jira/browse/FLINK-29088 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Assignee: Aitozi >Priority: Major > > It can be reproduce by > {code:java} > util.addTable( > s""" > |create table newX( > | a int, > | b bigint, > | c varchar > |) with ( > | 'connector' = 'values' > | ,'enable-projection-push-down' = 'true' > |) >""".stripMargin) > val sqlQuery = > """ > | SELECT b from newX WHERE a > 10 > | UNION ALL > | SELECT b from newX WHERE b > 10 > """.stripMargin > util.verifyExecPlan(sqlQuery) > {code} > if 'enable-projection-push-down' set to true, the source will not be reused. > If set to false, the source will be reused. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-28646) Handle scaling operation separately in reconciler/service
[ https://issues.apache.org/jira/browse/FLINK-28646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-28646. -- Resolution: Fixed merged to main 0a346ec7c6e8b770c1c10aa01686ee68c9dd4f27 > Handle scaling operation separately in reconciler/service > -- > > Key: FLINK-28646 > URL: https://issues.apache.org/jira/browse/FLINK-28646 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Matyas Orhidi >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.2.0 > > > The standalone integration for session clusters and application clusters > (with reactive mode), allows for more efficient scaling operations when only > the parallelism changes. > We should distinguish this opration in the reconciler/service and implement > this for standalone mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29088) Project push down cause the source reuse can not work
[ https://issues.apache.org/jira/browse/FLINK-29088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585251#comment-17585251 ] Aitozi commented on FLINK-29088: [~twalthr] Thanks for your inputs. > I guess the implementation can be a nicely separable logical rule I'm not sure whether we can detect a same source node in the Logical rule, My current solution is working around the {{SubplanReuser}} to find the similar source except the project list. And the recreate a new source with the union fields of all the projects. Do you think it's a reasonable solution? > Project push down cause the source reuse can not work > - > > Key: FLINK-29088 > URL: https://issues.apache.org/jira/browse/FLINK-29088 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Assignee: Aitozi >Priority: Major > > It can be reproduce by > {code:java} > util.addTable( > s""" > |create table newX( > | a int, > | b bigint, > | c varchar > |) with ( > | 'connector' = 'values' > | ,'enable-projection-push-down' = 'true' > |) >""".stripMargin) > val sqlQuery = > """ > | SELECT b from newX WHERE a > 10 > | UNION ALL > | SELECT b from newX WHERE b > 10 > """.stripMargin > util.verifyExecPlan(sqlQuery) > {code} > if 'enable-projection-push-down' set to true, the source will not be reused. > If set to false, the source will be reused. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #351: [FLINK-28646] Handle scaling operation separately in reconciler/service
gyfora merged PR #351: URL: https://github.com/apache/flink-kubernetes-operator/pull/351 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20687: [BP-1.15][FLINK-28976][state] Don't add extra delay to the 1st materialization
flinkbot commented on PR #20687: URL: https://github.com/apache/flink/pull/20687#issuecomment-1228242435 ## CI report: * 2c8c9692044b3d2ed718c2a31e68ed9bd79ec8f9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29117) Tried to associate with unreachable remote resourcemanager address
[ https://issues.apache.org/jira/browse/FLINK-29117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] geonyeong kim updated FLINK-29117: -- Description: Hello. I am planning to distribute and use FlinkDeployment through the flink kubernetes operator. CRD, operator, webbook, etc. are all set up, and we actually distributed FlinkDeployment to confirm normal operation. *However, strangely, connecting to resource manager fails if you make more than one task manager pod replica.* I thought it might be a problem with akka, timeout, etc. so I increased the values as below The connection continues to fail. - akka.retry-gate-closed-for: 1 - akka.server-socket-worker-pool.pool-size-min: 6 - akka.server-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-min: 6 - blob.client.connect.timeout: 3 The log of the taskmanager is as follows. {code:java} Association with remote system [akka.tcp://flink@10.238.80.92:6123] has failed, address is now gated for [1] ms. Reason: [Disassociated] Could not resolve ResourceManager address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1. Tried to associate with unreachable remote address [akka.tcp://flink@10.238.80.92:6123]. Address is now gated for 1 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.] {code} *If you go into the task manager pod and tcp check, the connection is open.* *Below are the flink versions I used.* * flink image: 1.15.1 - flink kubernetes operator: 1.1.0 *I would appreciate it if you could check the problem quickly.* *If it's a bug, please tell me how to detour in the current situation.* was: Hello. I am planning to distribute and use FlinkDeployment through the flink kubernetes operator. CRD, operator, webbook, etc. are all set up, and we actually distributed FlinkDeployment to confirm normal operation. *However, strangely, connecting to resource manager fails if you make more than one task manager pod replica.* I thought it might be a problem with akka, timeout, etc. so I increased the values as below The connection continues to fail. - akka.retry-gate-closed-for: 1 - akka.server-socket-worker-pool.pool-size-min: 6 - akka.server-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-min: 6 - blob.client.connect. The log of the taskmanager is as follows. {code:java} Association with remote system [akka.tcp://flink@10.238.80.92:6123] has failed, address is now gated for [1] ms. Reason: [Disassociated] Could not resolve ResourceManager address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1. Tried to associate with unreachable remote address [akka.tcp://flink@10.238.80.92:6123]. Address is now gated for 1 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.] {code} *If you go into the task manager pod and tcp check, the connection is open.* *Below are the flink versions I used.* * flink image: 1.15.1 - flink kubernetes operator: 1.1.0 *I would appreciate it if you could check the problem quickly.* *If it's a bug, please tell me how to detour in the current situation.* > Tried to associate with unreachable remote resourcemanager address > -- > > Key: FLINK-29117 > URL: https://issues.apache.org/jira/browse/FLINK-29117 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes, flink-contrib, flink-docker, > Kubernetes Operator >Affects Versions: 1.15.1, kubernetes-operator-1.1.0 >Reporter: geonyeong kim >Priority: Critical > Attachments: taskmanager_log.png > > > Hello. > I am planning to distribute and use FlinkDeployment through the flink > kubernetes operator. > CRD, operator, webbook, etc. are all set up, and we actually distributed > FlinkDeployment to confirm normal operation. > *However, strangely, connecting to resource manager fails if you make more > than one task manager pod replica.* > I thought it might be a problem with akka, timeout, etc. so I increased the > values as below > The connection continues to fail. > - akka.retry-gate-closed-for:
[GitHub] [flink] rkhachatryan opened a new pull request, #20687: [BP-1.15][FLINK-28976][state] Don't add extra delay to the 1st materialization
rkhachatryan opened a new pull request, #20687: URL: https://github.com/apache/flink/pull/20687 Backport of #20585 to 1.15. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-29116) Tried to associate with unreachable remote address
[ https://issues.apache.org/jira/browse/FLINK-29116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] geonyeong kim closed FLINK-29116. - Resolution: Duplicate > Tried to associate with unreachable remote address > -- > > Key: FLINK-29116 > URL: https://issues.apache.org/jira/browse/FLINK-29116 > Project: Flink > Issue Type: Bug >Affects Versions: 1.15.1, kubernetes-operator-1.1.0 >Reporter: geonyeong kim >Priority: Major > Labels: features > Attachments: Screen Shot 2022-08-26 at 5.04.37 PM.png > > > Hello. > I am planning to distribute and use FlinkDeployment through the flink > kubernetes operator. > CRD, operator, webbook, etc. are all set up, and we actually distributed > FlinkDeployment to confirm normal operation. > *However, strangely, connecting to resource manager fails if you make more > than one task manager pod replica.* > I thought it might be a problem with akka, timeout, etc. so I increased the > values as below > The connection continues to fail. > - akka.retry-gate-closed-for: 1 > - akka.server-socket-worker-pool.pool-size-min: 6 > - akka.server-socket-worker-pool.pool-size-max: 10 > - akka.client-socket-worker-pool.pool-size-max: 10 > - akka.client-socket-worker-pool.pool-size-min: 6 > - blob.client.connect. > > The log of the taskmanager is as follows. > {code:java} > Association with remote system [akka.tcp://flink@10.238.80.92:6123] has > failed, address is now gated for [1] ms. Reason: [Disassociated] > Could not resolve ResourceManager address > akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1, retrying in > 1 ms: Could not connect to rpc endpoint under address > akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1. > Tried to associate with unreachable remote address > [akka.tcp://flink@10.238.80.92:6123]. Address is now gated for 1 ms, all > messages to this address will be delivered to dead letters. Reason: [The > remote system has quarantined this system. No further associations to the > remote system are possible until this system is restarted.] > {code} > > *If you go into the task manager pod and tcp check, the connection is open.* > *Below are the flink versions I used.* > *- flink image: 1.15.1* > *- flink kubernetes operator: 1.1.0* > *I would appreciate it if you could check the problem quickly.* > *If it's a bug, please tell me how to detour in the current situation.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29116) Tried to associate with unreachable remote address
[ https://issues.apache.org/jira/browse/FLINK-29116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] geonyeong kim updated FLINK-29116: -- Labels: bug features (was: features) > Tried to associate with unreachable remote address > -- > > Key: FLINK-29116 > URL: https://issues.apache.org/jira/browse/FLINK-29116 > Project: Flink > Issue Type: Bug >Affects Versions: 1.15.1, kubernetes-operator-1.1.0 >Reporter: geonyeong kim >Priority: Major > Labels: bug, features > Attachments: Screen Shot 2022-08-26 at 5.04.37 PM.png > > > Hello. > I am planning to distribute and use FlinkDeployment through the flink > kubernetes operator. > CRD, operator, webbook, etc. are all set up, and we actually distributed > FlinkDeployment to confirm normal operation. > *However, strangely, connecting to resource manager fails if you make more > than one task manager pod replica.* > I thought it might be a problem with akka, timeout, etc. so I increased the > values as below > The connection continues to fail. > - akka.retry-gate-closed-for: 1 > - akka.server-socket-worker-pool.pool-size-min: 6 > - akka.server-socket-worker-pool.pool-size-max: 10 > - akka.client-socket-worker-pool.pool-size-max: 10 > - akka.client-socket-worker-pool.pool-size-min: 6 > - blob.client.connect. > > The log of the taskmanager is as follows. > {code:java} > Association with remote system [akka.tcp://flink@10.238.80.92:6123] has > failed, address is now gated for [1] ms. Reason: [Disassociated] > Could not resolve ResourceManager address > akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1, retrying in > 1 ms: Could not connect to rpc endpoint under address > akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1. > Tried to associate with unreachable remote address > [akka.tcp://flink@10.238.80.92:6123]. Address is now gated for 1 ms, all > messages to this address will be delivered to dead letters. Reason: [The > remote system has quarantined this system. No further associations to the > remote system are possible until this system is restarted.] > {code} > > *If you go into the task manager pod and tcp check, the connection is open.* > *Below are the flink versions I used.* > *- flink image: 1.15.1* > *- flink kubernetes operator: 1.1.0* > *I would appreciate it if you could check the problem quickly.* > *If it's a bug, please tell me how to detour in the current situation.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29117) Tried to associate with unreachable remote resourcemanager address
[ https://issues.apache.org/jira/browse/FLINK-29117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] geonyeong kim updated FLINK-29117: -- Summary: Tried to associate with unreachable remote resourcemanager address (was: Tried to associate with unreachable remote address) > Tried to associate with unreachable remote resourcemanager address > -- > > Key: FLINK-29117 > URL: https://issues.apache.org/jira/browse/FLINK-29117 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes, flink-contrib, flink-docker, > Kubernetes Operator >Affects Versions: 1.15.1, kubernetes-operator-1.1.0 >Reporter: geonyeong kim >Priority: Critical > Attachments: taskmanager_log.png > > > Hello. > I am planning to distribute and use FlinkDeployment through the flink > kubernetes operator. > CRD, operator, webbook, etc. are all set up, and we actually distributed > FlinkDeployment to confirm normal operation. > *However, strangely, connecting to resource manager fails if you make more > than one task manager pod replica.* > I thought it might be a problem with akka, timeout, etc. so I increased the > values as below > The connection continues to fail. > - akka.retry-gate-closed-for: 1 > - akka.server-socket-worker-pool.pool-size-min: 6 > - akka.server-socket-worker-pool.pool-size-max: 10 > - akka.client-socket-worker-pool.pool-size-max: 10 > - akka.client-socket-worker-pool.pool-size-min: 6 > - blob.client.connect. > The log of the taskmanager is as follows. > > {code:java} > Association with remote system [akka.tcp://flink@10.238.80.92:6123] has > failed, address is now gated for [1] ms. Reason: [Disassociated] > Could not resolve ResourceManager address > akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1, retrying in > 1 ms: Could not connect to rpc endpoint under address > akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1. > Tried to associate with unreachable remote address > [akka.tcp://flink@10.238.80.92:6123]. Address is now gated for 1 ms, all > messages to this address will be delivered to dead letters. Reason: [The > remote system has quarantined this system. No further associations to the > remote system are possible until this system is restarted.] {code} > *If you go into the task manager pod and tcp check, the connection is open.* > *Below are the flink versions I used.* > * flink image: 1.15.1 > - flink kubernetes operator: 1.1.0 > > *I would appreciate it if you could check the problem quickly.* > *If it's a bug, please tell me how to detour in the current situation.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29117) Tried to associate with unreachable remote address
[ https://issues.apache.org/jira/browse/FLINK-29117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] geonyeong kim updated FLINK-29117: -- Attachment: (was: Screen Shot 2022-08-26 at 5.04.37 PM.png) > Tried to associate with unreachable remote address > -- > > Key: FLINK-29117 > URL: https://issues.apache.org/jira/browse/FLINK-29117 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes, flink-contrib, flink-docker, > Kubernetes Operator >Affects Versions: 1.15.1, kubernetes-operator-1.1.0 >Reporter: geonyeong kim >Priority: Critical > Attachments: taskmanager_log.png > > > Hello. > I am planning to distribute and use FlinkDeployment through the flink > kubernetes operator. > CRD, operator, webbook, etc. are all set up, and we actually distributed > FlinkDeployment to confirm normal operation. > *However, strangely, connecting to resource manager fails if you make more > than one task manager pod replica.* > I thought it might be a problem with akka, timeout, etc. so I increased the > values as below > The connection continues to fail. > - akka.retry-gate-closed-for: 1 > - akka.server-socket-worker-pool.pool-size-min: 6 > - akka.server-socket-worker-pool.pool-size-max: 10 > - akka.client-socket-worker-pool.pool-size-max: 10 > - akka.client-socket-worker-pool.pool-size-min: 6 > - blob.client.connect. > The log of the taskmanager is as follows. > > {code:java} > Association with remote system [akka.tcp://flink@10.238.80.92:6123] has > failed, address is now gated for [1] ms. Reason: [Disassociated] > Could not resolve ResourceManager address > akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1, retrying in > 1 ms: Could not connect to rpc endpoint under address > akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1. > Tried to associate with unreachable remote address > [akka.tcp://flink@10.238.80.92:6123]. Address is now gated for 1 ms, all > messages to this address will be delivered to dead letters. Reason: [The > remote system has quarantined this system. No further associations to the > remote system are possible until this system is restarted.] {code} > *If you go into the task manager pod and tcp check, the connection is open.* > *Below are the flink versions I used.* > * flink image: 1.15.1 > - flink kubernetes operator: 1.1.0 > > *I would appreciate it if you could check the problem quickly.* > *If it's a bug, please tell me how to detour in the current situation.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29117) Tried to associate with unreachable remote address
[ https://issues.apache.org/jira/browse/FLINK-29117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] geonyeong kim updated FLINK-29117: -- Attachment: taskmanager_log.png > Tried to associate with unreachable remote address > -- > > Key: FLINK-29117 > URL: https://issues.apache.org/jira/browse/FLINK-29117 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes, flink-contrib, flink-docker, > Kubernetes Operator >Affects Versions: 1.15.1, kubernetes-operator-1.1.0 >Reporter: geonyeong kim >Priority: Critical > Attachments: taskmanager_log.png > > > Hello. > I am planning to distribute and use FlinkDeployment through the flink > kubernetes operator. > CRD, operator, webbook, etc. are all set up, and we actually distributed > FlinkDeployment to confirm normal operation. > *However, strangely, connecting to resource manager fails if you make more > than one task manager pod replica.* > I thought it might be a problem with akka, timeout, etc. so I increased the > values as below > The connection continues to fail. > - akka.retry-gate-closed-for: 1 > - akka.server-socket-worker-pool.pool-size-min: 6 > - akka.server-socket-worker-pool.pool-size-max: 10 > - akka.client-socket-worker-pool.pool-size-max: 10 > - akka.client-socket-worker-pool.pool-size-min: 6 > - blob.client.connect. > The log of the taskmanager is as follows. > > {code:java} > Association with remote system [akka.tcp://flink@10.238.80.92:6123] has > failed, address is now gated for [1] ms. Reason: [Disassociated] > Could not resolve ResourceManager address > akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1, retrying in > 1 ms: Could not connect to rpc endpoint under address > akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1. > Tried to associate with unreachable remote address > [akka.tcp://flink@10.238.80.92:6123]. Address is now gated for 1 ms, all > messages to this address will be delivered to dead letters. Reason: [The > remote system has quarantined this system. No further associations to the > remote system are possible until this system is restarted.] {code} > *If you go into the task manager pod and tcp check, the connection is open.* > *Below are the flink versions I used.* > * flink image: 1.15.1 > - flink kubernetes operator: 1.1.0 > > *I would appreciate it if you could check the problem quickly.* > *If it's a bug, please tell me how to detour in the current situation.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29117) Tried to associate with unreachable remote address
[ https://issues.apache.org/jira/browse/FLINK-29117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] geonyeong kim updated FLINK-29117: -- Description: Hello. I am planning to distribute and use FlinkDeployment through the flink kubernetes operator. CRD, operator, webbook, etc. are all set up, and we actually distributed FlinkDeployment to confirm normal operation. *However, strangely, connecting to resource manager fails if you make more than one task manager pod replica.* I thought it might be a problem with akka, timeout, etc. so I increased the values as below The connection continues to fail. - akka.retry-gate-closed-for: 1 - akka.server-socket-worker-pool.pool-size-min: 6 - akka.server-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-min: 6 - blob.client.connect. The log of the taskmanager is as follows. {code:java} Association with remote system [akka.tcp://flink@10.238.80.92:6123] has failed, address is now gated for [1] ms. Reason: [Disassociated] Could not resolve ResourceManager address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1. Tried to associate with unreachable remote address [akka.tcp://flink@10.238.80.92:6123]. Address is now gated for 1 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.] {code} *If you go into the task manager pod and tcp check, the connection is open.* *Below are the flink versions I used.* * flink image: 1.15.1 - flink kubernetes operator: 1.1.0 *I would appreciate it if you could check the problem quickly.* *If it's a bug, please tell me how to detour in the current situation.* was: Hello. I am planning to distribute and use FlinkDeployment through the flink kubernetes operator. CRD, operator, webbook, etc. are all set up, and we actually distributed FlinkDeployment to confirm normal operation. *However, strangely, connecting to resource manager fails if you make more than one task manager pod replica.* I thought it might be a problem with akka, timeout, etc. so I increased the values as below The connection continues to fail. - akka.retry-gate-closed-for: 1 - akka.server-socket-worker-pool.pool-size-min: 6 - akka.server-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-min: 6 - blob.client.connect. The log of the taskmanager is as follows. {code:java} Association with remote system [akka.tcp://flink@10.238.80.92:6123] has failed, address is now gated for [1] ms. Reason: [Disassociated] Could not resolve ResourceManager address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1. Tried to associate with unreachable remote address [akka.tcp://flink@10.238.80.92:6123]. Address is now gated for 1 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.] {code} *If you go into the task manager pod and tcp check, the connection is open.* *Below are the flink versions I used.* * -- flink image: 1.15.1* -- flink kubernetes operator: 1.1.0* *I would appreciate it if you could check the problem quickly.* *If it's a bug, please tell me how to detour in the current situation.* > Tried to associate with unreachable remote address > -- > > Key: FLINK-29117 > URL: https://issues.apache.org/jira/browse/FLINK-29117 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes, flink-contrib, flink-docker, > Kubernetes Operator >Affects Versions: 1.15.1, kubernetes-operator-1.1.0 >Reporter: geonyeong kim >Priority: Critical > Attachments: Screen Shot 2022-08-26 at 5.04.37 PM.png > > > Hello. > I am planning to distribute and use FlinkDeployment through the flink > kubernetes operator. > CRD, operator, webbook, etc. are all set up, and we actually distributed > FlinkDeployment to confirm normal operation. > *However, strangely, connecting to resource manager fails if you make more > than one task manager pod replica.* > I thought it might be a problem with akka, timeout, etc. so I increased the > values as below > The connection continues to fail. > - akka.retry-gate-closed-for: 1 > - akka.serve
[jira] [Updated] (FLINK-29117) Tried to associate with unreachable remote address
[ https://issues.apache.org/jira/browse/FLINK-29117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] geonyeong kim updated FLINK-29117: -- Description: Hello. I am planning to distribute and use FlinkDeployment through the flink kubernetes operator. CRD, operator, webbook, etc. are all set up, and we actually distributed FlinkDeployment to confirm normal operation. *However, strangely, connecting to resource manager fails if you make more than one task manager pod replica.* I thought it might be a problem with akka, timeout, etc. so I increased the values as below The connection continues to fail. - akka.retry-gate-closed-for: 1 - akka.server-socket-worker-pool.pool-size-min: 6 - akka.server-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-min: 6 - blob.client.connect. The log of the taskmanager is as follows. {code:java} Association with remote system [akka.tcp://flink@10.238.80.92:6123] has failed, address is now gated for [1] ms. Reason: [Disassociated] Could not resolve ResourceManager address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1. Tried to associate with unreachable remote address [akka.tcp://flink@10.238.80.92:6123]. Address is now gated for 1 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.] {code} *If you go into the task manager pod and tcp check, the connection is open.* *Below are the flink versions I used.* * -- flink image: 1.15.1* -- flink kubernetes operator: 1.1.0* *I would appreciate it if you could check the problem quickly.* *If it's a bug, please tell me how to detour in the current situation.* was: Hello. I am planning to distribute and use FlinkDeployment through the flink kubernetes operator. CRD, operator, webbook, etc. are all set up, and we actually distributed FlinkDeployment to confirm normal operation. *However, strangely, connecting to resource manager fails if you make more than one task manager pod replica.* I thought it might be a problem with akka, timeout, etc. so I increased the values as below The connection continues to fail. - akka.retry-gate-closed-for: 1 - akka.server-socket-worker-pool.pool-size-min: 6 - akka.server-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-min: 6 - blob.client.connect. The log of the taskmanager is as follows. {code:java} Association with remote system [akka.tcp://flink@10.238.80.92:6123] has failed, address is now gated for [1] ms. Reason: [Disassociated] Could not resolve ResourceManager address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1. Tried to associate with unreachable remote address [akka.tcp://flink@10.238.80.92:6123]. Address is now gated for 1 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.]{code} *If you go into the task manager pod and tcp check, the connection is open.* *Below are the flink versions I used.* *- flink image: 1.15.1* *- flink kubernetes operator: 1.1.0* *I would appreciate it if you could check the problem quickly.* *If it's a bug, please tell me how to detour in the current situation.* > Tried to associate with unreachable remote address > -- > > Key: FLINK-29117 > URL: https://issues.apache.org/jira/browse/FLINK-29117 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes, flink-contrib, flink-docker, > Kubernetes Operator >Affects Versions: 1.15.1, kubernetes-operator-1.1.0 >Reporter: geonyeong kim >Priority: Critical > Attachments: Screen Shot 2022-08-26 at 5.04.37 PM.png > > > Hello. > I am planning to distribute and use FlinkDeployment through the flink > kubernetes operator. > CRD, operator, webbook, etc. are all set up, and we actually distributed > FlinkDeployment to confirm normal operation. > *However, strangely, connecting to resource manager fails if you make more > than one task manager pod replica.* > I thought it might be a problem with akka, timeout, etc. so I increased the > values as below > The connection continues to fail. > - akka.retry-gate-closed-for: 1 > - akka.server-s
[jira] [Created] (FLINK-29117) Tried to associate with unreachable remote address
geonyeong kim created FLINK-29117: - Summary: Tried to associate with unreachable remote address Key: FLINK-29117 URL: https://issues.apache.org/jira/browse/FLINK-29117 Project: Flink Issue Type: Bug Components: Deployment / Kubernetes, flink-contrib, flink-docker, Kubernetes Operator Affects Versions: kubernetes-operator-1.1.0, 1.15.1 Reporter: geonyeong kim Attachments: Screen Shot 2022-08-26 at 5.04.37 PM.png Hello. I am planning to distribute and use FlinkDeployment through the flink kubernetes operator. CRD, operator, webbook, etc. are all set up, and we actually distributed FlinkDeployment to confirm normal operation. *However, strangely, connecting to resource manager fails if you make more than one task manager pod replica.* I thought it might be a problem with akka, timeout, etc. so I increased the values as below The connection continues to fail. - akka.retry-gate-closed-for: 1 - akka.server-socket-worker-pool.pool-size-min: 6 - akka.server-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-min: 6 - blob.client.connect. The log of the taskmanager is as follows. {code:java} Association with remote system [akka.tcp://flink@10.238.80.92:6123] has failed, address is now gated for [1] ms. Reason: [Disassociated] Could not resolve ResourceManager address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1. Tried to associate with unreachable remote address [akka.tcp://flink@10.238.80.92:6123]. Address is now gated for 1 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.]{code} *If you go into the task manager pod and tcp check, the connection is open.* *Below are the flink versions I used.* *- flink image: 1.15.1* *- flink kubernetes operator: 1.1.0* *I would appreciate it if you could check the problem quickly.* *If it's a bug, please tell me how to detour in the current situation.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wuchong commented on pull request #18975: [FLINK-26474][hive] Fold exprNode to fix the issue of failing to call some hive udf required constant parameters with implicit constant passe
wuchong commented on PR #18975: URL: https://github.com/apache/flink/pull/18975#issuecomment-1228225348 `HiveDialectQueryITCase.testCastTimeStampToDecimal:804` is failed, please take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20686: [FLINK-29112][table-planner] Print the lookup join hint on the node in the origin RelNode tree for easier debuging
flinkbot commented on PR #20686: URL: https://github.com/apache/flink/pull/20686#issuecomment-1228223447 ## CI report: * 9f8ceabd93c4692bad3e349b0efa08d6931827dc UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-29116) Tried to associate with unreachable remote address
geonyeong kim created FLINK-29116: - Summary: Tried to associate with unreachable remote address Key: FLINK-29116 URL: https://issues.apache.org/jira/browse/FLINK-29116 Project: Flink Issue Type: Bug Affects Versions: kubernetes-operator-1.1.0, 1.15.1 Reporter: geonyeong kim Attachments: Screen Shot 2022-08-26 at 5.04.37 PM.png Hello. I am planning to distribute and use FlinkDeployment through the flink kubernetes operator. CRD, operator, webbook, etc. are all set up, and we actually distributed FlinkDeployment to confirm normal operation. *However, strangely, connecting to resource manager fails if you make more than one task manager pod replica.* I thought it might be a problem with akka, timeout, etc. so I increased the values as below The connection continues to fail. - akka.retry-gate-closed-for: 1 - akka.server-socket-worker-pool.pool-size-min: 6 - akka.server-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-max: 10 - akka.client-socket-worker-pool.pool-size-min: 6 - blob.client.connect. The log of the taskmanager is as follows. {code:java} Association with remote system [akka.tcp://flink@10.238.80.92:6123] has failed, address is now gated for [1] ms. Reason: [Disassociated] Could not resolve ResourceManager address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@10.238.80.92:6123/user/rpc/resourcemanager_1. Tried to associate with unreachable remote address [akka.tcp://flink@10.238.80.92:6123]. Address is now gated for 1 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.] {code} *If you go into the task manager pod and tcp check, the connection is open.* *Below are the flink versions I used.* *- flink image: 1.15.1* *- flink kubernetes operator: 1.1.0* *I would appreciate it if you could check the problem quickly.* *If it's a bug, please tell me how to detour in the current situation.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp commented on a diff in pull request #20673: [BP-1.15][FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
XComp commented on code in PR #20673: URL: https://github.com/apache/flink/pull/20673#discussion_r955802991 ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java: ## @@ -1119,4 +1166,34 @@ private TestingLongStateHandleHelper.LongStateHandle addDeletingEntry( configMap.getData().put(key, deleting); return state; } + +private static CompletableFuture retryWithFirstFailedK8sOperation( +Function> function, +KubernetesConfigMap leaderConfigMap) { +final AtomicInteger callbackInvocationCount = new AtomicInteger(0); +final CompletableFuture result = +FutureUtils.retry( +() -> +CompletableFuture.supplyAsync( +() -> { + callbackInvocationCount.incrementAndGet(); +function.apply(leaderConfigMap); +if (callbackInvocationCount.get() == 1) { +throw new KubernetesClientException( +"Expected exception to simulate unstable " ++ "kubernetes client operation"); +} +return true; +}, +Executors.newDirectExecutorService()), + KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES +.defaultValue(), +t -> +ExceptionUtils.findThrowable(t, KubernetesClientException.class) +.isPresent(), +Executors.newDirectExecutorService()); +assertThat(callbackInvocationCount.get(), is(2)); +assertThat(result.isDone(), is(true)); Review Comment: interesting. I really would have assumed that there is something like `CompletableFuture::isCompleted()`. :thinking: I guess, `assertThat(result.get(), is(true));` would be the proper assertion then. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29112) Print the lookup join hint on the node `Correlate` in the origin RelNode tree for easier debuging
[ https://issues.apache.org/jira/browse/FLINK-29112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29112: --- Labels: pull-request-available (was: ) > Print the lookup join hint on the node `Correlate` in the origin RelNode tree > for easier debuging > - > > Key: FLINK-29112 > URL: https://issues.apache.org/jira/browse/FLINK-29112 > Project: Flink > Issue Type: Improvement >Reporter: lincoln lee >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)