Re: [PR] [FLINK-31481][table] Support enhanced show databases syntax [flink]

2024-01-11 Thread via GitHub


jeyhunkarimov commented on code in PR #23612:
URL: https://github.com/apache/flink/pull/23612#discussion_r1449994298


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowDatabasesConverter.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.dql.SqlShowDatabases;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ShowDatabasesOperation;
+
+/** A converter for {@link SqlShowDatabases}. */
+public class SqlShowDatabasesConverter implements 
SqlNodeConverter {
+
+@Override
+public Operation convertSqlNode(SqlShowDatabases sqlShowDatabases, 
ConvertContext context) {
+if (sqlShowDatabases.getPreposition() == null) {
+return new ShowDatabasesOperation(
+sqlShowDatabases.getLikeType(),
+sqlShowDatabases.getLikeSqlPattern(),
+sqlShowDatabases.isNotLike());
+} else {
+CatalogManager catalogManager = context.getCatalogManager();
+String[] fullCatalogName = sqlShowDatabases.getCatalog();
+String catalogName =
+fullCatalogName.length == 0
+? catalogManager.getCurrentCatalog()
+: fullCatalogName[0];

Review Comment:
   ok, got it. Good point. No, if the preposition is not null, catalog name 
should be present. I updated `SqlShowDatabasesConverter `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-34047) Inject GitHub Actions/Azure Pipelines env variables in uploading_watchdog.sh

2024-01-11 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl resolved FLINK-34047.
---
Fix Version/s: 1.19.0
   1.18.1
   Resolution: Fixed

master: 
[f29ee07f5e568fa94da75f3879a9aa7016dfcd5a|https://github.com/apache/flink/commit/f29ee07f5e568fa94da75f3879a9aa7016dfcd5a]
1.18: 
[3090e772192f4f9444e030b199cd95fc6c5afdd5|https://github.com/apache/flink/commit/3090e772192f4f9444e030b199cd95fc6c5afdd5]

> Inject GitHub Actions/Azure Pipelines env variables in uploading_watchdog.sh
> 
>
> Key: FLINK-34047
> URL: https://issues.apache.org/jira/browse/FLINK-34047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System / CI
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, pull-request-available
> Fix For: 1.19.0, 1.18.1
>
>
> The workflow that's triggered by 
> {{tools/azure-pipelines/uploading_watchdog.sh}} relies on CI specific 
> environment variables. We should make the two different CI backends explicit 
> in this script to improve the code readability.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-1.18][FLINK-34047][ci] Makes uploading_watchdog.sh support Azure Pipelines and GitHub Actions [flink]

2024-01-11 Thread via GitHub


XComp merged PR #24063:
URL: https://github.com/apache/flink/pull/24063


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34047][ci] Make CI scripts agnostic to CI environment [flink]

2024-01-11 Thread via GitHub


XComp merged PR #24061:
URL: https://github.com/apache/flink/pull/24061


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33768] Support dynamic source parallelism inference for batch jobs [flink]

2024-01-11 Thread via GitHub


flinkbot commented on PR #24078:
URL: https://github.com/apache/flink/pull/24078#issuecomment-1888578089

   
   ## CI report:
   
   * 547c3c10e8bb800244d18ce1c4cc23642bc460bf UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33268][rest] Skip unknown fields in REST response deserialization [flink]

2024-01-11 Thread via GitHub


gaborgsomogyi commented on PR #23930:
URL: https://github.com/apache/flink/pull/23930#issuecomment-1888577653

   The change looks good, but timeout happened.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33268][rest] Skip unknown fields in REST response deserialization [flink]

2024-01-11 Thread via GitHub


gaborgsomogyi commented on PR #23930:
URL: https://github.com/apache/flink/pull/23930#issuecomment-1888576964

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]

2024-01-11 Thread via GitHub


XComp commented on code in PR #23971:
URL: https://github.com/apache/flink/pull/23971#discussion_r1449977361


##
.github/actions/select_workflow_configs/action.yml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: "Selects Flink workflow configurations"
+description: "Loads the relevant workflow configuration from a given folder."
+inputs:
+  config-folder:
+description: "The directory in which the workflow configurations are 
located."
+required: true
+  profile:
+description: >-
+  The profile that's used to access the desired workflow configuration 
+  file. Any invalid profile (i.e. no file exists) will lead to using 
+  the default workflow configurations. Configuration files are expected 
+  to have the name format 'flink-workflow..json' and should be 
+  located in the folder that's specified through the 'config-folder' 
parameter.
+required: true
+outputs:
+  workflow-configurations:
+description: "A JSON representation of the workflow configurations."
+value: "${{ steps.workflow-selection.outputs.configs }}"
+runs:
+  using: "composite"
+  steps:
+- name: "Loads Flink workflow configurations"
+  id: workflow-selection
+  shell: bash
+  run: |
+fallback_profile="default"
+fallback_path="${{ inputs.config-folder 
}}/flink-workflow.${fallback_profile}.json"

Review Comment:
   > Scheduled workflow runs will be triggered from master. The workflow 
configuration (the json file) is therefore also loaded from master. The 
workflow then triggers the checkout of individual branches.
   
   The following build failure (which happened in release-1.18) seems to prove 
me wrong: https://github.com/XComp/flink/actions/runs/7498584099 I have 
forgotten to backport the job_init action into my fork's `release-1.18` branch 
in that case.
   
   I assume it being true also for the configuration files. So, your solution 
is viable. That means that we could have one workflow configuration file per 
branch. We will still keep the functionality around configuration files in 
general to avoid having too much duplicate code for each of the nightly 
workflows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]

2024-01-11 Thread via GitHub


XComp commented on code in PR #23971:
URL: https://github.com/apache/flink/pull/23971#discussion_r1449977361


##
.github/actions/select_workflow_configs/action.yml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: "Selects Flink workflow configurations"
+description: "Loads the relevant workflow configuration from a given folder."
+inputs:
+  config-folder:
+description: "The directory in which the workflow configurations are 
located."
+required: true
+  profile:
+description: >-
+  The profile that's used to access the desired workflow configuration 
+  file. Any invalid profile (i.e. no file exists) will lead to using 
+  the default workflow configurations. Configuration files are expected 
+  to have the name format 'flink-workflow..json' and should be 
+  located in the folder that's specified through the 'config-folder' 
parameter.
+required: true
+outputs:
+  workflow-configurations:
+description: "A JSON representation of the workflow configurations."
+value: "${{ steps.workflow-selection.outputs.configs }}"
+runs:
+  using: "composite"
+  steps:
+- name: "Loads Flink workflow configurations"
+  id: workflow-selection
+  shell: bash
+  run: |
+fallback_profile="default"
+fallback_path="${{ inputs.config-folder 
}}/flink-workflow.${fallback_profile}.json"

Review Comment:
   > Scheduled workflow runs will be triggered from master. The workflow 
configuration (the json file) is therefore also loaded from master. The 
workflow then triggers the checkout of individual branches.
   
   The following build failure (which happened in release-1.18) seems to prove 
me wrong: https://github.com/XComp/flink/actions/runs/7498584099 I have 
forgotten to backport the job_init action into my fork's `release-1.18` branch.
   
   Indeed, dedicated config files (e.g. custom actions and as a consequence 
also loaded from the branch). So your solution is viable. My reasoning that all 
the config is picked up from master while scheduling the build appears to be 
wrong. That means that we could have one workflow configuration file per 
branch. We will still keep the functionality around configuration files in 
general to avoid having too much duplicate code for each of the nightly 
workflows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34034][docs] Update the query hint docs to clarify the resolution of conflicts in kv hint and list hint [flink]

2024-01-11 Thread via GitHub


flinkbot commented on PR #24077:
URL: https://github.com/apache/flink/pull/24077#issuecomment-1888570663

   
   ## CI report:
   
   * d78dc4459252f1f105d0bc4337fa64726b834665 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]

2024-01-11 Thread via GitHub


XComp commented on code in PR #23971:
URL: https://github.com/apache/flink/pull/23971#discussion_r1449977361


##
.github/actions/select_workflow_configs/action.yml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: "Selects Flink workflow configurations"
+description: "Loads the relevant workflow configuration from a given folder."
+inputs:
+  config-folder:
+description: "The directory in which the workflow configurations are 
located."
+required: true
+  profile:
+description: >-
+  The profile that's used to access the desired workflow configuration 
+  file. Any invalid profile (i.e. no file exists) will lead to using 
+  the default workflow configurations. Configuration files are expected 
+  to have the name format 'flink-workflow..json' and should be 
+  located in the folder that's specified through the 'config-folder' 
parameter.
+required: true
+outputs:
+  workflow-configurations:
+description: "A JSON representation of the workflow configurations."
+value: "${{ steps.workflow-selection.outputs.configs }}"
+runs:
+  using: "composite"
+  steps:
+- name: "Loads Flink workflow configurations"
+  id: workflow-selection
+  shell: bash
+  run: |
+fallback_profile="default"
+fallback_path="${{ inputs.config-folder 
}}/flink-workflow.${fallback_profile}.json"

Review Comment:
   > Scheduled workflow runs will be triggered from master. The workflow 
configuration (the json file) is therefore also loaded from master. The 
workflow then triggers the checkout of individual branches.
   
   The following build failure (which happened in release-1.18) seems to prove 
me wrong: https://github.com/XComp/flink/actions/runs/7498584099
   
   Indeed, dedicated config files (e.g. custom actions and as a consequence 
also loaded from the branch). So your solution is viable. My reasoning that all 
the config is picked up from master while scheduling the build appears to be 
wrong. That means that we could have one workflow configuration file per 
branch. We will still keep the functionality around configuration files in 
general to avoid having too much duplicate code for each of the nightly 
workflows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-34040) ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration fails in GHA with JDK 17 and 21

2024-01-11 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl resolved FLINK-34040.
---
Resolution: Fixed

The error was, indeed caused by a wrong setup of the {{github-actions}} profile 
which caused the build to ignore the @FailsOnJava17 flag of the test.

> ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration fails in 
> GHA with JDK 17 and 21
> 
>
> Key: FLINK-34040
> URL: https://issues.apache.org/jira/browse/FLINK-34040
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Scala, Build System / CI
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> {code}
> Error: 13:05:23 13:05:23.538 [ERROR] Tests run: 1, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 0.375 s <<< FAILURE! -- in 
> org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest
> Error: 13:05:23 13:05:23.538 [ERROR] 
> org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration
>  -- Time elapsed: 0.371 s <<< FAILURE!
> Jan 07 13:05:23 org.junit.ComparisonFailure: 
> expected:<...MigrationTest$$anon$[8]> but was:<...MigrationTest$$anon$[1]>
> Jan 07 13:05:23   at org.junit.Assert.assertEquals(Assert.java:117)
> Jan 07 13:05:23   at org.junit.Assert.assertEquals(Assert.java:146)
> Jan 07 13:05:23   at 
> org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration(ScalaSerializersMigrationTest.scala:60)
> Jan 07 13:05:23   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:580)
> {code}
> The error only happens in the [master GHA 
> nightly|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] for 
> JDK 17 and 21.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33768] Support dynamic source parallelism inference for batch jobs [flink]

2024-01-11 Thread via GitHub


SinBex opened a new pull request, #24078:
URL: https://github.com/apache/flink/pull/24078

   ## What is the purpose of the change
   
   Currently, for JobVertices without parallelism configured, the 
AdaptiveBatchScheduler dynamically infers the vertex parallelism based on the 
volume of input data. Specifically, for Source vertices, it uses the value of 
`execution.batch.adaptive.auto-parallelism.default-source-parallelism` as the 
fixed parallelism. If this is not set by the user, the default value of 1  is 
used as the source parallelism, which is actually a temporary implementation 
solution.
   
   We aim to support dynamic source parallelism inference for batch jobs
   ## Brief change log
   
 - *Lazily initialize the parallelism of the OperatorCoordinator.*
 - *Add the `DynamicParallelismInference` and `DynamicFilteringInfo` 
interfaces, and enable the `SourceCoordinator` to invoke corresponding methods 
for dynamic parallelism inference.*
 - *The `AdaptiveBatchScheduler` applies dynamic source parallelism 
inference, and to avoid blocking the main thread by calling external systems, 
we have transformed the scheduling process to be asynchronous.*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - *Added integration tests to verify the end-to-end logic of dynamic 
parallelism inference, see 
`AdaptiveBatchSchedulerITCase#testSchedulingWithDynamicSourceParallelismInference`
 for details.*
 - *Added unit tests for the newly added methods in classes such as 
`SourceCoordinator` and `AdaptiveBatchScheduler`.*
 - *Manually verified the feature on a Flink session cluster (1 JobManager, 
77 TaskManagers), including dynamic inference of parallelism, asynchronous 
scheduling, and execution exceptions in `DynamicParallelismInference`, all 
performing as expected.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs / JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange

2024-01-11 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805915#comment-17805915
 ] 

Hangxiang Yu commented on FLINK-34050:
--

Thanks [~lijinzhong] and [~mayuehappy] providing this information.

IMO, maybe deleteRange+deleteFilesInRanges could be a good default behavious:
 # Only delteRange may cause some extra space usage which even last forever.
 # adding deleteFilesInRanges should not cost too much time than before because 
checking then deleting files should be quick.
 # Remaining files should still contain ranges which current keys and 
compaction will finally access. 

Of course, we could provide some performace check about 
deleteRange+deleteFilesInRanges vs deleteRange.

> Rocksdb state has space amplification after rescaling with DeleteRange
> --
>
> Key: FLINK-34050
> URL: https://issues.apache.org/jira/browse/FLINK-34050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Major
> Attachments: image-2024-01-10-21-23-48-134.png, 
> image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png
>
>
> FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will 
> cause space amplification in some case.
> We can reproduce this problem using wordCount job:
> 1) before rescaling, state operator in wordCount job has 2 parallelism and 
> 4G+ full checkpoint size;
> !image-2024-01-10-21-24-10-983.png|width=266,height=130!
> 2) then restart job with 4 parallelism (for state operator),  the full 
> checkpoint size of new job will be 8G+ ;
> 3) after many successful checkpoints, the full checkpoint size is still 8G+;
> !image-2024-01-10-21-28-24-312.png|width=454,height=111!
>  
> The root cause of this issue is that the deleted keyGroupRange does not 
> overlap with current DB keyGroupRange, so new data written into rocksdb after 
> rescaling almost never do LSM compaction with the deleted data (belonging to 
> other keyGroupRange.)
>  
> And the space amplification may affect Rocksdb read performance and disk 
> space usage after rescaling. It looks like a regression due to the 
> introduction of deleteRange for rescaling optimization.
>  
> To slove this problem, I think maybe we can invoke 
> Rocksdb.deleteFilesInRanges after deleteRange?
> {code:java}
> public static void clipDBWithKeyGroupRange() {
>   //...
>   List ranges = new ArrayList<>();
>   //...
>   deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
>   ranges.add(beginKeyGroupBytes);
>   ranges.add(endKeyGroupBytes);
>   //
>   for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
>  db.deleteFilesInRanges(columnFamilyHandle, ranges, false);
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33768) Support dynamic source parallelism inference for batch jobs

2024-01-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33768:
---
Labels: pull-request-available  (was: )

> Support dynamic source parallelism inference for batch jobs
> ---
>
> Key: FLINK-33768
> URL: https://issues.apache.org/jira/browse/FLINK-33768
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0
>Reporter: xingbe
>Assignee: xingbe
>Priority: Major
>  Labels: pull-request-available
>
> Currently, for JobVertices without parallelism configured, the 
> AdaptiveBatchScheduler dynamically infers the vertex parallelism based on the 
> volume of input data. Specifically, for Source vertices, it uses the value of 
> `{*}execution.batch.adaptive.auto-parallelism.default-source-parallelism{*}` 
> as the fixed parallelism. If this is not set by the user, the default value 
> of {{1}}  is used as the source parallelism, which is actually a temporary 
> implementation solution.
> We aim to support dynamic source parallelism inference for batch jobs. More 
> details see 
> [FLIP-379|https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled

2024-01-11 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl resolved FLINK-34036.
---
Resolution: Fixed

Yup, it was caused by what I described in my comment above. Thanks for the 
pointer, [~chesnay]. I'm closing this issue.

> Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with 
> Hadoop 3.1.3 enabled
> --
>
> Key: FLINK-34036
> URL: https://issues.apache.org/jira/browse/FLINK-34036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hadoop Compatibility, Connectors / Hive
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> The following {{HiveDialectQueryITCase}} tests fail consistently in the 
> FLINK-27075 GitHub Actions [master nightly 
> workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of 
> Flink (and also the [release-1.18 
> workflow|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]):
> * {{testInsertDirectory}}
> * {{testCastTimeStampToDecimal}}
> * {{testNullLiteralAsArgument}}
> {code}
> Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
> Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
>  -- Time elapsed: 0.069 s <<< ERROR!
> Jan 09 03:38:45 java.lang.NoSuchMethodError: 
> org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp;
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal
>  -- Time elapsed: 0.007 s <<< ERROR!
> Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with 
> identifier 'test-catalog.default.t1' does not exist.
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266)
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206)
> Jan 09 03:38:45   at 
> org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.663 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory 
> -- Time elapsed: 7.326 s <<< FAILURE!
> Jan 09 03:38:45 org.opentest4j.AssertionFailedError: 
> Jan 09 03:38:45 
> Jan 09 03:38:45 expected: "A:english=90#math=100#history=85"
> Jan 09 03:38:45  but was: "A:english=90math=100history=85"
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Jan 09 03:38:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}
> Additionally, the {{HiveITCase}} in the e2e test suite is affected:
> {code}
> Error: 05:20:20 05:20:20.949 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 0.106 s <<< FAILURE! -- in 
> org.apache.flink.tests.hive.HiveITCase
> Error: 05:20:20 05:20:20.949 [ERROR] org.apache.flink.tests.hive.HiveITCase 
> -- Time elapsed: 0.106 s <<< ERROR!
> Jan 07 05:20:20 java.lang.ExceptionInInitializerError
> Jan 07 05:20:20   at sun.misc.Unsafe.ensureClassInitialized(Native Method)
> Jan 07 05:20:20   at 
> java.lang.reflect.Field.acquireFieldAccessor(Field.java:1088)
> Jan 07 05:20:20   at 
> 

[PR] [FLINK-34034][docs] Update the query hint docs to clarify the resolution of conflicts in kv hint and list hint [flink]

2024-01-11 Thread via GitHub


swuferhong opened a new pull request, #24077:
URL: https://github.com/apache/flink/pull/24077

   
   
   ## What is the purpose of the change
   
   This pr is aims to update the query hint docs to clarify the resolution of 
conflicts in kv hint and list hint.
   
   
   ## Brief change log
   
   - update the query hint docs to clarify the resolution of conflicts in kv 
hint and list hint.
   
   
   ## Verifying this change
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector:no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34034) When kv hint and list hint handle duplicate query hints, the results are different.

2024-01-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34034:
---
Labels: pull-request-available  (was: )

> When kv hint and list hint handle duplicate query hints, the results are 
> different.
> ---
>
> Key: FLINK-34034
> URL: https://issues.apache.org/jira/browse/FLINK-34034
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Minor
>  Labels: pull-request-available
>
> When there are duplicate keys in the kv hint, calcite will overwrite the 
> previous value with the later value.
> {code:java}
> @TestTemplate
> def test(): Unit = {
>   val sql =
> "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 
> 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3', 
> 'max-attempts'='4') */ * FROM MyTable AS T JOIN LookupTable " +
>   "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
>   util.verifyExecPlan(sql)
> } {code}
> {code:java}
> Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 
> id, name, age]) 
>   +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
> joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
> name, age], retry=[lookup_miss, FIXED_DELAY, 1ms, 4]) 
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> But when a list hint is duplicated (such as a join hint), we will choose the 
> first one as the effective hint.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33705] Upgrade to flink-shaded 18.0 [flink]

2024-01-11 Thread via GitHub


snuyanzin commented on PR #23838:
URL: https://github.com/apache/flink/pull/23838#issuecomment-1888545344

   Thanks for having a look I will rebase to the latest branch and split it in 
2 commits: one for adopting jackson breaking change mentioned in description 
and another for the actual update. It should simplify code navigation in future


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-33946) RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel

2024-01-11 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-33946.
--
Resolution: Fixed

merged aa0c1b5e into master

> RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel
> 
>
> Key: FLINK-33946
> URL: https://issues.apache.org/jira/browse/FLINK-33946
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> When a Job fails, the task needs to be canceled and re-deployed. 
> RocksDBStatebackend will call RocksDB.close when disposing.
> {code:java}
> if (!shutting_down_.load(std::memory_order_acquire) &&
> has_unpersisted_data_.load(std::memory_order_relaxed) &&
> !mutable_db_options_.avoid_flush_during_shutdown) {
>   if (immutable_db_options_.atomic_flush) {
> autovector cfds;
> SelectColumnFamiliesForAtomicFlush();
> mutex_.Unlock();
> Status s =
> AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
>   } else {
> for (auto cfd : *versions_->GetColumnFamilySet()) {
>   if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
> cfd->Ref();
> mutex_.Unlock();
> Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
> cfd->UnrefAndTryDelete();
>   }
> }
>   } {code}
> By default (avoid_flush_during_shutdown=false) RocksDb requires FlushMemtable 
> when Close. When the disk pressure is high or the Memtable is large, this 
> process will be more time-consuming, which will cause the Task to get stuck 
> in the Canceling stage and affect the speed of job Failover.
> In fact, it is completely unnecessary to Flush memtable when Flink Task is 
> Close, because the data can be replayed from Checkpoint. So we can set 
> avoid_flush_during_shutdown to true to speed up Task Failover



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33946] [rocksdb] set AvoidFlushDuringShutdown to true for avoiding Flush when Cancel Task [flink]

2024-01-11 Thread via GitHub


masteryhx closed pull request #24069: [FLINK-33946] [rocksdb]  set 
AvoidFlushDuringShutdown to true for avoiding Flush when Cancel Task
URL: https://github.com/apache/flink/pull/24069


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-34051) Fix equals/hashCode/toString for SavepointRestoreSettings

2024-01-11 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-34051.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged 7745ef83 and ecb68704 into master

> Fix equals/hashCode/toString for SavepointRestoreSettings
> -
>
> Key: FLINK-34051
> URL: https://issues.apache.org/jira/browse/FLINK-34051
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.19.0
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> SavepointRestoreSettings#equals/hashCode/toString missed restoreMode property



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]

2024-01-11 Thread via GitHub


masteryhx closed pull request #24066: [FLINK-34051][checkpoint] Fix 
equals/hashCode/toString for SavepointRestoreSettings
URL: https://github.com/apache/flink/pull/24066


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34067] Fix javacc warnings in flink-sql-parser [flink]

2024-01-11 Thread via GitHub


flinkbot commented on PR #24076:
URL: https://github.com/apache/flink/pull/24076#issuecomment-1888409523

   
   ## CI report:
   
   * 319b84aca99ca02d59dbfb7139a92472d0b19fd3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34066][table-planner] Fix LagFunction throw NPE when input argument are not null [flink]

2024-01-11 Thread via GitHub


flinkbot commented on PR #24075:
URL: https://github.com/apache/flink/pull/24075#issuecomment-1888405606

   
   ## CI report:
   
   * a1aa7a2284cee539e17b951a585962a416360104 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34067) Fix javacc warnings in flink-sql-parser

2024-01-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34067:
---
Labels: pull-request-available  (was: )

> Fix javacc warnings in flink-sql-parser
> ---
>
> Key: FLINK-34067
> URL: https://issues.apache.org/jira/browse/FLINK-34067
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.19.0
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Minor
>  Labels: pull-request-available
>
> While extending the Flink SQL parser, I noticed these two warnings:
> ```
> [INFO] — javacc:2.4:javacc (javacc) @ flink-sql-parser ---                    
>                              
> Java Compiler Compiler Version 4.0 (Parser Generator)                         
>                               
> (type "javacc" with no arguments for help)                                    
>                                                              
> Reading from file 
> .../flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . 
> . .               
> Note: UNICODE_INPUT option is specified. Please make sure you create the 
> parser/lexer using a Reader with the correct character encoding.  
> Warning: Choice conflict involving two expansions at                          
>                                  
>          line 2043, column 13 and line 2052, column 9 respectively.           
>                              
>          A common prefix is: "IF"                                             
>                                                               Consider using 
> a lookahead of 2 for earlier expansion.                                       
>          
> Warning: Choice conflict involving two expansions at                          
>                                 
>          line 2097, column 13 and line 2105, column 8 respectively.           
>                               
>          A common prefix is: "IF"                                             
>                                                      
>          Consider using a lookahead of 2 for earlier expansion.     
> ```
> As the warning suggestions, adding `LOOKAHEAD(2)` in a few places addresses 
> the warning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34067] Fix javacc warnings in flink-sql-parser [flink]

2024-01-11 Thread via GitHub


jnh5y opened a new pull request, #24076:
URL: https://github.com/apache/flink/pull/24076

   ## What is the purpose of the change
   
   Address the two warnings in the build for the Flink SQL Parser:
   
   ```
   [INFO] — javacc:2.4:javacc (javacc) @ flink-sql-parser ---   
  
   Java Compiler Compiler Version 4.0 (Parser Generator)
   
   (type "javacc" with no arguments for help)   
  
   Reading from file 
.../flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . . 
.   
   Note: UNICODE_INPUT option is specified. Please make sure you create the 
parser/lexer using a Reader with the correct character encoding.  
   Warning: Choice conflict involving two expansions at 
  
line 2043, column 13 and line 2052, column 9 respectively.  
  
A common prefix is: "IF"
   Consider using a lookahead of 2 for earlier expansion.   
 
   Warning: Choice conflict involving two expansions at 
 
line 2097, column 13 and line 2105, column 8 respectively.  
   
A common prefix is: "IF"
  
Consider using a lookahead of 2 for earlier expansion. 
   ```
   
   ## Brief change log
   Adds `LOOKAHEAD(2)` to the necessary places in `parserImpls.ftl`.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   One can verify the change by building with `mvn -Pfast -f 
flink-table/flink-sql-parser install`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34067) Fix javacc warnings in flink-sql-parser

2024-01-11 Thread Jim Hughes (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jim Hughes updated FLINK-34067:
---
Description: 
While extending the Flink SQL parser, I noticed these two warnings:
```
[INFO] — javacc:2.4:javacc (javacc) @ flink-sql-parser ---                      
                           
Java Compiler Compiler Version 4.0 (Parser Generator)                           
                            
(type "javacc" with no arguments for help)                                      
                                                           
Reading from file 
.../flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . . 
.               
Note: UNICODE_INPUT option is specified. Please make sure you create the 
parser/lexer using a Reader with the correct character encoding.  
Warning: Choice conflict involving two expansions at                            
                               
         line 2043, column 13 and line 2052, column 9 respectively.             
                           
         A common prefix is: "IF"                                               
                                                            Consider using a 
lookahead of 2 for earlier expansion.                                           
     
Warning: Choice conflict involving two expansions at                            
                              
         line 2097, column 13 and line 2105, column 8 respectively.             
                            
         A common prefix is: "IF"                                               
                                                   
         Consider using a lookahead of 2 for earlier expansion.     
```

As the warning suggestions, adding `LOOKAHEAD(2)` in a few places addresses the 
warning.

  was:
While extending the Flink SQL parser, I noticed these two warnings:

```
[INFO] --- javacc:2.4:javacc (javacc) @ flink-sql-parser ---                    
                             
Java Compiler Compiler Version 4.0 (Parser Generator)                           
                            
(type "javacc" with no arguments for help)                                      
                                                           
Reading from file 
.../flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . . 
.               
Note: UNICODE_INPUT option is specified. Please make sure you create the 
parser/lexer using a Reader with the correct character encoding.  
Warning: Choice conflict involving two expansions at                            
                               
         line 2043, column 13 and line 2052, column 9 respectively.             
                           
         A common prefix is: "IF"                                               
                                                            Consider using a 
lookahead of 2 for earlier expansion.                                           
     
Warning: Choice conflict involving two expansions at                            
                              
         line 2097, column 13 and line 2105, column 8 respectively.             
                            
         A common prefix is: "IF"                                               
                                                   
         Consider using a lookahead of 2 for earlier expansion.     
```

As the warning suggestions, adding `LOOKAHEAD(2)` in a few places addresses the 
warning.


> Fix javacc warnings in flink-sql-parser
> ---
>
> Key: FLINK-34067
> URL: https://issues.apache.org/jira/browse/FLINK-34067
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.19.0
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Minor
>
> While extending the Flink SQL parser, I noticed these two warnings:
> ```
> [INFO] — javacc:2.4:javacc (javacc) @ flink-sql-parser ---                    
>                              
> Java Compiler Compiler Version 4.0 (Parser Generator)                         
>                               
> (type "javacc" with no arguments for help)                                    
>                                                              
> Reading from file 
> .../flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . 
> . .               
> Note: UNICODE_INPUT option is specified. Please make sure you create the 
> parser/lexer using a Reader with the correct character encoding.  
> Warning: Choice conflict involving two expansions at                          
>                                  
>          line 2043, column 13 and line 2052, column 9 respectively.           
>                              
>          A common prefix is: "IF"                                             
>        

[jira] [Created] (FLINK-34067) Fix javacc warnings in flink-sql-parser

2024-01-11 Thread Jim Hughes (Jira)
Jim Hughes created FLINK-34067:
--

 Summary: Fix javacc warnings in flink-sql-parser
 Key: FLINK-34067
 URL: https://issues.apache.org/jira/browse/FLINK-34067
 Project: Flink
  Issue Type: Improvement
Reporter: Jim Hughes
Assignee: Jim Hughes


While extending the Flink SQL parser, I noticed these two warnings:

```
[INFO] --- javacc:2.4:javacc (javacc) @ flink-sql-parser ---                    
                             
Java Compiler Compiler Version 4.0 (Parser Generator)                           
                            
(type "javacc" with no arguments for help)                                      
                                                           
Reading from file 
.../flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . . 
.               
Note: UNICODE_INPUT option is specified. Please make sure you create the 
parser/lexer using a Reader with the correct character encoding.  
Warning: Choice conflict involving two expansions at                            
                               
         line 2043, column 13 and line 2052, column 9 respectively.             
                           
         A common prefix is: "IF"                                               
                                                            Consider using a 
lookahead of 2 for earlier expansion.                                           
     
Warning: Choice conflict involving two expansions at                            
                              
         line 2097, column 13 and line 2105, column 8 respectively.             
                            
         A common prefix is: "IF"                                               
                                                   
         Consider using a lookahead of 2 for earlier expansion.     
```

As the warning suggestions, adding `LOOKAHEAD(2)` in a few places addresses the 
warning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34066) LagFunction throw NPE when input argument are not null

2024-01-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34066:
---
Labels: pull-request-available  (was: )

> LagFunction throw NPE when input argument are not null
> --
>
> Key: FLINK-34066
> URL: https://issues.apache.org/jira/browse/FLINK-34066
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> LagFunction throw NPE when input argument are not null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34067) Fix javacc warnings in flink-sql-parser

2024-01-11 Thread Jim Hughes (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jim Hughes updated FLINK-34067:
---
Affects Version/s: 1.19.0

> Fix javacc warnings in flink-sql-parser
> ---
>
> Key: FLINK-34067
> URL: https://issues.apache.org/jira/browse/FLINK-34067
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.19.0
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Minor
>
> While extending the Flink SQL parser, I noticed these two warnings:
> ```
> [INFO] --- javacc:2.4:javacc (javacc) @ flink-sql-parser ---                  
>                                
> Java Compiler Compiler Version 4.0 (Parser Generator)                         
>                               
> (type "javacc" with no arguments for help)                                    
>                                                              
> Reading from file 
> .../flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . 
> . .               
> Note: UNICODE_INPUT option is specified. Please make sure you create the 
> parser/lexer using a Reader with the correct character encoding.  
> Warning: Choice conflict involving two expansions at                          
>                                  
>          line 2043, column 13 and line 2052, column 9 respectively.           
>                              
>          A common prefix is: "IF"                                             
>                                                               Consider using 
> a lookahead of 2 for earlier expansion.                                       
>          
> Warning: Choice conflict involving two expansions at                          
>                                 
>          line 2097, column 13 and line 2105, column 8 respectively.           
>                               
>          A common prefix is: "IF"                                             
>                                                      
>          Consider using a lookahead of 2 for earlier expansion.     
> ```
> As the warning suggestions, adding `LOOKAHEAD(2)` in a few places addresses 
> the warning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34066][table-planner] Fix LagFunction throw NPE when input argument are not null [flink]

2024-01-11 Thread via GitHub


swuferhong opened a new pull request, #24075:
URL: https://github.com/apache/flink/pull/24075

   
   
   ## What is the purpose of the change
   
   Fix LagFunction throw NPE when input argument are not null.
   
   ## Brief change log
   
   - Fix LagFunction throw NPE when input argument are not null
   - Adding test to cover the npe case.
   
   
   ## Verifying this change
   
   - Adding test to cover the npe case.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? no docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34007) Flink Job stuck in suspend state after losing leadership in HA Mode

2024-01-11 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805871#comment-17805871
 ] 

Yang Wang commented on FLINK-34007:
---

[~mapohl] Could you please confirm that whether "multi-component leader 
election" will clean up the leader annotation on the ConfigMap when lost 
leadership?

It seems that the fabric8 Kubernetes client leader elector will not work 
properly by {{run()}} more than once if we do not clean up the leader 
annotation.

> Flink Job stuck in suspend state after losing leadership in HA Mode
> ---
>
> Key: FLINK-34007
> URL: https://issues.apache.org/jira/browse/FLINK-34007
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.3, 1.17.2, 1.18.1, 1.18.2
>Reporter: Zhenqiu Huang
>Priority: Major
> Attachments: Debug.log, job-manager.log
>
>
> The observation is that Job manager goes to suspend state with a failed 
> container not able to register itself to resource manager after timeout.
> JM Log, see attached
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34066) LagFunction throw NPE when input argument are not null

2024-01-11 Thread Yunhong Zheng (Jira)
Yunhong Zheng created FLINK-34066:
-

 Summary: LagFunction throw NPE when input argument are not null
 Key: FLINK-34066
 URL: https://issues.apache.org/jira/browse/FLINK-34066
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Yunhong Zheng
 Fix For: 1.19.0


LagFunction throw NPE when input argument are not null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33576][core] Introduce new Flink conf file 'config.yaml' supporting standard YAML syntax. [flink]

2024-01-11 Thread via GitHub


JunRuiLee commented on PR #23852:
URL: https://github.com/apache/flink/pull/23852#issuecomment-1888377286

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33929][jdbc-connector] Support JDBC String field read Fragment read [flink-connector-jdbc]

2024-01-11 Thread via GitHub


zhilinli123 commented on PR #87:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/87#issuecomment-1888373294

   > @zhilinli123 I still don't understand the Jira ticket, so I can't really 
review it. Perhaps @snuyanzin or @eskabetxe understand the goal of this PR.
   
   hi~ @MartijnVisser  The current jdbc slicing support type is limited, 
currently does not support for string type fields for slicing read, many table 
design using UUID as the table key without self-increment primary key, 
resulting in the use of flink reading is no way to slice read, the current 
function is to support the current shortage
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33958] Implement restore tests for IntervalJoin node [flink]

2024-01-11 Thread via GitHub


bvarghese1 commented on PR #24009:
URL: https://github.com/apache/flink/pull/24009#issuecomment-1888372875

   > From reading the code for `StreamExecIntervalJoin`, I noticed 
pad/filter-left/right transformations. Seems like those can be produced when 
the `TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS` is used.
   > 
   > Since it is a legacy thing, not sure if we want to test it or not.
   
   `TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS` is a deprecated config. It is used 
in the `createNegativeWindowSizeJoin` function and I have added a test case 
which covers that path.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]

2024-01-11 Thread via GitHub


ljz2051 commented on PR #24066:
URL: https://github.com/apache/flink/pull/24066#issuecomment-1888366794

   Thanks for the update, LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33792] Generate the same code for the same logic [flink]

2024-01-11 Thread via GitHub


zoudan commented on PR #23984:
URL: https://github.com/apache/flink/pull/23984#issuecomment-1888360571

   @lsyldliu I have updated my code, please have a took when you have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31788][table] TableAggregateFunction supports emitUpdateWithRetract [flink]

2024-01-11 Thread via GitHub


flinkbot commented on PR #24074:
URL: https://github.com/apache/flink/pull/24074#issuecomment-1888345731

   
   ## CI report:
   
   * 970e31067f5a893ea90b3f8a33064051e597bfe3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]

2024-01-11 Thread via GitHub


1996fanrui commented on code in PR #741:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/741#discussion_r1449726286


##
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java:
##
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_CREATE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_DELETE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_UPDATE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NOT_NEEDED;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.UP_TO_DATE;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The view of job state. */
+@NotThreadSafe
+public class JobStateView {
+
+/**
+ * The state of state type about the cache and database.
+ *
+ * Note: {@link #inLocally} and {@link #inDatabase} are only for 
understand, we don't use
+ * them.
+ */
+@SuppressWarnings("unused")
+enum State {
+
+/** State doesn't exist at database, and it's not used so far, so it's 
not needed. */
+NOT_NEEDED(false, false, false),
+/** State is only stored locally, not created in JDBC database yet. */
+NEEDS_CREATE(true, false, true),
+/** State exists in JDBC database but there are newer local changes. */
+NEEDS_UPDATE(true, true, true),
+/** State is stored locally and in database, and they are same. */
+UP_TO_DATE(true, true, false),
+/** State is stored in database, but it's deleted in local. */
+NEEDS_DELETE(false, true, true);

Review Comment:
   > It would be good to move more of the logic into the core, to avoid 
duplicating / writing similar logic. 
   
   Sounds make sense, after I analyze, some of logic are same, such as: 
   
   - `ConfigMapStore` and `JDBCStore` can be abstracted to `StringStateStore` 
interface
 - They support `put`, `get` and `remove`
 - The parameters of `ConfigMapStore` are the (JobContext, String key, 
String value).
 - The parameters of `JDBCStore` are the (String jobKey, StateType 
stateType, String value).
 - We can define a interface `StringStateStore`, and the parameters are 
`(JobContext, StateType stateType, String value)`.
   
   - `KubernetesAutoScalerStateStore` and `JDBCAutoScalerStateStore` can be 
abstracted to `AbstractAutoscalerStateStore`
 - They support serialize and compress `Original State` to String.
 - `AbstractAutoscalerStateStore` can reuse the serialize and compress logic
 - `KubernetesAutoScalerStateStore` support the limitation of stateValue
 - We can define a parameter for `AbstractAutoscalerStateStore`, the 
limitation is disabled by default, and `KubernetesAutoScalerStateStore` can 
enable it.
   
   And I created FLINK-34065 to follow it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34065) Design AbstractAutoscalerStateStore to support serialize State to String

2024-01-11 Thread Rui Fan (Jira)
Rui Fan created FLINK-34065:
---

 Summary: Design AbstractAutoscalerStateStore to support serialize 
State to String
 Key: FLINK-34065
 URL: https://issues.apache.org/jira/browse/FLINK-34065
 Project: Flink
  Issue Type: Sub-task
  Components: Autoscaler
Reporter: Rui Fan
Assignee: Rui Fan


Some logic of {{KubernetesAutoScalerStateStore}} and 
{{JDBCAutoScalerStateStore}} are similar, we can share some common code.
 * {{ConfigMapStore}} and {{JDBCStore}} can be abstracted to 
{{StringStateStore}} interface

 ** They support {{{}put{}}}, {{get}} and {{remove}}
 ** The parameters of {{ConfigMapStore}} are the (JobContext, String key, 
String value).
 ** The parameters of {{JDBCStore}} are the (String jobKey, StateType 
stateType, String value).
 ** We can define a interface {{{}StringStateStore{}}}, and the parameters are 
{{{}(JobContext, StateType stateType, String value){}}}.
 * {{KubernetesAutoScalerStateStore}} and {{JDBCAutoScalerStateStore}} can be 
abstracted to {{AbstractAutoscalerStateStore}}

 ** They support serialize and compress {{Original State}} to String.
 ** {{AbstractAutoscalerStateStore}} can reuse the serialize and compress logic
 ** {{KubernetesAutoScalerStateStore}} support the limitation of stateValue
 ** We can define a parameter for {{{}AbstractAutoscalerStateStore{}}}, the 
limitation is disabled by default, and {{KubernetesAutoScalerStateStore}} can 
enable it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33930] Canceled stop job status exception [flink-kubernetes-operator]

2024-01-11 Thread via GitHub


hunter-cloud09 commented on code in PR #740:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/740#discussion_r1449726381


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -369,16 +370,17 @@ protected void cancelJob(
 deleteClusterDeployment(
 deployment.getMetadata(), deploymentStatus, 
conf, true);
 }
+
deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name());
 break;
 case LAST_STATE:
 deleteClusterDeployment(
 deployment.getMetadata(), deploymentStatus, conf, 
false);
+
deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name());
 break;
 default:
 throw new RuntimeException("Unsupported upgrade mode " + 
upgradeMode);
 }
 }
-deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name());

Review Comment:
   Because we ran into this problem when we were doing job ops, the cancel job 
intermediate state always had `FINISHED`, which is not right for our business. 
So I don't think it's right for the program.
   
   On another note, I'd like to ask what was the reason for doing this in the 
first place? Is it normal to have an intermediate state finished for cancel 
requests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-31788][table] TableAggregateFunction supports emitUpdateWithRetract [flink]

2024-01-11 Thread via GitHub


LadyForest opened a new pull request, #24074:
URL: https://github.com/apache/flink/pull/24074

   ## What is the purpose of the change
   
   This PR is cherry-picked from https://github.com/apache/flink/pull/24051
   
   ## Brief change log
   
   - Planning phase: Check whether the `emitUpdateWithRetract` method is 
implemented. If so, the `TableAggsHandleFunction#emitValue` method should 
invoke `TableAggregateFunction#emitUpdateWithRetract` instead of 
`TableAggregateFunction#emitValue`. Meanwhile, change the collector type to 
`RetractableCollector` and implement `RetractableCollector#retract`.
   
   - Runtime phase: Do not invoke emitValue to retract previously sent 
unchanged data.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - `TableAggregateITCase#testFlagAggregateWithOrWithoutIncrementalUpdate`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31788) Add back Support emitValueWithRetract for TableAggregateFunction

2024-01-11 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-31788:
--
Affects Version/s: 1.18.1

> Add back Support emitValueWithRetract for TableAggregateFunction
> 
>
> Key: FLINK-31788
> URL: https://issues.apache.org/jira/browse/FLINK-31788
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0, 1.14.2, 1.14.3, 1.14.4, 1.14.5, 1.15.0, 1.15.1, 
> 1.16.0, 1.17.0, 1.15.2, 1.14.6, 1.15.3, 1.16.1, 1.15.4, 1.16.2, 1.18.0, 
> 1.17.1, 1.16.3, 1.17.2, 1.18.1
>Reporter: Feng Jin
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
>
> This feature was originally implemented in the old planner: 
> [https://github.com/apache/flink/pull/8550/files]
> However, this feature was not implemented in the new planner , the Blink 
> planner. 
> With the removal of the old planner in version 1.14 
> [https://github.com/apache/flink/pull/16080] , this code was also removed.
>  
> We should add it back. 
>  
> origin discuss link: 
> https://lists.apache.org/thread/rnvw8k3636dqhdttpmf1c9colbpw9svp



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33792] Generate the same code for the same logic [flink]

2024-01-11 Thread via GitHub


zoudan commented on PR #23984:
URL: https://github.com/apache/flink/pull/23984#issuecomment-1888301428

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31788) Add back Support emitValueWithRetract for TableAggregateFunction

2024-01-11 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-31788:
--
Affects Version/s: 1.17.2
   1.16.3
   1.17.1
   1.18.0
   1.16.2
   1.15.4
   1.16.1
   1.15.3
   1.14.6
   1.15.2
   1.17.0
   1.16.0
   1.15.1
   1.15.0
   1.14.5
   1.14.4
   1.14.3
   1.14.2
   1.14.0

> Add back Support emitValueWithRetract for TableAggregateFunction
> 
>
> Key: FLINK-31788
> URL: https://issues.apache.org/jira/browse/FLINK-31788
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0, 1.14.2, 1.14.3, 1.14.4, 1.14.5, 1.15.0, 1.15.1, 
> 1.16.0, 1.17.0, 1.15.2, 1.14.6, 1.15.3, 1.16.1, 1.15.4, 1.16.2, 1.18.0, 
> 1.17.1, 1.16.3, 1.17.2
>Reporter: Feng Jin
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
>
> This feature was originally implemented in the old planner: 
> [https://github.com/apache/flink/pull/8550/files]
> However, this feature was not implemented in the new planner , the Blink 
> planner. 
> With the removal of the old planner in version 1.14 
> [https://github.com/apache/flink/pull/16080] , this code was also removed.
>  
> We should add it back. 
>  
> origin discuss link: 
> https://lists.apache.org/thread/rnvw8k3636dqhdttpmf1c9colbpw9svp



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31788) Add back Support emitValueWithRetract for TableAggregateFunction

2024-01-11 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805853#comment-17805853
 ] 

Jane Chan commented on FLINK-31788:
---

Fixed in master 01569644aedb56f792c7f7e04f84612d405b0bdf

> Add back Support emitValueWithRetract for TableAggregateFunction
> 
>
> Key: FLINK-31788
> URL: https://issues.apache.org/jira/browse/FLINK-31788
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Feng Jin
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
>
> This feature was originally implemented in the old planner: 
> [https://github.com/apache/flink/pull/8550/files]
> However, this feature was not implemented in the new planner , the Blink 
> planner. 
> With the removal of the old planner in version 1.14 
> [https://github.com/apache/flink/pull/16080] , this code was also removed.
>  
> We should add it back. 
>  
> origin discuss link: 
> https://lists.apache.org/thread/rnvw8k3636dqhdttpmf1c9colbpw9svp



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31788][table] TableAggregateFunction supports emitUpdateWithRetract [flink]

2024-01-11 Thread via GitHub


LadyForest merged PR #24051:
URL: https://github.com/apache/flink/pull/24051


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-20281) Window aggregation supports changelog stream input

2024-01-11 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan resolved FLINK-20281.
---
Fix Version/s: 1.19.0
   Resolution: Fixed

> Window aggregation supports changelog stream input
> --
>
> Key: FLINK-20281
> URL: https://issues.apache.org/jira/browse/FLINK-20281
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner, Table SQL / Runtime
>Reporter: Jark Wu
>Assignee: xuyang
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
> Fix For: 1.19.0
>
> Attachments: screenshot-1.png
>
>
> Currently, window aggregation doesn't support to consume a changelog stream. 
> This makes it impossible to do a window aggregation on changelog sources 
> (e.g. Kafka with Debezium format, or upsert-kafka, or mysql-cdc). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-20281) Window aggregation supports changelog stream input

2024-01-11 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan closed FLINK-20281.
-

> Window aggregation supports changelog stream input
> --
>
> Key: FLINK-20281
> URL: https://issues.apache.org/jira/browse/FLINK-20281
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner, Table SQL / Runtime
>Reporter: Jark Wu
>Assignee: xuyang
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
> Fix For: 1.19.0
>
> Attachments: screenshot-1.png
>
>
> Currently, window aggregation doesn't support to consume a changelog stream. 
> This makes it impossible to do a window aggregation on changelog sources 
> (e.g. Kafka with Debezium format, or upsert-kafka, or mysql-cdc). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-20281) Window aggregation supports changelog stream input

2024-01-11 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805852#comment-17805852
 ] 

Jane Chan commented on FLINK-20281:
---

Fixed in master df2ecdc77b23e0e40fe151ed8a7350a6db333f91

> Window aggregation supports changelog stream input
> --
>
> Key: FLINK-20281
> URL: https://issues.apache.org/jira/browse/FLINK-20281
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner, Table SQL / Runtime
>Reporter: Jark Wu
>Assignee: xuyang
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
> Attachments: screenshot-1.png
>
>
> Currently, window aggregation doesn't support to consume a changelog stream. 
> This makes it impossible to do a window aggregation on changelog sources 
> (e.g. Kafka with Debezium format, or upsert-kafka, or mysql-cdc). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33879] Avoids the potential hang of Hybrid Shuffle during redistribution [flink]

2024-01-11 Thread via GitHub


jiangxin369 commented on PR #23957:
URL: https://github.com/apache/flink/pull/23957#issuecomment-1888280011

   @reswqa Seems that the CI failed but the cause is related to python rather 
than this PR. Could you help merge it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20281][table] support consuming cdc stream about window tvf aggregate [flink]

2024-01-11 Thread via GitHub


LadyForest merged PR #24030:
URL: https://github.com/apache/flink/pull/24030


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix] Correct release date for flink-shaded 18.0 [flink-web]

2024-01-11 Thread via GitHub


snuyanzin opened a new pull request, #710:
URL: https://github.com/apache/flink-web/pull/710

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add flink-shaded 18.0 release [flink-web]

2024-01-11 Thread via GitHub


snuyanzin merged PR #701:
URL: https://github.com/apache/flink-web/pull/701


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DRAFT] FLIP-376 DISTRIBUTED BY [flink]

2024-01-11 Thread via GitHub


flinkbot commented on PR #24073:
URL: https://github.com/apache/flink/pull/24073#issuecomment-1888126956

   
   ## CI report:
   
   * 03494cb182c4f0e3fa4e407dc1f3b06dc9db40b9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [DRAFT] FLIP-376 DISTRIBUTED BY [flink]

2024-01-11 Thread via GitHub


jnh5y opened a new pull request, #24073:
URL: https://github.com/apache/flink/pull/24073

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34063) When snapshot compression is enabled, rescaling of a source operator leads to some splits getting lost

2024-01-11 Thread Ivan Burmistrov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Burmistrov updated FLINK-34063:

Priority: Critical  (was: Major)

> When snapshot compression is enabled, rescaling of a source operator leads to 
> some splits getting lost
> --
>
> Key: FLINK-34063
> URL: https://issues.apache.org/jira/browse/FLINK-34063
> Project: Flink
>  Issue Type: Bug
> Environment: Can be reproduced in any environment. The most important 
> thing is to enable snapshot compression.
>Reporter: Ivan Burmistrov
>Priority: Critical
> Attachments: image-2024-01-11-16-27-09-066.png, 
> image-2024-01-11-16-30-47-466.png
>
>
> h2. Backstory
> We've been experimenting with Autoscaling on the Flink 1.18 and faced a 
> pretty nasty bug. 
> The symptoms on our production system were as following. After a while after 
> deploying a job with autoscaler it started accumulating Kafka lag, and this 
> could only be observed via external lag measurement - from inside Flink 
> (measured by
> {{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK:
> !image-2024-01-11-16-27-09-066.png|width=887,height=263!
> After some digging, it turned out that the job has lost some Kafka partitions 
> - i.e. it stopped consuming from them, “forgot” about their existence. That’s 
> why from the Flink’s perspective everything was fine - the lag was growing on 
> the partitions Flink no longer knew about.
> This was visible on a metric called “Assigned partitions” 
> (KafkaSourceReader_KafkaConsumer_assigned_partitions):
> !image-2024-01-11-16-30-47-466.png|width=1046,height=254!
> We see on the chart that the job used to know about 20 partitions, and then 
> this number got dropped to 16.
> This drop has been quickly connected to the job’s scaling events. Or, more 
> precisely, to the scaling of the source operator - with almost 100% 
> probability any scaling of the source operator led to partitions loss.
> h2. Investigation
> We've conducted the investigation. We use the latest Kubernetes operator and 
> deploy jobs with Native Kubernetes.
> The reproducing scenario we used for investigation:
>  * Launch a job with source operator parallelism = 4, enable DEBUG logging
>  * Wait until it takes the first checkpoint
>  * Scale-up the source operator to say 5 (no need to wait for autoscaling, it 
> can be done via Flink UI)
>  * Wait until the new checkpoint is taken
>  * Scale-down the source operator to 3
> These simple actions with almost 100% probability led to some partitions get 
> lost.
> After that we've downloaded all the logs and inspected them. Noticed these 
> strange records in logs:
> {code:java}
> {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring
>  state for 4 split(s) to reader.","service_name":"data-beaver"} 
> {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding
>  split(s) to reader: 
> [
> [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, 
> StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code}
> We see that some task being restored with 4 splits, however actual splits 
> have duplicates - we see that in reality 2 unique partitions have been added 
> ({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}).
> Digging into the code and the logs a bit more, log lines like this started 
> looking suspicious:
>  
> {code:java}
> {"timestamp":1704415753165,"is_logging_enabled":"false","logger_id":"org.apache.flink.runtime.state.TaskStateManagerImpl","log_level":"DEBUG",
>  "message":"Operator 156a1ebbc1936f7d4558c8070b35ba93 has remote state 
> SubtaskState{operatorStateFromBackend=StateObjectCollection{ 
> [OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244,
>  244], distributionMode=SPLIT_DISTRIBUTE}}, 
> delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/fadb4f23-85dd-4048-b466-94c1c5329dd3',
>  dataBytes=328}}, 
> OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244,
>  244], 

[jira] [Updated] (FLINK-34064) Expose JobManagerOperatorMetrics via REST API

2024-01-11 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-34064:
---
Affects Version/s: 1.18.0

> Expose JobManagerOperatorMetrics via REST API
> -
>
> Key: FLINK-34064
> URL: https://issues.apache.org/jira/browse/FLINK-34064
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Priority: Major
>
> Add a REST API to fetch coordinator metrics.
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34064) Expose JobManagerOperatorMetrics via REST API

2024-01-11 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-34064:
---
Component/s: Runtime / REST

> Expose JobManagerOperatorMetrics via REST API
> -
>
> Key: FLINK-34064
> URL: https://issues.apache.org/jira/browse/FLINK-34064
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Mason Chen
>Priority: Major
>
> Add a REST API to fetch coordinator metrics.
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34064) Expose JobManagerOperatorMetrics via REST API

2024-01-11 Thread Mason Chen (Jira)
Mason Chen created FLINK-34064:
--

 Summary: Expose JobManagerOperatorMetrics via REST API
 Key: FLINK-34064
 URL: https://issues.apache.org/jira/browse/FLINK-34064
 Project: Flink
  Issue Type: Improvement
Reporter: Mason Chen


Add a REST API to fetch coordinator metrics.

[https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]

2024-01-11 Thread via GitHub


mas-chen commented on PR #44:
URL: 
https://github.com/apache/flink-connector-kafka/pull/44#issuecomment-1887855854

   one-pager docs are here: 
docs/content/docs/connectors/datastream/dynamic-kafka.md. Marked with 
experimental status.
   
   Also copied this into the chinese doc directory.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2024-01-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805739#comment-17805739
 ] 

Piotr Nowojski commented on FLINK-33856:


In that case [~hejufang001] it would be great if you started another FLIP for 
adding per sub-task spans. If you decide to od so, please ping me so on Apache 
Flink Slack or via a Jira ticket so I don't miss it :)

> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
>
> When Flink makes a checkpoint, the interaction performance with the external 
> file system has a great impact on the overall time-consuming. Therefore, it 
> is easy to observe the bottleneck point by adding performance indicators when 
> the task interacts with the external file storage system. These include: the 
> rate of file write , the latency to write the file, the latency to close the 
> file.
> In flink side add the above metrics has the following advantages: convenient 
> statistical different task E2E time-consuming; do not need to distinguish the 
> type of external storage system, can be unified in the 
> FsCheckpointStreamFactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]

2024-01-11 Thread via GitHub


mxm commented on PR #44:
URL: 
https://github.com/apache/flink-connector-kafka/pull/44#issuecomment-1887622480

   A brief one-pager of the feature would be good. We can expand on it later. 
Just to have something in the web docs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34063) When snapshot compression is enabled, rescaling of a source operator leads to some splits getting lost

2024-01-11 Thread Ivan Burmistrov (Jira)
Ivan Burmistrov created FLINK-34063:
---

 Summary: When snapshot compression is enabled, rescaling of a 
source operator leads to some splits getting lost
 Key: FLINK-34063
 URL: https://issues.apache.org/jira/browse/FLINK-34063
 Project: Flink
  Issue Type: Bug
 Environment: Can be reproduced in any environment. The most important 
thing is to enable snapshot compression.
Reporter: Ivan Burmistrov
 Attachments: image-2024-01-11-16-27-09-066.png, 
image-2024-01-11-16-30-47-466.png

h2. Backstory

We've been experimenting with Autoscaling on the Flink 1.18 and faced a pretty 
nasty bug. 

The symptoms on our production system were as following. After a while after 
deploying a job with autoscaler it started accumulating Kafka lag, and this 
could only be observed via external lag measurement - from inside Flink 
(measured by
{{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK:
!image-2024-01-11-16-27-09-066.png|width=887,height=263!

After some digging, it turned out that the job has lost some Kafka partitions - 
i.e. it stopped consuming from them, “forgot” about their existence. That’s why 
from the Flink’s perspective everything was fine - the lag was growing on the 
partitions Flink no longer knew about.

This was visible on a metric called “Assigned partitions” 
(KafkaSourceReader_KafkaConsumer_assigned_partitions):
!image-2024-01-11-16-30-47-466.png|width=1046,height=254!


We see on the chart that the job used to know about 20 partitions, and then 
this number got dropped to 16.

This drop has been quickly connected to the job’s scaling events. Or, more 
precisely, to the scaling of the source operator - with almost 100% probability 
any scaling of the source operator led to partitions loss.
h2. Investigation

We've conducted the investigation. We use the latest Kubernetes operator and 
deploy jobs with Native Kubernetes.

The reproducing scenario we used for investigation:
 * Launch a job with source operator parallelism = 4, enable DEBUG logging
 * Wait until it takes the first checkpoint
 * Scale-up the source operator to say 5 (no need to wait for autoscaling, it 
can be done via Flink UI)
 * Wait until the new checkpoint is taken
 * Scale-down the source operator to 3

These simple actions with almost 100% probability led to some partitions get 
lost.

After that we've downloaded all the logs and inspected them. Noticed these 
strange records in logs:


{code:java}
{"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring
 state for 4 split(s) to reader.","service_name":"data-beaver"} 
{"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding
 split(s) to reader: 
[
[Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, 
StoppingOffset: -9223372036854775808], 
[Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, 
StoppingOffset: -9223372036854775808], 
[Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, 
StoppingOffset: -9223372036854775808], 
[Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, 
StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code}
We see that some task being restored with 4 splits, however actual splits have 
duplicates - we see that in reality 2 unique partitions have been added 
({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}).

Digging into the code and the logs a bit more, log lines like this started 
looking suspicious:

 
{code:java}
{"timestamp":1704415753165,"is_logging_enabled":"false","logger_id":"org.apache.flink.runtime.state.TaskStateManagerImpl","log_level":"DEBUG",
 "message":"Operator 156a1ebbc1936f7d4558c8070b35ba93 has remote state 
SubtaskState{operatorStateFromBackend=StateObjectCollection{ 
[OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244,
 244], distributionMode=SPLIT_DISTRIBUTE}}, 
delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/fadb4f23-85dd-4048-b466-94c1c5329dd3',
 dataBytes=328}}, 
OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244,
 244], distributionMode=SPLIT_DISTRIBUTE}}, 
delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/102aa50b-78c2-457e-9a2f-0055f1dbeb98',
 dataBytes=328}}]}, operatorStateFromStream=StateObjectCollection{[]}, 
keyedStateFromBackend=StateObjectCollection{[]}, 
keyedStateFromStream=StateObjectCollection{[]}, 

Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]

2024-01-11 Thread via GitHub


mas-chen commented on PR #44:
URL: 
https://github.com/apache/flink-connector-kafka/pull/44#issuecomment-1887602638

   Are the Javadocs sufficient in the bare minimum case? Otherwise, I'll just 
write the documentation into this PR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]

2024-01-11 Thread via GitHub


mxm commented on PR #44:
URL: 
https://github.com/apache/flink-connector-kafka/pull/44#issuecomment-1887599176

   Sounds good. Can we get a bare-minimum version of the documentation into 
this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-11 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1887600132

   @snuyanzin  I have left this with multiple commits for now . Should we 
squash to one commit and put you as a co author in the commit message? If so 
how would you like me to identify you in the `Co-authored-by: NAME 
`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-11 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1449155934


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicatePlanTest.java:
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests ParameterizedPredicate. */
+public class ParameterizedPredicatePlanTest extends TableTestBase {

Review Comment:
   @snuyanzin I assume that this comment is no longer relevant as is , as you 
have extended ParameterizedPredicatePlanTest to be a ParameterizedTest 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]

2024-01-11 Thread via GitHub


mas-chen commented on PR #44:
URL: 
https://github.com/apache/flink-connector-kafka/pull/44#issuecomment-1887586297

   Thanks @tzulitai @mxm for your review! I've addressed all the comments and 
squashed the commits. Can you please merge it at your convenience?
   
   I'll start on the docs immediately and hope to use this connector directly 
in the 3.1.0 release!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34027] Introduces AsyncScalarFunction as a new UDF type [flink]

2024-01-11 Thread via GitHub


twalthr commented on code in PR #23975:
URL: https://github.com/apache/flink/pull/23975#discussion_r1449149984


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/AsyncUtil.java:
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+/** Contains utilities for AsyncScalarFunctions. */
+public class AsyncUtil {
+
+private static final EmptyResponseResultStrategy EMPTY_RESPONSE =
+new EmptyResponseResultStrategy();
+private static final Predicate ANY_EXCEPTION = new 
AnyExceptionStrategy();
+
+/**
+ * Checks whether it contains the specified kind of async function call in 
the specified node.
+ *
+ * @param node the RexNode to check
+ * @return true if it contains an async function call in the specified 
node.
+ */
+public static boolean containsAsyncCall(RexNode node) {
+return node.accept(new FunctionFinder(true, true));
+}
+
+/**
+ * Checks whether it contains non-async function call in the specified 
node.
+ *
+ * @param node the RexNode to check
+ * @return true if it contains a non-async function call in the specified 
node.
+ */
+public static boolean containsNonAsyncCall(RexNode node) {
+return node.accept(new FunctionFinder(false, true));
+}
+
+/**
+ * Checks whether the specified node is the specified kind of async 
function call.
+ *
+ * @param node the RexNode to check
+ * @return true if the specified node is an async function call.
+ */
+public static boolean isAsyncCall(RexNode node) {
+return node.accept(new FunctionFinder(true, false));
+}
+
+/**
+ * Checks whether the specified node is a non-async function call.
+ *
+ * @param node the RexNode to check
+ * @return true if the specified node is a non-async function call.
+ */
+public static boolean isNonAsyncCall(RexNode node) {
+return node.accept(new FunctionFinder(false, false));
+}
+
+/**
+ * Gets the options required to run the operator.
+ *
+ * @param config The config from which to fetch the options
+ * @return Extracted options
+ */
+public static AsyncUtil.Options getAsyncOptions(ExecNodeConfig config) {
+return new AsyncUtil.Options(
+
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY),
+
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_TIMEOUT).toMillis(),
+AsyncDataStream.OutputMode.ORDERED,
+getResultRetryStrategy(
+
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_RETRY_STRATEGY),
+
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_RETRY_DELAY),
+
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_MAX_ATTEMPTS)));
+}
+
+/** Options for configuring async behavior. */
+public static class Options {
+
+public final int asyncBufferCapacity;
+public final long asyncTimeout;
+public final AsyncDataStream.OutputMode asyncOutputMode;
+public final AsyncRetryStrategy asyncRetryStrategy;
+
+  

[jira] [Comment Edited] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled

2024-01-11 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805708#comment-17805708
 ] 

Matthias Pohl edited comment on FLINK-34036 at 1/11/24 4:55 PM:


The problem seems to be that we're not building the Flink artifacts with the 
PROFILE which causes issues when running the tests with the PROFILE. That is 
why it's possible to reproduce it locally. Because I didn't trigger the clean 
phase before running the tests with the profiles enabled.

This is also a problem in the GHA workflow configuration. The run_mvn 
configuration doesn't inject the PROFILE env variable properly.


was (Author: mapohl):
The problem seems to be that we're not building the Flink artifacts with the 
PROFILE which causes issues when running the tests with the PROFILE. That is 
why it's possible to reproduce it locally. Because I didn't trigger the clean 
phase before running the tests with the profiles enabled.

> Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with 
> Hadoop 3.1.3 enabled
> --
>
> Key: FLINK-34036
> URL: https://issues.apache.org/jira/browse/FLINK-34036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hadoop Compatibility, Connectors / Hive
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> The following {{HiveDialectQueryITCase}} tests fail consistently in the 
> FLINK-27075 GitHub Actions [master nightly 
> workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of 
> Flink (and also the [release-1.18 
> workflow|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]):
> * {{testInsertDirectory}}
> * {{testCastTimeStampToDecimal}}
> * {{testNullLiteralAsArgument}}
> {code}
> Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
> Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
>  -- Time elapsed: 0.069 s <<< ERROR!
> Jan 09 03:38:45 java.lang.NoSuchMethodError: 
> org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp;
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal
>  -- Time elapsed: 0.007 s <<< ERROR!
> Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with 
> identifier 'test-catalog.default.t1' does not exist.
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266)
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206)
> Jan 09 03:38:45   at 
> org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.663 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory 
> -- Time elapsed: 7.326 s <<< FAILURE!
> Jan 09 03:38:45 org.opentest4j.AssertionFailedError: 
> Jan 09 03:38:45 
> Jan 09 03:38:45 expected: "A:english=90#math=100#history=85"
> Jan 09 03:38:45  but was: "A:english=90math=100history=85"
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Jan 09 03:38:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498)
> Jan 09 03:38:45   at 

[jira] [Commented] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled

2024-01-11 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805708#comment-17805708
 ] 

Matthias Pohl commented on FLINK-34036:
---

The problem seems to be that we're not building the Flink artifacts with the 
PROFILE which causes issues when running the tests with the PROFILE. That is 
why it's possible to reproduce it locally. Because I didn't trigger the clean 
phase before running the tests with the profiles enabled.

> Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with 
> Hadoop 3.1.3 enabled
> --
>
> Key: FLINK-34036
> URL: https://issues.apache.org/jira/browse/FLINK-34036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hadoop Compatibility, Connectors / Hive
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> The following {{HiveDialectQueryITCase}} tests fail consistently in the 
> FLINK-27075 GitHub Actions [master nightly 
> workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of 
> Flink (and also the [release-1.18 
> workflow|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]):
> * {{testInsertDirectory}}
> * {{testCastTimeStampToDecimal}}
> * {{testNullLiteralAsArgument}}
> {code}
> Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
> Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
>  -- Time elapsed: 0.069 s <<< ERROR!
> Jan 09 03:38:45 java.lang.NoSuchMethodError: 
> org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp;
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal
>  -- Time elapsed: 0.007 s <<< ERROR!
> Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with 
> identifier 'test-catalog.default.t1' does not exist.
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266)
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206)
> Jan 09 03:38:45   at 
> org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.663 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory 
> -- Time elapsed: 7.326 s <<< FAILURE!
> Jan 09 03:38:45 org.opentest4j.AssertionFailedError: 
> Jan 09 03:38:45 
> Jan 09 03:38:45 expected: "A:english=90#math=100#history=85"
> Jan 09 03:38:45  but was: "A:english=90math=100history=85"
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Jan 09 03:38:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}
> Additionally, the {{HiveITCase}} in the e2e test suite is affected:
> {code}
> Error: 05:20:20 05:20:20.949 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 0.106 s <<< FAILURE! -- in 
> org.apache.flink.tests.hive.HiveITCase
> Error: 05:20:20 05:20:20.949 [ERROR] org.apache.flink.tests.hive.HiveITCase 
> -- Time elapsed: 0.106 s <<< ERROR!
> Jan 07 05:20:20 java.lang.ExceptionInInitializerError
> Jan 07 05:20:20   at 

Re: [PR] [FLINK-34027] Introduces AsyncScalarFunction as a new UDF type [flink]

2024-01-11 Thread via GitHub


twalthr commented on code in PR #23975:
URL: https://github.com/apache/flink/pull/23975#discussion_r1449138779


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecAsyncCalc.java:
##
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.AsyncCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import 
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.AsyncUtil;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.operators.calc.async.AsyncFunctionRunner;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Base class for exec Async Calc. */
+public abstract class CommonExecAsyncCalc extends ExecNodeBase
+implements SingleTransformationTranslator {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(CommonExecAsyncCalc.class);
+
+public static final String ASYNC_CALC_TRANSFORMATION = "async-calc";
+
+public static final String FIELD_NAME_PROJECTION = "projection";
+
+public static final String FIELD_NAME_SYNC_MODE = "syncMode";
+
+@JsonProperty(FIELD_NAME_PROJECTION)
+private final List projection;
+
+public CommonExecAsyncCalc(
+int id,
+ExecNodeContext context,
+ReadableConfig persistedConfig,
+List projection,

Review Comment:
   Thanks for the explanation. It's unfortunate that the Python people called 
it Calc. But let's keep it like this for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34027] Introduces AsyncScalarFunction as a new UDF type [flink]

2024-01-11 Thread via GitHub


twalthr commented on code in PR #23975:
URL: https://github.com/apache/flink/pull/23975#discussion_r1449134322


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java:
##
@@ -208,6 +249,32 @@ static MethodVerification 
createParameterWithArgumentVerification(
 };
 }
 
+/** Verification that checks a method by parameters including an 
additional first parameter. */
+static MethodVerification 
createGenericParameterWithArgumentAndReturnTypeVerification(
+Class baseClass, Class argumentClass, int paramPos, int 
genericPos) {
+return (method, signature, result) -> {
+final Class[] parameters =
+Stream.concat(Stream.of(argumentClass), signature.stream())
+.toArray(Class[]::new);
+
+Type genericType = method.getGenericParameterTypes()[paramPos];
+// Trusty library allows to resolve generic types where the type 
resolves to a type with
+// generic parameters...
+genericType = 
TypeToken.of(baseClass).resolveType(genericType).getType();

Review Comment:
   Looks great now!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32006) AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry times out on Azure

2024-01-11 Thread Junrui Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805698#comment-17805698
 ] 

Junrui Li commented on FLINK-32006:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56262=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8

> AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry 
> times out on Azure
> --
>
> Key: FLINK-32006
> URL: https://issues.apache.org/jira/browse/FLINK-32006
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.18.0, 1.17.2, 1.19.0
>Reporter: David Morávek
>Assignee: David Morávek
>Priority: Critical
>  Labels: pull-request-available, stale-assigned, test-stability
>
> {code:java}
> May 04 13:52:18 [ERROR] 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry
>   Time elapsed: 100.009 s  <<< ERROR!
> May 04 13:52:18 org.junit.runners.model.TestTimedOutException: test timed out 
> after 100 seconds
> May 04 13:52:18   at java.lang.Thread.sleep(Native Method)
> May 04 13:52:18   at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncWaitOperatorTest.java:1313)
> May 04 13:52:18   at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry(AsyncWaitOperatorTest.java:1277)
> May 04 13:52:18   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> May 04 13:52:18   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> May 04 13:52:18   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> May 04 13:52:18   at java.lang.reflect.Method.invoke(Method.java:498)
> May 04 13:52:18   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> May 04 13:52:18   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> May 04 13:52:18   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> May 04 13:52:18   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> May 04 13:52:18   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> May 04 13:52:18   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> May 04 13:52:18   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> May 04 13:52:18   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> May 04 13:52:18   at java.lang.Thread.run(Thread.java:748)
>  {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48671=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9288



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34027] Introduces AsyncScalarFunction as a new UDF type [flink]

2024-01-11 Thread via GitHub


twalthr commented on code in PR #23975:
URL: https://github.com/apache/flink/pull/23975#discussion_r1449130065


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java:
##
@@ -67,6 +67,8 @@ public final class UserDefinedFunctionHelper {
 
 public static final String SCALAR_EVAL = "eval";
 
+public static final String ASYNC_SCALAR_EVAL = "eval";
+
 public static final String TABLE_EVAL = "eval";

Review Comment:
   That is definitely the best location as it is called right after 
registration in TableEnvironment. An additional check should be done after 
specialization in code generation.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java:
##
@@ -348,6 +350,53 @@ public static Optional> extractSimpleGeneric(
 }
 }
 
+/** Resolves a variable type while accepting a context for resolution. */
+public static Type resolveVariableWithClassContext(@Nullable Type 
contextType, Type type) {
+final List typeHierarchy;
+if (contextType != null) {
+typeHierarchy = collectTypeHierarchy(contextType);
+} else {
+typeHierarchy = Collections.emptyList();
+}
+if (!containsTypeVariable(type)) {

Review Comment:
   not sure if this was intended but with this check you exclude the 
`WildcardType`:
   ```
   eval(CompletableFuture s)
   ```
   which is correct because the if clause further down could not handle it, I 
would at least add a comment to the false in `containsTypeVariable`



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java:
##
@@ -521,6 +529,49 @@ private static void validateImplementationMethod(
 }
 }
 
+private static void validateAsyncImplementationMethod(
+Class clazz, String... 
methodNameOptions) {
+final Set nameSet = new 
HashSet<>(Arrays.asList(methodNameOptions));
+final List methods = getAllDeclaredMethods(clazz);
+for (Method method : methods) {
+if (!nameSet.contains(method.getName())) {
+continue;
+}
+if (!method.getReturnType().equals(Void.TYPE)) {
+throw new ValidationException(
+String.format(
+"Method '%s' of function class '%s' must be 
void.",
+method.getName(), clazz.getName()));
+}
+boolean foundParam = false;
+boolean genericParam = false;
+if (method.getParameterCount() >= 1) {
+Type firstParam = method.getGenericParameterTypes()[0];
+firstParam = 
ExtractionUtils.resolveVariableWithClassContext(clazz, firstParam);
+if (CompletableFuture.class.equals(firstParam)) {
+foundParam = true;
+} else if (firstParam instanceof ParameterizedType
+&& CompletableFuture.class.equals(
+((ParameterizedType) 
firstParam).getRawType())) {
+foundParam = true;
+genericParam = true;
+}
+}
+if (!foundParam) {
+throw new ValidationException(
+String.format(
+"Method '%s' of function class '%s' must have 
a first argument of type java.util.concurrent.CompletableFuture.",
+method.getName(), clazz.getName()));
+}
+if (!genericParam) {

Review Comment:
   this we shouldn't validate because if someone uses a type inference with 
multiple types (i.e. no FunctionMappingExtractor), they might use raw types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34020] Bump CI flink version on flink-connector-rabbitmq [flink-connector-rabbitmq]

2024-01-11 Thread via GitHub


snuyanzin commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-rabbitmq/pull/22#discussion_r1449084708


##
flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java:
##
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import 
org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper;

Review Comment:
   ```suggestion
   ```



##
flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java:
##
@@ -236,7 +239,10 @@ protected void setupQueue() throws IOException {
 @Override
 public void open(Configuration config) throws Exception {
 super.open(config);
+sessionIds = new ArrayList<>(64);
+sessionIdsPerSnapshot = new ArrayDeque<>();
 try {
+RichCombineToGroupCombineWrapper s;

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]

2024-01-11 Thread via GitHub


XComp commented on code in PR #23971:
URL: https://github.com/apache/flink/pull/23971#discussion_r1449060542


##
.github/actions/select_workflow_configs/action.yml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: "Selects Flink workflow configurations"
+description: "Loads the relevant workflow configuration from a given folder."
+inputs:
+  config-folder:
+description: "The directory in which the workflow configurations are 
located."
+required: true
+  profile:
+description: >-
+  The profile that's used to access the desired workflow configuration 
+  file. Any invalid profile (i.e. no file exists) will lead to using 
+  the default workflow configurations. Configuration files are expected 
+  to have the name format 'flink-workflow..json' and should be 
+  located in the folder that's specified through the 'config-folder' 
parameter.
+required: true
+outputs:
+  workflow-configurations:
+description: "A JSON representation of the workflow configurations."
+value: "${{ steps.workflow-selection.outputs.configs }}"
+runs:
+  using: "composite"
+  steps:
+- name: "Loads Flink workflow configurations"
+  id: workflow-selection
+  shell: bash
+  run: |
+fallback_profile="default"
+fallback_path="${{ inputs.config-folder 
}}/flink-workflow.${fallback_profile}.json"

Review Comment:
   > Why can't we use the configuration from the respective branch? checkout 
branch, use nightly config.
   
   Scheduled workflow runs will be triggered from `master`. The workflow 
configuration (the json file) is therefore also loaded from `master`. The 
workflow then triggers the checkout of individual branches.
   
   > Do we plan to have different workflow configs for a single branch?
   
   We have that right now: 1.18 doesn't include Java 21. But you're right: It's 
not a common case, I'd say.
   
   There are some reasons why I came up with this solution (having dedicated 
per-branch configuration files and one workflow file per branch):
   
   Initially, I went for a single workflow yaml that would trigger a fixed list 
of release branches through the matrix strategy.  We could have disabled the 
Java 21 workflow. But the flaw of this approach is that the workflow will be 
nested in three levels:
   1. branch
   2. profiles
   3. the individual jobs of flink-ci.template.yml
   
   GitHub Actions doesn't work well with three levels. The dropdown per profile 
would go away. Instead, we would have a dropdown per branch with all profiles 
and their individual jobs flattened out in a long list. Unfortunately, I don't 
have a workflow run saved anymore which would show what I mean. But It's 
essentially similar what we have with Azure Pipelines: A long list of 
individual jobs that are hard to parse by the human eye. 
   
   Having individual workflow yamls for each branch helped work around this 
issue with the cost of having kind of redundant code. I introduced the workflow 
configuration JSON files to reduce the redundancy if the workflow configuration 
to a minimum.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30252) Publish flink-shaded pom

2024-01-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-30252:
---
Fix Version/s: shaded-19.0
   (was: shaded-18.0)

> Publish flink-shaded pom
> 
>
> Key: FLINK-30252
> URL: https://issues.apache.org/jira/browse/FLINK-30252
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: shaded-19.0
>
>
> Publish a bom for flink-shaded, such that downtream projects just select the 
> flink-shaded version, with all other dependency versions being selected 
> automatically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]

2024-01-11 Thread via GitHub


XComp commented on code in PR #23971:
URL: https://github.com/apache/flink/pull/23971#discussion_r1449024195


##
.github/actions/select_workflow_configs/action.yml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: "Selects Flink workflow configurations"
+description: "Loads the relevant workflow configuration from a given folder."
+inputs:
+  config-folder:
+description: "The directory in which the workflow configurations are 
located."
+required: true
+  profile:
+description: >-
+  The profile that's used to access the desired workflow configuration 
+  file. Any invalid profile (i.e. no file exists) will lead to using 
+  the default workflow configurations. Configuration files are expected 
+  to have the name format 'flink-workflow..json' and should be 
+  located in the folder that's specified through the 'config-folder' 
parameter.
+required: true
+outputs:
+  workflow-configurations:
+description: "A JSON representation of the workflow configurations."
+value: "${{ steps.workflow-selection.outputs.configs }}"
+runs:
+  using: "composite"
+  steps:
+- name: "Loads Flink workflow configurations"
+  id: workflow-selection
+  shell: bash
+  run: |
+fallback_profile="default"
+fallback_path="${{ inputs.config-folder 
}}/flink-workflow.${fallback_profile}.json"

Review Comment:
   > a) nightly builds should use workflow_call; workflow_dispatch is for 
manual runs via the UI
   
   I'm not sure I understand your statement here. The default json file is 
essentially the workflow for the nightly build of master. Additionally, I 
enabled `workflow_dispatch` as an valid event trigger for this workflow as well 
so that anyone could trigger an extended workflow run (with the same 
configuration as the nightly build on `master`) on his/her branch.
   
   > shouldn't master explicitly point to the default config?
   
   Fine with me. We could rename the to `flink-workflow.master.json` and still 
use it as fallback for branches that do not have a dedicated configuration. If 
that's what you suggest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]

2024-01-11 Thread via GitHub


zentol commented on code in PR #23971:
URL: https://github.com/apache/flink/pull/23971#discussion_r1449020160


##
.github/actions/select_workflow_configs/action.yml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: "Selects Flink workflow configurations"
+description: "Loads the relevant workflow configuration from a given folder."
+inputs:
+  config-folder:
+description: "The directory in which the workflow configurations are 
located."
+required: true
+  profile:
+description: >-
+  The profile that's used to access the desired workflow configuration 
+  file. Any invalid profile (i.e. no file exists) will lead to using 
+  the default workflow configurations. Configuration files are expected 
+  to have the name format 'flink-workflow..json' and should be 
+  located in the folder that's specified through the 'config-folder' 
parameter.
+required: true
+outputs:
+  workflow-configurations:
+description: "A JSON representation of the workflow configurations."
+value: "${{ steps.workflow-selection.outputs.configs }}"
+runs:
+  using: "composite"
+  steps:
+- name: "Loads Flink workflow configurations"
+  id: workflow-selection
+  shell: bash
+  run: |
+fallback_profile="default"
+fallback_path="${{ inputs.config-folder 
}}/flink-workflow.${fallback_profile}.json"

Review Comment:
   > Why can't we use the configuration from the respective branch? checkout 
branch, use nightly config.
   
   I don't remember how exactly GHA handles things w.r.t. which branches 
workflow files are being honored...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]

2024-01-11 Thread via GitHub


zentol commented on code in PR #23971:
URL: https://github.com/apache/flink/pull/23971#discussion_r1449015851


##
.github/actions/select_workflow_configs/action.yml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: "Selects Flink workflow configurations"
+description: "Loads the relevant workflow configuration from a given folder."
+inputs:
+  config-folder:
+description: "The directory in which the workflow configurations are 
located."
+required: true
+  profile:
+description: >-
+  The profile that's used to access the desired workflow configuration 
+  file. Any invalid profile (i.e. no file exists) will lead to using 
+  the default workflow configurations. Configuration files are expected 
+  to have the name format 'flink-workflow..json' and should be 
+  located in the folder that's specified through the 'config-folder' 
parameter.
+required: true
+outputs:
+  workflow-configurations:
+description: "A JSON representation of the workflow configurations."
+value: "${{ steps.workflow-selection.outputs.configs }}"
+runs:
+  using: "composite"
+  steps:
+- name: "Loads Flink workflow configurations"
+  id: workflow-selection
+  shell: bash
+  run: |
+fallback_profile="default"
+fallback_path="${{ inputs.config-folder 
}}/flink-workflow.${fallback_profile}.json"

Review Comment:
   Why can't we use the configuration from the respective branch? checkout 
branch, use nightly config.
   
   Do we plan to have different workflow configs for a single branch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]

2024-01-11 Thread via GitHub


zentol commented on code in PR #23971:
URL: https://github.com/apache/flink/pull/23971#discussion_r1449014250


##
.github/actions/select_workflow_configs/action.yml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: "Selects Flink workflow configurations"
+description: "Loads the relevant workflow configuration from a given folder."
+inputs:
+  config-folder:
+description: "The directory in which the workflow configurations are 
located."
+required: true
+  profile:
+description: >-
+  The profile that's used to access the desired workflow configuration 
+  file. Any invalid profile (i.e. no file exists) will lead to using 
+  the default workflow configurations. Configuration files are expected 
+  to have the name format 'flink-workflow..json' and should be 
+  located in the folder that's specified through the 'config-folder' 
parameter.
+required: true
+outputs:
+  workflow-configurations:
+description: "A JSON representation of the workflow configurations."
+value: "${{ steps.workflow-selection.outputs.configs }}"
+runs:
+  using: "composite"
+  steps:
+- name: "Loads Flink workflow configurations"
+  id: workflow-selection
+  shell: bash
+  run: |
+fallback_profile="default"
+fallback_path="${{ inputs.config-folder 
}}/flink-workflow.${fallback_profile}.json"

Review Comment:
   a) nightly builds should use workflow_call; workflow_dispatch is for manual 
runs via the UI IIRC
   b) shouldn't master explicitly point to the default config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]

2024-01-11 Thread via GitHub


zentol commented on code in PR #23971:
URL: https://github.com/apache/flink/pull/23971#discussion_r1449015851


##
.github/actions/select_workflow_configs/action.yml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: "Selects Flink workflow configurations"
+description: "Loads the relevant workflow configuration from a given folder."
+inputs:
+  config-folder:
+description: "The directory in which the workflow configurations are 
located."
+required: true
+  profile:
+description: >-
+  The profile that's used to access the desired workflow configuration 
+  file. Any invalid profile (i.e. no file exists) will lead to using 
+  the default workflow configurations. Configuration files are expected 
+  to have the name format 'flink-workflow..json' and should be 
+  located in the folder that's specified through the 'config-folder' 
parameter.
+required: true
+outputs:
+  workflow-configurations:
+description: "A JSON representation of the workflow configurations."
+value: "${{ steps.workflow-selection.outputs.configs }}"
+runs:
+  using: "composite"
+  steps:
+- name: "Loads Flink workflow configurations"
+  id: workflow-selection
+  shell: bash
+  run: |
+fallback_profile="default"
+fallback_path="${{ inputs.config-folder 
}}/flink-workflow.${fallback_profile}.json"

Review Comment:
   Why can't we use the configuration from the respective branch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33915][ci] Adds nightly workflow for the master branch [flink]

2024-01-11 Thread via GitHub


zentol commented on code in PR #23971:
URL: https://github.com/apache/flink/pull/23971#discussion_r1449014250


##
.github/actions/select_workflow_configs/action.yml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: "Selects Flink workflow configurations"
+description: "Loads the relevant workflow configuration from a given folder."
+inputs:
+  config-folder:
+description: "The directory in which the workflow configurations are 
located."
+required: true
+  profile:
+description: >-
+  The profile that's used to access the desired workflow configuration 
+  file. Any invalid profile (i.e. no file exists) will lead to using 
+  the default workflow configurations. Configuration files are expected 
+  to have the name format 'flink-workflow..json' and should be 
+  located in the folder that's specified through the 'config-folder' 
parameter.
+required: true
+outputs:
+  workflow-configurations:
+description: "A JSON representation of the workflow configurations."
+value: "${{ steps.workflow-selection.outputs.configs }}"
+runs:
+  using: "composite"
+  steps:
+- name: "Loads Flink workflow configurations"
+  id: workflow-selection
+  shell: bash
+  run: |
+fallback_profile="default"
+fallback_path="${{ inputs.config-folder 
}}/flink-workflow.${fallback_profile}.json"

Review Comment:
   a) nightly builds should use workflow_call; workflow_dispatch is for manual 
runs via the UI
   b) shouldn't master explicitly point to the default config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31977) If scaling.effectiveness.detection.enabled is false, the call to the detectIneffectiveScaleUp() function is unnecessary

2024-01-11 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805650#comment-17805650
 ] 

Maximilian Michels commented on FLINK-31977:


I think this is related to FLINK-33993. The name of the configuration option is 
a bit misleading, as effectiveness detection is always on but scalings are only 
blocked when the option is set to {{true}}.

> If scaling.effectiveness.detection.enabled is false, the call to the 
> detectIneffectiveScaleUp() function is unnecessary
> ---
>
> Key: FLINK-31977
> URL: https://issues.apache.org/jira/browse/FLINK-31977
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: 1.17.0
>Reporter: Tan Kim
>Priority: Minor
>
> The code below is a function to detect inefficient scaleups.
> It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED 
> (scaling.effectiveness.detection.enabled) is true after all the necessary 
> computations for detection, but this is an unnecessary computation.
> {code:java}
> JobVertexScaler.java #175
> private boolean detectIneffectiveScaleUp(
> AbstractFlinkResource resource,
> JobVertexID vertex,
> Configuration conf,
> Map evaluatedMetrics,
> ScalingSummary lastSummary) {
> double lastProcRate = 
> lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 
> 22569.315633422066
> double lastExpectedProcRate =
> 
> lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 
> 37340.0
> var currentProcRate = 
> evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
> // To judge the effectiveness of the scale up operation we compute how 
> much of the expected
> // increase actually happened. For example if we expect a 100 increase in 
> proc rate and only
> // got an increase of 10 we only accomplished 10% of the desired 
> increase. If this number is
> // below the threshold, we mark the scaling ineffective.
> double expectedIncrease = lastExpectedProcRate - lastProcRate;
> double actualIncrease = currentProcRate - lastProcRate;
> boolean withinEffectiveThreshold =
> (actualIncrease / expectedIncrease)
> >= 
> conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
> if (withinEffectiveThreshold) {
> return false;
> }
> var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
> eventRecorder.triggerEvent(
> resource,
> EventRecorder.Type.Normal,
> EventRecorder.Reason.IneffectiveScaling,
> EventRecorder.Component.Operator,
> message);
> if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
> LOG.info(message);
> return true;
> } else {
> return false;
> }
> } {code}
> In the call to the detectIneffectiveScaleUp function, I would suggest 
> checking SCALING_EFFECTIVENESS_DETECTION_ENABLED first, as follows.
> {code:java}
> JobVertexScaler.java #150
> if (currentParallelism == lastSummary.getNewParallelism() && 
> lastSummary.isScaledUp()) {
>     if (scaledUp) {
>         
> if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
>             return detectIneffectiveScaleUp(resource, vertex, conf, 
> evaluatedMetrics, lastSummary);
>         } else {
>             return true;
>         }
>     } else {
>         return detectImmediateScaleDownAfterScaleUp(vertex, conf, 
> lastScalingTs);
>     }
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33694][gs-fs-hadoop] Support overriding GCS root URL [flink]

2024-01-11 Thread via GitHub


patricklucas commented on PR #23836:
URL: https://github.com/apache/flink/pull/23836#issuecomment-1887371839

   My suspicion is that there's an extremely small chance of that but I have no 
way to quantify it.
   
   In our case it was actually the other way around: we set this option and 
were reading and writing to GCS, however we were writing in a custom sink that 
used the GCS SDK directly, so everything worked as expected. It was only when 
we migrated to using the normal file sink that this issue arose.
   
   Does the Flink project have guidelines to approaching fixing this sort of 
issue, where users may depend on a definitely-broken behavior? If we cannot fix 
it directly (before Flink 2.0, I assume), do you have a suggestion for another 
workaround—perhaps just a new config key altogether?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]

2024-01-11 Thread via GitHub


mxm commented on code in PR #44:
URL: 
https://github.com/apache/flink-connector-kafka/pull/44#discussion_r1448982380


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/DynamicKafkaSource.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source;

Review Comment:
   Good to go. I can squash also on merge, as you prefer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled

2024-01-11 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805636#comment-17805636
 ] 

Matthias Pohl commented on FLINK-34036:
---

The test failures can be reproduced locally:
{code}
$ ./mvnw -Dflink.hadoop.version=3.2.3 -Phadoop3-tests,hive3 -pl 
flink-connectors/flink-connector-hive verify 
-Dtest=org.apache.flink.connectors.hive.HiveDialectQueryITCase
{code}

It could have something to do with the nightly builds only running on Alibaba 
machines for the Azure Pipeline builds. But we have seen this test already 
[succeeding on GHA 
workflows|https://github.com/XComp/flink/actions/runs/7278032529/job/19831862705#step:12:23467].

> Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with 
> Hadoop 3.1.3 enabled
> --
>
> Key: FLINK-34036
> URL: https://issues.apache.org/jira/browse/FLINK-34036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hadoop Compatibility, Connectors / Hive
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> The following {{HiveDialectQueryITCase}} tests fail consistently in the 
> FLINK-27075 GitHub Actions [master nightly 
> workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of 
> Flink (and also the [release-1.18 
> workflow|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]):
> * {{testInsertDirectory}}
> * {{testCastTimeStampToDecimal}}
> * {{testNullLiteralAsArgument}}
> {code}
> Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
> Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
>  -- Time elapsed: 0.069 s <<< ERROR!
> Jan 09 03:38:45 java.lang.NoSuchMethodError: 
> org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp;
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal
>  -- Time elapsed: 0.007 s <<< ERROR!
> Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with 
> identifier 'test-catalog.default.t1' does not exist.
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266)
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206)
> Jan 09 03:38:45   at 
> org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.663 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory 
> -- Time elapsed: 7.326 s <<< FAILURE!
> Jan 09 03:38:45 org.opentest4j.AssertionFailedError: 
> Jan 09 03:38:45 
> Jan 09 03:38:45 expected: "A:english=90#math=100#history=85"
> Jan 09 03:38:45  but was: "A:english=90math=100history=85"
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Jan 09 03:38:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}
> Additionally, the {{HiveITCase}} in the e2e test suite is affected:
> {code}
> Error: 05:20:20 05:20:20.949 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 0.106 s <<< FAILURE! -- in 
> org.apache.flink.tests.hive.HiveITCase
> Error: 

Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]

2024-01-11 Thread via GitHub


mxm commented on code in PR #741:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/741#discussion_r1448978956


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java:
##
@@ -59,4 +64,47 @@ private static ConfigOptions.OptionBuilder 
autoscalerStandaloneConfig(String key
 .withDeprecatedKeys("flinkClusterPort")
 .withDescription(
 "The port of flink cluster when the flink-cluster 
fetcher is used.");
+
+public static final ConfigOption STATE_STORE_TYPE =
+autoscalerStandaloneConfig("state-store.type")
+.enumType(StateStoreType.class)
+.defaultValue(StateStoreType.MEMORY)
+.withDescription("The autoscaler state store type.");

Review Comment:
   Got it! Forgot about that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]

2024-01-11 Thread via GitHub


mxm commented on code in PR #741:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/741#discussion_r1448978630


##
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java:
##
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_CREATE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_DELETE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_UPDATE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NOT_NEEDED;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.UP_TO_DATE;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The view of job state. */
+@NotThreadSafe
+public class JobStateView {
+
+/**
+ * The state of state type about the cache and database.
+ *
+ * Note: {@link #inLocally} and {@link #inDatabase} are only for 
understand, we don't use
+ * them.
+ */
+@SuppressWarnings("unused")
+enum State {
+
+/** State doesn't exist at database, and it's not used so far, so it's 
not needed. */
+NOT_NEEDED(false, false, false),
+/** State is only stored locally, not created in JDBC database yet. */
+NEEDS_CREATE(true, false, true),
+/** State exists in JDBC database but there are newer local changes. */
+NEEDS_UPDATE(true, true, true),
+/** State is stored locally and in database, and they are same. */
+UP_TO_DATE(true, true, false),
+/** State is stored in database, but it's deleted in local. */
+NEEDS_DELETE(false, true, true);

Review Comment:
   Got it. The code complexity for adding a new state backend makes we wonder 
though, if we need to rethink some of the building blocks for state stores. It 
would be good to move more of the logic into the core, to avoid duplicating / 
writing similar logic. I was initially more in favor of moving this out of the 
core but we need to see if that might be too complicated in the long run. This 
is out of scope for this PR though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33771] Add cluster capacity awareness to autoscaler [flink-kubernetes-operator]

2024-01-11 Thread via GitHub


1996fanrui commented on code in PR #751:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/751#discussion_r1448948315


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##
@@ -247,6 +254,69 @@ private boolean isJobUnderMemoryPressure(
 return false;
 }
 
+private boolean scalingWouldExceedClusterResources(
+EvaluatedMetrics evaluatedMetrics,
+Map scalingSummaries,
+Context ctx) {
+
+final double taskManagerCpu = ctx.getTaskManagerCpu();
+final double taskManagerMemory = ctx.getTaskManagerMemory();
+
+if (taskManagerCpu <= 0 || taskManagerMemory <= 0) {
+// We can't extract the requirements, we can't make any assumptions
+return false;
+}
+
+var globalMetrics = evaluatedMetrics.getGlobalMetrics();
+if (!(globalMetrics.containsKey(ScalingMetric.NUM_TASK_MANAGERS)
+&& 
globalMetrics.containsKey(ScalingMetric.NUM_TOTAL_TASK_SLOTS)
+&& 
globalMetrics.containsKey(ScalingMetric.NUM_TASK_SLOTS_USED))) {
+LOG.info("JM metrics not ready yet");
+return true;
+}
+
+var vertexMetrics = evaluatedMetrics.getVertexMetrics();
+
+int oldParallelismSum =
+vertexMetrics.values().stream()
+.map(map -> (int) 
map.get(ScalingMetric.PARALLELISM).getCurrent())
+.reduce(0, Integer::sum);
+
+Map newParallelisms = new HashMap<>();
+for (Map.Entry> entry :
+vertexMetrics.entrySet()) {
+JobVertexID jobVertexID = entry.getKey();
+ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID);
+if (scalingSummary != null) {
+newParallelisms.put(jobVertexID, 
scalingSummary.getNewParallelism());
+} else {
+newParallelisms.put(
+jobVertexID,
+(int) 
entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent());
+}
+}
+
+double numTaskSlotsUsed = 
globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
+
+final int numTaskSlotsAfterRescale;
+if (oldParallelismSum == numTaskSlotsUsed) {
+// Slot sharing activated
+numTaskSlotsAfterRescale = 
newParallelisms.values().stream().reduce(0, Integer::sum);

Review Comment:
   It could happen when resource is not enough and using adaptive scheduler, 
right?



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##
@@ -247,6 +254,69 @@ private boolean isJobUnderMemoryPressure(
 return false;
 }
 
+private boolean scalingWouldExceedClusterResources(
+EvaluatedMetrics evaluatedMetrics,
+Map scalingSummaries,
+Context ctx) {
+
+final double taskManagerCpu = ctx.getTaskManagerCpu();
+final double taskManagerMemory = ctx.getTaskManagerMemory();
+
+if (taskManagerCpu <= 0 || taskManagerMemory <= 0) {
+// We can't extract the requirements, we can't make any assumptions
+return false;
+}
+
+var globalMetrics = evaluatedMetrics.getGlobalMetrics();
+if (!(globalMetrics.containsKey(ScalingMetric.NUM_TASK_MANAGERS)
+&& 
globalMetrics.containsKey(ScalingMetric.NUM_TOTAL_TASK_SLOTS)
+&& 
globalMetrics.containsKey(ScalingMetric.NUM_TASK_SLOTS_USED))) {
+LOG.info("JM metrics not ready yet");
+return true;
+}
+
+var vertexMetrics = evaluatedMetrics.getVertexMetrics();
+
+int oldParallelismSum =
+vertexMetrics.values().stream()
+.map(map -> (int) 
map.get(ScalingMetric.PARALLELISM).getCurrent())
+.reduce(0, Integer::sum);
+
+Map newParallelisms = new HashMap<>();
+for (Map.Entry> entry :
+vertexMetrics.entrySet()) {
+JobVertexID jobVertexID = entry.getKey();
+ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID);
+if (scalingSummary != null) {
+newParallelisms.put(jobVertexID, 
scalingSummary.getNewParallelism());
+} else {
+newParallelisms.put(
+jobVertexID,
+(int) 
entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent());
+}
+}
+
+double numTaskSlotsUsed = 
globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
+
+final int numTaskSlotsAfterRescale;
+if (oldParallelismSum == numTaskSlotsUsed) {
+// Slot sharing activated
+numTaskSlotsAfterRescale = 
newParallelisms.values().stream().reduce(0, Integer::sum);
+} else {
+// Assuming slot sharing is not activated
+ 

Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]

2024-01-11 Thread via GitHub


1996fanrui commented on code in PR #741:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/741#discussion_r1448941925


##
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java:
##
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_CREATE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_DELETE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_UPDATE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NOT_NEEDED;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.UP_TO_DATE;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The view of job state. */
+@NotThreadSafe
+public class JobStateView {
+
+/**
+ * The state of state type about the cache and database.
+ *
+ * Note: {@link #inLocally} and {@link #inDatabase} are only for 
understand, we don't use
+ * them.
+ */
+@SuppressWarnings("unused")
+enum State {
+
+/** State doesn't exist at database, and it's not used so far, so it's 
not needed. */
+NOT_NEEDED(false, false, false),
+/** State is only stored locally, not created in JDBC database yet. */
+NEEDS_CREATE(true, false, true),
+/** State exists in JDBC database but there are newer local changes. */
+NEEDS_UPDATE(true, true, true),
+/** State is stored locally and in database, and they are same. */
+UP_TO_DATE(true, true, false),
+/** State is stored in database, but it's deleted in local. */
+NEEDS_DELETE(false, true, true);

Review Comment:
   Yeah, I refer to `ConfigMapView` during developing. I didn't share logic due 
to some reasons:
   
   1. The View and State are different
 - `ConfigMapView` stores `ConfigMap` for each job, so we create it 
directly.
 - `JobStateView` stores state for each stateType, it means that part of 
state types of current job are stored in the database, but the rest of the 
state types may have been created in the database.
 - That's why `JobStateView` have `NOT_NEEDED` and `NEEDS_DELETE`, but 
`ConfigMapView` doesn't.
   2. Based on the first reason, the `ConfigMapView` accesses the whole 
ConfigMap, `JobStateView`  accesses each row of database. It's hard to share 
code.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]

2024-01-11 Thread via GitHub


1996fanrui commented on code in PR #741:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/741#discussion_r1448928730


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java:
##
@@ -59,4 +64,47 @@ private static ConfigOptions.OptionBuilder 
autoscalerStandaloneConfig(String key
 .withDeprecatedKeys("flinkClusterPort")
 .withDescription(
 "The port of flink cluster when the flink-cluster 
fetcher is used.");
+
+public static final ConfigOption STATE_STORE_TYPE =
+autoscalerStandaloneConfig("state-store.type")
+.enumType(StateStoreType.class)
+.defaultValue(StateStoreType.MEMORY)
+.withDescription("The autoscaler state store type.");

Review Comment:
   Emmm, I didn't list due to flink-kubernetes-operator doc will list 
automaticlly.
   
   You can take a look the generated html, it has all enums of 
`StateStoreType`. 
   
   And all old options have `Possible values:` as well.
   
   https://github.com/apache/flink-kubernetes-operator/assets/38427477/ba2fa8b5-dc0e-4bb1-8f8f-d493c61a9bab;>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33771] Add cluster capacity awareness to autoscaler [flink-kubernetes-operator]

2024-01-11 Thread via GitHub


1996fanrui commented on code in PR #751:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/751#discussion_r1448858318


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/resources/ResourceCheck.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.resources;
+
+/** An interface for checking the available capacity of the underlying 
resources. */
+public interface ResourceCheck {
+
+/** Simulates scheduling the provided number of resources. */

Review Comment:
   It's better to add some comments to explain the return value.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/resources/ResourceCheck.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.resources;
+
+/** An interface for checking the available capacity of the underlying 
resources. */
+public interface ResourceCheck {
+
+/** Simulates scheduling the provided number of resources. */
+boolean trySchedule(
+int currentInstances,
+int newInstances,

Review Comment:
   ```suggestion
   int currentNumberOfTms,
   int newNumberOfTms,
   ```
   
   How about it? Instance is not specific.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##
@@ -103,6 +106,42 @@ protected Map 
queryAggregatedVertexMetrics(
 }
 }
 
+@Override
+@SneakyThrows
+protected Map queryJmMetrics(Context ctx) {
+Map metrics =
+Map.of(
+"numRegisteredTaskManagers", 
FlinkMetric.NUM_TASK_MANAGERS,
+"taskSlotsTotal", FlinkMetric.NUM_TASK_SLOTS_TOTAL,
+"taskSlotsAvailable", 
FlinkMetric.NUM_TASK_SLOTS_AVAILABLE);

Review Comment:
   All old metrics are queried and filter them via the `predicate` before. 
   
   These new metrics don't use the old solution. They are queried directly, and 
don't use the `predicate`. Could we abstract the FlinkMetric for them?
   
   We can define 2 types metrics: QueryableMetric and PredicatableMetric. WDYT?



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/resources/ResourceRequirements.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.resources;
+
+/** Interface to extract resource requirements. */
+public interface ResourceRequirements {
+
+/** The required number of CPU for TaskManager nodes, if available, 
otherwise 0. */
+default double getTaskManagerCpu() {

Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]

2024-01-11 Thread via GitHub


mxm commented on code in PR #741:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/741#discussion_r1448905911


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java:
##
@@ -59,4 +64,47 @@ private static ConfigOptions.OptionBuilder 
autoscalerStandaloneConfig(String key
 .withDeprecatedKeys("flinkClusterPort")
 .withDescription(
 "The port of flink cluster when the flink-cluster 
fetcher is used.");
+
+public static final ConfigOption STATE_STORE_TYPE =
+autoscalerStandaloneConfig("state-store.type")
+.enumType(StateStoreType.class)
+.defaultValue(StateStoreType.MEMORY)
+.withDescription("The autoscaler state store type.");

Review Comment:
   We should list the options here.



##
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java:
##
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_CREATE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_DELETE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_UPDATE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NOT_NEEDED;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.UP_TO_DATE;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The view of job state. */
+@NotThreadSafe
+public class JobStateView {
+
+/**
+ * The state of state type about the cache and database.
+ *
+ * Note: {@link #inLocally} and {@link #inDatabase} are only for 
understand, we don't use
+ * them.
+ */
+@SuppressWarnings("unused")
+enum State {
+
+/** State doesn't exist at database, and it's not used so far, so it's 
not needed. */
+NOT_NEEDED(false, false, false),
+/** State is only stored locally, not created in JDBC database yet. */
+NEEDS_CREATE(true, false, true),
+/** State exists in JDBC database but there are newer local changes. */
+NEEDS_UPDATE(true, true, true),
+/** State is stored locally and in database, and they are same. */
+UP_TO_DATE(true, true, false),
+/** State is stored in database, but it's deleted in local. */
+NEEDS_DELETE(false, true, true);

Review Comment:
   This looks conceptually very similar to `ConfigMapView`. I wonder if we 
could have shared some of that logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] DO_NOT_MERGE_YET[FLINK-33268][rest] Skip unknown fields in REST response deserialization [flink]

2024-01-11 Thread via GitHub


gaborgsomogyi commented on PR #23930:
URL: https://github.com/apache/flink/pull/23930#issuecomment-1887214969

   The approach looks fine though not fully executed the full pipeline so added 
an automated test to enforce `ErrorResponseBody` structure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33930] Canceled stop job status exception [flink-kubernetes-operator]

2024-01-11 Thread via GitHub


gyfora commented on code in PR #740:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/740#discussion_r1448906275


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -369,16 +370,17 @@ protected void cancelJob(
 deleteClusterDeployment(
 deployment.getMetadata(), deploymentStatus, 
conf, true);
 }
+
deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name());
 break;
 case LAST_STATE:
 deleteClusterDeployment(
 deployment.getMetadata(), deploymentStatus, conf, 
false);
+
deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name());
 break;
 default:
 throw new RuntimeException("Unsupported upgrade mode " + 
upgradeMode);
 }
 }
-deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name());

Review Comment:
   The problem with this and which also complicates the PR is that in some 
cases the job is already finished / failed or other terminal state in which 
case no action was taken and we should leave the status as is.
   
   As you said, it's a lot more simpler to simply set the state to "FINISHED" 
after cancellation :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >