[PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core package api-common [flink]
GOODBOY008 opened a new pull request, #23960: URL: https://github.com/apache/flink/pull/23960 [FLINK-25537] [JUnit5 Migration] Module: flink-core package api-common -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-31615) Fix some parts forgot to translate in "Table API" page of "Table API & SQL"
[ https://issues.apache.org/jira/browse/FLINK-31615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu resolved FLINK-31615. Resolution: Fixed Fixed in master(1.19): 7ded6089a5fc827b9d09ef490c8d1e4965a0c4bf > Fix some parts forgot to translate in "Table API" page of "Table API & SQL" > > > Key: FLINK-31615 > URL: https://issues.apache.org/jira/browse/FLINK-31615 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.18.0 >Reporter: Hang Ruan >Assignee: Hang Ruan >Priority: Minor > Labels: auto-deprioritized-minor, chinese-translation > Fix For: 1.19.0 > > > The query_state_warning in "Table API" page of "Table API & SQL" is still in > English. And some comments in codes are in English. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31615) Fix some parts forgot to translate in "Table API" page of "Table API & SQL"
[ https://issues.apache.org/jira/browse/FLINK-31615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-31615: --- Priority: Minor (was: Not a Priority) > Fix some parts forgot to translate in "Table API" page of "Table API & SQL" > > > Key: FLINK-31615 > URL: https://issues.apache.org/jira/browse/FLINK-31615 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.18.0 >Reporter: Hang Ruan >Assignee: Hang Ruan >Priority: Minor > Labels: auto-deprioritized-minor, chinese-translation > Fix For: 1.19.0 > > > The query_state_warning in "Table API" page of "Table API & SQL" is still in > English. And some comments in codes are in English. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31615) Fix some parts forgot to translate in "Table API" page of "Table API & SQL"
[ https://issues.apache.org/jira/browse/FLINK-31615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-31615: --- Fix Version/s: 1.19.0 > Fix some parts forgot to translate in "Table API" page of "Table API & SQL" > > > Key: FLINK-31615 > URL: https://issues.apache.org/jira/browse/FLINK-31615 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.18.0 >Reporter: Hang Ruan >Assignee: Hang Ruan >Priority: Not a Priority > Labels: auto-deprioritized-minor, chinese-translation > Fix For: 1.19.0 > > > The query_state_warning in "Table API" page of "Table API & SQL" is still in > English. And some comments in codes are in English. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31615) Fix some parts forgot to translate in "Table API" page of "Table API & SQL"
[ https://issues.apache.org/jira/browse/FLINK-31615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-31615: --- Affects Version/s: 1.18.0 > Fix some parts forgot to translate in "Table API" page of "Table API & SQL" > > > Key: FLINK-31615 > URL: https://issues.apache.org/jira/browse/FLINK-31615 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.18.0 >Reporter: Hang Ruan >Assignee: Hang Ruan >Priority: Not a Priority > Labels: auto-deprioritized-minor, chinese-translation > > The query_state_warning in "Table API" page of "Table API & SQL" is still in > English. And some comments in codes are in English. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31615) Fix some parts forgot to translate in "Table API" page of "Table API & SQL"
[ https://issues.apache.org/jira/browse/FLINK-31615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu reassigned FLINK-31615: -- Assignee: Hang Ruan > Fix some parts forgot to translate in "Table API" page of "Table API & SQL" > > > Key: FLINK-31615 > URL: https://issues.apache.org/jira/browse/FLINK-31615 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Hang Ruan >Assignee: Hang Ruan >Priority: Not a Priority > Labels: auto-deprioritized-minor, chinese-translation > > The query_state_warning in "Table API" page of "Table API & SQL" is still in > English. And some comments in codes are in English. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432336723 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool( this.totalResourceRequirements = ResourceCounter.empty(); this.fulfilledResourceRequirements = ResourceCounter.empty(); this.slotToRequirementProfileMappings = new HashMap<>(); +this.componentMainThreadExecutor = Preconditions.checkNotNull(componentMainThreadExecutor); +this.slotRequestMaxInterval = slotRequestMaxInterval; } @Override public void increaseResourceRequirementsBy(ResourceCounter increment) { -if (increment.isEmpty()) { +updateResourceRequirementsBy( +increment, +() -> totalResourceRequirements = totalResourceRequirements.add(increment)); Review Comment: @KarmaGYZ Thanks a lot for the comments. Please let me have a try on explaining it. My initial intention was to reduce code redundancy, as there are currently two calling functions, and the only difference between them is whether to reduce resource requests and increase resource requests. If the extraction of the common part is too broad, then I am very willing to improve it. for example: ``` @Override public void increaseResourceRequirementsBy(ResourceCounter increment) { if (increment.isEmpty()) { return; } totalResourceRequirements = totalResourceRequirements.add(increment); doDeclareResourceRequirements(); } @Override public void decreaseResourceRequirementsBy(ResourceCounter decrement) { if (decrement.isEmpty()) { return; } totalResourceRequirements = totalResourceRequirements.subtract(decrement); doDeclareResourceRequirements(); } private void doDeclareResourceRequirements() { if (slotRequestMaxInterval == null) { declareResourceRequirements(); return; } if (slotRequestMaxIntervalTimeoutFuture != null && !slotRequestMaxIntervalTimeoutFuture.isDone() && !slotRequestMaxIntervalTimeoutFuture.isCancelled()) { slotRequestMaxIntervalTimeoutFuture.cancel(true); } slotRequestMaxIntervalTimeoutFuture = componentMainThreadExecutor.schedule( this::declareResourceRequirements, slotRequestMaxInterval.toMilliseconds(), TimeUnit.MILLISECONDS); } ``` please let me know what's your opinion~ :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Flink-31615][doc-zh] Translates the query_state_warning in "Table API" page of "Table API & SQL" and fixes some content [flink]
leonardBang closed pull request #22272: [Flink-31615][doc-zh] Translates the query_state_warning in "Table API" page of "Table API & SQL" and fixes some content URL: https://github.com/apache/flink/pull/22272 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [Flink-31615][doc-zh] Translates the query_state_warning in "Table API" page of "Table API & SQL" and fixes some content [flink]
ruanhang1993 opened a new pull request, #22272: URL: https://github.com/apache/flink/pull/22272 ## What is the purpose of the change This pull request translates the query_state_warning in "Table API" page of "Table API & SQL" and fixes some content. ## Brief change log - Add query_state_warning_zh.html - Modify the "Table API" page of "Table API & SQL" ## Verifying this change This change is a document change without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Flink-31615][doc-zh] Translates the query_state_warning in "Table API" page of "Table API & SQL" and fixes some content [flink]
leonardBang closed pull request #22272: [Flink-31615][doc-zh] Translates the query_state_warning in "Table API" page of "Table API & SQL" and fixes some content URL: https://github.com/apache/flink/pull/22272 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33863][checkpoint] Fix restoring compressed operator state [flink]
ruibinx commented on code in PR #23938: URL: https://github.com/apache/flink/pull/23938#discussion_r1432355528 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java: ## @@ -168,9 +170,25 @@ public Void restore() throws Exception { } } +// sort state by offsets +List> entries = + stateHandle.getStateNameToPartitionOffsets().entrySet().stream() +.sorted( +Comparator.comparingLong( +entry -> { + OperatorStateHandle.StateMetaInfo +stateMetaInfo = entry.getValue(); +long[] offsets = stateMetaInfo.getOffsets(); +if (offsets == null || offsets.length == 0) { +return Long.MIN_VALUE; +} else { +return offsets[0]; +} Review Comment: Yeah I think so. The case we encountered should be a zero-length array. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432351722 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf( .createSlotPoolService( jid, createDeclarativeSlotPoolFactory( - jobMasterConfiguration.getConfiguration())); + jobMasterConfiguration.getConfiguration()), +null, Review Comment: hi, @KarmaGYZ Thank you very much for your comment. Did you mean that we didn't decide to pass the values of `slotRequestMaxInterval` `slotBatchAllocatable` here based on the configuration, but instead directly used hard coding to pass the values ? If so (IIUC), The reason for not doing so: - We want to change the parameter transfer logic uniformly when the default scheduler fully supports balanced scheduling. - The current hard coded default values will not break the original logical semantics Please correct me if i'm wrong. Any suggestion is appreciated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432351722 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf( .createSlotPoolService( jid, createDeclarativeSlotPoolFactory( - jobMasterConfiguration.getConfiguration())); + jobMasterConfiguration.getConfiguration()), +null, Review Comment: hi, @KarmaGYZ Thank you very much for your comment. Did you mean that we didn't decide to pass the values of `slotRequestMaxInterval` `slotBatchAllocatable` here based on the configuration, but instead directly used hard coding to pass the values ? If so (IIUC), The reason for not doing so: - We want to change the parameter transfer logic uniformly when the default scheduler fully supports balanced scheduling. -The current hard coded default values will not break the original logical semantics Please correct me if i'm wrong. Any suggestion is appreciated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432336723 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool( this.totalResourceRequirements = ResourceCounter.empty(); this.fulfilledResourceRequirements = ResourceCounter.empty(); this.slotToRequirementProfileMappings = new HashMap<>(); +this.componentMainThreadExecutor = Preconditions.checkNotNull(componentMainThreadExecutor); +this.slotRequestMaxInterval = slotRequestMaxInterval; } @Override public void increaseResourceRequirementsBy(ResourceCounter increment) { -if (increment.isEmpty()) { +updateResourceRequirementsBy( +increment, +() -> totalResourceRequirements = totalResourceRequirements.add(increment)); Review Comment: @KarmaGYZ Thanks a lot for the comments. Please let me have a try on explaining it. My initial intention was to reduce code redundancy, as there are currently two calling functions, and the only difference between them is whether to reduce resource requests and increase resource requests. If the extraction of the common part is too broad, then I am very willing to improve it. for example: ``` @Override public void increaseResourceRequirementsBy(ResourceCounter increment) { if (increment.isEmpty()) { return; } totalResourceRequirements = totalResourceRequirements.add(increment); updateResourceRequirementsBy(); } @Override public void decreaseResourceRequirementsBy(ResourceCounter decrement) { if (decrement.isEmpty()) { return; } totalResourceRequirements = totalResourceRequirements.subtract(decrement); updateResourceRequirementsBy(); } private void updateResourceRequirementsBy() { if (slotRequestMaxInterval == null) { declareResourceRequirements(); return; } if (slotRequestMaxIntervalTimeoutFuture != null && !slotRequestMaxIntervalTimeoutFuture.isDone() && !slotRequestMaxIntervalTimeoutFuture.isCancelled()) { slotRequestMaxIntervalTimeoutFuture.cancel(true); } slotRequestMaxIntervalTimeoutFuture = componentMainThreadExecutor.schedule( this::declareResourceRequirements, slotRequestMaxInterval.toMilliseconds(), TimeUnit.MILLISECONDS); } ``` please let me know what's your opinion~ :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33863][checkpoint] Fix restoring compressed operator state [flink]
fredia commented on PR #23938: URL: https://github.com/apache/flink/pull/23938#issuecomment-1863955270 @ruibinx Thanks for the clarification and update, LGTM, let's wait for CI green. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33863][checkpoint] Fix restoring compressed operator state [flink]
fredia commented on code in PR #23938: URL: https://github.com/apache/flink/pull/23938#discussion_r1432321260 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java: ## @@ -168,9 +170,25 @@ public Void restore() throws Exception { } } +// sort state by offsets +List> entries = + stateHandle.getStateNameToPartitionOffsets().entrySet().stream() +.sorted( +Comparator.comparingLong( +entry -> { + OperatorStateHandle.StateMetaInfo +stateMetaInfo = entry.getValue(); +long[] offsets = stateMetaInfo.getOffsets(); +if (offsets == null || offsets.length == 0) { +return Long.MIN_VALUE; +} else { +return offsets[0]; +} Review Comment: Thanks for the clarification. According to the [constructor](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java#L63C17-L63C17) of `StateMetaInfo` and the [serde](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L626) of `OperatorStateHandle`, I think the `offsets` can not be null. And let it stay as it is. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33863][checkpoint] Fix restoring compressed operator state [flink]
ruibinx commented on code in PR #23938: URL: https://github.com/apache/flink/pull/23938#discussion_r1432300092 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java: ## @@ -168,9 +170,25 @@ public Void restore() throws Exception { } } +// sort state by offsets Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33863][checkpoint] Fix restoring compressed operator state [flink]
ruibinx commented on code in PR #23938: URL: https://github.com/apache/flink/pull/23938#discussion_r1432299988 ## flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java: ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; + +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + Review Comment: Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33863][checkpoint] Fix restoring compressed operator state [flink]
ruibinx commented on code in PR #23938: URL: https://github.com/apache/flink/pull/23938#discussion_r1432299647 ## flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java: ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; + +import org.jetbrains.annotations.Nullable; Review Comment: Fixed. ## flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java: ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; + +import org.jetbrains.annotations.Nullable; +import org.junit.Test; Review Comment: Fixed, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Adding affinity rule. [flink-kubernetes-operator]
Anshul1128 opened a new pull request, #737: URL: https://github.com/apache/flink-kubernetes-operator/pull/737 ## What is the purpose of the change Improving the optimization of pod distribution and placement within a Kubernetes cluster is essential for enhancing resource utilization. Affinity proves to be a potent feature for this. ## Brief change log - *Describes pod anti-affinity scheduling rules (e.g. avoid putting this pod in the same node, zone, etc. as some other pod(s)).* - *Affinity rule is introduced to the flink-operator.yaml* - *Updated values.yaml with affinity value - Set the anti affinity type with default values* ## Verifying this change This change added tests and can be verified as follows: - *Manually verified the change by running a 4 node cluster with multiple replics scaling from 2-10* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (no) - Core observer or reconciler logic that is regularly executed: (yes) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25029) Hadoop Caller Context Setting In Flink
[ https://issues.apache.org/jira/browse/FLINK-25029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798831#comment-17798831 ] liang yu commented on FLINK-25029: -- Hi, [~liufangqi] [~dmvk] , are you still working on this issue? I faced the same problem recently and I want to solve this issue. > Hadoop Caller Context Setting In Flink > -- > > Key: FLINK-25029 > URL: https://issues.apache.org/jira/browse/FLINK-25029 > Project: Flink > Issue Type: Improvement > Components: FileSystems >Reporter: chenfengLiu >Assignee: chenfengLiu >Priority: Major > Labels: pull-request-available, stale-assigned > > For a given HDFS operation (e.g. delete file), it's very helpful to track > which upper level job issues it. The upper level callers may be specific > Oozie tasks, MR jobs, and hive queries. One scenario is that the namenode > (NN) is abused/spammed, the operator may want to know immediately which MR > job should be blamed so that she can kill it. To this end, the caller context > contains at least the application-dependent "tracking id". > The above is the main effect of the Caller Context. HDFS Client set Caller > Context, then name node get it in audit log to do some work. > Now the Spark and hive have the Caller Context to meet the HDFS Job Audit > requirement. > In my company, flink jobs often cause some problems for HDFS, so we did it > for preventing some cases. > If the feature is general enough. Should we support it, then I can submit a > PR for this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32721][table-planner] Fix SQL MAX aggregation function does no… [flink]
lsyldliu closed pull request #23671: [FLINK-32721][table-planner] Fix SQL MAX aggregation function does no… URL: https://github.com/apache/flink/pull/23671 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31664][table] Add ARRAY_INTERSECT function [flink]
snuyanzin closed pull request #23959: [FLINK-31664][table] Add ARRAY_INTERSECT function URL: https://github.com/apache/flink/pull/23959 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31664][table] Add ARRAY_INTERSECT function [flink]
snuyanzin commented on PR #23959: URL: https://github.com/apache/flink/pull/23959#issuecomment-1863913063 Please do not create double PRs for the same issue it is already the thirs one open https://github.com/apache/flink/pull/22320 https://github.com/apache/flink/pull/23171 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33881) [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
[ https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798825#comment-17798825 ] Zakelly Lan commented on FLINK-33881: - Thanks for clarification! It is definitely useful, but I'm not sure is it safe to do shallow copy instead of deep copy. > [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull > > > Key: FLINK-33881 > URL: https://issues.apache.org/jira/browse/FLINK-33881 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Jinzhong Li >Priority: Minor > Attachments: image-2023-12-19-21-25-21-446.png, > image-2023-12-19-21-26-43-518.png > > > In some scenarios, 'TtlListState#getUnexpiredOrNull -> > elementSerializer.copy(ttlValue)' consumes a lot of cpu resources. > !image-2023-12-19-21-25-21-446.png|width=529,height=119! > I found that for TtlListState#getUnexpiredOrNull, if none of the elements > have expired, it still needs to copy all the elements and update the whole > list/map in TtlIncrementalCleanup#runCleanup(); > !image-2023-12-19-21-26-43-518.png|width=505,height=266! > I think we could optimize TtlListState#getUnexpiredOrNull by: > 1)find the first expired element index in the list; > 2)If not found, return to the original list; > 3)If found, then constrct the unexpire list (puts the previous elements into > the list), and go through the subsequent elements, adding expired elements > into the list. > {code:java} > public List> getUnexpiredOrNull(@Nonnull List> > ttlValues) { > //... > int firstExpireIndex = -1; > for (int i = 0; i < ttlValues.size(); i++) { > if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) { > firstExpireIndex = i; > break; > } > } > if (firstExpireIndex == -1) { > return ttlValues; //return the original ttlValues > } > List> unexpired = new ArrayList<>(ttlValues.size()); > for (int i = 0; i < ttlValues.size(); i++) { > if (i < firstExpireIndex) { > unexpired.add(ttlValues.get(i)); > } > if (i > firstExpireIndex) { > if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) { > unexpired.add(ttlValues.get(i)); > } > } > } > // . > } {code} > *In this way, the extra iteration overhead is actually very very small, but > the benefit when there are no expired elements is significant.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33892) FLIP-383: Support Job Recovery for Batch Jobs
[ https://issues.apache.org/jira/browse/FLINK-33892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Wang reassigned FLINK-33892: -- Assignee: (was: Lijie Wang) > FLIP-383: Support Job Recovery for Batch Jobs > - > > Key: FLINK-33892 > URL: https://issues.apache.org/jira/browse/FLINK-33892 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Reporter: Lijie Wang >Priority: Major > > This is the umbrella ticket for > [FLIP-383|https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33892) FLIP-383: Support Job Recovery for Batch Jobs
[ https://issues.apache.org/jira/browse/FLINK-33892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Wang reassigned FLINK-33892: -- Assignee: Lijie Wang > FLIP-383: Support Job Recovery for Batch Jobs > - > > Key: FLINK-33892 > URL: https://issues.apache.org/jira/browse/FLINK-33892 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Reporter: Lijie Wang >Assignee: Lijie Wang >Priority: Major > > This is the umbrella ticket for > [FLIP-383|https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33892) FLIP-383: Support Job Recovery for Batch Jobs
[ https://issues.apache.org/jira/browse/FLINK-33892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Wang reassigned FLINK-33892: -- Assignee: Lijie Wang > FLIP-383: Support Job Recovery for Batch Jobs > - > > Key: FLINK-33892 > URL: https://issues.apache.org/jira/browse/FLINK-33892 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Reporter: Lijie Wang >Assignee: Lijie Wang >Priority: Major > > This is the umbrella ticket for > [FLIP-383|https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33892) FLIP-383: Support Job Recovery for Batch Jobs
Lijie Wang created FLINK-33892: -- Summary: FLIP-383: Support Job Recovery for Batch Jobs Key: FLINK-33892 URL: https://issues.apache.org/jira/browse/FLINK-33892 Project: Flink Issue Type: New Feature Components: Runtime / Coordination Reporter: Lijie Wang This is the umbrella ticket for [FLIP-383|https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31664][table] Add ARRAY_INTERSECT function [flink]
flinkbot commented on PR #23959: URL: https://github.com/apache/flink/pull/23959#issuecomment-1863879876 ## CI report: * 59ffefac3cbac5d32690c38d52d1d2e967f7652b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31664][table] Add ARRAY_INTERSECT function [flink]
liuyongvs closed pull request #22629: [FLINK-31664][table] Add ARRAY_INTERSECT function URL: https://github.com/apache/flink/pull/22629 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33865][runtime] Support setting `exponential-delay.attempts-before-reset-backoff` when it's set in Job Configuration [flink]
zhuzhurk commented on PR #23942: URL: https://github.com/apache/flink/pull/23942#issuecomment-1863877534 > Hi @zhuzhurk , thanks a lot for the comment in advance. The current PR is still missing some tests. I originally planned to develop all the tests and then find you and @JunRuiLee . > > > How about to add an internal method `RestartStrategies#exponentialDelayRestart(...)` which accepts the attempts param and modify `RestartStrategies#fromConfiguration(...)` accordingly? The change can be much simpler, which I think is better for the deprecated code path. @JunRuiLee is currently working on a common solution to pass all job configuration to JM and use it to create restartStrategy/stateBackend/etc. So I prefer to not solve it case by case which will lead to unnecessary conflicts. > > I also consider the solution you mentioned, which is really simple. The solution of current PR is using the `Configuration` to save the `restartStrategy options` inside of ExecutionConfig instead of `RestartStrategyConfiguration` object. > > For the convenience of comparison, I use SolutionA and SolutionB for them: > > * SolutionA: Using the `RestartStrategyConfiguration` object to save `restartStrategy options` > * SolutionB: Using the `Configuration` to save `restartStrategy options` > > I prefer SolutionB because some reasons: > > * `RestartStrategyConfiguration` and all sub-classes are the inner class of `RestartStrategies`. And `RestartStrategies` has been Deprecated. So we can consider `RestartStrategyConfiguration` and all sub-classes have been Depreacated. > > * SolutionA still uses dereacated classes, they cannot be removed it in 2.0. > * SolutionB doesn't use them, we can remove these classes in 2.0 directly. > * If we add other options in the future, SolutionA still needs to update these classes. > * It is more intuitive to use ConfigOptions to store all Options to Configuration. > > * We recommend users using `ConfigOptions` or key name to using the restart strategy. > * If flink developers using them in flink code directly, it's easy to maintain. > * The change of SolutionA is simpler than SolutionB, because current master branch code is SolutionA. > > * This PR(SolutionB) has some of code to compatible with `RestartStrategyConfiguration`, that's why this change is big. > * Actually, most of changes are unit tests, I need to test whether SolutionB compatible with `RestartStrategyConfiguration`. > * I believe our code will be clearer if we remove `RestartStrategies` and all classes related to `RestartStrategyConfiguration` in 2.0. > > After this PR, I believe SolutionB is very easy to add new options. > > WDYT? Looking forward to your suggestion, thanks! @JunRuiLee is working on a common solution which uses configuration to save restartStrategy options, as well as other config options, but the implementation is different, e.g. do not create restart strategy configuration first and later convert it to config option. This means that the benefits of solution A can be covered by the common solution, while solution B will add extra complexity and possible conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31664][table] Add ARRAY_INTERSECT function [flink]
liuyongvs commented on PR #23959: URL: https://github.com/apache/flink/pull/23959#issuecomment-1863875160 hi @xuyangzhong @lsyldliu do you have time to help review it? The implementation refers to the spark implementation https://github.com/apache/spark/blob/50b652e241f7e31b99303359ec53e26a8989a4f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L4030 the array_union i supports merged here https://github.com/apache/flink/pull/22483 the logical is same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-31664][table] Add ARRAY_INTERSECT function [flink]
liuyongvs opened a new pull request, #23959: URL: https://github.com/apache/flink/pull/23959 - What is the purpose of the change This is an implementation of ARRAY_INTERSECT - Brief change log ARRAY_INTERSECT for Table API and SQL ``` Returns an array of the elements in the intersection of array1 and array2, without duplicates. Syntax: array_intersect(array1, array2) Arguments: array: An ARRAY to be handled. Returns: An ARRAY. If any of the array is null, the function will return null. Examples: > SELECT array_intersect(array(1, 2, 3), array(1, 3, 5)); [1,3] ``` See also spark https://spark.apache.org/docs/latest/api/sql/index.html#array_intersect presto https://prestodb.io/docs/current/functions/array.html - Verifying this change This change added tests in CollectionFunctionsITCase. - Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): ( no) The public API, i.e., is any changed class annotated with @Public(Evolving): (yes ) The serializers: (no) The runtime per-record code paths (performance sensitive): ( no) Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no) The S3 file system connector: ( no) - Documentation Does this pull request introduce a new feature? (yes) If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32721] [planner] Support CharType for the MaxAggFunction [flink]
liuyongvs closed pull request #23943: [FLINK-32721] [planner] Support CharType for the MaxAggFunction URL: https://github.com/apache/flink/pull/23943 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32721] [planner] Support CharType for the MaxAggFunction [flink]
liuyongvs commented on PR #23943: URL: https://github.com/apache/flink/pull/23943#issuecomment-1863863632 i close it now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32721] [planner] Support CharType for the MaxAggFunction [flink]
liuyongvs commented on PR #23943: URL: https://github.com/apache/flink/pull/23943#issuecomment-1863863494 > @liuyongvs Thanks for your contribution, this will be fixed by #18375, this pr will be closed later. @lsyldliu thanks for your clarify, i also add comments for the pr you refer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25476][table-planner] support CHAR type in function MAX and MIN [flink]
liuyongvs commented on code in PR #18375: URL: https://github.com/apache/flink/pull/18375#discussion_r1432254893 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala: ## @@ -1331,6 +1331,81 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) } + @TestTemplate + def testMinMaxWithChar(): Unit = { +val data = + List( +rowOf(1, "a"), +rowOf(1, "b"), +rowOf(2, "d"), +rowOf(2, "c") + ) +val dataId = TestValuesTableFactory.registerData(data) +tEnv.executeSql(s""" + |CREATE TABLE src( + | `id` INT, + | `char` CHAR(1) + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$dataId' + |) + |""".stripMargin) + +val sql = + """ +|select `id`, count(*), min(`char`), max(`char`) from src group by `id` + """.stripMargin + +val sink = new TestingRetractSink() +tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink) +env.execute() + +val expected = List("1,2,a,b", "2,2,c,d") +assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testRetractMinMaxWithChar(): Unit = { +val data = + List( +changelogRow("+I", Int.box(1), "a"), +changelogRow("+I", Int.box(1), "b"), +changelogRow("+I", Int.box(1), "c"), +changelogRow("-D", Int.box(1), "c"), +changelogRow("-D", Int.box(1), "a"), +changelogRow("+I", Int.box(2), "a"), +changelogRow("+I", Int.box(2), "b"), +changelogRow("+I", Int.box(2), "c"), +changelogRow("-U", Int.box(2), "b"), +changelogRow("+U", Int.box(2), "d"), +changelogRow("-U", Int.box(2), "a"), +changelogRow("+U", Int.box(2), "b") + ) +val dataId = TestValuesTableFactory.registerData(data) +tEnv.executeSql(s""" + |CREATE TABLE src( + | `id` INT, + | `char` CHAR(1) + |) WITH ( + | 'connector' = 'values', Review Comment: could we also change test for CHAR(2), because the sql standard char is fixed length, can supports max length for n when char(n) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33891) Remove the obsolete SingleJobGraphStore
[ https://issues.apache.org/jira/browse/FLINK-33891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798819#comment-17798819 ] Zhanghao Chen commented on FLINK-33891: --- [~huweihua] Could you help take a look? > Remove the obsolete SingleJobGraphStore > --- > > Key: FLINK-33891 > URL: https://issues.apache.org/jira/browse/FLINK-33891 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Coordination >Reporter: Zhanghao Chen >Priority: Minor > Labels: pull-request-available > > SingleJobGraphStore was introduced a long time ago in FLIP-6. It is only used > in a test case in DefaultDispatcherRunnerITCase# > leaderChange_withBlockingJobManagerTermination_doesNotAffectNewLeader. We can > replace it with TestingJobGraphStore there and then safely remove the class. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33871][HIVE]Reduce getTable call for hive client and optimize graph generation time [flink]
hehuiyuan commented on code in PR #23945: URL: https://github.com/apache/flink/pull/23945#discussion_r1432229793 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java: ## @@ -310,6 +313,10 @@ public void open() throws CatalogException { "Configured default database %s doesn't exist in catalog %s.", getDefaultDatabase(), getName())); } + +if (cacheTable == null) { Review Comment: Hi , this has no special design and we can directly use cache without going through open method. Just like `client`, open method initialization and close method cleaning -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33871][HIVE]Reduce getTable call for hive client and optimize graph generation time [flink]
hehuiyuan commented on code in PR #23945: URL: https://github.com/apache/flink/pull/23945#discussion_r1432229793 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java: ## @@ -310,6 +313,10 @@ public void open() throws CatalogException { "Configured default database %s doesn't exist in catalog %s.", getDefaultDatabase(), getName())); } + +if (cacheTable == null) { Review Comment: Hi , this has no special design and we can directly use cache without going through open method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
TanYuxin-tyx commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1431327842 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import java.io.Serializable; + +/** A collection of subpartition indexes. */ +public interface ResultSubpartitionIndexSet extends Iterable, Serializable { Review Comment: Do we need the interface extended from Iterable, Is it more flexible to define a method in the interface instead? Note this is not a strong comment because this can also work, but I think it's worth discussing it carefully for better subsequential implementation. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexRange.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import java.util.Iterator; + +/** + * A {@link ResultSubpartitionIndexSet} represented as a range of indexes. The range is inclusive. + */ +public class ResultSubpartitionIndexRange extends IndexRange implements ResultSubpartitionIndexSet { Review Comment: Is this extended because of compatibility or other considerations? ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.util.TestBufferFactory; + +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link UnionResultSubpartitionView}. */ +public class UnionResultSubpartitionViewTest { Review Comment: Remove the `public` here. ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding
[jira] [Comment Edited] (FLINK-33534) PipelineOptions.PARALLELISM_OVERRIDES is not picked up from jar submission request
[ https://issues.apache.org/jira/browse/FLINK-33534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798400#comment-17798400 ] Weijie Guo edited comment on FLINK-33534 at 12/20/23 3:55 AM: -- master(1.19) via 4cc24c1dd17b0abe3c4372652c7ab88fedc7e478. release-1.18 via 6aa64ebb88045abef8b900b1ff3e15b171da5709. was (Author: weijie guo): master(1.19) via 4cc24c1dd17b0abe3c4372652c7ab88fedc7e478. > PipelineOptions.PARALLELISM_OVERRIDES is not picked up from jar submission > request > -- > > Key: FLINK-33534 > URL: https://issues.apache.org/jira/browse/FLINK-33534 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration, Runtime / REST >Affects Versions: 1.18.0, 1.17.1 >Reporter: Gyula Fora >Assignee: Yunfeng Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > PARALLELISM_OVERRIDES are currently only applied when they are part of the > JobManager / Cluster configuration. > When this config is provided as part of the JarRunRequestBody it is > completely ignored and does not take effect. > The main reason is that the dispatcher reads this value from it's own > configuration object and does not include the extra configs passed through > the rest request. > This is a blocker for supporting the autoscaler properly for FlinkSessionJobs > in the autoscaler -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-1.18][FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]
reswqa merged PR #23953: URL: https://github.com/apache/flink/pull/23953 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33863][checkpoint] Fix restoring compressed operator state [flink]
fredia commented on code in PR #23938: URL: https://github.com/apache/flink/pull/23938#discussion_r1432205407 ## flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java: ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; + +import org.jetbrains.annotations.Nullable; Review Comment: ```suggestion import javax.annotation.Nullable; ``` ## flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java: ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; + +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + Review Comment: Need some comments here, otherwise `checkstyle` would be fail. ## flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java: ## @@ -168,9 +170,25 @@ public Void restore() throws Exception { } } +// sort state by offsets +List> entries = + stateHandle.getStateNameToPartitionOffsets().entrySet().stream() +.sorted( +Comparator.comparingLong( +entry -> { + OperatorStateHandle.StateMetaInfo +stateMetaInfo = entry.getValue(); +long[] offsets = stateMetaInfo.getOffsets(); +if (offsets == null || offsets.length == 0) { +return Long.MIN_VALUE; +} else { +return offsets[0]; +} Review Comment: ```suggestion Preconditions.checkNotNull(offsets); Preconditions.checkState(offsets.length > 0); return offsets[0]; ``` I think we should throw exceptions as early as possible. ## flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java: ##
[jira] [Commented] (FLINK-33863) Compressed Operator state restore failed
[ https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798808#comment-17798808 ] Ruibin Xing commented on FLINK-33863: - [~Yanfei Lei] Hi, I will try to illustrate this problem with an example: {code:java} | Snappy Header 1 | State 1 | Snappy Header 2 | State 2 | Snappy Header 3 | State 3 | ^ ^ ^ ^ ^ offset a b c d e {code} This is the layout of a snapshot of compressed operator states. If we try to restore it in a sequence of State 1, 3, 2 instead of State 1, 2, 3: # We will start with offset a. # Snappy will verify the header 1 and everything will be ok. # We will seek to offset b(from the OperatorStateHandle) and restoring the states until we reach offset C. # Now we are restoring State 3, we will verify the snappy header 2 instead of 3. # Then we will seek to offset d and eventually reached offset e. # Then we are going to restoring State 2 and when trying to verify the header, an EOF error is thrown. So there are two problems if we don't sort states by offsets before restoring them: # In step 4, we try to restoring State 3, instead the header of State 2 is verified. # There is currently no simple way to seek to the correct header position. > Compressed Operator state restore failed > > > Key: FLINK-33863 > URL: https://issues.apache.org/jira/browse/FLINK-33863 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Ruibin Xing >Assignee: Ruibin Xing >Priority: Major > Labels: pull-request-available > > We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot > Compression and used multiple operator states and broadcast states in an > operator. When recovering Operator State from a Savepoint, the following > error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF > while reading stream header." > After researching, I believe the error is due to Flink 1.18.0's support for > Snapshot Compression on Operator State (see > https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a > Savepoint, SnappyFramedInputStream adds a header to the beginning of the > data. When recovering Operator State from a Savepoint, > SnappyFramedInputStream verifies the header from the beginning of the data. > Currently, when recovering Operator State with Snapshot Compression enabled, > the logic is as follows: > For each OperatorStateHandle: > 1. Verify if the current Savepoint stream's offset is the Snappy header. > 2. Seek to the state's start offset. > 3. Read the state's data and finally seek to the state's end offset. > (See: > [https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172] > ) > Furthermore, when there are multiple Operator States, they are not sorted > according to the Operator State's offset. The broadcast states will always be > written to the end of the savepoint. However when reading from savepoint, > there are no guarantee that broadcast states will be read at last. > Therefore, if the Operator States are out of order and the final offset is > recovered first, the Savepoint stream will be seeked to the end, resulting in > an EOF error. > I propose a solution: sort the OperatorStateHandle by offset and then recover > the Operator State in order. After testing, this approach resolves the issue. > I will submit a PR. This is my first time contributing code, so any help is > really appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33863) Compressed Operator state restore failed
[ https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798802#comment-17798802 ] Yanfei Lei edited comment on FLINK-33863 at 12/20/23 3:18 AM: -- Is it because once the file stream has reached EOF, we can‘t use this stream to build a SnappyFramedInputStream? was (Author: yanfei lei): Is it because once the file stream has reached EOF, we can‘t use seek() to go back to an earlier position? > Compressed Operator state restore failed > > > Key: FLINK-33863 > URL: https://issues.apache.org/jira/browse/FLINK-33863 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Ruibin Xing >Assignee: Ruibin Xing >Priority: Major > Labels: pull-request-available > > We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot > Compression and used multiple operator states and broadcast states in an > operator. When recovering Operator State from a Savepoint, the following > error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF > while reading stream header." > After researching, I believe the error is due to Flink 1.18.0's support for > Snapshot Compression on Operator State (see > https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a > Savepoint, SnappyFramedInputStream adds a header to the beginning of the > data. When recovering Operator State from a Savepoint, > SnappyFramedInputStream verifies the header from the beginning of the data. > Currently, when recovering Operator State with Snapshot Compression enabled, > the logic is as follows: > For each OperatorStateHandle: > 1. Verify if the current Savepoint stream's offset is the Snappy header. > 2. Seek to the state's start offset. > 3. Read the state's data and finally seek to the state's end offset. > (See: > [https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172] > ) > Furthermore, when there are multiple Operator States, they are not sorted > according to the Operator State's offset. The broadcast states will always be > written to the end of the savepoint. However when reading from savepoint, > there are no guarantee that broadcast states will be read at last. > Therefore, if the Operator States are out of order and the final offset is > recovered first, the Savepoint stream will be seeked to the end, resulting in > an EOF error. > I propose a solution: sort the OperatorStateHandle by offset and then recover > the Operator State in order. After testing, this approach resolves the issue. > I will submit a PR. This is my first time contributing code, so any help is > really appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432179061 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf( .createSlotPoolService( jid, createDeclarativeSlotPoolFactory( - jobMasterConfiguration.getConfiguration())); + jobMasterConfiguration.getConfiguration()), +null, Review Comment: Why not set it according to the configuration? ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool( this.totalResourceRequirements = ResourceCounter.empty(); this.fulfilledResourceRequirements = ResourceCounter.empty(); this.slotToRequirementProfileMappings = new HashMap<>(); +this.componentMainThreadExecutor = Preconditions.checkNotNull(componentMainThreadExecutor); +this.slotRequestMaxInterval = slotRequestMaxInterval; } @Override public void increaseResourceRequirementsBy(ResourceCounter increment) { -if (increment.isEmpty()) { +updateResourceRequirementsBy( +increment, +() -> totalResourceRequirements = totalResourceRequirements.add(increment)); +} + +private void updateResourceRequirementsBy( +@Nonnull ResourceCounter deltaResourceCount, @Nonnull Runnable runnable) { +if (deltaResourceCount.isEmpty()) { return; } -totalResourceRequirements = totalResourceRequirements.add(increment); -declareResourceRequirements(); -} +runnable.run(); -@Override -public void decreaseResourceRequirementsBy(ResourceCounter decrement) { -if (decrement.isEmpty()) { +if (slotRequestMaxInterval == null) { +declareResourceRequirements(); return; } -totalResourceRequirements = totalResourceRequirements.subtract(decrement); -declareResourceRequirements(); +if (slotRequestMaxIntervalTimeoutFuture != null +&& !slotRequestMaxIntervalTimeoutFuture.isDone() +&& !slotRequestMaxIntervalTimeoutFuture.isCancelled()) { +slotRequestMaxIntervalTimeoutFuture.cancel(true); +} +slotRequestMaxIntervalTimeoutFuture = +componentMainThreadExecutor.schedule( +this::declareResourceRequirements, +slotRequestMaxInterval.toMilliseconds(), +TimeUnit.MILLISECONDS); +} + +@Override +public void decreaseResourceRequirementsBy(ResourceCounter decrement) { +updateResourceRequirementsBy( +decrement, +() -> totalResourceRequirements = totalResourceRequirements.subtract(decrement)); Review Comment: ditto ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -336,7 +336,8 @@ public void onUnknownDeploymentsOf( createDeclarativeSlotPoolFactory( jobMasterConfiguration.getConfiguration()), null, -getMainThreadExecutor()); +getMainThreadExecutor(), +false); Review Comment: Why not set slotBatchAllocatable according to the configuration? ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool( this.totalResourceRequirements = ResourceCounter.empty(); this.fulfilledResourceRequirements = ResourceCounter.empty(); this.slotToRequirementProfileMappings = new HashMap<>(); +this.componentMainThreadExecutor = Preconditions.checkNotNull(componentMainThreadExecutor); +this.slotRequestMaxInterval = slotRequestMaxInterval; } @Override public void increaseResourceRequirementsBy(ResourceCounter increment) { -if (increment.isEmpty()) { +updateResourceRequirementsBy( +increment, +() -> totalResourceRequirements = totalResourceRequirements.add(increment)); Review Comment: Why not check whether the ResourceCounter is empty and modify the totalResourceRequirements right here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service,
[jira] [Assigned] (FLINK-33863) Compressed Operator state restore failed
[ https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanfei Lei reassigned FLINK-33863: -- Assignee: Ruibin Xing > Compressed Operator state restore failed > > > Key: FLINK-33863 > URL: https://issues.apache.org/jira/browse/FLINK-33863 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Ruibin Xing >Assignee: Ruibin Xing >Priority: Major > Labels: pull-request-available > > We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot > Compression and used multiple operator states and broadcast states in an > operator. When recovering Operator State from a Savepoint, the following > error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF > while reading stream header." > After researching, I believe the error is due to Flink 1.18.0's support for > Snapshot Compression on Operator State (see > https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a > Savepoint, SnappyFramedInputStream adds a header to the beginning of the > data. When recovering Operator State from a Savepoint, > SnappyFramedInputStream verifies the header from the beginning of the data. > Currently, when recovering Operator State with Snapshot Compression enabled, > the logic is as follows: > For each OperatorStateHandle: > 1. Verify if the current Savepoint stream's offset is the Snappy header. > 2. Seek to the state's start offset. > 3. Read the state's data and finally seek to the state's end offset. > (See: > [https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172] > ) > Furthermore, when there are multiple Operator States, they are not sorted > according to the Operator State's offset. The broadcast states will always be > written to the end of the savepoint. However when reading from savepoint, > there are no guarantee that broadcast states will be read at last. > Therefore, if the Operator States are out of order and the final offset is > recovered first, the Savepoint stream will be seeked to the end, resulting in > an EOF error. > I propose a solution: sort the OperatorStateHandle by offset and then recover > the Operator State in order. After testing, this approach resolves the issue. > I will submit a PR. This is my first time contributing code, so any help is > really appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33863) Compressed Operator state restore failed
[ https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798802#comment-17798802 ] Yanfei Lei commented on FLINK-33863: Is it because once the file stream has reached EOF, we can‘t use seek() to go back to an earlier position? > Compressed Operator state restore failed > > > Key: FLINK-33863 > URL: https://issues.apache.org/jira/browse/FLINK-33863 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Ruibin Xing >Priority: Major > Labels: pull-request-available > > We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot > Compression and used multiple operator states and broadcast states in an > operator. When recovering Operator State from a Savepoint, the following > error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF > while reading stream header." > After researching, I believe the error is due to Flink 1.18.0's support for > Snapshot Compression on Operator State (see > https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a > Savepoint, SnappyFramedInputStream adds a header to the beginning of the > data. When recovering Operator State from a Savepoint, > SnappyFramedInputStream verifies the header from the beginning of the data. > Currently, when recovering Operator State with Snapshot Compression enabled, > the logic is as follows: > For each OperatorStateHandle: > 1. Verify if the current Savepoint stream's offset is the Snappy header. > 2. Seek to the state's start offset. > 3. Read the state's data and finally seek to the state's end offset. > (See: > [https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172] > ) > Furthermore, when there are multiple Operator States, they are not sorted > according to the Operator State's offset. The broadcast states will always be > written to the end of the savepoint. However when reading from savepoint, > there are no guarantee that broadcast states will be read at last. > Therefore, if the Operator States are out of order and the final offset is > recovered first, the Savepoint stream will be seeked to the end, resulting in > an EOF error. > I propose a solution: sort the OperatorStateHandle by offset and then recover > the Operator State in order. After testing, this approach resolves the issue. > I will submit a PR. This is my first time contributing code, so any help is > really appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33865][runtime] Support setting `exponential-delay.attempts-before-reset-backoff` when it's set in Job Configuration [flink]
zhuzhurk commented on PR #23942: URL: https://github.com/apache/flink/pull/23942#issuecomment-1863767529 How about to add an internal method `RestartStrategies#exponentialDelayRestart(...)` which accepts the attempts param and modify `RestartStrategies#fromConfiguration(...)` accordingly? The change can be much simpler, which I think is better for the deprecated code path. @JunRuiLee is currently working on a common solution to pass all job configuration to JM and use it to create restartStrategy/stateBackend/etc. So I prefer to not solve it case by case which will lead to unnecessary conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33891] Remove the obsolete SingleJobGraphStore [flink]
flinkbot commented on PR #23958: URL: https://github.com/apache/flink/pull/23958#issuecomment-1863766624 ## CI report: * 4e85a45962578fc6b79ff2f1e37b41cba852e602 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33882) UT/IT for checkpointing statistics
[ https://issues.apache.org/jira/browse/FLINK-33882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798801#comment-17798801 ] Tongtong Zhu commented on FLINK-33882: -- [~jingge] Are you asking me to write the test section in this test directory of the Flink project? As shown in the screenshot below. !image-2023-12-20-10-42-00-237.png! > UT/IT for checkpointing statistics > -- > > Key: FLINK-33882 > URL: https://issues.apache.org/jira/browse/FLINK-33882 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Jing Ge >Priority: Minor > Attachments: image-2023-12-20-10-42-00-237.png > > > https://issues.apache.org/jira/browse/FLINK-33588 > has been manually tested by [~zhutong66] as follows: > 1. I will package the modified code, and the code modification will be done > in the jar package of flink-dist-xxx.jar. Replace the jar package with the > production Flink client. > 2. Submit the Flink SQL task in the production environment to Yarn in > application mode and check the Yarn logs > 3. Check for any further errors in the Yarn log. > 4. On the web interface of Flink web, check if the data displayed on the > checkpoint information statistics page is normal. > It would be great to write UT or IT for this change -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33882) UT/IT for checkpointing statistics
[ https://issues.apache.org/jira/browse/FLINK-33882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tongtong Zhu updated FLINK-33882: - Attachment: image-2023-12-20-10-42-00-237.png > UT/IT for checkpointing statistics > -- > > Key: FLINK-33882 > URL: https://issues.apache.org/jira/browse/FLINK-33882 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Jing Ge >Priority: Minor > Attachments: image-2023-12-20-10-42-00-237.png > > > https://issues.apache.org/jira/browse/FLINK-33588 > has been manually tested by [~zhutong66] as follows: > 1. I will package the modified code, and the code modification will be done > in the jar package of flink-dist-xxx.jar. Replace the jar package with the > production Flink client. > 2. Submit the Flink SQL task in the production environment to Yarn in > application mode and check the Yarn logs > 3. Check for any further errors in the Yarn log. > 4. On the web interface of Flink web, check if the data displayed on the > checkpoint information statistics page is normal. > It would be great to write UT or IT for this change -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33891) Remove the obsolete SingleJobGraphStore
[ https://issues.apache.org/jira/browse/FLINK-33891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33891: --- Labels: pull-request-available (was: ) > Remove the obsolete SingleJobGraphStore > --- > > Key: FLINK-33891 > URL: https://issues.apache.org/jira/browse/FLINK-33891 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Coordination >Reporter: Zhanghao Chen >Priority: Minor > Labels: pull-request-available > > SingleJobGraphStore was introduced a long time ago in FLIP-6. It is only used > in a test case in DefaultDispatcherRunnerITCase# > leaderChange_withBlockingJobManagerTermination_doesNotAffectNewLeader. We can > replace it with TestingJobGraphStore there and then safely remove the class. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33891] Remove the obsolete SingleJobGraphStore [flink]
X-czh opened a new pull request, #23958: URL: https://github.com/apache/flink/pull/23958 ## What is the purpose of the change SingleJobGraphStore was introduced a long time ago in FLIP-6. It is only used in a test case in DefaultDispatcherRunnerITCase# leaderChange_withBlockingJobManagerTermination_doesNotAffectNewLeader. We can replace it with TestingJobGraphStore there and then safely remove the class. ## Brief change log Remove the obsolete SingleJobGraphStore. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33891) Remove the obsolete SingleJobGraphStore
Zhanghao Chen created FLINK-33891: - Summary: Remove the obsolete SingleJobGraphStore Key: FLINK-33891 URL: https://issues.apache.org/jira/browse/FLINK-33891 Project: Flink Issue Type: Technical Debt Components: Runtime / Coordination Reporter: Zhanghao Chen SingleJobGraphStore was introduced a long time ago in FLIP-6. It is only used in a test case in DefaultDispatcherRunnerITCase# leaderChange_withBlockingJobManagerTermination_doesNotAffectNewLeader. We can replace it with TestingJobGraphStore there and then safely remove the class. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33881) [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
[ https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798799#comment-17798799 ] Jinzhong Li commented on FLINK-33881: - [~Zakelly] Thanks for your reply. I think this ticket is targeting a different optimization point than FLINK-30088. The way mentioned in ticket could avoid elements copy of the TtlListState if there is no expired data. But FLINK-30088 still need copy list elements, consuming lots of unnecessary cpu. > [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull > > > Key: FLINK-33881 > URL: https://issues.apache.org/jira/browse/FLINK-33881 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Jinzhong Li >Priority: Minor > Attachments: image-2023-12-19-21-25-21-446.png, > image-2023-12-19-21-26-43-518.png > > > In some scenarios, 'TtlListState#getUnexpiredOrNull -> > elementSerializer.copy(ttlValue)' consumes a lot of cpu resources. > !image-2023-12-19-21-25-21-446.png|width=529,height=119! > I found that for TtlListState#getUnexpiredOrNull, if none of the elements > have expired, it still needs to copy all the elements and update the whole > list/map in TtlIncrementalCleanup#runCleanup(); > !image-2023-12-19-21-26-43-518.png|width=505,height=266! > I think we could optimize TtlListState#getUnexpiredOrNull by: > 1)find the first expired element index in the list; > 2)If not found, return to the original list; > 3)If found, then constrct the unexpire list (puts the previous elements into > the list), and go through the subsequent elements, adding expired elements > into the list. > {code:java} > public List> getUnexpiredOrNull(@Nonnull List> > ttlValues) { > //... > int firstExpireIndex = -1; > for (int i = 0; i < ttlValues.size(); i++) { > if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) { > firstExpireIndex = i; > break; > } > } > if (firstExpireIndex == -1) { > return ttlValues; //return the original ttlValues > } > List> unexpired = new ArrayList<>(ttlValues.size()); > for (int i = 0; i < ttlValues.size(); i++) { > if (i < firstExpireIndex) { > unexpired.add(ttlValues.get(i)); > } > if (i > firstExpireIndex) { > if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) { > unexpired.add(ttlValues.get(i)); > } > } > } > // . > } {code} > *In this way, the extra iteration overhead is actually very very small, but > the benefit when there are no expired elements is significant.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33490][table-planner] Validate the column name conflicts in view when creating view [flink]
xuyangzhong commented on PR #23760: URL: https://github.com/apache/flink/pull/23760#issuecomment-1863752804 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-33877) CollectSinkFunctionTest.testConfiguredPortIsUsed fails due to BindException
[ https://issues.apache.org/jira/browse/FLINK-33877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun resolved FLINK-33877. Fix Version/s: 1.19.0 Resolution: Fixed > CollectSinkFunctionTest.testConfiguredPortIsUsed fails due to BindException > --- > > Key: FLINK-33877 > URL: https://issues.apache.org/jira/browse/FLINK-33877 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.19.0 >Reporter: Jiabao Sun >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.19.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55646=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9482 > {noformat} > Dec 18 17:49:57 17:49:57.241 [ERROR] > org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed > -- Time elapsed: 0.021 s <<< ERROR! > Dec 18 17:49:57 java.net.BindException: Address already in use (Bind failed) > Dec 18 17:49:57 at java.net.PlainSocketImpl.socketBind(Native Method) > Dec 18 17:49:57 at > java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387) > Dec 18 17:49:57 at java.net.ServerSocket.bind(ServerSocket.java:390) > Dec 18 17:49:57 at java.net.ServerSocket.(ServerSocket.java:252) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:375) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:362) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.open(CollectSinkFunction.java:252) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper.openFunction(CollectSinkFunctionTestWrapper.java:103) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed(CollectSinkFunctionTest.java:138) > Dec 18 17:49:57 at java.lang.reflect.Method.invoke(Method.java:498) > Dec 18 17:49:57 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798797#comment-17798797 ] xiaogang zhou commented on FLINK-33728: --- [~mapohl] Hi Matthias , would you please let me know what additional test is needed to prove my proposal can move forward. > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33877) CollectSinkFunctionTest.testConfiguredPortIsUsed fails due to BindException
[ https://issues.apache.org/jira/browse/FLINK-33877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798796#comment-17798796 ] Jiabao Sun commented on FLINK-33877: Merged into master via 409f44660eb434a57237af2690c9e4b7f9679442 7cae20f22a24403210ca5d6addf83571fcf46843 > CollectSinkFunctionTest.testConfiguredPortIsUsed fails due to BindException > --- > > Key: FLINK-33877 > URL: https://issues.apache.org/jira/browse/FLINK-33877 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.19.0 >Reporter: Jiabao Sun >Priority: Critical > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55646=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9482 > {noformat} > Dec 18 17:49:57 17:49:57.241 [ERROR] > org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed > -- Time elapsed: 0.021 s <<< ERROR! > Dec 18 17:49:57 java.net.BindException: Address already in use (Bind failed) > Dec 18 17:49:57 at java.net.PlainSocketImpl.socketBind(Native Method) > Dec 18 17:49:57 at > java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387) > Dec 18 17:49:57 at java.net.ServerSocket.bind(ServerSocket.java:390) > Dec 18 17:49:57 at java.net.ServerSocket.(ServerSocket.java:252) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:375) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:362) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.open(CollectSinkFunction.java:252) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper.openFunction(CollectSinkFunctionTestWrapper.java:103) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed(CollectSinkFunctionTest.java:138) > Dec 18 17:49:57 at java.lang.reflect.Method.invoke(Method.java:498) > Dec 18 17:49:57 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
[ https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798795#comment-17798795 ] Junrui Li commented on FLINK-27756: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55673=logs=2e8cb2f7-b2d3-5c62-9c05-cd756d33a819=2dd510a3-5041-5201-6dc3-54d310f68906 > Fix intermittently failing test in > AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds > -- > > Key: FLINK-27756 > URL: https://issues.apache.org/jira/browse/FLINK-27756 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.15.0, 1.17.0, 1.19.0 >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Critical > Labels: test-stability > Fix For: 1.16.0 > > > h2. Motivation > - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of > {{AsyncSinkWriterTest}} has been reported to fail intermittently on build > pipeline causing blocking of new changes. > - Reporting build is [linked > |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making
[ https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798787#comment-17798787 ] Jiang Xin commented on FLINK-33856: --- Sounds reasonable. Have you already started working on this? > Add metrics to monitor the interaction performance between task and external > storage system in the process of checkpoint making > --- > > Key: FLINK-33856 > URL: https://issues.apache.org/jira/browse/FLINK-33856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 >Reporter: Jufang He >Priority: Major > > When Flink makes a checkpoint, the interaction performance with the external > file system has a great impact on the overall time-consuming. Therefore, it > is easy to observe the bottleneck point by adding performance indicators when > the task interacts with the external file storage system. These include: the > rate of file write , the latency to write the file, the latency to close the > file. > In flink side add the above metrics has the following advantages: convenient > statistical different task E2E time-consuming; do not need to distinguish the > type of external storage system, can be unified in the > FsCheckpointStreamFactory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33890) Determine the initial status before receiving the first RecordAttributes
Xuannan Su created FLINK-33890: -- Summary: Determine the initial status before receiving the first RecordAttributes Key: FLINK-33890 URL: https://issues.apache.org/jira/browse/FLINK-33890 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: Xuannan Su Currently, all the operators are initialized with non-backlog mode. Ideally, we should determine the initial status before receiving the first {{RecordAttributes}} so that we don't have to initialize the operator in non-backlog mode and immediately switch to backlog mode before processing any records. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33810][Runtime] Propagate RecordAttributes that contains isProcessingBacklog status [flink]
Sxnan commented on code in PR #23919: URL: https://github.com/apache/flink/pull/23919#discussion_r1432093951 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordAttributesCombiner.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; + +/** RecordAttributesValve combine RecordAttributes from different input channels. */ +public class RecordAttributesCombiner { + +private static final Logger LOG = LoggerFactory.getLogger(RecordAttributesCombiner.class); + +private final int numInputChannels; +private final RecordAttributes[] allChannelRecordAttributes; +private int nonBacklogChannelsCnt = 0; +private RecordAttributes lastOutputAttributes = null; + +public RecordAttributesCombiner(int numInputChannels) { +this.numInputChannels = numInputChannels; +this.allChannelRecordAttributes = new RecordAttributes[numInputChannels]; +} + +public void inputRecordAttributes( +RecordAttributes recordAttributes, int channelIdx, DataOutput output) +throws Exception { +LOG.debug("RecordAttributes: {} from channel idx: {}", recordAttributes, channelIdx); +RecordAttributes lastChannelRecordAttributes = allChannelRecordAttributes[channelIdx]; +allChannelRecordAttributes[channelIdx] = recordAttributes; + +// skip if the input RecordAttributes of the input channel is the same as the last. +if (recordAttributes.equals(lastChannelRecordAttributes)) { +return; +} + +final RecordAttributesBuilder builder = +new RecordAttributesBuilder(Collections.emptyList()); + +builder.setBacklog(combineIsBacklog(lastChannelRecordAttributes, recordAttributes)); + +final RecordAttributes outputAttribute = builder.build(); +if (!outputAttribute.equals(lastOutputAttributes)) { +output.emitRecordAttributes(outputAttribute); +lastOutputAttributes = outputAttribute; +} +} + +/** If any of the input channels is backlog, the combined RecordAttributes is backlog. */ +private boolean combineIsBacklog( +RecordAttributes lastRecordAttributes, RecordAttributes recordAttributes) { +if (lastRecordAttributes == null +|| lastRecordAttributes.isBacklog() != recordAttributes.isBacklog()) { +if (lastRecordAttributes != null && recordAttributes.isBacklog()) { +nonBacklogChannelsCnt -= 1; +} +if (!recordAttributes.isBacklog()) { +nonBacklogChannelsCnt += 1; +} +} + +return nonBacklogChannelsCnt < numInputChannels; +} Review Comment: The [JIRA ticket](https://issues.apache.org/jira/browse/FLINK-33890) is created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33883][jdbc] Bump CI flink version on flink-connector-jdbc to support flink 1.19 [flink-connector-jdbc]
Jiabao-Sun commented on code in PR #85: URL: https://github.com/apache/flink-connector-jdbc/pull/85#discussion_r1432091019 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -64,4 +69,14 @@ public void testFilterPushdown() { util.verifyExecPlan( "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } + +// A workaround to get the test method name for flink versions not completely migrated to JUnit5 +public TestName name() { Review Comment: Yes, I modified the comments to make them more clear. @snuyanzin, could you help take a look again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33611] [flink-protobuf] Support Large Protobuf Schemas [flink]
sharath1709 commented on PR #23937: URL: https://github.com/apache/flink/pull/23937#issuecomment-1863633658 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33888] Add release flink-1.18.1 [flink-web]
JingGe opened a new pull request, #706: URL: https://github.com/apache/flink-web/pull/706 Add the content of release Flink-1.18.1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33888) Propose a pull request for website updates
[ https://issues.apache.org/jira/browse/FLINK-33888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge updated FLINK-33888: Description: The final step of building the candidate is to propose a website pull request containing the following changes: * update docs/data/flink.yml * ** Add a new major version or update minor version as required * update docs/data/release_archive.yml * add a blog post announcing the release in {{docs/content/posts}} (!) Don’t merge the PRs before finalizing the release. h3. Expectations * Website pull request proposed to list the [release|http://flink.apache.org/downloads.html] was: The final step of building the candidate is to propose a website pull request containing the following changes: * update docs/data/flink.yml ** Add a new major version or update minor version as required * update docs/data/release_archive.yml * update version references in quickstarts ({{{}q/{}}} directory) as required (outdated?) * add a blog post announcing the release in {{docs/content/posts}} (!) Don’t merge the PRs before finalizing the release. h3. Expectations * Website pull request proposed to list the [release|http://flink.apache.org/downloads.html] > Propose a pull request for website updates > -- > > Key: FLINK-33888 > URL: https://issues.apache.org/jira/browse/FLINK-33888 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.18.0 >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > > The final step of building the candidate is to propose a website pull request > containing the following changes: > * update docs/data/flink.yml > * > ** Add a new major version or update minor version as required > * update docs/data/release_archive.yml > * add a blog post announcing the release in {{docs/content/posts}} > (!) Don’t merge the PRs before finalizing the release. > > > h3. Expectations > * Website pull request proposed to list the > [release|http://flink.apache.org/downloads.html] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33837) Vote on the release candidate
[ https://issues.apache.org/jira/browse/FLINK-33837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge closed FLINK-33837. --- Release Note: canceled Resolution: Won't Fix > Vote on the release candidate > - > > Key: FLINK-33837 > URL: https://issues.apache.org/jira/browse/FLINK-33837 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.18.0 >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Major > > Once you have built and individually reviewed the release candidate, please > share it for the community-wide review. Please review foundation-wide [voting > guidelines|http://www.apache.org/foundation/voting.html] for more information. > Start the review-and-vote thread on the dev@ mailing list. Here’s an email > template; please adjust as you see fit. > {quote}From: Release Manager > To: d...@flink.apache.org > Subject: [VOTE] Release 1.2.3, release candidate #3 > Hi everyone, > Please review and vote on the release candidate #3 for the version 1.2.3, as > follows: > [ ] +1, Approve the release > [ ] -1, Do not approve the release (please provide specific comments) > The complete staging area is available for your review, which includes: > * JIRA release notes [1], > * the official Apache source release and binary convenience releases to be > deployed to dist.apache.org [2], which are signed with the key with > fingerprint [3], > * all artifacts to be deployed to the Maven Central Repository [4], > * source code tag "release-1.2.3-rc3" [5], > * website pull request listing the new release and adding announcement blog > post [6]. > The vote will be open for at least 72 hours. It is adopted by majority > approval, with at least 3 PMC affirmative votes. > Thanks, > Release Manager > [1] link > [2] link > [3] [https://dist.apache.org/repos/dist/release/flink/KEYS] > [4] link > [5] link > [6] link > {quote} > *If there are any issues found in the release candidate, reply on the vote > thread to cancel the vote.* There’s no need to wait 72 hours. Proceed to the > Fix Issues step below and address the problem. However, some issues don’t > require cancellation. For example, if an issue is found in the website pull > request, just correct it on the spot and the vote can continue as-is. > For cancelling a release, the release manager needs to send an email to the > release candidate thread, stating that the release candidate is officially > cancelled. Next, all artifacts created specifically for the RC in the > previous steps need to be removed: > * Delete the staging repository in Nexus > * Remove the source / binary RC files from dist.apache.org > * Delete the source code tag in git > *If there are no issues, reply on the vote thread to close the voting.* Then, > tally the votes in a separate email. Here’s an email template; please adjust > as you see fit. > {quote}From: Release Manager > To: d...@flink.apache.org > Subject: [RESULT] [VOTE] Release 1.2.3, release candidate #3 > I'm happy to announce that we have unanimously approved this release. > There are XXX approving votes, XXX of which are binding: > * approver 1 > * approver 2 > * approver 3 > * approver 4 > There are no disapproving votes. > Thanks everyone! > {quote} > > > h3. Expectations > * Community votes to release the proposed candidate, with at least three > approving PMC votes > Any issues that are raised till the vote is over should be either resolved or > moved into the next release (if applicable). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33833) Build Release Candidate: 1.18.1-rc1
[ https://issues.apache.org/jira/browse/FLINK-33833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge closed FLINK-33833. --- Release Note: canceled Resolution: Fixed > Build Release Candidate: 1.18.1-rc1 > --- > > Key: FLINK-33833 > URL: https://issues.apache.org/jira/browse/FLINK-33833 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.18.0 >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Major > > The core of the release process is the build-vote-fix cycle. Each cycle > produces one release candidate. The Release Manager repeats this cycle until > the community approves one release candidate, which is then finalized. > h4. Prerequisites > Set up a few environment variables to simplify Maven commands that follow. > This identifies the release candidate being built. Start with {{RC_NUM}} > equal to 1 and increment it for each candidate: > {code} > RC_NUM="1" > TAG="release-${RELEASE_VERSION}-rc${RC_NUM}" > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33836) Propose a pull request for website updates
[ https://issues.apache.org/jira/browse/FLINK-33836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge closed FLINK-33836. --- Release Note: canceled Resolution: Won't Fix > Propose a pull request for website updates > -- > > Key: FLINK-33836 > URL: https://issues.apache.org/jira/browse/FLINK-33836 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.18.0 >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > > The final step of building the candidate is to propose a website pull request > containing the following changes: > * update docs/data/flink.yml > ** Add a new major version or update minor version as required > * update docs/data/release_archive.yml > * update version references in quickstarts ({{{}q/{}}} directory) as > required (outdated?) > * add a blog post announcing the release in {{docs/content/posts}} > (!) Don’t merge the PRs before finalizing the release. > > > h3. Expectations > * Website pull request proposed to list the > [release|http://flink.apache.org/downloads.html] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33842) Publish the Dockerfiles for the new release
[ https://issues.apache.org/jira/browse/FLINK-33842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge reassigned FLINK-33842: --- Assignee: Jing Ge (was: Matthias Pohl) > Publish the Dockerfiles for the new release > --- > > Key: FLINK-33842 > URL: https://issues.apache.org/jira/browse/FLINK-33842 > Project: Flink > Issue Type: Sub-task >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > > Note: the official Dockerfiles fetch the binary distribution of the target > Flink version from an Apache mirror. After publishing the binary release > artifacts, mirrors can take some hours to start serving the new artifacts, so > you may want to wait to do this step until you are ready to continue with the > "Promote the release" steps in the follow-up Jira. > Follow the [release instructions in the flink-docker > repo|https://github.com/apache/flink-docker#release-workflow] to build the > new Dockerfiles and send an updated manifest to Docker Hub so the new images > are built and published. > > > h3. Expectations > * Dockerfiles in [flink-docker|https://github.com/apache/flink-docker] > updated for the new Flink release and pull request opened on the Docker > official-images with an updated manifest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33841) Create Git tag and mark version as released in Jira
[ https://issues.apache.org/jira/browse/FLINK-33841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge reassigned FLINK-33841: --- Assignee: Jing Ge (was: Qingsheng Ren) > Create Git tag and mark version as released in Jira > --- > > Key: FLINK-33841 > URL: https://issues.apache.org/jira/browse/FLINK-33841 > Project: Flink > Issue Type: Sub-task >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Major > > Create and push a new Git tag for the released version by copying the tag for > the final release candidate, as follows: > {code:java} > $ git tag -s "release-${RELEASE_VERSION}" refs/tags/${TAG}^{} -m "Release > Flink ${RELEASE_VERSION}" > $ git push refs/tags/release-${RELEASE_VERSION} > {code} > In JIRA, inside [version > management|https://issues.apache.org/jira/plugins/servlet/project-config/FLINK/versions], > hover over the current release and a settings menu will appear. Click > Release, and select today’s date. > (Note: Only PMC members have access to the project administration. If you do > not have access, ask on the mailing list for assistance.) > If PRs have been merged to the release branch after the the last release > candidate was tagged, make sure that the corresponding Jira tickets have the > correct Fix Version set. > > > h3. Expectations > * Release tagged in the source code repository > * Release version finalized in JIRA. (Note: Not all committers have > administrator access to JIRA. If you end up getting permissions errors ask on > the mailing list for assistance) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33840) Deploy artifacts to Maven Central Repository
[ https://issues.apache.org/jira/browse/FLINK-33840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge reassigned FLINK-33840: --- Assignee: Jing Ge (was: Qingsheng Ren) > Deploy artifacts to Maven Central Repository > > > Key: FLINK-33840 > URL: https://issues.apache.org/jira/browse/FLINK-33840 > Project: Flink > Issue Type: Sub-task >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Major > > Use the [Apache Nexus repository|https://repository.apache.org/] to release > the staged binary artifacts to the Maven Central repository. In the Staging > Repositories section, find the relevant release candidate orgapacheflink-XXX > entry and click Release. Drop all other release candidates that are not being > released. > h3. Deploy source and binary releases to dist.apache.org > Copy the source and binary releases from the dev repository to the release > repository at [dist.apache.org|http://dist.apache.org/] using Subversion. > {code:java} > $ svn move -m "Release Flink ${RELEASE_VERSION}" > https://dist.apache.org/repos/dist/dev/flink/flink-${RELEASE_VERSION}-rc${RC_NUM} > https://dist.apache.org/repos/dist/release/flink/flink-${RELEASE_VERSION} > {code} > (Note: Only PMC members have access to the release repository. If you do not > have access, ask on the mailing list for assistance.) > h3. Remove old release candidates from > [dist.apache.org|http://dist.apache.org/] > Remove the old release candidates from > [https://dist.apache.org/repos/dist/dev/flink] using Subversion. > {code:java} > $ svn checkout https://dist.apache.org/repos/dist/dev/flink --depth=immediates > $ cd flink > $ svn remove flink-${RELEASE_VERSION}-rc* > $ svn commit -m "Remove old release candidates for Apache Flink > ${RELEASE_VERSION} > {code} > > > h3. Expectations > * Maven artifacts released and indexed in the [Maven Central > Repository|https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.flink%22] > (usually takes about a day to show up) > * Source & binary distributions available in the release repository of > [https://dist.apache.org/repos/dist/release/flink/] > * Dev repository [https://dist.apache.org/repos/dist/dev/flink/] is empty > * Website contains links to new release binaries and sources in download page > * (for minor version updates) the front page references the correct new > major release version and directs to the correct link -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33889) Vote on the release candidate
[ https://issues.apache.org/jira/browse/FLINK-33889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge updated FLINK-33889: Summary: Vote on the release candidate (was: CLONE - Vote on the release candidate) > Vote on the release candidate > - > > Key: FLINK-33889 > URL: https://issues.apache.org/jira/browse/FLINK-33889 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.18.0 >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Major > > Once you have built and individually reviewed the release candidate, please > share it for the community-wide review. Please review foundation-wide [voting > guidelines|http://www.apache.org/foundation/voting.html] for more information. > Start the review-and-vote thread on the dev@ mailing list. Here’s an email > template; please adjust as you see fit. > {quote}From: Release Manager > To: d...@flink.apache.org > Subject: [VOTE] Release 1.2.3, release candidate #3 > Hi everyone, > Please review and vote on the release candidate #3 for the version 1.2.3, as > follows: > [ ] +1, Approve the release > [ ] -1, Do not approve the release (please provide specific comments) > The complete staging area is available for your review, which includes: > * JIRA release notes [1], > * the official Apache source release and binary convenience releases to be > deployed to dist.apache.org [2], which are signed with the key with > fingerprint [3], > * all artifacts to be deployed to the Maven Central Repository [4], > * source code tag "release-1.2.3-rc3" [5], > * website pull request listing the new release and adding announcement blog > post [6]. > The vote will be open for at least 72 hours. It is adopted by majority > approval, with at least 3 PMC affirmative votes. > Thanks, > Release Manager > [1] link > [2] link > [3] [https://dist.apache.org/repos/dist/release/flink/KEYS] > [4] link > [5] link > [6] link > {quote} > *If there are any issues found in the release candidate, reply on the vote > thread to cancel the vote.* There’s no need to wait 72 hours. Proceed to the > Fix Issues step below and address the problem. However, some issues don’t > require cancellation. For example, if an issue is found in the website pull > request, just correct it on the spot and the vote can continue as-is. > For cancelling a release, the release manager needs to send an email to the > release candidate thread, stating that the release candidate is officially > cancelled. Next, all artifacts created specifically for the RC in the > previous steps need to be removed: > * Delete the staging repository in Nexus > * Remove the source / binary RC files from dist.apache.org > * Delete the source code tag in git > *If there are no issues, reply on the vote thread to close the voting.* Then, > tally the votes in a separate email. Here’s an email template; please adjust > as you see fit. > {quote}From: Release Manager > To: d...@flink.apache.org > Subject: [RESULT] [VOTE] Release 1.2.3, release candidate #3 > I'm happy to announce that we have unanimously approved this release. > There are XXX approving votes, XXX of which are binding: > * approver 1 > * approver 2 > * approver 3 > * approver 4 > There are no disapproving votes. > Thanks everyone! > {quote} > > > h3. Expectations > * Community votes to release the proposed candidate, with at least three > approving PMC votes > Any issues that are raised till the vote is over should be either resolved or > moved into the next release (if applicable). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33888) Propose a pull request for website updates
[ https://issues.apache.org/jira/browse/FLINK-33888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge updated FLINK-33888: Summary: Propose a pull request for website updates (was: CLONE - Propose a pull request for website updates) > Propose a pull request for website updates > -- > > Key: FLINK-33888 > URL: https://issues.apache.org/jira/browse/FLINK-33888 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.18.0 >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > > The final step of building the candidate is to propose a website pull request > containing the following changes: > * update docs/data/flink.yml > ** Add a new major version or update minor version as required > * update docs/data/release_archive.yml > * update version references in quickstarts ({{{}q/{}}} directory) as > required (outdated?) > * add a blog post announcing the release in {{docs/content/posts}} > (!) Don’t merge the PRs before finalizing the release. > > > h3. Expectations > * Website pull request proposed to list the > [release|http://flink.apache.org/downloads.html] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33886) Build and stage Java and Python artifacts
[ https://issues.apache.org/jira/browse/FLINK-33886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge updated FLINK-33886: Summary: Build and stage Java and Python artifacts (was: CLONE - Build and stage Java and Python artifacts) > Build and stage Java and Python artifacts > - > > Key: FLINK-33886 > URL: https://issues.apache.org/jira/browse/FLINK-33886 > Project: Flink > Issue Type: Sub-task >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Major > > # Create a local release branch ((!) this step can not be skipped for minor > releases): > {code:bash} > $ cd ./tools > tools/ $ OLD_VERSION=$CURRENT_SNAPSHOT_VERSION NEW_VERSION=$RELEASE_VERSION > RELEASE_CANDIDATE=$RC_NUM releasing/create_release_branch.sh > {code} > # Tag the release commit: > {code:bash} > $ git tag -s ${TAG} -m "${TAG}" > {code} > # We now need to do several things: > ## Create the source release archive > ## Deploy jar artefacts to the [Apache Nexus > Repository|https://repository.apache.org/], which is the staging area for > deploying the jars to Maven Central > ## Build PyFlink wheel packages > You might want to create a directory on your local machine for collecting the > various source and binary releases before uploading them. Creating the binary > releases is a lengthy process but you can do this on another machine (for > example, in the "cloud"). When doing this, you can skip signing the release > files on the remote machine, download them to your local machine and sign > them there. > # Build the source release: > {code:bash} > tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_source_release.sh > {code} > # Stage the maven artifacts: > {code:bash} > tools $ releasing/deploy_staging_jars.sh > {code} > Review all staged artifacts ([https://repository.apache.org/]). They should > contain all relevant parts for each module, including pom.xml, jar, test jar, > source, test source, javadoc, etc. Carefully review any new artifacts. > # Close the staging repository on Apache Nexus. When prompted for a > description, enter “Apache Flink, version X, release candidate Y”. > Then, you need to build the PyFlink wheel packages (since 1.11): > # Set up an azure pipeline in your own Azure account. You can refer to > [Azure > Pipelines|https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository] > for more details on how to set up azure pipeline for a fork of the Flink > repository. Note that a google cloud mirror in Europe is used for downloading > maven artifacts, therefore it is recommended to set your [Azure organization > region|https://docs.microsoft.com/en-us/azure/devops/organizations/accounts/change-organization-location] > to Europe to speed up the downloads. > # Push the release candidate branch to your forked personal Flink > repository, e.g. > {code:bash} > tools $ git push > refs/heads/release-${RELEASE_VERSION}-rc${RC_NUM}:release-${RELEASE_VERSION}-rc${RC_NUM} > {code} > # Trigger the Azure Pipelines manually to build the PyFlink wheel packages > ## Go to your Azure Pipelines Flink project → Pipelines > ## Click the "New pipeline" button on the top right > ## Select "GitHub" → your GitHub Flink repository → "Existing Azure > Pipelines YAML file" > ## Select your branch → Set path to "/azure-pipelines.yaml" → click on > "Continue" → click on "Variables" > ## Then click "New Variable" button, fill the name with "MODE", and the > value with "release". Click "OK" to set the variable and the "Save" button to > save the variables, then back on the "Review your pipeline" screen click > "Run" to trigger the build. > ## You should now see a build where only the "CI build (release)" is running > # Download the PyFlink wheel packages from the build result page after the > jobs of "build_wheels mac" and "build_wheels linux" have finished. > ## Download the PyFlink wheel packages > ### Open the build result page of the pipeline > ### Go to the {{Artifacts}} page (build_wheels linux -> 1 artifact) > ### Click {{wheel_Darwin_build_wheels mac}} and {{wheel_Linux_build_wheels > linux}} separately to download the zip files > ## Unzip these two zip files > {code:bash} > $ cd /path/to/downloaded_wheel_packages > $ unzip wheel_Linux_build_wheels\ linux.zip > $ unzip wheel_Darwin_build_wheels\ mac.zip{code} > ## Create directory {{./dist}} under the directory of {{{}flink-python{}}}: > {code:bash} > $ cd > $ mkdir flink-python/dist{code} > ## Move the unzipped wheel packages to the directory of > {{{}flink-python/dist{}}}: > {code:java} > $ mv /path/to/wheel_Darwin_build_wheels\ mac/* flink-python/dist/ > $ mv /path/to/wheel_Linux_build_wheels\ linux/* flink-python/dist/ > $ cd tools{code} > Finally, we create
[jira] [Resolved] (FLINK-33887) Stage source and binary releases on dist.apache.org
[ https://issues.apache.org/jira/browse/FLINK-33887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge resolved FLINK-33887. - Resolution: Fixed > Stage source and binary releases on dist.apache.org > --- > > Key: FLINK-33887 > URL: https://issues.apache.org/jira/browse/FLINK-33887 > Project: Flink > Issue Type: Sub-task >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Major > > Copy the source release to the dev repository of dist.apache.org: > # If you have not already, check out the Flink section of the dev repository > on dist.apache.org via Subversion. In a fresh directory: > {code:bash} > $ svn checkout https://dist.apache.org/repos/dist/dev/flink --depth=immediates > {code} > # Make a directory for the new release and copy all the artifacts (Flink > source/binary distributions, hashes, GPG signatures and the python > subdirectory) into that newly created directory: > {code:bash} > $ mkdir flink/flink-${RELEASE_VERSION}-rc${RC_NUM} > $ mv /tools/releasing/release/* > flink/flink-${RELEASE_VERSION}-rc${RC_NUM} > {code} > # Add and commit all the files. > {code:bash} > $ cd flink > flink $ svn add flink-${RELEASE_VERSION}-rc${RC_NUM} > flink $ svn commit -m "Add flink-${RELEASE_VERSION}-rc${RC_NUM}" > {code} > # Verify that files are present under > [https://dist.apache.org/repos/dist/dev/flink|https://dist.apache.org/repos/dist/dev/flink]. > # Push the release tag if not done already (the following command assumes to > be called from within the apache/flink checkout): > {code:bash} > $ git push refs/tags/release-${RELEASE_VERSION}-rc${RC_NUM} > {code} > > > h3. Expectations > * Maven artifacts deployed to the staging repository of > [repository.apache.org|https://repository.apache.org/content/repositories/] > * Source distribution deployed to the dev repository of > [dist.apache.org|https://dist.apache.org/repos/dist/dev/flink/] > * Check hashes (e.g. shasum -c *.sha512) > * Check signatures (e.g. {{{}gpg --verify > flink-1.2.3-source-release.tar.gz.asc flink-1.2.3-source-release.tar.gz{}}}) > * {{grep}} for legal headers in each file. > * If time allows check the NOTICE files of the modules whose dependencies > have been changed in this release in advance, since the license issues from > time to time pop up during voting. See [Verifying a Flink > Release|https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Release] > "Checking License" section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33886) Build and stage Java and Python artifacts
[ https://issues.apache.org/jira/browse/FLINK-33886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge resolved FLINK-33886. - Resolution: Fixed > Build and stage Java and Python artifacts > - > > Key: FLINK-33886 > URL: https://issues.apache.org/jira/browse/FLINK-33886 > Project: Flink > Issue Type: Sub-task >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Major > > # Create a local release branch ((!) this step can not be skipped for minor > releases): > {code:bash} > $ cd ./tools > tools/ $ OLD_VERSION=$CURRENT_SNAPSHOT_VERSION NEW_VERSION=$RELEASE_VERSION > RELEASE_CANDIDATE=$RC_NUM releasing/create_release_branch.sh > {code} > # Tag the release commit: > {code:bash} > $ git tag -s ${TAG} -m "${TAG}" > {code} > # We now need to do several things: > ## Create the source release archive > ## Deploy jar artefacts to the [Apache Nexus > Repository|https://repository.apache.org/], which is the staging area for > deploying the jars to Maven Central > ## Build PyFlink wheel packages > You might want to create a directory on your local machine for collecting the > various source and binary releases before uploading them. Creating the binary > releases is a lengthy process but you can do this on another machine (for > example, in the "cloud"). When doing this, you can skip signing the release > files on the remote machine, download them to your local machine and sign > them there. > # Build the source release: > {code:bash} > tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_source_release.sh > {code} > # Stage the maven artifacts: > {code:bash} > tools $ releasing/deploy_staging_jars.sh > {code} > Review all staged artifacts ([https://repository.apache.org/]). They should > contain all relevant parts for each module, including pom.xml, jar, test jar, > source, test source, javadoc, etc. Carefully review any new artifacts. > # Close the staging repository on Apache Nexus. When prompted for a > description, enter “Apache Flink, version X, release candidate Y”. > Then, you need to build the PyFlink wheel packages (since 1.11): > # Set up an azure pipeline in your own Azure account. You can refer to > [Azure > Pipelines|https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository] > for more details on how to set up azure pipeline for a fork of the Flink > repository. Note that a google cloud mirror in Europe is used for downloading > maven artifacts, therefore it is recommended to set your [Azure organization > region|https://docs.microsoft.com/en-us/azure/devops/organizations/accounts/change-organization-location] > to Europe to speed up the downloads. > # Push the release candidate branch to your forked personal Flink > repository, e.g. > {code:bash} > tools $ git push > refs/heads/release-${RELEASE_VERSION}-rc${RC_NUM}:release-${RELEASE_VERSION}-rc${RC_NUM} > {code} > # Trigger the Azure Pipelines manually to build the PyFlink wheel packages > ## Go to your Azure Pipelines Flink project → Pipelines > ## Click the "New pipeline" button on the top right > ## Select "GitHub" → your GitHub Flink repository → "Existing Azure > Pipelines YAML file" > ## Select your branch → Set path to "/azure-pipelines.yaml" → click on > "Continue" → click on "Variables" > ## Then click "New Variable" button, fill the name with "MODE", and the > value with "release". Click "OK" to set the variable and the "Save" button to > save the variables, then back on the "Review your pipeline" screen click > "Run" to trigger the build. > ## You should now see a build where only the "CI build (release)" is running > # Download the PyFlink wheel packages from the build result page after the > jobs of "build_wheels mac" and "build_wheels linux" have finished. > ## Download the PyFlink wheel packages > ### Open the build result page of the pipeline > ### Go to the {{Artifacts}} page (build_wheels linux -> 1 artifact) > ### Click {{wheel_Darwin_build_wheels mac}} and {{wheel_Linux_build_wheels > linux}} separately to download the zip files > ## Unzip these two zip files > {code:bash} > $ cd /path/to/downloaded_wheel_packages > $ unzip wheel_Linux_build_wheels\ linux.zip > $ unzip wheel_Darwin_build_wheels\ mac.zip{code} > ## Create directory {{./dist}} under the directory of {{{}flink-python{}}}: > {code:bash} > $ cd > $ mkdir flink-python/dist{code} > ## Move the unzipped wheel packages to the directory of > {{{}flink-python/dist{}}}: > {code:java} > $ mv /path/to/wheel_Darwin_build_wheels\ mac/* flink-python/dist/ > $ mv /path/to/wheel_Linux_build_wheels\ linux/* flink-python/dist/ > $ cd tools{code} > Finally, we create the binary convenience release files: > {code:bash} > tools $
[jira] [Updated] (FLINK-33887) Stage source and binary releases on dist.apache.org
[ https://issues.apache.org/jira/browse/FLINK-33887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge updated FLINK-33887: Summary: Stage source and binary releases on dist.apache.org (was: CLONE - Stage source and binary releases on dist.apache.org) > Stage source and binary releases on dist.apache.org > --- > > Key: FLINK-33887 > URL: https://issues.apache.org/jira/browse/FLINK-33887 > Project: Flink > Issue Type: Sub-task >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Major > > Copy the source release to the dev repository of dist.apache.org: > # If you have not already, check out the Flink section of the dev repository > on dist.apache.org via Subversion. In a fresh directory: > {code:bash} > $ svn checkout https://dist.apache.org/repos/dist/dev/flink --depth=immediates > {code} > # Make a directory for the new release and copy all the artifacts (Flink > source/binary distributions, hashes, GPG signatures and the python > subdirectory) into that newly created directory: > {code:bash} > $ mkdir flink/flink-${RELEASE_VERSION}-rc${RC_NUM} > $ mv /tools/releasing/release/* > flink/flink-${RELEASE_VERSION}-rc${RC_NUM} > {code} > # Add and commit all the files. > {code:bash} > $ cd flink > flink $ svn add flink-${RELEASE_VERSION}-rc${RC_NUM} > flink $ svn commit -m "Add flink-${RELEASE_VERSION}-rc${RC_NUM}" > {code} > # Verify that files are present under > [https://dist.apache.org/repos/dist/dev/flink|https://dist.apache.org/repos/dist/dev/flink]. > # Push the release tag if not done already (the following command assumes to > be called from within the apache/flink checkout): > {code:bash} > $ git push refs/tags/release-${RELEASE_VERSION}-rc${RC_NUM} > {code} > > > h3. Expectations > * Maven artifacts deployed to the staging repository of > [repository.apache.org|https://repository.apache.org/content/repositories/] > * Source distribution deployed to the dev repository of > [dist.apache.org|https://dist.apache.org/repos/dist/dev/flink/] > * Check hashes (e.g. shasum -c *.sha512) > * Check signatures (e.g. {{{}gpg --verify > flink-1.2.3-source-release.tar.gz.asc flink-1.2.3-source-release.tar.gz{}}}) > * {{grep}} for legal headers in each file. > * If time allows check the NOTICE files of the modules whose dependencies > have been changed in this release in advance, since the license issues from > time to time pop up during voting. See [Verifying a Flink > Release|https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Release] > "Checking License" section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33886) CLONE - Build and stage Java and Python artifacts
Jing Ge created FLINK-33886: --- Summary: CLONE - Build and stage Java and Python artifacts Key: FLINK-33886 URL: https://issues.apache.org/jira/browse/FLINK-33886 Project: Flink Issue Type: Sub-task Reporter: Jing Ge Assignee: Jing Ge # Create a local release branch ((!) this step can not be skipped for minor releases): {code:bash} $ cd ./tools tools/ $ OLD_VERSION=$CURRENT_SNAPSHOT_VERSION NEW_VERSION=$RELEASE_VERSION RELEASE_CANDIDATE=$RC_NUM releasing/create_release_branch.sh {code} # Tag the release commit: {code:bash} $ git tag -s ${TAG} -m "${TAG}" {code} # We now need to do several things: ## Create the source release archive ## Deploy jar artefacts to the [Apache Nexus Repository|https://repository.apache.org/], which is the staging area for deploying the jars to Maven Central ## Build PyFlink wheel packages You might want to create a directory on your local machine for collecting the various source and binary releases before uploading them. Creating the binary releases is a lengthy process but you can do this on another machine (for example, in the "cloud"). When doing this, you can skip signing the release files on the remote machine, download them to your local machine and sign them there. # Build the source release: {code:bash} tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_source_release.sh {code} # Stage the maven artifacts: {code:bash} tools $ releasing/deploy_staging_jars.sh {code} Review all staged artifacts ([https://repository.apache.org/]). They should contain all relevant parts for each module, including pom.xml, jar, test jar, source, test source, javadoc, etc. Carefully review any new artifacts. # Close the staging repository on Apache Nexus. When prompted for a description, enter “Apache Flink, version X, release candidate Y”. Then, you need to build the PyFlink wheel packages (since 1.11): # Set up an azure pipeline in your own Azure account. You can refer to [Azure Pipelines|https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository] for more details on how to set up azure pipeline for a fork of the Flink repository. Note that a google cloud mirror in Europe is used for downloading maven artifacts, therefore it is recommended to set your [Azure organization region|https://docs.microsoft.com/en-us/azure/devops/organizations/accounts/change-organization-location] to Europe to speed up the downloads. # Push the release candidate branch to your forked personal Flink repository, e.g. {code:bash} tools $ git push refs/heads/release-${RELEASE_VERSION}-rc${RC_NUM}:release-${RELEASE_VERSION}-rc${RC_NUM} {code} # Trigger the Azure Pipelines manually to build the PyFlink wheel packages ## Go to your Azure Pipelines Flink project → Pipelines ## Click the "New pipeline" button on the top right ## Select "GitHub" → your GitHub Flink repository → "Existing Azure Pipelines YAML file" ## Select your branch → Set path to "/azure-pipelines.yaml" → click on "Continue" → click on "Variables" ## Then click "New Variable" button, fill the name with "MODE", and the value with "release". Click "OK" to set the variable and the "Save" button to save the variables, then back on the "Review your pipeline" screen click "Run" to trigger the build. ## You should now see a build where only the "CI build (release)" is running # Download the PyFlink wheel packages from the build result page after the jobs of "build_wheels mac" and "build_wheels linux" have finished. ## Download the PyFlink wheel packages ### Open the build result page of the pipeline ### Go to the {{Artifacts}} page (build_wheels linux -> 1 artifact) ### Click {{wheel_Darwin_build_wheels mac}} and {{wheel_Linux_build_wheels linux}} separately to download the zip files ## Unzip these two zip files {code:bash} $ cd /path/to/downloaded_wheel_packages $ unzip wheel_Linux_build_wheels\ linux.zip $ unzip wheel_Darwin_build_wheels\ mac.zip{code} ## Create directory {{./dist}} under the directory of {{{}flink-python{}}}: {code:bash} $ cd $ mkdir flink-python/dist{code} ## Move the unzipped wheel packages to the directory of {{{}flink-python/dist{}}}: {code:java} $ mv /path/to/wheel_Darwin_build_wheels\ mac/* flink-python/dist/ $ mv /path/to/wheel_Linux_build_wheels\ linux/* flink-python/dist/ $ cd tools{code} Finally, we create the binary convenience release files: {code:bash} tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_binary_release.sh {code} If you want to run this step in parallel on a remote machine you have to make the release commit available there (for example by pushing to a repository). *This is important: the commit inside the binary builds has to match the commit of the source builds and the tagged release commit.* When building remotely, you can skip gpg
[jira] [Created] (FLINK-33888) CLONE - Propose a pull request for website updates
Jing Ge created FLINK-33888: --- Summary: CLONE - Propose a pull request for website updates Key: FLINK-33888 URL: https://issues.apache.org/jira/browse/FLINK-33888 Project: Flink Issue Type: Sub-task Affects Versions: 1.18.0 Reporter: Jing Ge Assignee: Jing Ge The final step of building the candidate is to propose a website pull request containing the following changes: * update docs/data/flink.yml ** Add a new major version or update minor version as required * update docs/data/release_archive.yml * update version references in quickstarts ({{{}q/{}}} directory) as required (outdated?) * add a blog post announcing the release in {{docs/content/posts}} (!) Don’t merge the PRs before finalizing the release. h3. Expectations * Website pull request proposed to list the [release|http://flink.apache.org/downloads.html] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33885) Build Release Candidate: 1.18.1-rc2
Jing Ge created FLINK-33885: --- Summary: Build Release Candidate: 1.18.1-rc2 Key: FLINK-33885 URL: https://issues.apache.org/jira/browse/FLINK-33885 Project: Flink Issue Type: New Feature Affects Versions: 1.18.0 Reporter: Jing Ge Assignee: Jing Ge The core of the release process is the build-vote-fix cycle. Each cycle produces one release candidate. The Release Manager repeats this cycle until the community approves one release candidate, which is then finalized. h4. Prerequisites Set up a few environment variables to simplify Maven commands that follow. This identifies the release candidate being built. Start with {{RC_NUM}} equal to 1 and increment it for each candidate: {code} RC_NUM="1" TAG="release-${RELEASE_VERSION}-rc${RC_NUM}" {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33889) CLONE - Vote on the release candidate
Jing Ge created FLINK-33889: --- Summary: CLONE - Vote on the release candidate Key: FLINK-33889 URL: https://issues.apache.org/jira/browse/FLINK-33889 Project: Flink Issue Type: Sub-task Affects Versions: 1.18.0 Reporter: Jing Ge Assignee: Jing Ge Once you have built and individually reviewed the release candidate, please share it for the community-wide review. Please review foundation-wide [voting guidelines|http://www.apache.org/foundation/voting.html] for more information. Start the review-and-vote thread on the dev@ mailing list. Here’s an email template; please adjust as you see fit. {quote}From: Release Manager To: d...@flink.apache.org Subject: [VOTE] Release 1.2.3, release candidate #3 Hi everyone, Please review and vote on the release candidate #3 for the version 1.2.3, as follows: [ ] +1, Approve the release [ ] -1, Do not approve the release (please provide specific comments) The complete staging area is available for your review, which includes: * JIRA release notes [1], * the official Apache source release and binary convenience releases to be deployed to dist.apache.org [2], which are signed with the key with fingerprint [3], * all artifacts to be deployed to the Maven Central Repository [4], * source code tag "release-1.2.3-rc3" [5], * website pull request listing the new release and adding announcement blog post [6]. The vote will be open for at least 72 hours. It is adopted by majority approval, with at least 3 PMC affirmative votes. Thanks, Release Manager [1] link [2] link [3] [https://dist.apache.org/repos/dist/release/flink/KEYS] [4] link [5] link [6] link {quote} *If there are any issues found in the release candidate, reply on the vote thread to cancel the vote.* There’s no need to wait 72 hours. Proceed to the Fix Issues step below and address the problem. However, some issues don’t require cancellation. For example, if an issue is found in the website pull request, just correct it on the spot and the vote can continue as-is. For cancelling a release, the release manager needs to send an email to the release candidate thread, stating that the release candidate is officially cancelled. Next, all artifacts created specifically for the RC in the previous steps need to be removed: * Delete the staging repository in Nexus * Remove the source / binary RC files from dist.apache.org * Delete the source code tag in git *If there are no issues, reply on the vote thread to close the voting.* Then, tally the votes in a separate email. Here’s an email template; please adjust as you see fit. {quote}From: Release Manager To: d...@flink.apache.org Subject: [RESULT] [VOTE] Release 1.2.3, release candidate #3 I'm happy to announce that we have unanimously approved this release. There are XXX approving votes, XXX of which are binding: * approver 1 * approver 2 * approver 3 * approver 4 There are no disapproving votes. Thanks everyone! {quote} h3. Expectations * Community votes to release the proposed candidate, with at least three approving PMC votes Any issues that are raised till the vote is over should be either resolved or moved into the next release (if applicable). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33887) CLONE - Stage source and binary releases on dist.apache.org
Jing Ge created FLINK-33887: --- Summary: CLONE - Stage source and binary releases on dist.apache.org Key: FLINK-33887 URL: https://issues.apache.org/jira/browse/FLINK-33887 Project: Flink Issue Type: Sub-task Reporter: Jing Ge Assignee: Jing Ge Copy the source release to the dev repository of dist.apache.org: # If you have not already, check out the Flink section of the dev repository on dist.apache.org via Subversion. In a fresh directory: {code:bash} $ svn checkout https://dist.apache.org/repos/dist/dev/flink --depth=immediates {code} # Make a directory for the new release and copy all the artifacts (Flink source/binary distributions, hashes, GPG signatures and the python subdirectory) into that newly created directory: {code:bash} $ mkdir flink/flink-${RELEASE_VERSION}-rc${RC_NUM} $ mv /tools/releasing/release/* flink/flink-${RELEASE_VERSION}-rc${RC_NUM} {code} # Add and commit all the files. {code:bash} $ cd flink flink $ svn add flink-${RELEASE_VERSION}-rc${RC_NUM} flink $ svn commit -m "Add flink-${RELEASE_VERSION}-rc${RC_NUM}" {code} # Verify that files are present under [https://dist.apache.org/repos/dist/dev/flink|https://dist.apache.org/repos/dist/dev/flink]. # Push the release tag if not done already (the following command assumes to be called from within the apache/flink checkout): {code:bash} $ git push refs/tags/release-${RELEASE_VERSION}-rc${RC_NUM} {code} h3. Expectations * Maven artifacts deployed to the staging repository of [repository.apache.org|https://repository.apache.org/content/repositories/] * Source distribution deployed to the dev repository of [dist.apache.org|https://dist.apache.org/repos/dist/dev/flink/] * Check hashes (e.g. shasum -c *.sha512) * Check signatures (e.g. {{{}gpg --verify flink-1.2.3-source-release.tar.gz.asc flink-1.2.3-source-release.tar.gz{}}}) * {{grep}} for legal headers in each file. * If time allows check the NOTICE files of the modules whose dependencies have been changed in this release in advance, since the license issues from time to time pop up during voting. See [Verifying a Flink Release|https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Release] "Checking License" section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33362) Document Externalized Declarative Resource Management With Chinese
[ https://issues.apache.org/jira/browse/FLINK-33362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge updated FLINK-33362: Fix Version/s: 1.18.2 (was: 1.18.1) > Document Externalized Declarative Resource Management With Chinese > -- > > Key: FLINK-33362 > URL: https://issues.apache.org/jira/browse/FLINK-33362 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Affects Versions: 1.18.0, 1.18.1 >Reporter: ConradJam >Priority: Major > Labels: pull-request-available > Fix For: 1.18.2 > > > Document Externalized Declarative Resource Management With Chinese -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33877][streaming] Fix unstable test of CollectSinkFunctionTest.testConfiguredPortIsUsed [flink]
JingGe merged PR #23955: URL: https://github.com/apache/flink/pull/23955 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33611] [flink-protobuf] Support Large Protobuf Schemas [flink]
sharath1709 commented on PR #23937: URL: https://github.com/apache/flink/pull/23937#issuecomment-1863449372 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector
[ https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798719#comment-17798719 ] david radley edited comment on FLINK-33365 at 12/19/23 7:36 PM: [~Sergey Nuyanzin] I have pushed up my latest changes including the fix for the OR case. The tests are more simplified but they are not data driven - with one method looking of test case data. I did not find a way with the existing data to drive the null case. I have tested that and the where locally but there is no automated test for it. Please review and see what more needs to be changed? I appreciate your time on this. I added debug - as I think this will speed future diagnostics in this area. Scan logic has a similar debug. was (Author: JIRAUSER300523): [~Sergey Nuyanzin] I have pushed up my latest changes including the fix for the OR case. The tests are more simplified but they are not data driven - with one method looking of test case data. I did not find a way with the existing data to drive the null case. I have tested that and the where locally but there is no automated test for it. Please review and see what more needs to be changed? I appreciate your time on this. > Missing filter condition in execution plan containing lookup join with mysql > jdbc connector > --- > > Key: FLINK-33365 > URL: https://issues.apache.org/jira/browse/FLINK-33365 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.18.0, 1.17.1 > Environment: Flink 1.17.1 & Flink 1.18.0 with > flink-connector-jdbc-3.1.1-1.17.jar >Reporter: macdoor615 >Assignee: david radley >Priority: Critical > Labels: pull-request-available > Attachments: flink-connector-jdbc-3.0.0-1.16.png, > flink-connector-jdbc-3.1.1-1.17.png > > > create table in flink with sql-client.sh > {code:java} > CREATE TABLE default_catalog.default_database.a ( > ip string, > proctime as proctime() > ) > WITH ( > 'connector' = 'datagen' > );{code} > create table in mysql > {code:java} > create table b ( > ip varchar(20), > type int > ); {code} > > Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar* > excute in sql-client.sh > {code:java} > explain SELECT * FROM default_catalog.default_database.a left join > bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and > a.ip = b.ip; {code} > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip > AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]){code} > > excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and > *flink-connector-jdbc-3.0.0-1.16.jar* > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 > AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]) {code} > with flink-connector-jdbc-3.1.1-1.17.jar, the condition is > *lookup=[ip=ip]* > with flink-connector-jdbc-3.0.0-1.16.jar , the condition is > *lookup=[type=0, ip=ip], where=[(type = 0)]* > > In out real world production environment, this lead incorrect data output > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector
[ https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798719#comment-17798719 ] david radley commented on FLINK-33365: -- [~Sergey Nuyanzin] I have pushed up my latest changes including the fix for the OR case. The tests are more simplified but they are not data driven - with one method looking of test case data. I did not find a way with the existing data to drive the null case. I have tested that and the where locally but there is no automated test for it. Please review and see what more needs to be changed? I appreciate your time on this. > Missing filter condition in execution plan containing lookup join with mysql > jdbc connector > --- > > Key: FLINK-33365 > URL: https://issues.apache.org/jira/browse/FLINK-33365 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.18.0, 1.17.1 > Environment: Flink 1.17.1 & Flink 1.18.0 with > flink-connector-jdbc-3.1.1-1.17.jar >Reporter: macdoor615 >Assignee: david radley >Priority: Critical > Labels: pull-request-available > Attachments: flink-connector-jdbc-3.0.0-1.16.png, > flink-connector-jdbc-3.1.1-1.17.png > > > create table in flink with sql-client.sh > {code:java} > CREATE TABLE default_catalog.default_database.a ( > ip string, > proctime as proctime() > ) > WITH ( > 'connector' = 'datagen' > );{code} > create table in mysql > {code:java} > create table b ( > ip varchar(20), > type int > ); {code} > > Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar* > excute in sql-client.sh > {code:java} > explain SELECT * FROM default_catalog.default_database.a left join > bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and > a.ip = b.ip; {code} > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip > AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]){code} > > excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and > *flink-connector-jdbc-3.0.0-1.16.jar* > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 > AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]) {code} > with flink-connector-jdbc-3.1.1-1.17.jar, the condition is > *lookup=[ip=ip]* > with flink-connector-jdbc-3.0.0-1.16.jar , the condition is > *lookup=[type=0, ip=ip], where=[(type = 0)]* > > In out real world production environment, this lead incorrect data output > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1431857437 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java: ## @@ -356,6 +356,152 @@ void testLookupJoin(Caching caching) { } } +@ParameterizedTest +@EnumSource(Caching.class) +void testLookupJoinWithFilter(Caching caching) { Review Comment: I have added more tests, and made some methods to tidy the code. I have not managed to make it totally data driven. Is this sufficient? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1431856416 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java: ## @@ -96,28 +98,70 @@ public JdbcDynamicTableSource( public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { // JDBC only support non-nested look up keys String[] keyNames = new String[context.getKeys().length]; -for (int i = 0; i < keyNames.length; i++) { + +for (int i = 0; i < context.getKeys().length; i++) { int[] innerKeyArr = context.getKeys()[i]; Preconditions.checkArgument( innerKeyArr.length == 1, "JDBC only support non-nested look up keys"); keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]); } + final RowType rowType = (RowType) physicalRowDataType.getLogicalType(); + +String[] conditions = null; + +if (this.resolvedPredicates != null) { +conditions = new String[this.resolvedPredicates.size()]; +for (int i = 0; i < this.resolvedPredicates.size(); i++) { +String resolvedPredicate = this.resolvedPredicates.get(i); +String param = this.pushdownParams[i].toString(); +/* + * This replace seems like it should be using a Flink class to resolve the parameter. It does not + * effect the dialects as the placeholder comes from JdbcFilterPushdownPreparedStatementVisitor. + * + * Here is what has been considered as alternatives. + * + * We cannot use the way this is done in getScanRuntimeProvider, as the index we have is the index + * into the filters, but it needs the index into the fields. For example one lookup key and one filter + * would both have an index of 0, which the subsequent code would incorrectly resolve to the first + * field. + * We cannot use the PreparedStatement as we have not got access to the statement here. + * We cannot use ParameterizedPredicate as it takes the filter expression as input (e.g EQUALS(...) + * not the form we have here an example would be ('field1'= ?). + */ +conditions[i] = resolvePredicateParam(resolvedPredicate, param); Review Comment: fixed this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33884) Update Pulsar dependency to 3.0.2 in Pulsar Connector
[ https://issues.apache.org/jira/browse/FLINK-33884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33884: --- Labels: pull-request-available (was: ) > Update Pulsar dependency to 3.0.2 in Pulsar Connector > - > > Key: FLINK-33884 > URL: https://issues.apache.org/jira/browse/FLINK-33884 > Project: Flink > Issue Type: Improvement > Components: Connectors / Pulsar >Affects Versions: pulsar-4.0.1 >Reporter: David Christle >Priority: Major > Labels: pull-request-available > > The [3.0.2 > patch|https://pulsar.apache.org/release-notes/versioned/pulsar-3.0.2/] > includes various bug fixes, including a few for the Pulsar client (e.g. > [link]([https://github.com/apache/pulsar/pull/21144)). Upgrading the > dependency in the connector will pick up these fixes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33884] Update Pulsar dependency to 3.0.2 in Pulsar Connector [flink-connector-pulsar]
dchristle opened a new pull request, #72: URL: https://github.com/apache/flink-connector-pulsar/pull/72 ## Purpose of the change Update the Pulsar dependency version to pick up patch fixes. ## Brief change log - Update `pulsar.version` to `3.0.2` in pom.xml. ## Verifying this change This change added tests and can be verified as follows: - Manually verified by running the Pulsar connector on a local Flink cluster. Running this on a production workload did not uncover any issues. ## Significant changes *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)* - [x] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [ ] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33884] Update Pulsar dependency to 3.0.2 in Pulsar Connector [flink-connector-pulsar]
boring-cyborg[bot] commented on PR #72: URL: https://github.com/apache/flink-connector-pulsar/pull/72#issuecomment-1863247716 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33884) Update Pulsar dependency to 3.0.2 in Pulsar Connector
David Christle created FLINK-33884: -- Summary: Update Pulsar dependency to 3.0.2 in Pulsar Connector Key: FLINK-33884 URL: https://issues.apache.org/jira/browse/FLINK-33884 Project: Flink Issue Type: Improvement Components: Connectors / Pulsar Affects Versions: pulsar-4.0.1 Reporter: David Christle The [3.0.2 patch|https://pulsar.apache.org/release-notes/versioned/pulsar-3.0.2/] includes various bug fixes, including a few for the Pulsar client (e.g. [link]([https://github.com/apache/pulsar/pull/21144)). Upgrading the dependency in the connector will pick up these fixes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33486) Pulsar Client Send Timeout Terminates TaskManager
[ https://issues.apache.org/jira/browse/FLINK-33486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798684#comment-17798684 ] David Christle commented on FLINK-33486: We also observe this issue, typically at higher QPS. When it triggers, the task will restart, but future checkpoints fail if the tolerable checkpoint failure count is greater than zero. Setting the tolerable checkpoint failure count to zero triggers a more complete restart, which fixes the issue, but it means publishing is paused for a few minutes, which is not ideal. At least for the best effort/at-least-once delivery modes, is there some way to implement a retry when send timeout triggers? This way, we'd potentially publish a single message/batch twice, rather than triggering a full failure + republishing all messages since the last checkpoint. > Pulsar Client Send Timeout Terminates TaskManager > - > > Key: FLINK-33486 > URL: https://issues.apache.org/jira/browse/FLINK-33486 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.1 >Reporter: Jason Kania >Priority: Major > > Currently, when the Pulsar Producer encounters a timeout when attempting to > send data, it generates an unhandled TimeoutException. This is not a > reasonable way to handle the timeout. The situation should be handled in a > graceful way either through additional parameters that put control of the > action under the discretion of the user or through some callback mechanism > that the user can work with to write code. Unfortunately, fight now, this > causes a termination of the task manager which then leads to other issues. > Increasing the timeout period to avoid the issue is not really an option to > ensure proper handling in the event that the situation does occur. > The exception is as follows: > org.apache.flink.util.FlinkRuntimeException: Failed to send data to Pulsar: > persistent://public/default/myproducer-partition-0 > at > org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.throwSendingException(PulsarWriter.java:182) > ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17] > at > org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.lambda$write$0(PulsarWriter.java:172) > ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > [flink-dist-1.17.1.jar:1.17.1] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > [flink-dist-1.17.1.jar:1.17.1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > [flink-dist-1.17.1.jar:1.17.1] > at java.lang.Thread.run(Thread.java:829) [?:?] > Caused by: > org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The > producer myproducer- f4b1580b-1ea8-4c21-9d0b-da4d12ca6f93 can not send > message to the topic persistent://public/default/myproducer-partition-0 > within given timeout > at > org.apache.pulsar.client.impl.ProducerImpl.run(ProducerImpl.java:1993) > ~[pulsar-client-all-2.11.2.jar:2.11.2] > at > org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715) > ~[pulsar-client-all-2.11.2.jar:2.11.2] > at > org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34) > ~[pulsar-client-all-2.11.2.jar:2.11.2] > at >
Re: [PR] [FLINK-33883][jdbc] Bump CI flink version on flink-connector-jdbc to support flink 1.19 [flink-connector-jdbc]
snuyanzin commented on code in PR #85: URL: https://github.com/apache/flink-connector-jdbc/pull/85#discussion_r1431724158 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -64,4 +69,14 @@ public void testFilterPushdown() { util.verifyExecPlan( "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } + +// A workaround to get the test method name for flink versions not completely migrated to JUnit5 +public TestName name() { Review Comment: Does it mean that with dropping support of Flink 1.18 this method could be removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org