Re: [PR] [FLINK-31481][table] Support enhanced show databases syntax [flink]
jeyhunkarimov commented on code in PR #23612: URL: https://github.com/apache/flink/pull/23612#discussion_r1449994298 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowDatabasesConverter.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.dql.SqlShowDatabases; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ShowDatabasesOperation; + +/** A converter for {@link SqlShowDatabases}. */ +public class SqlShowDatabasesConverter implements SqlNodeConverter { + +@Override +public Operation convertSqlNode(SqlShowDatabases sqlShowDatabases, ConvertContext context) { +if (sqlShowDatabases.getPreposition() == null) { +return new ShowDatabasesOperation( +sqlShowDatabases.getLikeType(), +sqlShowDatabases.getLikeSqlPattern(), +sqlShowDatabases.isNotLike()); +} else { +CatalogManager catalogManager = context.getCatalogManager(); +String[] fullCatalogName = sqlShowDatabases.getCatalog(); +String catalogName = +fullCatalogName.length == 0 +? catalogManager.getCurrentCatalog() +: fullCatalogName[0]; Review Comment: ok, got it. Good point. No, if the preposition is not null, catalog name should be present. I updated `SqlShowDatabasesConverter ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-34047) Inject GitHub Actions/Azure Pipelines env variables in uploading_watchdog.sh
[ https://issues.apache.org/jira/browse/FLINK-34047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-34047. --- Fix Version/s: 1.19.0 1.18.1 Resolution: Fixed master: [f29ee07f5e568fa94da75f3879a9aa7016dfcd5a|https://github.com/apache/flink/commit/f29ee07f5e568fa94da75f3879a9aa7016dfcd5a] 1.18: [3090e772192f4f9444e030b199cd95fc6c5afdd5|https://github.com/apache/flink/commit/3090e772192f4f9444e030b199cd95fc6c5afdd5] > Inject GitHub Actions/Azure Pipelines env variables in uploading_watchdog.sh > > > Key: FLINK-34047 > URL: https://issues.apache.org/jira/browse/FLINK-34047 > Project: Flink > Issue Type: Sub-task > Components: Build System / CI >Affects Versions: 1.18.0, 1.19.0 >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions, pull-request-available > Fix For: 1.19.0, 1.18.1 > > > The workflow that's triggered by > {{tools/azure-pipelines/uploading_watchdog.sh}} relies on CI specific > environment variables. We should make the two different CI backends explicit > in this script to improve the code readability. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-1.18][FLINK-34047][ci] Makes uploading_watchdog.sh support Azure Pipelines and GitHub Actions [flink]
XComp merged PR #24063: URL: https://github.com/apache/flink/pull/24063 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34047][ci] Make CI scripts agnostic to CI environment [flink]
XComp merged PR #24061: URL: https://github.com/apache/flink/pull/24061 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33768] Support dynamic source parallelism inference for batch jobs [flink]
flinkbot commented on PR #24078: URL: https://github.com/apache/flink/pull/24078#issuecomment-1888578089 ## CI report: * 547c3c10e8bb800244d18ce1c4cc23642bc460bf UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33268][rest] Skip unknown fields in REST response deserialization [flink]
gaborgsomogyi commented on PR #23930: URL: https://github.com/apache/flink/pull/23930#issuecomment-1888577653 The change looks good, but timeout happened. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33268][rest] Skip unknown fields in REST response deserialization [flink]
gaborgsomogyi commented on PR #23930: URL: https://github.com/apache/flink/pull/23930#issuecomment-1888576964 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]
XComp commented on code in PR #23971: URL: https://github.com/apache/flink/pull/23971#discussion_r1449977361 ## .github/actions/select_workflow_configs/action.yml: ## @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: "Selects Flink workflow configurations" +description: "Loads the relevant workflow configuration from a given folder." +inputs: + config-folder: +description: "The directory in which the workflow configurations are located." +required: true + profile: +description: >- + The profile that's used to access the desired workflow configuration + file. Any invalid profile (i.e. no file exists) will lead to using + the default workflow configurations. Configuration files are expected + to have the name format 'flink-workflow..json' and should be + located in the folder that's specified through the 'config-folder' parameter. +required: true +outputs: + workflow-configurations: +description: "A JSON representation of the workflow configurations." +value: "${{ steps.workflow-selection.outputs.configs }}" +runs: + using: "composite" + steps: +- name: "Loads Flink workflow configurations" + id: workflow-selection + shell: bash + run: | +fallback_profile="default" +fallback_path="${{ inputs.config-folder }}/flink-workflow.${fallback_profile}.json" Review Comment: > Scheduled workflow runs will be triggered from master. The workflow configuration (the json file) is therefore also loaded from master. The workflow then triggers the checkout of individual branches. The following build failure (which happened in release-1.18) seems to prove me wrong: https://github.com/XComp/flink/actions/runs/7498584099 I have forgotten to backport the job_init action into my fork's `release-1.18` branch in that case. I assume it being true also for the configuration files. So, your solution is viable. That means that we could have one workflow configuration file per branch. We will still keep the functionality around configuration files in general to avoid having too much duplicate code for each of the nightly workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]
XComp commented on code in PR #23971: URL: https://github.com/apache/flink/pull/23971#discussion_r1449977361 ## .github/actions/select_workflow_configs/action.yml: ## @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: "Selects Flink workflow configurations" +description: "Loads the relevant workflow configuration from a given folder." +inputs: + config-folder: +description: "The directory in which the workflow configurations are located." +required: true + profile: +description: >- + The profile that's used to access the desired workflow configuration + file. Any invalid profile (i.e. no file exists) will lead to using + the default workflow configurations. Configuration files are expected + to have the name format 'flink-workflow..json' and should be + located in the folder that's specified through the 'config-folder' parameter. +required: true +outputs: + workflow-configurations: +description: "A JSON representation of the workflow configurations." +value: "${{ steps.workflow-selection.outputs.configs }}" +runs: + using: "composite" + steps: +- name: "Loads Flink workflow configurations" + id: workflow-selection + shell: bash + run: | +fallback_profile="default" +fallback_path="${{ inputs.config-folder }}/flink-workflow.${fallback_profile}.json" Review Comment: > Scheduled workflow runs will be triggered from master. The workflow configuration (the json file) is therefore also loaded from master. The workflow then triggers the checkout of individual branches. The following build failure (which happened in release-1.18) seems to prove me wrong: https://github.com/XComp/flink/actions/runs/7498584099 I have forgotten to backport the job_init action into my fork's `release-1.18` branch. Indeed, dedicated config files (e.g. custom actions and as a consequence also loaded from the branch). So your solution is viable. My reasoning that all the config is picked up from master while scheduling the build appears to be wrong. That means that we could have one workflow configuration file per branch. We will still keep the functionality around configuration files in general to avoid having too much duplicate code for each of the nightly workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34034][docs] Update the query hint docs to clarify the resolution of conflicts in kv hint and list hint [flink]
flinkbot commented on PR #24077: URL: https://github.com/apache/flink/pull/24077#issuecomment-1888570663 ## CI report: * d78dc4459252f1f105d0bc4337fa64726b834665 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]
XComp commented on code in PR #23971: URL: https://github.com/apache/flink/pull/23971#discussion_r1449977361 ## .github/actions/select_workflow_configs/action.yml: ## @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: "Selects Flink workflow configurations" +description: "Loads the relevant workflow configuration from a given folder." +inputs: + config-folder: +description: "The directory in which the workflow configurations are located." +required: true + profile: +description: >- + The profile that's used to access the desired workflow configuration + file. Any invalid profile (i.e. no file exists) will lead to using + the default workflow configurations. Configuration files are expected + to have the name format 'flink-workflow..json' and should be + located in the folder that's specified through the 'config-folder' parameter. +required: true +outputs: + workflow-configurations: +description: "A JSON representation of the workflow configurations." +value: "${{ steps.workflow-selection.outputs.configs }}" +runs: + using: "composite" + steps: +- name: "Loads Flink workflow configurations" + id: workflow-selection + shell: bash + run: | +fallback_profile="default" +fallback_path="${{ inputs.config-folder }}/flink-workflow.${fallback_profile}.json" Review Comment: > Scheduled workflow runs will be triggered from master. The workflow configuration (the json file) is therefore also loaded from master. The workflow then triggers the checkout of individual branches. The following build failure (which happened in release-1.18) seems to prove me wrong: https://github.com/XComp/flink/actions/runs/7498584099 Indeed, dedicated config files (e.g. custom actions and as a consequence also loaded from the branch). So your solution is viable. My reasoning that all the config is picked up from master while scheduling the build appears to be wrong. That means that we could have one workflow configuration file per branch. We will still keep the functionality around configuration files in general to avoid having too much duplicate code for each of the nightly workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-34040) ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration fails in GHA with JDK 17 and 21
[ https://issues.apache.org/jira/browse/FLINK-34040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-34040. --- Resolution: Fixed The error was, indeed caused by a wrong setup of the {{github-actions}} profile which caused the build to ignore the @FailsOnJava17 flag of the test. > ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration fails in > GHA with JDK 17 and 21 > > > Key: FLINK-34040 > URL: https://issues.apache.org/jira/browse/FLINK-34040 > Project: Flink > Issue Type: Sub-task > Components: API / Scala, Build System / CI >Affects Versions: 1.19.0 >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions, test-stability > > {code} > Error: 13:05:23 13:05:23.538 [ERROR] Tests run: 1, Failures: 1, Errors: 0, > Skipped: 0, Time elapsed: 0.375 s <<< FAILURE! -- in > org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest > Error: 13:05:23 13:05:23.538 [ERROR] > org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration > -- Time elapsed: 0.371 s <<< FAILURE! > Jan 07 13:05:23 org.junit.ComparisonFailure: > expected:<...MigrationTest$$anon$[8]> but was:<...MigrationTest$$anon$[1]> > Jan 07 13:05:23 at org.junit.Assert.assertEquals(Assert.java:117) > Jan 07 13:05:23 at org.junit.Assert.assertEquals(Assert.java:146) > Jan 07 13:05:23 at > org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration(ScalaSerializersMigrationTest.scala:60) > Jan 07 13:05:23 at > java.base/java.lang.reflect.Method.invoke(Method.java:580) > {code} > The error only happens in the [master GHA > nightly|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] for > JDK 17 and 21. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33768] Support dynamic source parallelism inference for batch jobs [flink]
SinBex opened a new pull request, #24078: URL: https://github.com/apache/flink/pull/24078 ## What is the purpose of the change Currently, for JobVertices without parallelism configured, the AdaptiveBatchScheduler dynamically infers the vertex parallelism based on the volume of input data. Specifically, for Source vertices, it uses the value of `execution.batch.adaptive.auto-parallelism.default-source-parallelism` as the fixed parallelism. If this is not set by the user, the default value of 1 is used as the source parallelism, which is actually a temporary implementation solution. We aim to support dynamic source parallelism inference for batch jobs ## Brief change log - *Lazily initialize the parallelism of the OperatorCoordinator.* - *Add the `DynamicParallelismInference` and `DynamicFilteringInfo` interfaces, and enable the `SourceCoordinator` to invoke corresponding methods for dynamic parallelism inference.* - *The `AdaptiveBatchScheduler` applies dynamic source parallelism inference, and to avoid blocking the main thread by calling external systems, we have transformed the scheduling process to be asynchronous.* ## Verifying this change This change added tests and can be verified as follows: - *Added integration tests to verify the end-to-end logic of dynamic parallelism inference, see `AdaptiveBatchSchedulerITCase#testSchedulingWithDynamicSourceParallelismInference` for details.* - *Added unit tests for the newly added methods in classes such as `SourceCoordinator` and `AdaptiveBatchScheduler`.* - *Manually verified the feature on a Flink session cluster (1 JobManager, 77 TaskManagers), including dynamic inference of parallelism, asynchronous scheduling, and execution exceptions in `DynamicParallelismInference`, all performing as expected.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs / JavaDocs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange
[ https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805915#comment-17805915 ] Hangxiang Yu commented on FLINK-34050: -- Thanks [~lijinzhong] and [~mayuehappy] providing this information. IMO, maybe deleteRange+deleteFilesInRanges could be a good default behavious: # Only delteRange may cause some extra space usage which even last forever. # adding deleteFilesInRanges should not cost too much time than before because checking then deleting files should be quick. # Remaining files should still contain ranges which current keys and compaction will finally access. Of course, we could provide some performace check about deleteRange+deleteFilesInRanges vs deleteRange. > Rocksdb state has space amplification after rescaling with DeleteRange > -- > > Key: FLINK-34050 > URL: https://issues.apache.org/jira/browse/FLINK-34050 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Jinzhong Li >Priority: Major > Attachments: image-2024-01-10-21-23-48-134.png, > image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png > > > FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will > cause space amplification in some case. > We can reproduce this problem using wordCount job: > 1) before rescaling, state operator in wordCount job has 2 parallelism and > 4G+ full checkpoint size; > !image-2024-01-10-21-24-10-983.png|width=266,height=130! > 2) then restart job with 4 parallelism (for state operator), the full > checkpoint size of new job will be 8G+ ; > 3) after many successful checkpoints, the full checkpoint size is still 8G+; > !image-2024-01-10-21-28-24-312.png|width=454,height=111! > > The root cause of this issue is that the deleted keyGroupRange does not > overlap with current DB keyGroupRange, so new data written into rocksdb after > rescaling almost never do LSM compaction with the deleted data (belonging to > other keyGroupRange.) > > And the space amplification may affect Rocksdb read performance and disk > space usage after rescaling. It looks like a regression due to the > introduction of deleteRange for rescaling optimization. > > To slove this problem, I think maybe we can invoke > Rocksdb.deleteFilesInRanges after deleteRange? > {code:java} > public static void clipDBWithKeyGroupRange() { > //... > List ranges = new ArrayList<>(); > //... > deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes); > ranges.add(beginKeyGroupBytes); > ranges.add(endKeyGroupBytes); > // > for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { > db.deleteFilesInRanges(columnFamilyHandle, ranges, false); > } > } > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33768) Support dynamic source parallelism inference for batch jobs
[ https://issues.apache.org/jira/browse/FLINK-33768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33768: --- Labels: pull-request-available (was: ) > Support dynamic source parallelism inference for batch jobs > --- > > Key: FLINK-33768 > URL: https://issues.apache.org/jira/browse/FLINK-33768 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.19.0 >Reporter: xingbe >Assignee: xingbe >Priority: Major > Labels: pull-request-available > > Currently, for JobVertices without parallelism configured, the > AdaptiveBatchScheduler dynamically infers the vertex parallelism based on the > volume of input data. Specifically, for Source vertices, it uses the value of > `{*}execution.batch.adaptive.auto-parallelism.default-source-parallelism{*}` > as the fixed parallelism. If this is not set by the user, the default value > of {{1}} is used as the source parallelism, which is actually a temporary > implementation solution. > We aim to support dynamic source parallelism inference for batch jobs. More > details see > [FLIP-379|https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled
[ https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-34036. --- Resolution: Fixed Yup, it was caused by what I described in my comment above. Thanks for the pointer, [~chesnay]. I'm closing this issue. > Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with > Hadoop 3.1.3 enabled > -- > > Key: FLINK-34036 > URL: https://issues.apache.org/jira/browse/FLINK-34036 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hadoop Compatibility, Connectors / Hive >Affects Versions: 1.18.0, 1.19.0 >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions, test-stability > > The following {{HiveDialectQueryITCase}} tests fail consistently in the > FLINK-27075 GitHub Actions [master nightly > workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of > Flink (and also the [release-1.18 > workflow|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]): > * {{testInsertDirectory}} > * {{testCastTimeStampToDecimal}} > * {{testNullLiteralAsArgument}} > {code} > Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, > Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in > org.apache.flink.connectors.hive.HiveDialectQueryITCase > Error: 03:38:45 03:38:45.662 [ERROR] > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument > -- Time elapsed: 0.069 s <<< ERROR! > Jan 09 03:38:45 java.lang.NoSuchMethodError: > org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp; > Jan 09 03:38:45 at > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959) > Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498) > Jan 09 03:38:45 > Error: 03:38:45 03:38:45.662 [ERROR] > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal > -- Time elapsed: 0.007 s <<< ERROR! > Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with > identifier 'test-catalog.default.t1' does not exist. > Jan 09 03:38:45 at > org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266) > Jan 09 03:38:45 at > org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206) > Jan 09 03:38:45 at > org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74) > Jan 09 03:38:45 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107) > Jan 09 03:38:45 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735) > Jan 09 03:38:45 at > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835) > Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498) > Jan 09 03:38:45 > Error: 03:38:45 03:38:45.663 [ERROR] > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory > -- Time elapsed: 7.326 s <<< FAILURE! > Jan 09 03:38:45 org.opentest4j.AssertionFailedError: > Jan 09 03:38:45 > Jan 09 03:38:45 expected: "A:english=90#math=100#history=85" > Jan 09 03:38:45 but was: "A:english=90math=100history=85" > Jan 09 03:38:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Jan 09 03:38:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Jan 09 03:38:45 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > Jan 09 03:38:45 at > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498) > Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498) > {code} > Additionally, the {{HiveITCase}} in the e2e test suite is affected: > {code} > Error: 05:20:20 05:20:20.949 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 0.106 s <<< FAILURE! -- in > org.apache.flink.tests.hive.HiveITCase > Error: 05:20:20 05:20:20.949 [ERROR] org.apache.flink.tests.hive.HiveITCase > -- Time elapsed: 0.106 s <<< ERROR! > Jan 07 05:20:20 java.lang.ExceptionInInitializerError > Jan 07 05:20:20 at sun.misc.Unsafe.ensureClassInitialized(Native Method) > Jan 07 05:20:20 at > java.lang.reflect.Field.acquireFieldAccessor(Field.java:1088) > Jan 07 05:20:20 at >
[PR] [FLINK-34034][docs] Update the query hint docs to clarify the resolution of conflicts in kv hint and list hint [flink]
swuferhong opened a new pull request, #24077: URL: https://github.com/apache/flink/pull/24077 ## What is the purpose of the change This pr is aims to update the query hint docs to clarify the resolution of conflicts in kv hint and list hint. ## Brief change log - update the query hint docs to clarify the resolution of conflicts in kv hint and list hint. ## Verifying this change ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector:no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34034) When kv hint and list hint handle duplicate query hints, the results are different.
[ https://issues.apache.org/jira/browse/FLINK-34034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34034: --- Labels: pull-request-available (was: ) > When kv hint and list hint handle duplicate query hints, the results are > different. > --- > > Key: FLINK-34034 > URL: https://issues.apache.org/jira/browse/FLINK-34034 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: xuyang >Assignee: xuyang >Priority: Minor > Labels: pull-request-available > > When there are duplicate keys in the kv hint, calcite will overwrite the > previous value with the later value. > {code:java} > @TestTemplate > def test(): Unit = { > val sql = > "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', > 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3', > 'max-attempts'='4') */ * FROM MyTable AS T JOIN LookupTable " + > "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id" > util.verifyExecPlan(sql) > } {code} > {code:java} > Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, > id, name, age]) > +- LookupJoin(table=[default_catalog.default_database.LookupTable], > joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, > name, age], retry=[lookup_miss, FIXED_DELAY, 1ms, 4]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > {code} > But when a list hint is duplicated (such as a join hint), we will choose the > first one as the effective hint. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33705] Upgrade to flink-shaded 18.0 [flink]
snuyanzin commented on PR #23838: URL: https://github.com/apache/flink/pull/23838#issuecomment-1888545344 Thanks for having a look I will rebase to the latest branch and split it in 2 commits: one for adopting jackson breaking change mentioned in description and another for the actual update. It should simplify code navigation in future -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-33946) RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel
[ https://issues.apache.org/jira/browse/FLINK-33946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu resolved FLINK-33946. -- Resolution: Fixed merged aa0c1b5e into master > RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel > > > Key: FLINK-33946 > URL: https://issues.apache.org/jira/browse/FLINK-33946 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.19.0 >Reporter: Yue Ma >Assignee: Yue Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > When a Job fails, the task needs to be canceled and re-deployed. > RocksDBStatebackend will call RocksDB.close when disposing. > {code:java} > if (!shutting_down_.load(std::memory_order_acquire) && > has_unpersisted_data_.load(std::memory_order_relaxed) && > !mutable_db_options_.avoid_flush_during_shutdown) { > if (immutable_db_options_.atomic_flush) { > autovector cfds; > SelectColumnFamiliesForAtomicFlush(); > mutex_.Unlock(); > Status s = > AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown); > s.PermitUncheckedError(); //**TODO: What to do on error? > mutex_.Lock(); > } else { > for (auto cfd : *versions_->GetColumnFamilySet()) { > if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) { > cfd->Ref(); > mutex_.Unlock(); > Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown); > s.PermitUncheckedError(); //**TODO: What to do on error? > mutex_.Lock(); > cfd->UnrefAndTryDelete(); > } > } > } {code} > By default (avoid_flush_during_shutdown=false) RocksDb requires FlushMemtable > when Close. When the disk pressure is high or the Memtable is large, this > process will be more time-consuming, which will cause the Task to get stuck > in the Canceling stage and affect the speed of job Failover. > In fact, it is completely unnecessary to Flush memtable when Flink Task is > Close, because the data can be replayed from Checkpoint. So we can set > avoid_flush_during_shutdown to true to speed up Task Failover -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33946] [rocksdb] set AvoidFlushDuringShutdown to true for avoiding Flush when Cancel Task [flink]
masteryhx closed pull request #24069: [FLINK-33946] [rocksdb] set AvoidFlushDuringShutdown to true for avoiding Flush when Cancel Task URL: https://github.com/apache/flink/pull/24069 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-34051) Fix equals/hashCode/toString for SavepointRestoreSettings
[ https://issues.apache.org/jira/browse/FLINK-34051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu resolved FLINK-34051. -- Fix Version/s: 1.19.0 Resolution: Fixed merged 7745ef83 and ecb68704 into master > Fix equals/hashCode/toString for SavepointRestoreSettings > - > > Key: FLINK-34051 > URL: https://issues.apache.org/jira/browse/FLINK-34051 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.19.0 >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Minor > Labels: pull-request-available > Fix For: 1.19.0 > > > SavepointRestoreSettings#equals/hashCode/toString missed restoreMode property -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]
masteryhx closed pull request #24066: [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings URL: https://github.com/apache/flink/pull/24066 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34067] Fix javacc warnings in flink-sql-parser [flink]
flinkbot commented on PR #24076: URL: https://github.com/apache/flink/pull/24076#issuecomment-1888409523 ## CI report: * 319b84aca99ca02d59dbfb7139a92472d0b19fd3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34066][table-planner] Fix LagFunction throw NPE when input argument are not null [flink]
flinkbot commented on PR #24075: URL: https://github.com/apache/flink/pull/24075#issuecomment-1888405606 ## CI report: * a1aa7a2284cee539e17b951a585962a416360104 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34067) Fix javacc warnings in flink-sql-parser
[ https://issues.apache.org/jira/browse/FLINK-34067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34067: --- Labels: pull-request-available (was: ) > Fix javacc warnings in flink-sql-parser > --- > > Key: FLINK-34067 > URL: https://issues.apache.org/jira/browse/FLINK-34067 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.19.0 >Reporter: Jim Hughes >Assignee: Jim Hughes >Priority: Minor > Labels: pull-request-available > > While extending the Flink SQL parser, I noticed these two warnings: > ``` > [INFO] — javacc:2.4:javacc (javacc) @ flink-sql-parser --- > > Java Compiler Compiler Version 4.0 (Parser Generator) > > (type "javacc" with no arguments for help) > > Reading from file > .../flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . > . . > Note: UNICODE_INPUT option is specified. Please make sure you create the > parser/lexer using a Reader with the correct character encoding. > Warning: Choice conflict involving two expansions at > > line 2043, column 13 and line 2052, column 9 respectively. > > A common prefix is: "IF" > Consider using > a lookahead of 2 for earlier expansion. > > Warning: Choice conflict involving two expansions at > > line 2097, column 13 and line 2105, column 8 respectively. > > A common prefix is: "IF" > > Consider using a lookahead of 2 for earlier expansion. > ``` > As the warning suggestions, adding `LOOKAHEAD(2)` in a few places addresses > the warning. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34067] Fix javacc warnings in flink-sql-parser [flink]
jnh5y opened a new pull request, #24076: URL: https://github.com/apache/flink/pull/24076 ## What is the purpose of the change Address the two warnings in the build for the Flink SQL Parser: ``` [INFO] — javacc:2.4:javacc (javacc) @ flink-sql-parser --- Java Compiler Compiler Version 4.0 (Parser Generator) (type "javacc" with no arguments for help) Reading from file .../flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . . . Note: UNICODE_INPUT option is specified. Please make sure you create the parser/lexer using a Reader with the correct character encoding. Warning: Choice conflict involving two expansions at line 2043, column 13 and line 2052, column 9 respectively. A common prefix is: "IF" Consider using a lookahead of 2 for earlier expansion. Warning: Choice conflict involving two expansions at line 2097, column 13 and line 2105, column 8 respectively. A common prefix is: "IF" Consider using a lookahead of 2 for earlier expansion. ``` ## Brief change log Adds `LOOKAHEAD(2)` to the necessary places in `parserImpls.ftl`. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. One can verify the change by building with `mvn -Pfast -f flink-table/flink-sql-parser install` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34067) Fix javacc warnings in flink-sql-parser
[ https://issues.apache.org/jira/browse/FLINK-34067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jim Hughes updated FLINK-34067: --- Description: While extending the Flink SQL parser, I noticed these two warnings: ``` [INFO] — javacc:2.4:javacc (javacc) @ flink-sql-parser --- Java Compiler Compiler Version 4.0 (Parser Generator) (type "javacc" with no arguments for help) Reading from file .../flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . . . Note: UNICODE_INPUT option is specified. Please make sure you create the parser/lexer using a Reader with the correct character encoding. Warning: Choice conflict involving two expansions at line 2043, column 13 and line 2052, column 9 respectively. A common prefix is: "IF" Consider using a lookahead of 2 for earlier expansion. Warning: Choice conflict involving two expansions at line 2097, column 13 and line 2105, column 8 respectively. A common prefix is: "IF" Consider using a lookahead of 2 for earlier expansion. ``` As the warning suggestions, adding `LOOKAHEAD(2)` in a few places addresses the warning. was: While extending the Flink SQL parser, I noticed these two warnings: ``` [INFO] --- javacc:2.4:javacc (javacc) @ flink-sql-parser --- Java Compiler Compiler Version 4.0 (Parser Generator) (type "javacc" with no arguments for help) Reading from file .../flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . . . Note: UNICODE_INPUT option is specified. Please make sure you create the parser/lexer using a Reader with the correct character encoding. Warning: Choice conflict involving two expansions at line 2043, column 13 and line 2052, column 9 respectively. A common prefix is: "IF" Consider using a lookahead of 2 for earlier expansion. Warning: Choice conflict involving two expansions at line 2097, column 13 and line 2105, column 8 respectively. A common prefix is: "IF" Consider using a lookahead of 2 for earlier expansion. ``` As the warning suggestions, adding `LOOKAHEAD(2)` in a few places addresses the warning. > Fix javacc warnings in flink-sql-parser > --- > > Key: FLINK-34067 > URL: https://issues.apache.org/jira/browse/FLINK-34067 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.19.0 >Reporter: Jim Hughes >Assignee: Jim Hughes >Priority: Minor > > While extending the Flink SQL parser, I noticed these two warnings: > ``` > [INFO] — javacc:2.4:javacc (javacc) @ flink-sql-parser --- > > Java Compiler Compiler Version 4.0 (Parser Generator) > > (type "javacc" with no arguments for help) > > Reading from file > .../flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . > . . > Note: UNICODE_INPUT option is specified. Please make sure you create the > parser/lexer using a Reader with the correct character encoding. > Warning: Choice conflict involving two expansions at > > line 2043, column 13 and line 2052, column 9 respectively. > > A common prefix is: "IF" >
[jira] [Created] (FLINK-34067) Fix javacc warnings in flink-sql-parser
Jim Hughes created FLINK-34067: -- Summary: Fix javacc warnings in flink-sql-parser Key: FLINK-34067 URL: https://issues.apache.org/jira/browse/FLINK-34067 Project: Flink Issue Type: Improvement Reporter: Jim Hughes Assignee: Jim Hughes While extending the Flink SQL parser, I noticed these two warnings: ``` [INFO] --- javacc:2.4:javacc (javacc) @ flink-sql-parser --- Java Compiler Compiler Version 4.0 (Parser Generator) (type "javacc" with no arguments for help) Reading from file .../flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . . . Note: UNICODE_INPUT option is specified. Please make sure you create the parser/lexer using a Reader with the correct character encoding. Warning: Choice conflict involving two expansions at line 2043, column 13 and line 2052, column 9 respectively. A common prefix is: "IF" Consider using a lookahead of 2 for earlier expansion. Warning: Choice conflict involving two expansions at line 2097, column 13 and line 2105, column 8 respectively. A common prefix is: "IF" Consider using a lookahead of 2 for earlier expansion. ``` As the warning suggestions, adding `LOOKAHEAD(2)` in a few places addresses the warning. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34066) LagFunction throw NPE when input argument are not null
[ https://issues.apache.org/jira/browse/FLINK-34066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34066: --- Labels: pull-request-available (was: ) > LagFunction throw NPE when input argument are not null > -- > > Key: FLINK-34066 > URL: https://issues.apache.org/jira/browse/FLINK-34066 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Yunhong Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > LagFunction throw NPE when input argument are not null. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34067) Fix javacc warnings in flink-sql-parser
[ https://issues.apache.org/jira/browse/FLINK-34067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jim Hughes updated FLINK-34067: --- Affects Version/s: 1.19.0 > Fix javacc warnings in flink-sql-parser > --- > > Key: FLINK-34067 > URL: https://issues.apache.org/jira/browse/FLINK-34067 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.19.0 >Reporter: Jim Hughes >Assignee: Jim Hughes >Priority: Minor > > While extending the Flink SQL parser, I noticed these two warnings: > ``` > [INFO] --- javacc:2.4:javacc (javacc) @ flink-sql-parser --- > > Java Compiler Compiler Version 4.0 (Parser Generator) > > (type "javacc" with no arguments for help) > > Reading from file > .../flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . > . . > Note: UNICODE_INPUT option is specified. Please make sure you create the > parser/lexer using a Reader with the correct character encoding. > Warning: Choice conflict involving two expansions at > > line 2043, column 13 and line 2052, column 9 respectively. > > A common prefix is: "IF" > Consider using > a lookahead of 2 for earlier expansion. > > Warning: Choice conflict involving two expansions at > > line 2097, column 13 and line 2105, column 8 respectively. > > A common prefix is: "IF" > > Consider using a lookahead of 2 for earlier expansion. > ``` > As the warning suggestions, adding `LOOKAHEAD(2)` in a few places addresses > the warning. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34066][table-planner] Fix LagFunction throw NPE when input argument are not null [flink]
swuferhong opened a new pull request, #24075: URL: https://github.com/apache/flink/pull/24075 ## What is the purpose of the change Fix LagFunction throw NPE when input argument are not null. ## Brief change log - Fix LagFunction throw NPE when input argument are not null - Adding test to cover the npe case. ## Verifying this change - Adding test to cover the npe case. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34007) Flink Job stuck in suspend state after losing leadership in HA Mode
[ https://issues.apache.org/jira/browse/FLINK-34007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805871#comment-17805871 ] Yang Wang commented on FLINK-34007: --- [~mapohl] Could you please confirm that whether "multi-component leader election" will clean up the leader annotation on the ConfigMap when lost leadership? It seems that the fabric8 Kubernetes client leader elector will not work properly by {{run()}} more than once if we do not clean up the leader annotation. > Flink Job stuck in suspend state after losing leadership in HA Mode > --- > > Key: FLINK-34007 > URL: https://issues.apache.org/jira/browse/FLINK-34007 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.3, 1.17.2, 1.18.1, 1.18.2 >Reporter: Zhenqiu Huang >Priority: Major > Attachments: Debug.log, job-manager.log > > > The observation is that Job manager goes to suspend state with a failed > container not able to register itself to resource manager after timeout. > JM Log, see attached > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34066) LagFunction throw NPE when input argument are not null
Yunhong Zheng created FLINK-34066: - Summary: LagFunction throw NPE when input argument are not null Key: FLINK-34066 URL: https://issues.apache.org/jira/browse/FLINK-34066 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.18.0 Reporter: Yunhong Zheng Fix For: 1.19.0 LagFunction throw NPE when input argument are not null. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33576][core] Introduce new Flink conf file 'config.yaml' supporting standard YAML syntax. [flink]
JunRuiLee commented on PR #23852: URL: https://github.com/apache/flink/pull/23852#issuecomment-1888377286 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33929][jdbc-connector] Support JDBC String field read Fragment read [flink-connector-jdbc]
zhilinli123 commented on PR #87: URL: https://github.com/apache/flink-connector-jdbc/pull/87#issuecomment-1888373294 > @zhilinli123 I still don't understand the Jira ticket, so I can't really review it. Perhaps @snuyanzin or @eskabetxe understand the goal of this PR. hi~ @MartijnVisser The current jdbc slicing support type is limited, currently does not support for string type fields for slicing read, many table design using UUID as the table key without self-increment primary key, resulting in the use of flink reading is no way to slice read, the current function is to support the current shortage -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33958] Implement restore tests for IntervalJoin node [flink]
bvarghese1 commented on PR #24009: URL: https://github.com/apache/flink/pull/24009#issuecomment-1888372875 > From reading the code for `StreamExecIntervalJoin`, I noticed pad/filter-left/right transformations. Seems like those can be produced when the `TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS` is used. > > Since it is a legacy thing, not sure if we want to test it or not. `TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS` is a deprecated config. It is used in the `createNegativeWindowSizeJoin` function and I have added a test case which covers that path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]
ljz2051 commented on PR #24066: URL: https://github.com/apache/flink/pull/24066#issuecomment-1888366794 Thanks for the update, LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33792] Generate the same code for the same logic [flink]
zoudan commented on PR #23984: URL: https://github.com/apache/flink/pull/23984#issuecomment-1888360571 @lsyldliu I have updated my code, please have a took when you have time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31788][table] TableAggregateFunction supports emitUpdateWithRetract [flink]
flinkbot commented on PR #24074: URL: https://github.com/apache/flink/pull/24074#issuecomment-1888345731 ## CI report: * 970e31067f5a893ea90b3f8a33064051e597bfe3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]
1996fanrui commented on code in PR #741: URL: https://github.com/apache/flink-kubernetes-operator/pull/741#discussion_r1449726286 ## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java: ## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import org.apache.flink.annotation.VisibleForTesting; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_CREATE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_DELETE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_UPDATE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NOT_NEEDED; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.UP_TO_DATE; +import static org.apache.flink.util.Preconditions.checkState; + +/** The view of job state. */ +@NotThreadSafe +public class JobStateView { + +/** + * The state of state type about the cache and database. + * + * Note: {@link #inLocally} and {@link #inDatabase} are only for understand, we don't use + * them. + */ +@SuppressWarnings("unused") +enum State { + +/** State doesn't exist at database, and it's not used so far, so it's not needed. */ +NOT_NEEDED(false, false, false), +/** State is only stored locally, not created in JDBC database yet. */ +NEEDS_CREATE(true, false, true), +/** State exists in JDBC database but there are newer local changes. */ +NEEDS_UPDATE(true, true, true), +/** State is stored locally and in database, and they are same. */ +UP_TO_DATE(true, true, false), +/** State is stored in database, but it's deleted in local. */ +NEEDS_DELETE(false, true, true); Review Comment: > It would be good to move more of the logic into the core, to avoid duplicating / writing similar logic. Sounds make sense, after I analyze, some of logic are same, such as: - `ConfigMapStore` and `JDBCStore` can be abstracted to `StringStateStore` interface - They support `put`, `get` and `remove` - The parameters of `ConfigMapStore` are the (JobContext, String key, String value). - The parameters of `JDBCStore` are the (String jobKey, StateType stateType, String value). - We can define a interface `StringStateStore`, and the parameters are `(JobContext, StateType stateType, String value)`. - `KubernetesAutoScalerStateStore` and `JDBCAutoScalerStateStore` can be abstracted to `AbstractAutoscalerStateStore` - They support serialize and compress `Original State` to String. - `AbstractAutoscalerStateStore` can reuse the serialize and compress logic - `KubernetesAutoScalerStateStore` support the limitation of stateValue - We can define a parameter for `AbstractAutoscalerStateStore`, the limitation is disabled by default, and `KubernetesAutoScalerStateStore` can enable it. And I created FLINK-34065 to follow it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34065) Design AbstractAutoscalerStateStore to support serialize State to String
Rui Fan created FLINK-34065: --- Summary: Design AbstractAutoscalerStateStore to support serialize State to String Key: FLINK-34065 URL: https://issues.apache.org/jira/browse/FLINK-34065 Project: Flink Issue Type: Sub-task Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Some logic of {{KubernetesAutoScalerStateStore}} and {{JDBCAutoScalerStateStore}} are similar, we can share some common code. * {{ConfigMapStore}} and {{JDBCStore}} can be abstracted to {{StringStateStore}} interface ** They support {{{}put{}}}, {{get}} and {{remove}} ** The parameters of {{ConfigMapStore}} are the (JobContext, String key, String value). ** The parameters of {{JDBCStore}} are the (String jobKey, StateType stateType, String value). ** We can define a interface {{{}StringStateStore{}}}, and the parameters are {{{}(JobContext, StateType stateType, String value){}}}. * {{KubernetesAutoScalerStateStore}} and {{JDBCAutoScalerStateStore}} can be abstracted to {{AbstractAutoscalerStateStore}} ** They support serialize and compress {{Original State}} to String. ** {{AbstractAutoscalerStateStore}} can reuse the serialize and compress logic ** {{KubernetesAutoScalerStateStore}} support the limitation of stateValue ** We can define a parameter for {{{}AbstractAutoscalerStateStore{}}}, the limitation is disabled by default, and {{KubernetesAutoScalerStateStore}} can enable it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33930] Canceled stop job status exception [flink-kubernetes-operator]
hunter-cloud09 commented on code in PR #740: URL: https://github.com/apache/flink-kubernetes-operator/pull/740#discussion_r1449726381 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -369,16 +370,17 @@ protected void cancelJob( deleteClusterDeployment( deployment.getMetadata(), deploymentStatus, conf, true); } + deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name()); break; case LAST_STATE: deleteClusterDeployment( deployment.getMetadata(), deploymentStatus, conf, false); + deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name()); break; default: throw new RuntimeException("Unsupported upgrade mode " + upgradeMode); } } -deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name()); Review Comment: Because we ran into this problem when we were doing job ops, the cancel job intermediate state always had `FINISHED`, which is not right for our business. So I don't think it's right for the program. On another note, I'd like to ask what was the reason for doing this in the first place? Is it normal to have an intermediate state finished for cancel requests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-31788][table] TableAggregateFunction supports emitUpdateWithRetract [flink]
LadyForest opened a new pull request, #24074: URL: https://github.com/apache/flink/pull/24074 ## What is the purpose of the change This PR is cherry-picked from https://github.com/apache/flink/pull/24051 ## Brief change log - Planning phase: Check whether the `emitUpdateWithRetract` method is implemented. If so, the `TableAggsHandleFunction#emitValue` method should invoke `TableAggregateFunction#emitUpdateWithRetract` instead of `TableAggregateFunction#emitValue`. Meanwhile, change the collector type to `RetractableCollector` and implement `RetractableCollector#retract`. - Runtime phase: Do not invoke emitValue to retract previously sent unchanged data. ## Verifying this change This change added tests and can be verified as follows: - `TableAggregateITCase#testFlagAggregateWithOrWithoutIncrementalUpdate` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`:no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31788) Add back Support emitValueWithRetract for TableAggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-31788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan updated FLINK-31788: -- Affects Version/s: 1.18.1 > Add back Support emitValueWithRetract for TableAggregateFunction > > > Key: FLINK-31788 > URL: https://issues.apache.org/jira/browse/FLINK-31788 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.14.0, 1.14.2, 1.14.3, 1.14.4, 1.14.5, 1.15.0, 1.15.1, > 1.16.0, 1.17.0, 1.15.2, 1.14.6, 1.15.3, 1.16.1, 1.15.4, 1.16.2, 1.18.0, > 1.17.1, 1.16.3, 1.17.2, 1.18.1 >Reporter: Feng Jin >Assignee: Jane Chan >Priority: Major > Labels: pull-request-available > > This feature was originally implemented in the old planner: > [https://github.com/apache/flink/pull/8550/files] > However, this feature was not implemented in the new planner , the Blink > planner. > With the removal of the old planner in version 1.14 > [https://github.com/apache/flink/pull/16080] , this code was also removed. > > We should add it back. > > origin discuss link: > https://lists.apache.org/thread/rnvw8k3636dqhdttpmf1c9colbpw9svp -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33792] Generate the same code for the same logic [flink]
zoudan commented on PR #23984: URL: https://github.com/apache/flink/pull/23984#issuecomment-1888301428 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31788) Add back Support emitValueWithRetract for TableAggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-31788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan updated FLINK-31788: -- Affects Version/s: 1.17.2 1.16.3 1.17.1 1.18.0 1.16.2 1.15.4 1.16.1 1.15.3 1.14.6 1.15.2 1.17.0 1.16.0 1.15.1 1.15.0 1.14.5 1.14.4 1.14.3 1.14.2 1.14.0 > Add back Support emitValueWithRetract for TableAggregateFunction > > > Key: FLINK-31788 > URL: https://issues.apache.org/jira/browse/FLINK-31788 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.14.0, 1.14.2, 1.14.3, 1.14.4, 1.14.5, 1.15.0, 1.15.1, > 1.16.0, 1.17.0, 1.15.2, 1.14.6, 1.15.3, 1.16.1, 1.15.4, 1.16.2, 1.18.0, > 1.17.1, 1.16.3, 1.17.2 >Reporter: Feng Jin >Assignee: Jane Chan >Priority: Major > Labels: pull-request-available > > This feature was originally implemented in the old planner: > [https://github.com/apache/flink/pull/8550/files] > However, this feature was not implemented in the new planner , the Blink > planner. > With the removal of the old planner in version 1.14 > [https://github.com/apache/flink/pull/16080] , this code was also removed. > > We should add it back. > > origin discuss link: > https://lists.apache.org/thread/rnvw8k3636dqhdttpmf1c9colbpw9svp -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31788) Add back Support emitValueWithRetract for TableAggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-31788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805853#comment-17805853 ] Jane Chan commented on FLINK-31788: --- Fixed in master 01569644aedb56f792c7f7e04f84612d405b0bdf > Add back Support emitValueWithRetract for TableAggregateFunction > > > Key: FLINK-31788 > URL: https://issues.apache.org/jira/browse/FLINK-31788 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Feng Jin >Assignee: Jane Chan >Priority: Major > Labels: pull-request-available > > This feature was originally implemented in the old planner: > [https://github.com/apache/flink/pull/8550/files] > However, this feature was not implemented in the new planner , the Blink > planner. > With the removal of the old planner in version 1.14 > [https://github.com/apache/flink/pull/16080] , this code was also removed. > > We should add it back. > > origin discuss link: > https://lists.apache.org/thread/rnvw8k3636dqhdttpmf1c9colbpw9svp -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31788][table] TableAggregateFunction supports emitUpdateWithRetract [flink]
LadyForest merged PR #24051: URL: https://github.com/apache/flink/pull/24051 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-20281) Window aggregation supports changelog stream input
[ https://issues.apache.org/jira/browse/FLINK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan resolved FLINK-20281. --- Fix Version/s: 1.19.0 Resolution: Fixed > Window aggregation supports changelog stream input > -- > > Key: FLINK-20281 > URL: https://issues.apache.org/jira/browse/FLINK-20281 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner, Table SQL / Runtime >Reporter: Jark Wu >Assignee: xuyang >Priority: Major > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > Fix For: 1.19.0 > > Attachments: screenshot-1.png > > > Currently, window aggregation doesn't support to consume a changelog stream. > This makes it impossible to do a window aggregation on changelog sources > (e.g. Kafka with Debezium format, or upsert-kafka, or mysql-cdc). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-20281) Window aggregation supports changelog stream input
[ https://issues.apache.org/jira/browse/FLINK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan closed FLINK-20281. - > Window aggregation supports changelog stream input > -- > > Key: FLINK-20281 > URL: https://issues.apache.org/jira/browse/FLINK-20281 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner, Table SQL / Runtime >Reporter: Jark Wu >Assignee: xuyang >Priority: Major > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > Fix For: 1.19.0 > > Attachments: screenshot-1.png > > > Currently, window aggregation doesn't support to consume a changelog stream. > This makes it impossible to do a window aggregation on changelog sources > (e.g. Kafka with Debezium format, or upsert-kafka, or mysql-cdc). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-20281) Window aggregation supports changelog stream input
[ https://issues.apache.org/jira/browse/FLINK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805852#comment-17805852 ] Jane Chan commented on FLINK-20281: --- Fixed in master df2ecdc77b23e0e40fe151ed8a7350a6db333f91 > Window aggregation supports changelog stream input > -- > > Key: FLINK-20281 > URL: https://issues.apache.org/jira/browse/FLINK-20281 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner, Table SQL / Runtime >Reporter: Jark Wu >Assignee: xuyang >Priority: Major > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > Attachments: screenshot-1.png > > > Currently, window aggregation doesn't support to consume a changelog stream. > This makes it impossible to do a window aggregation on changelog sources > (e.g. Kafka with Debezium format, or upsert-kafka, or mysql-cdc). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33879] Avoids the potential hang of Hybrid Shuffle during redistribution [flink]
jiangxin369 commented on PR #23957: URL: https://github.com/apache/flink/pull/23957#issuecomment-1888280011 @reswqa Seems that the CI failed but the cause is related to python rather than this PR. Could you help merge it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20281][table] support consuming cdc stream about window tvf aggregate [flink]
LadyForest merged PR #24030: URL: https://github.com/apache/flink/pull/24030 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix] Correct release date for flink-shaded 18.0 [flink-web]
snuyanzin opened a new pull request, #710: URL: https://github.com/apache/flink-web/pull/710 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add flink-shaded 18.0 release [flink-web]
snuyanzin merged PR #701: URL: https://github.com/apache/flink-web/pull/701 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DRAFT] FLIP-376 DISTRIBUTED BY [flink]
flinkbot commented on PR #24073: URL: https://github.com/apache/flink/pull/24073#issuecomment-1888126956 ## CI report: * 03494cb182c4f0e3fa4e407dc1f3b06dc9db40b9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [DRAFT] FLIP-376 DISTRIBUTED BY [flink]
jnh5y opened a new pull request, #24073: URL: https://github.com/apache/flink/pull/24073 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34063) When snapshot compression is enabled, rescaling of a source operator leads to some splits getting lost
[ https://issues.apache.org/jira/browse/FLINK-34063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Burmistrov updated FLINK-34063: Priority: Critical (was: Major) > When snapshot compression is enabled, rescaling of a source operator leads to > some splits getting lost > -- > > Key: FLINK-34063 > URL: https://issues.apache.org/jira/browse/FLINK-34063 > Project: Flink > Issue Type: Bug > Environment: Can be reproduced in any environment. The most important > thing is to enable snapshot compression. >Reporter: Ivan Burmistrov >Priority: Critical > Attachments: image-2024-01-11-16-27-09-066.png, > image-2024-01-11-16-30-47-466.png > > > h2. Backstory > We've been experimenting with Autoscaling on the Flink 1.18 and faced a > pretty nasty bug. > The symptoms on our production system were as following. After a while after > deploying a job with autoscaler it started accumulating Kafka lag, and this > could only be observed via external lag measurement - from inside Flink > (measured by > {{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK: > !image-2024-01-11-16-27-09-066.png|width=887,height=263! > After some digging, it turned out that the job has lost some Kafka partitions > - i.e. it stopped consuming from them, “forgot” about their existence. That’s > why from the Flink’s perspective everything was fine - the lag was growing on > the partitions Flink no longer knew about. > This was visible on a metric called “Assigned partitions” > (KafkaSourceReader_KafkaConsumer_assigned_partitions): > !image-2024-01-11-16-30-47-466.png|width=1046,height=254! > We see on the chart that the job used to know about 20 partitions, and then > this number got dropped to 16. > This drop has been quickly connected to the job’s scaling events. Or, more > precisely, to the scaling of the source operator - with almost 100% > probability any scaling of the source operator led to partitions loss. > h2. Investigation > We've conducted the investigation. We use the latest Kubernetes operator and > deploy jobs with Native Kubernetes. > The reproducing scenario we used for investigation: > * Launch a job with source operator parallelism = 4, enable DEBUG logging > * Wait until it takes the first checkpoint > * Scale-up the source operator to say 5 (no need to wait for autoscaling, it > can be done via Flink UI) > * Wait until the new checkpoint is taken > * Scale-down the source operator to 3 > These simple actions with almost 100% probability led to some partitions get > lost. > After that we've downloaded all the logs and inspected them. Noticed these > strange records in logs: > {code:java} > {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring > state for 4 split(s) to reader.","service_name":"data-beaver"} > {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding > split(s) to reader: > [ > [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, > StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code} > We see that some task being restored with 4 splits, however actual splits > have duplicates - we see that in reality 2 unique partitions have been added > ({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}). > Digging into the code and the logs a bit more, log lines like this started > looking suspicious: > > {code:java} > {"timestamp":1704415753165,"is_logging_enabled":"false","logger_id":"org.apache.flink.runtime.state.TaskStateManagerImpl","log_level":"DEBUG", > "message":"Operator 156a1ebbc1936f7d4558c8070b35ba93 has remote state > SubtaskState{operatorStateFromBackend=StateObjectCollection{ > [OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244, > 244], distributionMode=SPLIT_DISTRIBUTE}}, > delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/fadb4f23-85dd-4048-b466-94c1c5329dd3', > dataBytes=328}}, > OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244, > 244],
[jira] [Updated] (FLINK-34064) Expose JobManagerOperatorMetrics via REST API
[ https://issues.apache.org/jira/browse/FLINK-34064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mason Chen updated FLINK-34064: --- Affects Version/s: 1.18.0 > Expose JobManagerOperatorMetrics via REST API > - > > Key: FLINK-34064 > URL: https://issues.apache.org/jira/browse/FLINK-34064 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Affects Versions: 1.18.0 >Reporter: Mason Chen >Priority: Major > > Add a REST API to fetch coordinator metrics. > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34064) Expose JobManagerOperatorMetrics via REST API
[ https://issues.apache.org/jira/browse/FLINK-34064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mason Chen updated FLINK-34064: --- Component/s: Runtime / REST > Expose JobManagerOperatorMetrics via REST API > - > > Key: FLINK-34064 > URL: https://issues.apache.org/jira/browse/FLINK-34064 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Reporter: Mason Chen >Priority: Major > > Add a REST API to fetch coordinator metrics. > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34064) Expose JobManagerOperatorMetrics via REST API
Mason Chen created FLINK-34064: -- Summary: Expose JobManagerOperatorMetrics via REST API Key: FLINK-34064 URL: https://issues.apache.org/jira/browse/FLINK-34064 Project: Flink Issue Type: Improvement Reporter: Mason Chen Add a REST API to fetch coordinator metrics. [https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]
mas-chen commented on PR #44: URL: https://github.com/apache/flink-connector-kafka/pull/44#issuecomment-1887855854 one-pager docs are here: docs/content/docs/connectors/datastream/dynamic-kafka.md. Marked with experimental status. Also copied this into the chinese doc directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making
[ https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805739#comment-17805739 ] Piotr Nowojski commented on FLINK-33856: In that case [~hejufang001] it would be great if you started another FLIP for adding per sub-task spans. If you decide to od so, please ping me so on Apache Flink Slack or via a Jira ticket so I don't miss it :) > Add metrics to monitor the interaction performance between task and external > storage system in the process of checkpoint making > --- > > Key: FLINK-33856 > URL: https://issues.apache.org/jira/browse/FLINK-33856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 >Reporter: Jufang He >Assignee: Jufang He >Priority: Major > Labels: pull-request-available > > When Flink makes a checkpoint, the interaction performance with the external > file system has a great impact on the overall time-consuming. Therefore, it > is easy to observe the bottleneck point by adding performance indicators when > the task interacts with the external file storage system. These include: the > rate of file write , the latency to write the file, the latency to close the > file. > In flink side add the above metrics has the following advantages: convenient > statistical different task E2E time-consuming; do not need to distinguish the > type of external storage system, can be unified in the > FsCheckpointStreamFactory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]
mxm commented on PR #44: URL: https://github.com/apache/flink-connector-kafka/pull/44#issuecomment-1887622480 A brief one-pager of the feature would be good. We can expand on it later. Just to have something in the web docs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34063) When snapshot compression is enabled, rescaling of a source operator leads to some splits getting lost
Ivan Burmistrov created FLINK-34063: --- Summary: When snapshot compression is enabled, rescaling of a source operator leads to some splits getting lost Key: FLINK-34063 URL: https://issues.apache.org/jira/browse/FLINK-34063 Project: Flink Issue Type: Bug Environment: Can be reproduced in any environment. The most important thing is to enable snapshot compression. Reporter: Ivan Burmistrov Attachments: image-2024-01-11-16-27-09-066.png, image-2024-01-11-16-30-47-466.png h2. Backstory We've been experimenting with Autoscaling on the Flink 1.18 and faced a pretty nasty bug. The symptoms on our production system were as following. After a while after deploying a job with autoscaler it started accumulating Kafka lag, and this could only be observed via external lag measurement - from inside Flink (measured by {{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK: !image-2024-01-11-16-27-09-066.png|width=887,height=263! After some digging, it turned out that the job has lost some Kafka partitions - i.e. it stopped consuming from them, “forgot” about their existence. That’s why from the Flink’s perspective everything was fine - the lag was growing on the partitions Flink no longer knew about. This was visible on a metric called “Assigned partitions” (KafkaSourceReader_KafkaConsumer_assigned_partitions): !image-2024-01-11-16-30-47-466.png|width=1046,height=254! We see on the chart that the job used to know about 20 partitions, and then this number got dropped to 16. This drop has been quickly connected to the job’s scaling events. Or, more precisely, to the scaling of the source operator - with almost 100% probability any scaling of the source operator led to partitions loss. h2. Investigation We've conducted the investigation. We use the latest Kubernetes operator and deploy jobs with Native Kubernetes. The reproducing scenario we used for investigation: * Launch a job with source operator parallelism = 4, enable DEBUG logging * Wait until it takes the first checkpoint * Scale-up the source operator to say 5 (no need to wait for autoscaling, it can be done via Flink UI) * Wait until the new checkpoint is taken * Scale-down the source operator to 3 These simple actions with almost 100% probability led to some partitions get lost. After that we've downloaded all the logs and inspected them. Noticed these strange records in logs: {code:java} {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring state for 4 split(s) to reader.","service_name":"data-beaver"} {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding split(s) to reader: [ [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, StoppingOffset: -9223372036854775808], [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, StoppingOffset: -9223372036854775808], [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, StoppingOffset: -9223372036854775808], [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code} We see that some task being restored with 4 splits, however actual splits have duplicates - we see that in reality 2 unique partitions have been added ({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}). Digging into the code and the logs a bit more, log lines like this started looking suspicious: {code:java} {"timestamp":1704415753165,"is_logging_enabled":"false","logger_id":"org.apache.flink.runtime.state.TaskStateManagerImpl","log_level":"DEBUG", "message":"Operator 156a1ebbc1936f7d4558c8070b35ba93 has remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{ [OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244, 244], distributionMode=SPLIT_DISTRIBUTE}}, delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/fadb4f23-85dd-4048-b466-94c1c5329dd3', dataBytes=328}}, OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244, 244], distributionMode=SPLIT_DISTRIBUTE}}, delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/102aa50b-78c2-457e-9a2f-0055f1dbeb98', dataBytes=328}}]}, operatorStateFromStream=StateObjectCollection{[]}, keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=StateObjectCollection{[]},
Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]
mas-chen commented on PR #44: URL: https://github.com/apache/flink-connector-kafka/pull/44#issuecomment-1887602638 Are the Javadocs sufficient in the bare minimum case? Otherwise, I'll just write the documentation into this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]
mxm commented on PR #44: URL: https://github.com/apache/flink-connector-kafka/pull/44#issuecomment-1887599176 Sounds good. Can we get a bare-minimum version of the documentation into this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1887600132 @snuyanzin I have left this with multiple commits for now . Should we squash to one commit and put you as a co author in the commit message? If so how would you like me to identify you in the `Co-authored-by: NAME ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1449155934 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicatePlanTest.java: ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests ParameterizedPredicate. */ +public class ParameterizedPredicatePlanTest extends TableTestBase { Review Comment: @snuyanzin I assume that this comment is no longer relevant as is , as you have extended ParameterizedPredicatePlanTest to be a ParameterizedTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]
mas-chen commented on PR #44: URL: https://github.com/apache/flink-connector-kafka/pull/44#issuecomment-1887586297 Thanks @tzulitai @mxm for your review! I've addressed all the comments and squashed the commits. Can you please merge it at your convenience? I'll start on the docs immediately and hope to use this connector directly in the 3.1.0 release! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34027] Introduces AsyncScalarFunction as a new UDF type [flink]
twalthr commented on code in PR #23975: URL: https://github.com/apache/flink/pull/23975#discussion_r1449149984 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/AsyncUtil.java: ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.utils; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.AsyncScalarFunction; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; + +import java.io.Serializable; +import java.time.Duration; +import java.util.Collection; +import java.util.Objects; +import java.util.function.Predicate; + +/** Contains utilities for AsyncScalarFunctions. */ +public class AsyncUtil { + +private static final EmptyResponseResultStrategy EMPTY_RESPONSE = +new EmptyResponseResultStrategy(); +private static final Predicate ANY_EXCEPTION = new AnyExceptionStrategy(); + +/** + * Checks whether it contains the specified kind of async function call in the specified node. + * + * @param node the RexNode to check + * @return true if it contains an async function call in the specified node. + */ +public static boolean containsAsyncCall(RexNode node) { +return node.accept(new FunctionFinder(true, true)); +} + +/** + * Checks whether it contains non-async function call in the specified node. + * + * @param node the RexNode to check + * @return true if it contains a non-async function call in the specified node. + */ +public static boolean containsNonAsyncCall(RexNode node) { +return node.accept(new FunctionFinder(false, true)); +} + +/** + * Checks whether the specified node is the specified kind of async function call. + * + * @param node the RexNode to check + * @return true if the specified node is an async function call. + */ +public static boolean isAsyncCall(RexNode node) { +return node.accept(new FunctionFinder(true, false)); +} + +/** + * Checks whether the specified node is a non-async function call. + * + * @param node the RexNode to check + * @return true if the specified node is a non-async function call. + */ +public static boolean isNonAsyncCall(RexNode node) { +return node.accept(new FunctionFinder(false, false)); +} + +/** + * Gets the options required to run the operator. + * + * @param config The config from which to fetch the options + * @return Extracted options + */ +public static AsyncUtil.Options getAsyncOptions(ExecNodeConfig config) { +return new AsyncUtil.Options( + config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY), + config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_TIMEOUT).toMillis(), +AsyncDataStream.OutputMode.ORDERED, +getResultRetryStrategy( + config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_RETRY_STRATEGY), + config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_RETRY_DELAY), + config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_MAX_ATTEMPTS))); +} + +/** Options for configuring async behavior. */ +public static class Options { + +public final int asyncBufferCapacity; +public final long asyncTimeout; +public final AsyncDataStream.OutputMode asyncOutputMode; +public final AsyncRetryStrategy asyncRetryStrategy; + +
[jira] [Comment Edited] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled
[ https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805708#comment-17805708 ] Matthias Pohl edited comment on FLINK-34036 at 1/11/24 4:55 PM: The problem seems to be that we're not building the Flink artifacts with the PROFILE which causes issues when running the tests with the PROFILE. That is why it's possible to reproduce it locally. Because I didn't trigger the clean phase before running the tests with the profiles enabled. This is also a problem in the GHA workflow configuration. The run_mvn configuration doesn't inject the PROFILE env variable properly. was (Author: mapohl): The problem seems to be that we're not building the Flink artifacts with the PROFILE which causes issues when running the tests with the PROFILE. That is why it's possible to reproduce it locally. Because I didn't trigger the clean phase before running the tests with the profiles enabled. > Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with > Hadoop 3.1.3 enabled > -- > > Key: FLINK-34036 > URL: https://issues.apache.org/jira/browse/FLINK-34036 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hadoop Compatibility, Connectors / Hive >Affects Versions: 1.18.0, 1.19.0 >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions, test-stability > > The following {{HiveDialectQueryITCase}} tests fail consistently in the > FLINK-27075 GitHub Actions [master nightly > workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of > Flink (and also the [release-1.18 > workflow|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]): > * {{testInsertDirectory}} > * {{testCastTimeStampToDecimal}} > * {{testNullLiteralAsArgument}} > {code} > Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, > Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in > org.apache.flink.connectors.hive.HiveDialectQueryITCase > Error: 03:38:45 03:38:45.662 [ERROR] > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument > -- Time elapsed: 0.069 s <<< ERROR! > Jan 09 03:38:45 java.lang.NoSuchMethodError: > org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp; > Jan 09 03:38:45 at > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959) > Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498) > Jan 09 03:38:45 > Error: 03:38:45 03:38:45.662 [ERROR] > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal > -- Time elapsed: 0.007 s <<< ERROR! > Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with > identifier 'test-catalog.default.t1' does not exist. > Jan 09 03:38:45 at > org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266) > Jan 09 03:38:45 at > org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206) > Jan 09 03:38:45 at > org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74) > Jan 09 03:38:45 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107) > Jan 09 03:38:45 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735) > Jan 09 03:38:45 at > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835) > Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498) > Jan 09 03:38:45 > Error: 03:38:45 03:38:45.663 [ERROR] > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory > -- Time elapsed: 7.326 s <<< FAILURE! > Jan 09 03:38:45 org.opentest4j.AssertionFailedError: > Jan 09 03:38:45 > Jan 09 03:38:45 expected: "A:english=90#math=100#history=85" > Jan 09 03:38:45 but was: "A:english=90math=100history=85" > Jan 09 03:38:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Jan 09 03:38:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Jan 09 03:38:45 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > Jan 09 03:38:45 at > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498) > Jan 09 03:38:45 at
[jira] [Commented] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled
[ https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805708#comment-17805708 ] Matthias Pohl commented on FLINK-34036: --- The problem seems to be that we're not building the Flink artifacts with the PROFILE which causes issues when running the tests with the PROFILE. That is why it's possible to reproduce it locally. Because I didn't trigger the clean phase before running the tests with the profiles enabled. > Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with > Hadoop 3.1.3 enabled > -- > > Key: FLINK-34036 > URL: https://issues.apache.org/jira/browse/FLINK-34036 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hadoop Compatibility, Connectors / Hive >Affects Versions: 1.18.0, 1.19.0 >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions, test-stability > > The following {{HiveDialectQueryITCase}} tests fail consistently in the > FLINK-27075 GitHub Actions [master nightly > workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of > Flink (and also the [release-1.18 > workflow|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]): > * {{testInsertDirectory}} > * {{testCastTimeStampToDecimal}} > * {{testNullLiteralAsArgument}} > {code} > Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, > Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in > org.apache.flink.connectors.hive.HiveDialectQueryITCase > Error: 03:38:45 03:38:45.662 [ERROR] > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument > -- Time elapsed: 0.069 s <<< ERROR! > Jan 09 03:38:45 java.lang.NoSuchMethodError: > org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp; > Jan 09 03:38:45 at > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959) > Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498) > Jan 09 03:38:45 > Error: 03:38:45 03:38:45.662 [ERROR] > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal > -- Time elapsed: 0.007 s <<< ERROR! > Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with > identifier 'test-catalog.default.t1' does not exist. > Jan 09 03:38:45 at > org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266) > Jan 09 03:38:45 at > org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206) > Jan 09 03:38:45 at > org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74) > Jan 09 03:38:45 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107) > Jan 09 03:38:45 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735) > Jan 09 03:38:45 at > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835) > Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498) > Jan 09 03:38:45 > Error: 03:38:45 03:38:45.663 [ERROR] > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory > -- Time elapsed: 7.326 s <<< FAILURE! > Jan 09 03:38:45 org.opentest4j.AssertionFailedError: > Jan 09 03:38:45 > Jan 09 03:38:45 expected: "A:english=90#math=100#history=85" > Jan 09 03:38:45 but was: "A:english=90math=100history=85" > Jan 09 03:38:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Jan 09 03:38:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Jan 09 03:38:45 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > Jan 09 03:38:45 at > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498) > Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498) > {code} > Additionally, the {{HiveITCase}} in the e2e test suite is affected: > {code} > Error: 05:20:20 05:20:20.949 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 0.106 s <<< FAILURE! -- in > org.apache.flink.tests.hive.HiveITCase > Error: 05:20:20 05:20:20.949 [ERROR] org.apache.flink.tests.hive.HiveITCase > -- Time elapsed: 0.106 s <<< ERROR! > Jan 07 05:20:20 java.lang.ExceptionInInitializerError > Jan 07 05:20:20 at
Re: [PR] [FLINK-34027] Introduces AsyncScalarFunction as a new UDF type [flink]
twalthr commented on code in PR #23975: URL: https://github.com/apache/flink/pull/23975#discussion_r1449138779 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecAsyncCalc.java: ## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.common; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.codegen.AsyncCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.planner.plan.utils.AsyncUtil; +import org.apache.flink.table.runtime.generated.GeneratedFunction; +import org.apache.flink.table.runtime.operators.calc.async.AsyncFunctionRunner; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Base class for exec Async Calc. */ +public abstract class CommonExecAsyncCalc extends ExecNodeBase +implements SingleTransformationTranslator { + +private static final Logger LOG = LoggerFactory.getLogger(CommonExecAsyncCalc.class); + +public static final String ASYNC_CALC_TRANSFORMATION = "async-calc"; + +public static final String FIELD_NAME_PROJECTION = "projection"; + +public static final String FIELD_NAME_SYNC_MODE = "syncMode"; + +@JsonProperty(FIELD_NAME_PROJECTION) +private final List projection; + +public CommonExecAsyncCalc( +int id, +ExecNodeContext context, +ReadableConfig persistedConfig, +List projection, Review Comment: Thanks for the explanation. It's unfortunate that the Python people called it Calc. But let's keep it like this for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34027] Introduces AsyncScalarFunction as a new UDF type [flink]
twalthr commented on code in PR #23975: URL: https://github.com/apache/flink/pull/23975#discussion_r1449134322 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java: ## @@ -208,6 +249,32 @@ static MethodVerification createParameterWithArgumentVerification( }; } +/** Verification that checks a method by parameters including an additional first parameter. */ +static MethodVerification createGenericParameterWithArgumentAndReturnTypeVerification( +Class baseClass, Class argumentClass, int paramPos, int genericPos) { +return (method, signature, result) -> { +final Class[] parameters = +Stream.concat(Stream.of(argumentClass), signature.stream()) +.toArray(Class[]::new); + +Type genericType = method.getGenericParameterTypes()[paramPos]; +// Trusty library allows to resolve generic types where the type resolves to a type with +// generic parameters... +genericType = TypeToken.of(baseClass).resolveType(genericType).getType(); Review Comment: Looks great now! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32006) AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry times out on Azure
[ https://issues.apache.org/jira/browse/FLINK-32006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805698#comment-17805698 ] Junrui Li commented on FLINK-32006: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56262=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8 > AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry > times out on Azure > -- > > Key: FLINK-32006 > URL: https://issues.apache.org/jira/browse/FLINK-32006 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.18.0, 1.17.2, 1.19.0 >Reporter: David Morávek >Assignee: David Morávek >Priority: Critical > Labels: pull-request-available, stale-assigned, test-stability > > {code:java} > May 04 13:52:18 [ERROR] > org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry > Time elapsed: 100.009 s <<< ERROR! > May 04 13:52:18 org.junit.runners.model.TestTimedOutException: test timed out > after 100 seconds > May 04 13:52:18 at java.lang.Thread.sleep(Native Method) > May 04 13:52:18 at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncWaitOperatorTest.java:1313) > May 04 13:52:18 at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry(AsyncWaitOperatorTest.java:1277) > May 04 13:52:18 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > May 04 13:52:18 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > May 04 13:52:18 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > May 04 13:52:18 at java.lang.reflect.Method.invoke(Method.java:498) > May 04 13:52:18 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > May 04 13:52:18 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > May 04 13:52:18 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > May 04 13:52:18 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > May 04 13:52:18 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > May 04 13:52:18 at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > May 04 13:52:18 at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > May 04 13:52:18 at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > May 04 13:52:18 at java.lang.Thread.run(Thread.java:748) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48671=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9288 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34027] Introduces AsyncScalarFunction as a new UDF type [flink]
twalthr commented on code in PR #23975: URL: https://github.com/apache/flink/pull/23975#discussion_r1449130065 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java: ## @@ -67,6 +67,8 @@ public final class UserDefinedFunctionHelper { public static final String SCALAR_EVAL = "eval"; +public static final String ASYNC_SCALAR_EVAL = "eval"; + public static final String TABLE_EVAL = "eval"; Review Comment: That is definitely the best location as it is called right after registration in TableEnvironment. An additional check should be done after specialization in code generation. ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java: ## @@ -348,6 +350,53 @@ public static Optional> extractSimpleGeneric( } } +/** Resolves a variable type while accepting a context for resolution. */ +public static Type resolveVariableWithClassContext(@Nullable Type contextType, Type type) { +final List typeHierarchy; +if (contextType != null) { +typeHierarchy = collectTypeHierarchy(contextType); +} else { +typeHierarchy = Collections.emptyList(); +} +if (!containsTypeVariable(type)) { Review Comment: not sure if this was intended but with this check you exclude the `WildcardType`: ``` eval(CompletableFuture s) ``` which is correct because the if clause further down could not handle it, I would at least add a comment to the false in `containsTypeVariable` ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java: ## @@ -521,6 +529,49 @@ private static void validateImplementationMethod( } } +private static void validateAsyncImplementationMethod( +Class clazz, String... methodNameOptions) { +final Set nameSet = new HashSet<>(Arrays.asList(methodNameOptions)); +final List methods = getAllDeclaredMethods(clazz); +for (Method method : methods) { +if (!nameSet.contains(method.getName())) { +continue; +} +if (!method.getReturnType().equals(Void.TYPE)) { +throw new ValidationException( +String.format( +"Method '%s' of function class '%s' must be void.", +method.getName(), clazz.getName())); +} +boolean foundParam = false; +boolean genericParam = false; +if (method.getParameterCount() >= 1) { +Type firstParam = method.getGenericParameterTypes()[0]; +firstParam = ExtractionUtils.resolveVariableWithClassContext(clazz, firstParam); +if (CompletableFuture.class.equals(firstParam)) { +foundParam = true; +} else if (firstParam instanceof ParameterizedType +&& CompletableFuture.class.equals( +((ParameterizedType) firstParam).getRawType())) { +foundParam = true; +genericParam = true; +} +} +if (!foundParam) { +throw new ValidationException( +String.format( +"Method '%s' of function class '%s' must have a first argument of type java.util.concurrent.CompletableFuture.", +method.getName(), clazz.getName())); +} +if (!genericParam) { Review Comment: this we shouldn't validate because if someone uses a type inference with multiple types (i.e. no FunctionMappingExtractor), they might use raw types. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34020] Bump CI flink version on flink-connector-rabbitmq [flink-connector-rabbitmq]
snuyanzin commented on code in PR #22: URL: https://github.com/apache/flink-connector-rabbitmq/pull/22#discussion_r1449084708 ## flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java: ## @@ -22,6 +22,7 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper; Review Comment: ```suggestion ``` ## flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java: ## @@ -236,7 +239,10 @@ protected void setupQueue() throws IOException { @Override public void open(Configuration config) throws Exception { super.open(config); +sessionIds = new ArrayList<>(64); +sessionIdsPerSnapshot = new ArrayDeque<>(); try { +RichCombineToGroupCombineWrapper s; Review Comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]
XComp commented on code in PR #23971: URL: https://github.com/apache/flink/pull/23971#discussion_r1449060542 ## .github/actions/select_workflow_configs/action.yml: ## @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: "Selects Flink workflow configurations" +description: "Loads the relevant workflow configuration from a given folder." +inputs: + config-folder: +description: "The directory in which the workflow configurations are located." +required: true + profile: +description: >- + The profile that's used to access the desired workflow configuration + file. Any invalid profile (i.e. no file exists) will lead to using + the default workflow configurations. Configuration files are expected + to have the name format 'flink-workflow..json' and should be + located in the folder that's specified through the 'config-folder' parameter. +required: true +outputs: + workflow-configurations: +description: "A JSON representation of the workflow configurations." +value: "${{ steps.workflow-selection.outputs.configs }}" +runs: + using: "composite" + steps: +- name: "Loads Flink workflow configurations" + id: workflow-selection + shell: bash + run: | +fallback_profile="default" +fallback_path="${{ inputs.config-folder }}/flink-workflow.${fallback_profile}.json" Review Comment: > Why can't we use the configuration from the respective branch? checkout branch, use nightly config. Scheduled workflow runs will be triggered from `master`. The workflow configuration (the json file) is therefore also loaded from `master`. The workflow then triggers the checkout of individual branches. > Do we plan to have different workflow configs for a single branch? We have that right now: 1.18 doesn't include Java 21. But you're right: It's not a common case, I'd say. There are some reasons why I came up with this solution (having dedicated per-branch configuration files and one workflow file per branch): Initially, I went for a single workflow yaml that would trigger a fixed list of release branches through the matrix strategy. We could have disabled the Java 21 workflow. But the flaw of this approach is that the workflow will be nested in three levels: 1. branch 2. profiles 3. the individual jobs of flink-ci.template.yml GitHub Actions doesn't work well with three levels. The dropdown per profile would go away. Instead, we would have a dropdown per branch with all profiles and their individual jobs flattened out in a long list. Unfortunately, I don't have a workflow run saved anymore which would show what I mean. But It's essentially similar what we have with Azure Pipelines: A long list of individual jobs that are hard to parse by the human eye. Having individual workflow yamls for each branch helped work around this issue with the cost of having kind of redundant code. I introduced the workflow configuration JSON files to reduce the redundancy if the workflow configuration to a minimum. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30252) Publish flink-shaded pom
[ https://issues.apache.org/jira/browse/FLINK-30252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-30252: --- Fix Version/s: shaded-19.0 (was: shaded-18.0) > Publish flink-shaded pom > > > Key: FLINK-30252 > URL: https://issues.apache.org/jira/browse/FLINK-30252 > Project: Flink > Issue Type: Technical Debt > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: shaded-19.0 > > > Publish a bom for flink-shaded, such that downtream projects just select the > flink-shaded version, with all other dependency versions being selected > automatically. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]
XComp commented on code in PR #23971: URL: https://github.com/apache/flink/pull/23971#discussion_r1449024195 ## .github/actions/select_workflow_configs/action.yml: ## @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: "Selects Flink workflow configurations" +description: "Loads the relevant workflow configuration from a given folder." +inputs: + config-folder: +description: "The directory in which the workflow configurations are located." +required: true + profile: +description: >- + The profile that's used to access the desired workflow configuration + file. Any invalid profile (i.e. no file exists) will lead to using + the default workflow configurations. Configuration files are expected + to have the name format 'flink-workflow..json' and should be + located in the folder that's specified through the 'config-folder' parameter. +required: true +outputs: + workflow-configurations: +description: "A JSON representation of the workflow configurations." +value: "${{ steps.workflow-selection.outputs.configs }}" +runs: + using: "composite" + steps: +- name: "Loads Flink workflow configurations" + id: workflow-selection + shell: bash + run: | +fallback_profile="default" +fallback_path="${{ inputs.config-folder }}/flink-workflow.${fallback_profile}.json" Review Comment: > a) nightly builds should use workflow_call; workflow_dispatch is for manual runs via the UI I'm not sure I understand your statement here. The default json file is essentially the workflow for the nightly build of master. Additionally, I enabled `workflow_dispatch` as an valid event trigger for this workflow as well so that anyone could trigger an extended workflow run (with the same configuration as the nightly build on `master`) on his/her branch. > shouldn't master explicitly point to the default config? Fine with me. We could rename the to `flink-workflow.master.json` and still use it as fallback for branches that do not have a dedicated configuration. If that's what you suggest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]
zentol commented on code in PR #23971: URL: https://github.com/apache/flink/pull/23971#discussion_r1449020160 ## .github/actions/select_workflow_configs/action.yml: ## @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: "Selects Flink workflow configurations" +description: "Loads the relevant workflow configuration from a given folder." +inputs: + config-folder: +description: "The directory in which the workflow configurations are located." +required: true + profile: +description: >- + The profile that's used to access the desired workflow configuration + file. Any invalid profile (i.e. no file exists) will lead to using + the default workflow configurations. Configuration files are expected + to have the name format 'flink-workflow..json' and should be + located in the folder that's specified through the 'config-folder' parameter. +required: true +outputs: + workflow-configurations: +description: "A JSON representation of the workflow configurations." +value: "${{ steps.workflow-selection.outputs.configs }}" +runs: + using: "composite" + steps: +- name: "Loads Flink workflow configurations" + id: workflow-selection + shell: bash + run: | +fallback_profile="default" +fallback_path="${{ inputs.config-folder }}/flink-workflow.${fallback_profile}.json" Review Comment: > Why can't we use the configuration from the respective branch? checkout branch, use nightly config. I don't remember how exactly GHA handles things w.r.t. which branches workflow files are being honored... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]
zentol commented on code in PR #23971: URL: https://github.com/apache/flink/pull/23971#discussion_r1449015851 ## .github/actions/select_workflow_configs/action.yml: ## @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: "Selects Flink workflow configurations" +description: "Loads the relevant workflow configuration from a given folder." +inputs: + config-folder: +description: "The directory in which the workflow configurations are located." +required: true + profile: +description: >- + The profile that's used to access the desired workflow configuration + file. Any invalid profile (i.e. no file exists) will lead to using + the default workflow configurations. Configuration files are expected + to have the name format 'flink-workflow..json' and should be + located in the folder that's specified through the 'config-folder' parameter. +required: true +outputs: + workflow-configurations: +description: "A JSON representation of the workflow configurations." +value: "${{ steps.workflow-selection.outputs.configs }}" +runs: + using: "composite" + steps: +- name: "Loads Flink workflow configurations" + id: workflow-selection + shell: bash + run: | +fallback_profile="default" +fallback_path="${{ inputs.config-folder }}/flink-workflow.${fallback_profile}.json" Review Comment: Why can't we use the configuration from the respective branch? checkout branch, use nightly config. Do we plan to have different workflow configs for a single branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]
zentol commented on code in PR #23971: URL: https://github.com/apache/flink/pull/23971#discussion_r1449014250 ## .github/actions/select_workflow_configs/action.yml: ## @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: "Selects Flink workflow configurations" +description: "Loads the relevant workflow configuration from a given folder." +inputs: + config-folder: +description: "The directory in which the workflow configurations are located." +required: true + profile: +description: >- + The profile that's used to access the desired workflow configuration + file. Any invalid profile (i.e. no file exists) will lead to using + the default workflow configurations. Configuration files are expected + to have the name format 'flink-workflow..json' and should be + located in the folder that's specified through the 'config-folder' parameter. +required: true +outputs: + workflow-configurations: +description: "A JSON representation of the workflow configurations." +value: "${{ steps.workflow-selection.outputs.configs }}" +runs: + using: "composite" + steps: +- name: "Loads Flink workflow configurations" + id: workflow-selection + shell: bash + run: | +fallback_profile="default" +fallback_path="${{ inputs.config-folder }}/flink-workflow.${fallback_profile}.json" Review Comment: a) nightly builds should use workflow_call; workflow_dispatch is for manual runs via the UI IIRC b) shouldn't master explicitly point to the default config? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]
zentol commented on code in PR #23971: URL: https://github.com/apache/flink/pull/23971#discussion_r1449015851 ## .github/actions/select_workflow_configs/action.yml: ## @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: "Selects Flink workflow configurations" +description: "Loads the relevant workflow configuration from a given folder." +inputs: + config-folder: +description: "The directory in which the workflow configurations are located." +required: true + profile: +description: >- + The profile that's used to access the desired workflow configuration + file. Any invalid profile (i.e. no file exists) will lead to using + the default workflow configurations. Configuration files are expected + to have the name format 'flink-workflow..json' and should be + located in the folder that's specified through the 'config-folder' parameter. +required: true +outputs: + workflow-configurations: +description: "A JSON representation of the workflow configurations." +value: "${{ steps.workflow-selection.outputs.configs }}" +runs: + using: "composite" + steps: +- name: "Loads Flink workflow configurations" + id: workflow-selection + shell: bash + run: | +fallback_profile="default" +fallback_path="${{ inputs.config-folder }}/flink-workflow.${fallback_profile}.json" Review Comment: Why can't we use the configuration from the respective branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]
zentol commented on code in PR #23971: URL: https://github.com/apache/flink/pull/23971#discussion_r1449014250 ## .github/actions/select_workflow_configs/action.yml: ## @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: "Selects Flink workflow configurations" +description: "Loads the relevant workflow configuration from a given folder." +inputs: + config-folder: +description: "The directory in which the workflow configurations are located." +required: true + profile: +description: >- + The profile that's used to access the desired workflow configuration + file. Any invalid profile (i.e. no file exists) will lead to using + the default workflow configurations. Configuration files are expected + to have the name format 'flink-workflow..json' and should be + located in the folder that's specified through the 'config-folder' parameter. +required: true +outputs: + workflow-configurations: +description: "A JSON representation of the workflow configurations." +value: "${{ steps.workflow-selection.outputs.configs }}" +runs: + using: "composite" + steps: +- name: "Loads Flink workflow configurations" + id: workflow-selection + shell: bash + run: | +fallback_profile="default" +fallback_path="${{ inputs.config-folder }}/flink-workflow.${fallback_profile}.json" Review Comment: a) nightly builds should use workflow_call; workflow_dispatch is for manual runs via the UI b) shouldn't master explicitly point to the default config? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31977) If scaling.effectiveness.detection.enabled is false, the call to the detectIneffectiveScaleUp() function is unnecessary
[ https://issues.apache.org/jira/browse/FLINK-31977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805650#comment-17805650 ] Maximilian Michels commented on FLINK-31977: I think this is related to FLINK-33993. The name of the configuration option is a bit misleading, as effectiveness detection is always on but scalings are only blocked when the option is set to {{true}}. > If scaling.effectiveness.detection.enabled is false, the call to the > detectIneffectiveScaleUp() function is unnecessary > --- > > Key: FLINK-31977 > URL: https://issues.apache.org/jira/browse/FLINK-31977 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Affects Versions: 1.17.0 >Reporter: Tan Kim >Priority: Minor > > The code below is a function to detect inefficient scaleups. > It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED > (scaling.effectiveness.detection.enabled) is true after all the necessary > computations for detection, but this is an unnecessary computation. > {code:java} > JobVertexScaler.java #175 > private boolean detectIneffectiveScaleUp( > AbstractFlinkResource resource, > JobVertexID vertex, > Configuration conf, > Map evaluatedMetrics, > ScalingSummary lastSummary) { > double lastProcRate = > lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // > 22569.315633422066 > double lastExpectedProcRate = > > lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // > 37340.0 > var currentProcRate = > evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage(); > // To judge the effectiveness of the scale up operation we compute how > much of the expected > // increase actually happened. For example if we expect a 100 increase in > proc rate and only > // got an increase of 10 we only accomplished 10% of the desired > increase. If this number is > // below the threshold, we mark the scaling ineffective. > double expectedIncrease = lastExpectedProcRate - lastProcRate; > double actualIncrease = currentProcRate - lastProcRate; > boolean withinEffectiveThreshold = > (actualIncrease / expectedIncrease) > >= > conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD); > if (withinEffectiveThreshold) { > return false; > } > var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex); > eventRecorder.triggerEvent( > resource, > EventRecorder.Type.Normal, > EventRecorder.Reason.IneffectiveScaling, > EventRecorder.Component.Operator, > message); > if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) { > LOG.info(message); > return true; > } else { > return false; > } > } {code} > In the call to the detectIneffectiveScaleUp function, I would suggest > checking SCALING_EFFECTIVENESS_DETECTION_ENABLED first, as follows. > {code:java} > JobVertexScaler.java #150 > if (currentParallelism == lastSummary.getNewParallelism() && > lastSummary.isScaledUp()) { > if (scaledUp) { > > if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) { > return detectIneffectiveScaleUp(resource, vertex, conf, > evaluatedMetrics, lastSummary); > } else { > return true; > } > } else { > return detectImmediateScaleDownAfterScaleUp(vertex, conf, > lastScalingTs); > } > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33694][gs-fs-hadoop] Support overriding GCS root URL [flink]
patricklucas commented on PR #23836: URL: https://github.com/apache/flink/pull/23836#issuecomment-1887371839 My suspicion is that there's an extremely small chance of that but I have no way to quantify it. In our case it was actually the other way around: we set this option and were reading and writing to GCS, however we were writing in a custom sink that used the GCS SDK directly, so everything worked as expected. It was only when we migrated to using the normal file sink that this issue arose. Does the Flink project have guidelines to approaching fixing this sort of issue, where users may depend on a definitely-broken behavior? If we cannot fix it directly (before Flink 2.0, I assume), do you have a suggestion for another workaround—perhaps just a new config key altogether? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]
mxm commented on code in PR #44: URL: https://github.com/apache/flink-connector-kafka/pull/44#discussion_r1448982380 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/DynamicKafkaSource.java: ## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source; Review Comment: Good to go. I can squash also on merge, as you prefer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled
[ https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805636#comment-17805636 ] Matthias Pohl commented on FLINK-34036: --- The test failures can be reproduced locally: {code} $ ./mvnw -Dflink.hadoop.version=3.2.3 -Phadoop3-tests,hive3 -pl flink-connectors/flink-connector-hive verify -Dtest=org.apache.flink.connectors.hive.HiveDialectQueryITCase {code} It could have something to do with the nightly builds only running on Alibaba machines for the Azure Pipeline builds. But we have seen this test already [succeeding on GHA workflows|https://github.com/XComp/flink/actions/runs/7278032529/job/19831862705#step:12:23467]. > Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with > Hadoop 3.1.3 enabled > -- > > Key: FLINK-34036 > URL: https://issues.apache.org/jira/browse/FLINK-34036 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hadoop Compatibility, Connectors / Hive >Affects Versions: 1.18.0, 1.19.0 >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions, test-stability > > The following {{HiveDialectQueryITCase}} tests fail consistently in the > FLINK-27075 GitHub Actions [master nightly > workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of > Flink (and also the [release-1.18 > workflow|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]): > * {{testInsertDirectory}} > * {{testCastTimeStampToDecimal}} > * {{testNullLiteralAsArgument}} > {code} > Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, > Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in > org.apache.flink.connectors.hive.HiveDialectQueryITCase > Error: 03:38:45 03:38:45.662 [ERROR] > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument > -- Time elapsed: 0.069 s <<< ERROR! > Jan 09 03:38:45 java.lang.NoSuchMethodError: > org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp; > Jan 09 03:38:45 at > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959) > Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498) > Jan 09 03:38:45 > Error: 03:38:45 03:38:45.662 [ERROR] > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal > -- Time elapsed: 0.007 s <<< ERROR! > Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with > identifier 'test-catalog.default.t1' does not exist. > Jan 09 03:38:45 at > org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266) > Jan 09 03:38:45 at > org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206) > Jan 09 03:38:45 at > org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74) > Jan 09 03:38:45 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107) > Jan 09 03:38:45 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735) > Jan 09 03:38:45 at > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835) > Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498) > Jan 09 03:38:45 > Error: 03:38:45 03:38:45.663 [ERROR] > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory > -- Time elapsed: 7.326 s <<< FAILURE! > Jan 09 03:38:45 org.opentest4j.AssertionFailedError: > Jan 09 03:38:45 > Jan 09 03:38:45 expected: "A:english=90#math=100#history=85" > Jan 09 03:38:45 but was: "A:english=90math=100history=85" > Jan 09 03:38:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Jan 09 03:38:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Jan 09 03:38:45 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > Jan 09 03:38:45 at > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498) > Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498) > {code} > Additionally, the {{HiveITCase}} in the e2e test suite is affected: > {code} > Error: 05:20:20 05:20:20.949 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 0.106 s <<< FAILURE! -- in > org.apache.flink.tests.hive.HiveITCase > Error:
Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]
mxm commented on code in PR #741: URL: https://github.com/apache/flink-kubernetes-operator/pull/741#discussion_r1448978956 ## flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java: ## @@ -59,4 +64,47 @@ private static ConfigOptions.OptionBuilder autoscalerStandaloneConfig(String key .withDeprecatedKeys("flinkClusterPort") .withDescription( "The port of flink cluster when the flink-cluster fetcher is used."); + +public static final ConfigOption STATE_STORE_TYPE = +autoscalerStandaloneConfig("state-store.type") +.enumType(StateStoreType.class) +.defaultValue(StateStoreType.MEMORY) +.withDescription("The autoscaler state store type."); Review Comment: Got it! Forgot about that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]
mxm commented on code in PR #741: URL: https://github.com/apache/flink-kubernetes-operator/pull/741#discussion_r1448978630 ## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java: ## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import org.apache.flink.annotation.VisibleForTesting; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_CREATE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_DELETE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_UPDATE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NOT_NEEDED; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.UP_TO_DATE; +import static org.apache.flink.util.Preconditions.checkState; + +/** The view of job state. */ +@NotThreadSafe +public class JobStateView { + +/** + * The state of state type about the cache and database. + * + * Note: {@link #inLocally} and {@link #inDatabase} are only for understand, we don't use + * them. + */ +@SuppressWarnings("unused") +enum State { + +/** State doesn't exist at database, and it's not used so far, so it's not needed. */ +NOT_NEEDED(false, false, false), +/** State is only stored locally, not created in JDBC database yet. */ +NEEDS_CREATE(true, false, true), +/** State exists in JDBC database but there are newer local changes. */ +NEEDS_UPDATE(true, true, true), +/** State is stored locally and in database, and they are same. */ +UP_TO_DATE(true, true, false), +/** State is stored in database, but it's deleted in local. */ +NEEDS_DELETE(false, true, true); Review Comment: Got it. The code complexity for adding a new state backend makes we wonder though, if we need to rethink some of the building blocks for state stores. It would be good to move more of the logic into the core, to avoid duplicating / writing similar logic. I was initially more in favor of moving this out of the core but we need to see if that might be too complicated in the long run. This is out of scope for this PR though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33771] Add cluster capacity awareness to autoscaler [flink-kubernetes-operator]
1996fanrui commented on code in PR #751: URL: https://github.com/apache/flink-kubernetes-operator/pull/751#discussion_r1448948315 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -247,6 +254,69 @@ private boolean isJobUnderMemoryPressure( return false; } +private boolean scalingWouldExceedClusterResources( +EvaluatedMetrics evaluatedMetrics, +Map scalingSummaries, +Context ctx) { + +final double taskManagerCpu = ctx.getTaskManagerCpu(); +final double taskManagerMemory = ctx.getTaskManagerMemory(); + +if (taskManagerCpu <= 0 || taskManagerMemory <= 0) { +// We can't extract the requirements, we can't make any assumptions +return false; +} + +var globalMetrics = evaluatedMetrics.getGlobalMetrics(); +if (!(globalMetrics.containsKey(ScalingMetric.NUM_TASK_MANAGERS) +&& globalMetrics.containsKey(ScalingMetric.NUM_TOTAL_TASK_SLOTS) +&& globalMetrics.containsKey(ScalingMetric.NUM_TASK_SLOTS_USED))) { +LOG.info("JM metrics not ready yet"); +return true; +} + +var vertexMetrics = evaluatedMetrics.getVertexMetrics(); + +int oldParallelismSum = +vertexMetrics.values().stream() +.map(map -> (int) map.get(ScalingMetric.PARALLELISM).getCurrent()) +.reduce(0, Integer::sum); + +Map newParallelisms = new HashMap<>(); +for (Map.Entry> entry : +vertexMetrics.entrySet()) { +JobVertexID jobVertexID = entry.getKey(); +ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID); +if (scalingSummary != null) { +newParallelisms.put(jobVertexID, scalingSummary.getNewParallelism()); +} else { +newParallelisms.put( +jobVertexID, +(int) entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent()); +} +} + +double numTaskSlotsUsed = globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent(); + +final int numTaskSlotsAfterRescale; +if (oldParallelismSum == numTaskSlotsUsed) { +// Slot sharing activated +numTaskSlotsAfterRescale = newParallelisms.values().stream().reduce(0, Integer::sum); Review Comment: It could happen when resource is not enough and using adaptive scheduler, right? ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -247,6 +254,69 @@ private boolean isJobUnderMemoryPressure( return false; } +private boolean scalingWouldExceedClusterResources( +EvaluatedMetrics evaluatedMetrics, +Map scalingSummaries, +Context ctx) { + +final double taskManagerCpu = ctx.getTaskManagerCpu(); +final double taskManagerMemory = ctx.getTaskManagerMemory(); + +if (taskManagerCpu <= 0 || taskManagerMemory <= 0) { +// We can't extract the requirements, we can't make any assumptions +return false; +} + +var globalMetrics = evaluatedMetrics.getGlobalMetrics(); +if (!(globalMetrics.containsKey(ScalingMetric.NUM_TASK_MANAGERS) +&& globalMetrics.containsKey(ScalingMetric.NUM_TOTAL_TASK_SLOTS) +&& globalMetrics.containsKey(ScalingMetric.NUM_TASK_SLOTS_USED))) { +LOG.info("JM metrics not ready yet"); +return true; +} + +var vertexMetrics = evaluatedMetrics.getVertexMetrics(); + +int oldParallelismSum = +vertexMetrics.values().stream() +.map(map -> (int) map.get(ScalingMetric.PARALLELISM).getCurrent()) +.reduce(0, Integer::sum); + +Map newParallelisms = new HashMap<>(); +for (Map.Entry> entry : +vertexMetrics.entrySet()) { +JobVertexID jobVertexID = entry.getKey(); +ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID); +if (scalingSummary != null) { +newParallelisms.put(jobVertexID, scalingSummary.getNewParallelism()); +} else { +newParallelisms.put( +jobVertexID, +(int) entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent()); +} +} + +double numTaskSlotsUsed = globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent(); + +final int numTaskSlotsAfterRescale; +if (oldParallelismSum == numTaskSlotsUsed) { +// Slot sharing activated +numTaskSlotsAfterRescale = newParallelisms.values().stream().reduce(0, Integer::sum); +} else { +// Assuming slot sharing is not activated +
Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]
1996fanrui commented on code in PR #741: URL: https://github.com/apache/flink-kubernetes-operator/pull/741#discussion_r1448941925 ## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java: ## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import org.apache.flink.annotation.VisibleForTesting; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_CREATE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_DELETE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_UPDATE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NOT_NEEDED; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.UP_TO_DATE; +import static org.apache.flink.util.Preconditions.checkState; + +/** The view of job state. */ +@NotThreadSafe +public class JobStateView { + +/** + * The state of state type about the cache and database. + * + * Note: {@link #inLocally} and {@link #inDatabase} are only for understand, we don't use + * them. + */ +@SuppressWarnings("unused") +enum State { + +/** State doesn't exist at database, and it's not used so far, so it's not needed. */ +NOT_NEEDED(false, false, false), +/** State is only stored locally, not created in JDBC database yet. */ +NEEDS_CREATE(true, false, true), +/** State exists in JDBC database but there are newer local changes. */ +NEEDS_UPDATE(true, true, true), +/** State is stored locally and in database, and they are same. */ +UP_TO_DATE(true, true, false), +/** State is stored in database, but it's deleted in local. */ +NEEDS_DELETE(false, true, true); Review Comment: Yeah, I refer to `ConfigMapView` during developing. I didn't share logic due to some reasons: 1. The View and State are different - `ConfigMapView` stores `ConfigMap` for each job, so we create it directly. - `JobStateView` stores state for each stateType, it means that part of state types of current job are stored in the database, but the rest of the state types may have been created in the database. - That's why `JobStateView` have `NOT_NEEDED` and `NEEDS_DELETE`, but `ConfigMapView` doesn't. 2. Based on the first reason, the `ConfigMapView` accesses the whole ConfigMap, `JobStateView` accesses each row of database. It's hard to share code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]
1996fanrui commented on code in PR #741: URL: https://github.com/apache/flink-kubernetes-operator/pull/741#discussion_r1448928730 ## flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java: ## @@ -59,4 +64,47 @@ private static ConfigOptions.OptionBuilder autoscalerStandaloneConfig(String key .withDeprecatedKeys("flinkClusterPort") .withDescription( "The port of flink cluster when the flink-cluster fetcher is used."); + +public static final ConfigOption STATE_STORE_TYPE = +autoscalerStandaloneConfig("state-store.type") +.enumType(StateStoreType.class) +.defaultValue(StateStoreType.MEMORY) +.withDescription("The autoscaler state store type."); Review Comment: Emmm, I didn't list due to flink-kubernetes-operator doc will list automaticlly. You can take a look the generated html, it has all enums of `StateStoreType`. And all old options have `Possible values:` as well. https://github.com/apache/flink-kubernetes-operator/assets/38427477/ba2fa8b5-dc0e-4bb1-8f8f-d493c61a9bab;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33771] Add cluster capacity awareness to autoscaler [flink-kubernetes-operator]
1996fanrui commented on code in PR #751: URL: https://github.com/apache/flink-kubernetes-operator/pull/751#discussion_r1448858318 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/resources/ResourceCheck.java: ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.resources; + +/** An interface for checking the available capacity of the underlying resources. */ +public interface ResourceCheck { + +/** Simulates scheduling the provided number of resources. */ Review Comment: It's better to add some comments to explain the return value. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/resources/ResourceCheck.java: ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.resources; + +/** An interface for checking the available capacity of the underlying resources. */ +public interface ResourceCheck { + +/** Simulates scheduling the provided number of resources. */ +boolean trySchedule( +int currentInstances, +int newInstances, Review Comment: ```suggestion int currentNumberOfTms, int newNumberOfTms, ``` How about it? Instance is not specific. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java: ## @@ -103,6 +106,42 @@ protected Map queryAggregatedVertexMetrics( } } +@Override +@SneakyThrows +protected Map queryJmMetrics(Context ctx) { +Map metrics = +Map.of( +"numRegisteredTaskManagers", FlinkMetric.NUM_TASK_MANAGERS, +"taskSlotsTotal", FlinkMetric.NUM_TASK_SLOTS_TOTAL, +"taskSlotsAvailable", FlinkMetric.NUM_TASK_SLOTS_AVAILABLE); Review Comment: All old metrics are queried and filter them via the `predicate` before. These new metrics don't use the old solution. They are queried directly, and don't use the `predicate`. Could we abstract the FlinkMetric for them? We can define 2 types metrics: QueryableMetric and PredicatableMetric. WDYT? ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/resources/ResourceRequirements.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.resources; + +/** Interface to extract resource requirements. */ +public interface ResourceRequirements { + +/** The required number of CPU for TaskManager nodes, if available, otherwise 0. */ +default double getTaskManagerCpu() {
Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]
mxm commented on code in PR #741: URL: https://github.com/apache/flink-kubernetes-operator/pull/741#discussion_r1448905911 ## flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java: ## @@ -59,4 +64,47 @@ private static ConfigOptions.OptionBuilder autoscalerStandaloneConfig(String key .withDeprecatedKeys("flinkClusterPort") .withDescription( "The port of flink cluster when the flink-cluster fetcher is used."); + +public static final ConfigOption STATE_STORE_TYPE = +autoscalerStandaloneConfig("state-store.type") +.enumType(StateStoreType.class) +.defaultValue(StateStoreType.MEMORY) +.withDescription("The autoscaler state store type."); Review Comment: We should list the options here. ## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java: ## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import org.apache.flink.annotation.VisibleForTesting; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_CREATE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_DELETE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_UPDATE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NOT_NEEDED; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.UP_TO_DATE; +import static org.apache.flink.util.Preconditions.checkState; + +/** The view of job state. */ +@NotThreadSafe +public class JobStateView { + +/** + * The state of state type about the cache and database. + * + * Note: {@link #inLocally} and {@link #inDatabase} are only for understand, we don't use + * them. + */ +@SuppressWarnings("unused") +enum State { + +/** State doesn't exist at database, and it's not used so far, so it's not needed. */ +NOT_NEEDED(false, false, false), +/** State is only stored locally, not created in JDBC database yet. */ +NEEDS_CREATE(true, false, true), +/** State exists in JDBC database but there are newer local changes. */ +NEEDS_UPDATE(true, true, true), +/** State is stored locally and in database, and they are same. */ +UP_TO_DATE(true, true, false), +/** State is stored in database, but it's deleted in local. */ +NEEDS_DELETE(false, true, true); Review Comment: This looks conceptually very similar to `ConfigMapView`. I wonder if we could have shared some of that logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] DO_NOT_MERGE_YET[FLINK-33268][rest] Skip unknown fields in REST response deserialization [flink]
gaborgsomogyi commented on PR #23930: URL: https://github.com/apache/flink/pull/23930#issuecomment-1887214969 The approach looks fine though not fully executed the full pipeline so added an automated test to enforce `ErrorResponseBody` structure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33930] Canceled stop job status exception [flink-kubernetes-operator]
gyfora commented on code in PR #740: URL: https://github.com/apache/flink-kubernetes-operator/pull/740#discussion_r1448906275 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -369,16 +370,17 @@ protected void cancelJob( deleteClusterDeployment( deployment.getMetadata(), deploymentStatus, conf, true); } + deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name()); break; case LAST_STATE: deleteClusterDeployment( deployment.getMetadata(), deploymentStatus, conf, false); + deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name()); break; default: throw new RuntimeException("Unsupported upgrade mode " + upgradeMode); } } -deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name()); Review Comment: The problem with this and which also complicates the PR is that in some cases the job is already finished / failed or other terminal state in which case no action was taken and we should leave the status as is. As you said, it's a lot more simpler to simply set the state to "FINISHED" after cancellation :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org