[jira] [Commented] (FLINK-29634) Support periodic checkpoint triggering
[ https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653846#comment-17653846 ] Gyula Fora commented on FLINK-29634: Yea, the Mock classes can be tricky [~Jiale] I don't really have a good answer yet, have to think about this. > Support periodic checkpoint triggering > -- > > Key: FLINK-29634 > URL: https://issues.apache.org/jira/browse/FLINK-29634 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Thomas Weise >Assignee: Jiale Tan >Priority: Major > > Similar to the support for periodic savepoints, the operator should support > triggering periodic checkpoints to break the incremental checkpoint chain. > Support for external triggering will come with 1.17: > https://issues.apache.org/jira/browse/FLINK-27101 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-pulsar] tisonkun commented on a diff in pull request #10: [FLINK-30413][Connector/Pulsar] Drop subscription support, remove unordered consuming, change related tests.
tisonkun commented on code in PR #10: URL: https://github.com/apache/flink-connector-pulsar/pull/10#discussion_r1060341436 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java: ## @@ -203,12 +175,7 @@ public boolean isEnableMetrics() { /** Convert the subscription into a readable str. */ public String getSubscriptionDesc() { -return getSubscriptionName() -+ "(" -+ getSubscriptionType() -+ "," -+ getSubscriptionMode() -+ ")"; +return getSubscriptionName() + "(" + getSubscriptionMode() + ")"; Review Comment: Maybe we hard code the type as `Exclusive`? I think it's still expressive and meaningful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] tisonkun commented on a diff in pull request #10: [FLINK-30413][Connector/Pulsar] Drop subscription support, remove unordered consuming, change related tests.
tisonkun commented on code in PR #10: URL: https://github.com/apache/flink-connector-pulsar/pull/10#discussion_r1060340966 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java: ## @@ -124,6 +124,8 @@ private PulsarSourceOptions() { " We would automatically commit the cursor using the given period (in ms).") .build()); +/** @deprecated We no longer need transitions for consuming messages. */ Review Comment: ```suggestion /** @deprecated We no longer need transactions for consuming messages. */ ``` ? Also, if this option has no effect, we'd better directly remove it. ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java: ## @@ -236,6 +230,8 @@ private PulsarSourceOptions() { " This argument is required when constructing the consumer.") .build()); +/** @deprecated This config option is no longer supported. */ +@Deprecated public static final ConfigOption PULSAR_SUBSCRIPTION_TYPE = Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30413) Drop Shared and Key_Shared subscription support in Pulsar connector
[ https://issues.apache.org/jira/browse/FLINK-30413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30413: --- Labels: pull-request-available (was: ) > Drop Shared and Key_Shared subscription support in Pulsar connector > --- > > Key: FLINK-30413 > URL: https://issues.apache.org/jira/browse/FLINK-30413 > Project: Flink > Issue Type: Improvement > Components: Connectors / Pulsar >Affects Versions: 1.17.0 >Reporter: Yufan Sheng >Assignee: Yufan Sheng >Priority: Critical > Labels: pull-request-available > Fix For: pulsar-4.0.0 > > > A lot of Pulsar connector test unstable issues are related to {{Shared}} and > {{Key_Shared}} subscription. Because this two subscription is designed to > consume the records in an unordered way. And we can support multiple > consumers in same topic partition. But this feature lead to some drawbacks in > connector. > 1. Performance > Flink is a true stream processor with high correctness support. But support > multiple consumer will require higher correctness which depends on Pulsar > transaction. But the internal implementation of Pulsar transaction on source > is record the message one by one and stores all the pending ack status in > client side. Which is slow and memory inefficient. > This means that we can only use {{Shared}} and {{Key_Shared}} on Flink with > low throughput. This against our intention to support these two subscription. > Because adding multiple consumer to same partition can increase the consuming > speed. > 2. Unstable > Pulsar transaction acknowledge the messages one by one in an internal > Pulsar's topic. But it's not stable enough to get it works. A lot of pending > issues in Flink JIRA are related to Pulsar transaction and we don't have any > workaround. > 3. Complex > Support {{Shared}} and {{Key_Shared}} subscription make the connector's code > more complex than we expect. We have to make every part of code into ordered > and unordered way. Which is hard to understand for the maintainer. > 4. Necessary > The current implementation on {{Shared}} and {{Key_Shared}} is completely > unusable to use in Production environment. For the user, this function is not > necessary. Because there is no bottleneck in consuming data from Pulsar, the > bottleneck is in processing the data, which we can achieve by increasing the > parallelism of the processing operator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-pulsar] tisonkun commented on a diff in pull request #10: [FLINK-30413][Connector/Pulsar] Drop subscription support, remove unordered consuming, change related tests.
tisonkun commented on code in PR #10: URL: https://github.com/apache/flink-connector-pulsar/pull/10#discussion_r1060340290 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java: ## @@ -177,9 +171,11 @@ public PulsarSourceBuilder setSubscriptionName(String subscriptionName) { * @return this PulsarSourceBuilder. * @see https://pulsar.apache.org/docs/en/concepts-messaging/#subscriptions;>Pulsar * Subscriptions + * @deprecated We don't support setting the subscription type now. */ +@Deprecated public PulsarSourceBuilder setSubscriptionType(SubscriptionType subscriptionType) { Review Comment: If it's a dummy method, shall we remove these methods? Otherwise, although users' code doesn't fail on compile, the result is actually unexpected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH
jiangxin369 commented on code in PR #191: URL: https://github.com/apache/flink-ml/pull/191#discussion_r1060307196 ## flink-ml-python/pyflink/ml/lib/feature/lsh.py: ## @@ -0,0 +1,191 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import typing +from abc import ABC +from pyflink.java_gateway import get_gateway +from pyflink.table import Table +from pyflink.util.java_utils import to_jarray + +from pyflink.ml.core.linalg import Vector, DenseVector, SparseVector +from pyflink.ml.core.param import Param, IntParam, ParamValidators +from pyflink.ml.core.wrapper import JavaWithParams +from pyflink.ml.lib.feature.common import JavaFeatureEstimator, JavaFeatureModel +from pyflink.ml.lib.param import HasInputCol, HasOutputCol, HasSeed + + +class _LSHModelParams(JavaWithParams, + HasInputCol, + HasOutputCol): +""" +Params for :class:`LSHModel` +""" + +def __init__(self, java_params): +super(_LSHModelParams, self).__init__(java_params) + + +class _LSHParams(_LSHModelParams): +""" +Params for :class:`LSH` +""" + +NUM_HASH_TABLES: Param[int] = IntParam( +"num_hash_tables", "Number of hash tables.", 1, ParamValidators.gt_eq(1) +) + +NUM_HASH_FUNCTIONS_PER_TABLE: Param[int] = IntParam( +"num_hash_functions_per_table", +"Number of hash functions per table.", +1, +ParamValidators.gt_eq(1.)) + +def __init__(self, java_params): +super(_LSHParams, self).__init__(java_params) + +def set_num_hash_tables(self, value: int): +return typing.cast(_LSHParams, self.set(self.NUM_HASH_TABLES, value)) + +def get_num_hash_tables(self): +return self.get(self.NUM_HASH_TABLES) + +@property +def num_hash_tables(self): +return self.get_num_hash_tables() + +def set_num_hash_functions_per_table(self, value: int): +return typing.cast(_LSHParams, self.set(self.NUM_HASH_FUNCTIONS_PER_TABLE, value)) + +def get_num_hash_functions_per_table(self): +return self.get(self.NUM_HASH_FUNCTIONS_PER_TABLE) + +@property +def num_hash_functions_per_table(self): +return self.get_num_hash_functions_per_table() + + +class _LSH(JavaFeatureEstimator, ABC): +""" +Base class for estimators which implement LSH (Locality-sensitive hashing) algorithms. +""" + +def __init__(self): +super(_LSH, self).__init__() + +@classmethod +def _java_estimator_package_name(cls) -> str: +return "lsh" + + +class _LSHModel(JavaFeatureModel, ABC): +""" +Base class for LSH model. +""" + +def __init__(self, java_model): +super(_LSHModel, self).__init__(java_model) + +@classmethod +def _java_model_package_name(cls) -> str: +return "lsh" + +def approx_nearest_neighbors(self, dataset: Table, key: Vector, k: int, + dist_col: str = 'distCol'): +""" +Given a dataset and an item, approximately find at most k items which have the closest +distance to the item. If the `outputCol` is missing in the given dataset, this method +transforms the dataset with the model at first. + +:param dataset: The dataset in which to to search for nearest neighbors. +:param key: The item to search for. +:param k: The maximum number of nearest neighbors. +:param dist_col: The output column storing the distance between each neighbor and the +key. +:return: A dataset containing at most k items closest to the key with a column named +`distCol` appended. Review Comment: To keep the same coding style with PyFlink and flink-ml-python, it's recommended to align the beginning of each line. E.g., ``` :param dist_col: The output column storing the distance between each neighbor and the key. :return: A dataset containing at most k items closest to the key with a column named `distCol` appended. ``` So as the other
[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH
jiangxin369 commented on code in PR #191: URL: https://github.com/apache/flink-ml/pull/191#discussion_r1060307196 ## flink-ml-python/pyflink/ml/lib/feature/lsh.py: ## @@ -0,0 +1,191 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import typing +from abc import ABC +from pyflink.java_gateway import get_gateway +from pyflink.table import Table +from pyflink.util.java_utils import to_jarray + +from pyflink.ml.core.linalg import Vector, DenseVector, SparseVector +from pyflink.ml.core.param import Param, IntParam, ParamValidators +from pyflink.ml.core.wrapper import JavaWithParams +from pyflink.ml.lib.feature.common import JavaFeatureEstimator, JavaFeatureModel +from pyflink.ml.lib.param import HasInputCol, HasOutputCol, HasSeed + + +class _LSHModelParams(JavaWithParams, + HasInputCol, + HasOutputCol): +""" +Params for :class:`LSHModel` +""" + +def __init__(self, java_params): +super(_LSHModelParams, self).__init__(java_params) + + +class _LSHParams(_LSHModelParams): +""" +Params for :class:`LSH` +""" + +NUM_HASH_TABLES: Param[int] = IntParam( +"num_hash_tables", "Number of hash tables.", 1, ParamValidators.gt_eq(1) +) + +NUM_HASH_FUNCTIONS_PER_TABLE: Param[int] = IntParam( +"num_hash_functions_per_table", +"Number of hash functions per table.", +1, +ParamValidators.gt_eq(1.)) + +def __init__(self, java_params): +super(_LSHParams, self).__init__(java_params) + +def set_num_hash_tables(self, value: int): +return typing.cast(_LSHParams, self.set(self.NUM_HASH_TABLES, value)) + +def get_num_hash_tables(self): +return self.get(self.NUM_HASH_TABLES) + +@property +def num_hash_tables(self): +return self.get_num_hash_tables() + +def set_num_hash_functions_per_table(self, value: int): +return typing.cast(_LSHParams, self.set(self.NUM_HASH_FUNCTIONS_PER_TABLE, value)) + +def get_num_hash_functions_per_table(self): +return self.get(self.NUM_HASH_FUNCTIONS_PER_TABLE) + +@property +def num_hash_functions_per_table(self): +return self.get_num_hash_functions_per_table() + + +class _LSH(JavaFeatureEstimator, ABC): +""" +Base class for estimators which implement LSH (Locality-sensitive hashing) algorithms. +""" + +def __init__(self): +super(_LSH, self).__init__() + +@classmethod +def _java_estimator_package_name(cls) -> str: +return "lsh" + + +class _LSHModel(JavaFeatureModel, ABC): +""" +Base class for LSH model. +""" + +def __init__(self, java_model): +super(_LSHModel, self).__init__(java_model) + +@classmethod +def _java_model_package_name(cls) -> str: +return "lsh" + +def approx_nearest_neighbors(self, dataset: Table, key: Vector, k: int, + dist_col: str = 'distCol'): +""" +Given a dataset and an item, approximately find at most k items which have the closest +distance to the item. If the `outputCol` is missing in the given dataset, this method +transforms the dataset with the model at first. + +:param dataset: The dataset in which to to search for nearest neighbors. +:param key: The item to search for. +:param k: The maximum number of nearest neighbors. +:param dist_col: The output column storing the distance between each neighbor and the +key. +:return: A dataset containing at most k items closest to the key with a column named +`distCol` appended. Review Comment: To keep the same coding style with PyFlink and flink-ml-python, it's recommended to align the beginning of each line. E.g., ``` :param dist_col: The output column storing the distance between each neighbor and the key. :return: A dataset containing at most k items closest to the key with a column named `distCol` appended. ``` So as the other
[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH
jiangxin369 commented on code in PR #191: URL: https://github.com/apache/flink-ml/pull/191#discussion_r1060261023 ## flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/MinHashLSHExample.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.examples.feature; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.ml.feature.lsh.MinHashLSH; +import org.apache.flink.ml.feature.lsh.MinHashLSHModel; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.SparseVector; +import org.apache.flink.ml.linalg.Vector; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import org.apache.commons.collections.IteratorUtils; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.table.api.Expressions.$; + +/** + * Simple program that trains a MinHashLSH model and uses it for approximate nearest neighbors and + * similarity join. + */ +public class MinHashLSHExample { +public static void main(String[] args) throws Exception { + +// Creates a new StreamExecutionEnvironment +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// Creates a StreamTableEnvironment +StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + +// Generates two datasets +Table data = +tEnv.fromDataStream( +env.fromCollection( +Arrays.asList( +Row.of( +0, +Vectors.sparse( +6, +new int[] {0, 1, 2}, +new double[] {1., 1., 1.})), +Row.of( +1, +Vectors.sparse( +6, +new int[] {2, 3, 4}, +new double[] {1., 1., 1.})), +Row.of( +2, +Vectors.sparse( +6, +new int[] {0, 2, 4}, +new double[] {1., 1., 1.}))), +Types.ROW_NAMED( +new String[] {"id", "vec"}, +Types.INT, + TypeInformation.of(SparseVector.class; + +Table dataB = Review Comment: As the examples are expected to be the best practice for users, the variable naming should be more strict. Would it be better if rename the `data` to `inputTable`, `dataB` to `similarityJoinTable`? `dataA` and `dataB` are also acceptable. So as in the python example. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH
jiangxin369 commented on code in PR #191: URL: https://github.com/apache/flink-ml/pull/191#discussion_r1060256575 ## docs/content/docs/operators/feature/minhashlsh.md: ## @@ -0,0 +1,280 @@ +--- +title: "MinHash LSH" +weight: 1 +type: docs +aliases: +- /operators/feature/minhashlsh.html +--- + + + +## MinHash LSH + +MinHash LSH is a Locality Sensitive Hashing (LSH) scheme for Jaccard distance metric. +The input features are sets of natural numbers represented as non-zero indices of vectors, +either dense vectors or sparse vectors. Typically, sparse vectors are more efficient. + +In addition to transforming input feature vectors to multiple hash values, the MinHash LSH +model also supports approximate nearest neighbors search within a dataset regarding a key +vector and approximate similarity join between two datasets. + +### Input Columns + +| Param name | Type | Default | Description| +|:---|:---|:--|:---| +| inputCol | Vector | `"input"` | Features to be mapped. | + +### Output Columns + +| Param name | Type | Default| Description | +|:---|:--|:---|:-| +| outputCol | DenseVector[] | `"output"` | Hash values. | + +### Parameters Review Comment: Since `MinHashLSH` has some parameters that cannot be set to `MinHashLSHModel`, let's separate the parameters into two tables just like in `vectorindexer.md`. ## flink-ml-python/pyflink/ml/lib/feature/lsh.py: ## @@ -0,0 +1,191 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import typing +from abc import ABC +from pyflink.java_gateway import get_gateway +from pyflink.table import Table +from pyflink.util.java_utils import to_jarray + +from pyflink.ml.core.linalg import Vector, DenseVector, SparseVector +from pyflink.ml.core.param import Param, IntParam, ParamValidators +from pyflink.ml.core.wrapper import JavaWithParams +from pyflink.ml.lib.feature.common import JavaFeatureEstimator, JavaFeatureModel +from pyflink.ml.lib.param import HasInputCol, HasOutputCol, HasSeed + + +class _LSHModelParams(JavaWithParams, + HasInputCol, + HasOutputCol): +""" +Params for :class:`LSHModel` +""" + +def __init__(self, java_params): +super(_LSHModelParams, self).__init__(java_params) + + +class _LSHParams(_LSHModelParams): +""" +Params for :class:`LSH` +""" + +NUM_HASH_TABLES: Param[int] = IntParam( +"num_hash_tables", "Number of hash tables.", 1, ParamValidators.gt_eq(1) +) + +NUM_HASH_FUNCTIONS_PER_TABLE: Param[int] = IntParam( +"num_hash_functions_per_table", +"Number of hash functions per table.", +1, +ParamValidators.gt_eq(1.)) Review Comment: nit: let's unify the parameter of `gt_eq` in `NUM_HASH_TABLES` and `NUM_HASH_FUNCTIONS_PER_TABLE` to either 1. or 1. ## flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/MinHashLSHExample.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.examples.feature; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types;
[jira] [Commented] (FLINK-30544) Speed up finding minimum watermark across all channels by introducing heap-based algorithm
[ https://issues.apache.org/jira/browse/FLINK-30544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653836#comment-17653836 ] XiangQianLiu commented on FLINK-30544: -- 不错的提议 > Speed up finding minimum watermark across all channels by introducing > heap-based algorithm > -- > > Key: FLINK-30544 > URL: https://issues.apache.org/jira/browse/FLINK-30544 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Lijie Wang >Assignee: Lijie Wang >Priority: Major > Fix For: 1.17.0 > > > Currently, every time a task receives a watermark, it tries to update the > minimum watermark.Currently, we use the traversal algorithm to find the > minimum watermark across all channels(see > [StatusWatermarkValue#findAndOutputNewMinWatermarkAcrossAlignedChannels|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java#:~:text=private%20void-,findAndOutputNewMinWatermarkAcrossAlignedChannels,-(DataOutput%3C%3F%3E] > for details), and the time complexity is O(N), where N is the number of > channels. > We can optimize it by introducing a heap-based algorthim, reducing the time > complexity to O(log(N))) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xintongsong commented on a diff in pull request #21565: [FLINK-18229][ResourceManager] Support cancel pending workers if no longer needed.
xintongsong commented on code in PR #21565: URL: https://github.com/apache/flink/pull/21565#discussion_r1060267951 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java: ## @@ -526,6 +528,40 @@ public Collection getPendingTaskManagerSlots() { return pendingSlots.values(); } +/** + * remove unused pending task manager slots. + * + * @param unUsedResourceCounter the count of unused resources. + */ +public void removePendingTaskManagerSlots(ResourceCounter unUsedResourceCounter) { +Iterator> pendingSlotIterator = +pendingSlots.entrySet().iterator(); +while (pendingSlotIterator.hasNext()) { +Map.Entry pendingTaskManagerSlotEntry = +pendingSlotIterator.next(); +PendingTaskManagerSlot pendingTaskManagerSlot = pendingTaskManagerSlotEntry.getValue(); +ResourceProfile resourceProfile = pendingTaskManagerSlot.getResourceProfile(); +if (unUsedResourceCounter.getResourceCount(resourceProfile) > 0) { Review Comment: For `DeclarativeSlotManager`, fine-grained resource management is unsupported, and the resource profiles for pending slots should always be `defaultSlotResourceProfile`. (Constructor of `PendingTaskManagerSlot` is always called with `defaultSlotResourceProfile` as the argument.) Based on this assumption, we can add a `checkState` for protection and simplifies this method. ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java: ## @@ -82,12 +83,26 @@ public void replaceAllPendingAllocations( LOG.trace("Record the pending allocations {}.", pendingSlotAllocations); pendingSlotAllocationRecords.clear(); pendingSlotAllocationRecords.putAll(pendingSlotAllocations); +Set unusedPendingTaskManager = +pendingTaskManagers.keySet().stream() +.filter(id -> !pendingSlotAllocationRecords.containsKey(id)) +.collect(Collectors.toSet()); +for (PendingTaskManagerId pendingTaskManagerId : unusedPendingTaskManager) { +removePendingTaskManager(pendingTaskManagerId); +} } @Override public void clearPendingAllocationsOfJob(JobID jobId) { LOG.info("Clear all pending allocations for job {}.", jobId); pendingSlotAllocationRecords.values().forEach(allocation -> allocation.remove(jobId)); +Set unusedPendingTaskManager = +pendingTaskManagers.keySet().stream() +.filter(id -> !pendingSlotAllocationRecords.containsKey(id)) +.collect(Collectors.toSet()); +for (PendingTaskManagerId pendingTaskManagerId : unusedPendingTaskManager) { +removePendingTaskManager(pendingTaskManagerId); +} Review Comment: These can be deduplicated as `removeUnusedPendingTaskManagers()` and reused in `replaceAllPendingAllocations`. Even `clearAllPendingTaskManager` can be replaced. ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java: ## @@ -384,18 +422,59 @@ private int releaseUnWantedResources( return needReleaseWorkerNumber; } +private int releaseResources(Collection resourceIDS, int needReleaseWorkerNumber) { Review Comment: ```suggestion private int releaseResources(Collection resourceIds, int needReleaseWorkerNumber) { ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java: ## @@ -526,6 +528,40 @@ public Collection getPendingTaskManagerSlots() { return pendingSlots.values(); } +/** + * remove unused pending task manager slots. + * + * @param unUsedResourceCounter the count of unused resources. + */ +public void removePendingTaskManagerSlots(ResourceCounter unUsedResourceCounter) { +Iterator> pendingSlotIterator = +pendingSlots.entrySet().iterator(); +while (pendingSlotIterator.hasNext()) { +Map.Entry pendingTaskManagerSlotEntry = +pendingSlotIterator.next(); +PendingTaskManagerSlot pendingTaskManagerSlot = pendingTaskManagerSlotEntry.getValue(); +ResourceProfile resourceProfile = pendingTaskManagerSlot.getResourceProfile(); +if (unUsedResourceCounter.getResourceCount(resourceProfile) > 0) { +pendingSlotIterator.remove(); +unUsedResourceCounter = unUsedResourceCounter.subtract(resourceProfile, 1); +} +} + +if (resourceAllocator.isSupported()) { +declareNeededResourcesWithDelay(); +} +} + +/** clear all pending task manager slots. */ +public void
[jira] [Created] (FLINK-30545) The SchemaManager doesn't check 'NOT NULL' specification when committing AddColumn change
yuzelin created FLINK-30545: --- Summary: The SchemaManager doesn't check 'NOT NULL' specification when committing AddColumn change Key: FLINK-30545 URL: https://issues.apache.org/jira/browse/FLINK-30545 Project: Flink Issue Type: Bug Components: Table Store Reporter: yuzelin Fix For: table-store-0.4.0 Currently, table store doesn't support adding column with 'NOT NULL' specification, but it doesn't check this condition. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] yunfengzhou-hub commented on pull request #189: [FLINK-30348] Refine Transformer for RandomSplitter
yunfengzhou-hub commented on PR #189: URL: https://github.com/apache/flink-ml/pull/189#issuecomment-1369450594 Hi @weibozhao, thanks for the update. Could you please modify the description section of this PR and its corresponding Jira ticket, explaining the changes made in this PR and why we should add them to Flink ML? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-30539) YARNSessionCapacitySchedulerITCase.testDetachedPerJobYarnCluster and testDetachedPerJobYarnClusterWithStreamingJob timing out
[ https://issues.apache.org/jira/browse/FLINK-30539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653606#comment-17653606 ] Matthias Pohl edited comment on FLINK-30539 at 1/3/23 7:07 AM: --- -Ok, I assume you meant waiting with investigating the issue which makes sense. I went ahead and created the PRs for removing the timeout, though.- reevaluating my comment: I missed going over [your comment in FLINK-24169|https://issues.apache.org/jira/browse/FLINK-24169?focusedCommentId=17653560=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17653560] initially. Considering that you want to tackle the relative path issue for YARN tests in [PR #21128|https://github.com/apache/flink/pull/21128] that covers the hadoop update, it makes sense to wait with investigating this issue. Thanks for that. Merging the PRs for removing the timeouts makes sense, anyway. was (Author: mapohl): Ok, I assume you meant waiting with investigating the issue which makes sense. I went ahead and created the PRs for removing the timeout, though. > YARNSessionCapacitySchedulerITCase.testDetachedPerJobYarnCluster and > testDetachedPerJobYarnClusterWithStreamingJob timing out > - > > Key: FLINK-30539 > URL: https://issues.apache.org/jira/browse/FLINK-30539 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.16.0 >Reporter: Matthias Pohl >Priority: Major > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44337=logs=298e20ef-7951-5965-0e79-ea664ddc435e=d4c90338-c843-57b0-3232-10ae74f00347=32023 > In order to help investigating the issue: Both tests failed because they are > running into the 60s timeout that was defined for each of them. We should get > rid of the timeout to access the thread dump. It might be related to > FLINK-24169 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30343) Migrate KubernetesLeaderElectionAndRetrievalITCase
[ https://issues.apache.org/jira/browse/FLINK-30343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653825#comment-17653825 ] Wencong Liu commented on FLINK-30343: - It makes sense. Let's follow the progress of FLINK-26522. > Migrate KubernetesLeaderElectionAndRetrievalITCase > -- > > Key: FLINK-30343 > URL: https://issues.apache.org/jira/browse/FLINK-30343 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.16.0, 1.17.0 >Reporter: Matthias Pohl >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] link3280 commented on pull request #21581: [FLINK-30538][SQL gateway/client] Improve error handling of stop job operation
link3280 commented on PR #21581: URL: https://github.com/apache/flink/pull/21581#issuecomment-1369443823 Please kindly take a look @fsk119 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27518) Refactor migration tests to support version update automatically
[ https://issues.apache.org/jira/browse/FLINK-27518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653822#comment-17653822 ] Yun Gao commented on FLINK-27518: - Got that, I'm now a bit freed and will improve the priority of this issue. I'll open the PR before early next week. > Refactor migration tests to support version update automatically > > > Key: FLINK-27518 > URL: https://issues.apache.org/jira/browse/FLINK-27518 > Project: Flink > Issue Type: Bug > Components: Test Infrastructure >Affects Versions: 1.16.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > > Currently on releasing each version, we need to manually generate the > snapshots for every migration tests and update the current versions. With > more and more migration tests are added, this has been more and more > intractable. It is better if we could make it happen automatically on cutting > new branches. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26974) Python EmbeddedThreadDependencyTests.test_add_python_file failed on azure
[ https://issues.apache.org/jira/browse/FLINK-26974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653821#comment-17653821 ] Matthias Pohl commented on FLINK-26974: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44394=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=28170 > Python EmbeddedThreadDependencyTests.test_add_python_file failed on azure > - > > Key: FLINK-26974 > URL: https://issues.apache.org/jira/browse/FLINK-26974 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yun Gao >Assignee: Huang Xingbo >Priority: Critical > Labels: auto-deprioritized-major, test-stability > > {code:java} > Mar 31 10:49:17 === FAILURES > === > Mar 31 10:49:17 __ > EmbeddedThreadDependencyTests.test_add_python_file __ > Mar 31 10:49:17 > Mar 31 10:49:17 self = > testMethod=test_add_python_file> > Mar 31 10:49:17 > Mar 31 10:49:17 def test_add_python_file(self): > Mar 31 10:49:17 python_file_dir = os.path.join(self.tempdir, > "python_file_dir_" + str(uuid.uuid4())) > Mar 31 10:49:17 os.mkdir(python_file_dir) > Mar 31 10:49:17 python_file_path = os.path.join(python_file_dir, > "test_dependency_manage_lib.py") > Mar 31 10:49:17 with open(python_file_path, 'w') as f: > Mar 31 10:49:17 f.write("def add_two(a):\nraise > Exception('This function should not be called!')") > Mar 31 10:49:17 self.t_env.add_python_file(python_file_path) > Mar 31 10:49:17 > Mar 31 10:49:17 python_file_dir_with_higher_priority = os.path.join( > Mar 31 10:49:17 self.tempdir, "python_file_dir_" + > str(uuid.uuid4())) > Mar 31 10:49:17 os.mkdir(python_file_dir_with_higher_priority) > Mar 31 10:49:17 python_file_path_higher_priority = > os.path.join(python_file_dir_with_higher_priority, > Mar 31 10:49:17 > "test_dependency_manage_lib.py") > Mar 31 10:49:17 with open(python_file_path_higher_priority, 'w') as f: > Mar 31 10:49:17 f.write("def add_two(a):\nreturn a + 2") > Mar 31 10:49:17 > self.t_env.add_python_file(python_file_path_higher_priority) > Mar 31 10:49:17 > Mar 31 10:49:17 def plus_two(i): > Mar 31 10:49:17 from test_dependency_manage_lib import add_two > Mar 31 10:49:17 return add_two(i) > Mar 31 10:49:17 > Mar 31 10:49:17 self.t_env.create_temporary_system_function( > Mar 31 10:49:17 "add_two", udf(plus_two, DataTypes.BIGINT(), > DataTypes.BIGINT())) > Mar 31 10:49:17 table_sink = source_sink_utils.TestAppendSink( > Mar 31 10:49:17 ['a', 'b'], [DataTypes.BIGINT(), > DataTypes.BIGINT()]) > Mar 31 10:49:17 self.t_env.register_table_sink("Results", table_sink) > Mar 31 10:49:17 t = self.t_env.from_elements([(1, 2), (2, 5), (3, > 1)], ['a', 'b']) > Mar 31 10:49:17 > t.select(expr.call("add_two", t.a), > t.a).execute_insert("Results").wait() > Mar 31 10:49:17 > Mar 31 10:49:17 pyflink/table/tests/test_dependency.py:63: > Mar 31 10:49:17 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ > Mar 31 10:49:17 pyflink/table/table_result.py:76: in wait > Mar 31 10:49:17 get_method(self._j_table_result, "await")() > Mar 31 10:49:17 > .tox/py38-cython/lib/python3.8/site-packages/py4j/java_gateway.py:1321: in > __call__ > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34001=logs=821b528f-1eed-5598-a3b4-7f748b13f261=6bb545dd-772d-5d8c-f258-f5085fba3295=27239 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29849) Event time temporal join on an upsert source may produce incorrect execution plan
[ https://issues.apache.org/jira/browse/FLINK-29849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-29849: Release Note: This resolves the correctness issue when do event time temporal join with a versioned table backed by an upsert source. When the right input of the join is an upsert source, it no longer generates a ChangelogNormalize node for it. Note this is an incompatible plan change compare to 1.16.0 > Event time temporal join on an upsert source may produce incorrect execution > plan > - > > Key: FLINK-29849 > URL: https://issues.apache.org/jira/browse/FLINK-29849 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0, 1.15.3 >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > For current implementation, the execution plan is incorrect when do event > time temporal join on an upsert source. There's two problems: > 1. for an upsert source, we should not add a ChangelogNormalize node under a > temporal join input, or it will damage the versions of the version table. For > versioned tables, we use a single-temporal mechanism which relies sequencial > records of a same key to ensure the valid period of each version, so if the > ChangelogNormalize was added then an UB message will be produced based on the > previous UA or Insert message, and all the columns are totally same include > event time, e.g., > original upsert input > {code} > +I (key1, '2022-11-02 10:00:00', a1) > +U (key1, '2022-11-02 10:01:03', a2) > {code} > the versioned data should be: > {code} > v1 [~, '2022-11-02 10:00:00') > v2 ['2022-11-02 10:00:00', '2022-11-02 10:01:03') > {code} > after ChangelogNormalize's processing, will output: > {code} > +I (key1, '2022-11-02 10:00:00', a1) > -U (key1, '2022-11-02 10:00:00', a1) > +U (key1, '2022-11-02 10:01:03', a2) > {code} > versions are incorrect: > {code} > v1 ['2022-11-02 10:00:00', '2022-11-02 10:00:00') // invalid period > v2 ['2022-11-02 10:00:00', '2022-11-02 10:01:03') > {code} > 2. semantically, a filter cannot be pushed into an event time temporal join, > otherwise, the filter may also corrupt the versioned table -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30343) Migrate KubernetesLeaderElectionAndRetrievalITCase
[ https://issues.apache.org/jira/browse/FLINK-30343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653817#comment-17653817 ] Matthias Pohl commented on FLINK-30343: --- I'm not sure whether it's worth it to start on FLINK-30338 while FLINK-26522. If FLINK-26522 is resolved in the way it is described in its comment section right now, the {{MultipleComponent*}} classes would either go away or become what's {{DefaultLeaderElectionService}} right now with slight adaptations. Therefore, it would make sense to rather migrate the test cases that are implemented for the {{MultiComponent*}} as part of FLINK-26522 than migrating the legacy test cases to what's now under the {{MultipleComponent*}} leader election as part of FLINK-30338 just to migrate them back as part of FLINK-26522 again. It seems to be extra effort without any additional value. WDYT? > Migrate KubernetesLeaderElectionAndRetrievalITCase > -- > > Key: FLINK-30343 > URL: https://issues.apache.org/jira/browse/FLINK-30343 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.16.0, 1.17.0 >Reporter: Matthias Pohl >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28988) Incorrect result for filter after temporal join
[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-28988: Flags: Important Release Note: After FLINK-28988 applied, the filter will not be pushed down into both inputs of the join. Note this may cause incompatible plan changes compare to 1.16.0, e.g., when left input is an upsert source(use upsert-kafka connector), the query plan will remove the ChangelogNormalize node from which appeared in 1.16.0. > Incorrect result for filter after temporal join > --- > > Key: FLINK-28988 > URL: https://issues.apache.org/jira/browse/FLINK-28988 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.1 >Reporter: Xuannan Su >Assignee: Shuiqiang Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0, 1.16.1 > > > The following code can reproduce the case > > {code:java} > public class TemporalJoinSQLExample1 { > public static void main(String[] args) throws Exception { > // set up the Java DataStream API > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // set up the Java Table API > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > final DataStreamSource> ds = > env.fromElements( > new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), > new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), > new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); > final Table table = > tableEnv.fromDataStream( > ds, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", DataTypes.STRING()) > .column("f2", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f2", "f2 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "state", "ts"); > tableEnv.createTemporaryView("source_table", table); > final Table dedupeTable = > tableEnv.sqlQuery( > "SELECT * FROM (" > + " SELECT *, ROW_NUMBER() OVER (PARTITION BY > id ORDER BY ts DESC) AS row_num FROM source_table" > + ") WHERE row_num = 1"); > tableEnv.createTemporaryView("versioned_table", dedupeTable); > DataStreamSource> event = > env.fromElements( > new Tuple2<>(0, Instant.ofEpochMilli(0)), > new Tuple2<>(0, Instant.ofEpochMilli(5)), > new Tuple2<>(0, Instant.ofEpochMilli(10)), > new Tuple2<>(0, Instant.ofEpochMilli(15)), > new Tuple2<>(0, Instant.ofEpochMilli(20)), > new Tuple2<>(0, Instant.ofEpochMilli(25))); > final Table eventTable = > tableEnv.fromDataStream( > event, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f1", "f1 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "ts"); > tableEnv.createTemporaryView("event_table", eventTable); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM event_table" > + " LEFT JOIN versioned_table FOR SYSTEM_TIME > AS OF event_table.ts" > + " ON event_table.id = versioned_table.id"); > result.execute().print(); > result.filter($("state").isEqual("online")).execute().print(); > } > } {code} > > The result of temporal join is the following: > |op| id| ts| id0| > state| ts0| row_num| > |+I| 0|1970-01-01 08:00:00.000| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.005| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.010| 0| > offline|1970-01-01 08:00:00.010| 1| > |+I|
[jira] [Updated] (FLINK-28988) Incorrect result for filter after temporal join
[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-28988: Release Note: After FLINK-28988 applied, the filter will not be pushed down into both inputs of the event time temporal join. Note this may cause incompatible plan changes compare to 1.16.0, e.g., when left input is an upsert source(use upsert-kafka connector), the query plan will remove the ChangelogNormalize node from which appeared in 1.16.0. was: After FLINK-28988 applied, the filter will not be pushed down into both inputs of the join. Note this may cause incompatible plan changes compare to 1.16.0, e.g., when left input is an upsert source(use upsert-kafka connector), the query plan will remove the ChangelogNormalize node from which appeared in 1.16.0. > Incorrect result for filter after temporal join > --- > > Key: FLINK-28988 > URL: https://issues.apache.org/jira/browse/FLINK-28988 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.1 >Reporter: Xuannan Su >Assignee: Shuiqiang Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0, 1.16.1 > > > The following code can reproduce the case > > {code:java} > public class TemporalJoinSQLExample1 { > public static void main(String[] args) throws Exception { > // set up the Java DataStream API > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // set up the Java Table API > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > final DataStreamSource> ds = > env.fromElements( > new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), > new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), > new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); > final Table table = > tableEnv.fromDataStream( > ds, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", DataTypes.STRING()) > .column("f2", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f2", "f2 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "state", "ts"); > tableEnv.createTemporaryView("source_table", table); > final Table dedupeTable = > tableEnv.sqlQuery( > "SELECT * FROM (" > + " SELECT *, ROW_NUMBER() OVER (PARTITION BY > id ORDER BY ts DESC) AS row_num FROM source_table" > + ") WHERE row_num = 1"); > tableEnv.createTemporaryView("versioned_table", dedupeTable); > DataStreamSource> event = > env.fromElements( > new Tuple2<>(0, Instant.ofEpochMilli(0)), > new Tuple2<>(0, Instant.ofEpochMilli(5)), > new Tuple2<>(0, Instant.ofEpochMilli(10)), > new Tuple2<>(0, Instant.ofEpochMilli(15)), > new Tuple2<>(0, Instant.ofEpochMilli(20)), > new Tuple2<>(0, Instant.ofEpochMilli(25))); > final Table eventTable = > tableEnv.fromDataStream( > event, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f1", "f1 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "ts"); > tableEnv.createTemporaryView("event_table", eventTable); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM event_table" > + " LEFT JOIN versioned_table FOR SYSTEM_TIME > AS OF event_table.ts" > + " ON event_table.id = versioned_table.id"); > result.execute().print(); > result.filter($("state").isEqual("online")).execute().print(); > } > } {code} > > The result of temporal join is the following: > |op| id| ts| id0| > state| ts0| row_num| > |+I| 0|1970-01-01 08:00:00.000| 0| > online|1970-01-01
[jira] [Assigned] (FLINK-30389) Add retry to read hints
[ https://issues.apache.org/jira/browse/FLINK-30389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-30389: Assignee: Wencong Liu > Add retry to read hints > --- > > Key: FLINK-30389 > URL: https://issues.apache.org/jira/browse/FLINK-30389 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Wencong Liu >Priority: Major > Fix For: table-store-0.3.0 > > > For the oss (object store) filesystem. When writing hint file, delete it > first and then add it. Reading hint file may fail frequently. We don't need > to return directly in case of failure. We can add a retry. > {code:java} > Failed to read hint file LATEST. Falling back to listing files. > java.io.FileNotFoundException: oss://lake_v4/snapshot/LATEST: No such file or > directory! > at > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30389) Add retry to read hints
[ https://issues.apache.org/jira/browse/FLINK-30389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653815#comment-17653815 ] Jingsong Lee commented on FLINK-30389: -- [~Wencong Liu] Assigned > Add retry to read hints > --- > > Key: FLINK-30389 > URL: https://issues.apache.org/jira/browse/FLINK-30389 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Wencong Liu >Priority: Major > Fix For: table-store-0.3.0 > > > For the oss (object store) filesystem. When writing hint file, delete it > first and then add it. Reading hint file may fail frequently. We don't need > to return directly in case of failure. We can add a retry. > {code:java} > Failed to read hint file LATEST. Falling back to listing files. > java.io.FileNotFoundException: oss://lake_v4/snapshot/LATEST: No such file or > directory! > at > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30234) SourceReaderBase should provide an option to disable numRecordsIn metric registration
[ https://issues.apache.org/jira/browse/FLINK-30234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653814#comment-17653814 ] Wencong Liu commented on FLINK-30234: - The discussion has been created. cc [~renqs] [[DISCUSS] Allow source readers extending SourceReaderBase to override numRecordsIn report logic-Apache Mail Archives|https://lists.apache.org/thread/pq7vrqc1vjbzj6of4wm5x8x7pkpdchox] > SourceReaderBase should provide an option to disable numRecordsIn metric > registration > - > > Key: FLINK-30234 > URL: https://issues.apache.org/jira/browse/FLINK-30234 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.17.0 >Reporter: Qingsheng Ren >Assignee: Wencong Liu >Priority: Major > > Currently the numRecordsIn metric is pre-registered for all sources in > SourceReaderBase. Considering different implementation of source reader, the > definition of "record" might differ from the one we use in SourceReaderBase, > hence numRecordsIn might be inaccurate. > We could introduce an option in SourceReader to disable the registration of > numRecordsIn in SourceReaderBase and let the actual implementation to report > the metric instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on a diff in pull request #21560: [FLINK-29768] Hybrid shuffle supports consume partial finished subtask
reswqa commented on code in PR #21560: URL: https://github.com/apache/flink/pull/21560#discussion_r1060305155 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java: ## @@ -122,4 +122,6 @@ List getClusterPartitionShuffleDescriptors( IntermediateDataSetID intermediateResultPartition); MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy(); + +boolean isHybridEnableConsumePartialFinishedProducer(); Review Comment: Yes, I also didn't find a better way to handle this. Let's keep it first, If there is a better way, we can reconstruct it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #21560: [FLINK-29768] Hybrid shuffle supports consume partial finished subtask
reswqa commented on code in PR #21560: URL: https://github.com/apache/flink/pull/21560#discussion_r1060304453 ## flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java: ## @@ -670,6 +670,24 @@ public enum SchedulerType { code(SPECULATIVE_ENABLED.key())) .build()); +@Documentation.Section({ +Documentation.Sections.EXPERT_SCHEDULING, +Documentation.Sections.ALL_JOB_MANAGER +}) +public static final ConfigOption CONSUME_PARTIAL_FINISHED_PRODUCER_ENABLED = Review Comment: Good idea, I will use an enum type to replace this two config option. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] LadyForest commented on a diff in pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change
LadyForest commented on code in PR #21577: URL: https://github.com/apache/flink/pull/21577#discussion_r1060300795 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java: ## @@ -685,18 +731,18 @@ String getComment(SqlTableColumn column) { // private void validateColumnName( -String originColumnName, +String oldColumnName, String newColumnName, -ResolvedSchema originSchemas, +ResolvedSchema oldSchemas, List partitionKeys) { validateColumnName( -originColumnName, -originSchemas, +oldColumnName, +oldSchemas, Review Comment: Nit: `oldSchemas` -> `oldSchema` ? ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java: ## @@ -685,18 +731,18 @@ String getComment(SqlTableColumn column) { // private void validateColumnName( -String originColumnName, +String oldColumnName, String newColumnName, -ResolvedSchema originSchemas, +ResolvedSchema oldSchemas, Review Comment: Nit: `oldSchemas` -> `oldSchema` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30389) Add retry to read hints
[ https://issues.apache.org/jira/browse/FLINK-30389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653812#comment-17653812 ] Wencong Liu commented on FLINK-30389: - Thanks [~lzljs3620320] . A fixed number of retries and detection intervals can be added to readHints. I'd like to take this ticket. Could you please assign to me? > Add retry to read hints > --- > > Key: FLINK-30389 > URL: https://issues.apache.org/jira/browse/FLINK-30389 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Priority: Major > Fix For: table-store-0.3.0 > > > For the oss (object store) filesystem. When writing hint file, delete it > first and then add it. Reading hint file may fail frequently. We don't need > to return directly in case of failure. We can add a retry. > {code:java} > Failed to read hint file LATEST. Falling back to listing files. > java.io.FileNotFoundException: oss://lake_v4/snapshot/LATEST: No such file or > directory! > at > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30376) Introduce a new flink bushy join reorder rule which based on greedy algorithm
[ https://issues.apache.org/jira/browse/FLINK-30376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yunhong Zheng updated FLINK-30376: -- Summary: Introduce a new flink bushy join reorder rule which based on greedy algorithm (was: Introduce a new flink busy join reorder rule which based on greedy algorithm) > Introduce a new flink bushy join reorder rule which based on greedy algorithm > - > > Key: FLINK-30376 > URL: https://issues.apache.org/jira/browse/FLINK-30376 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: Yunhong Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > Introducing a new Flink busy join reorder strategy which based on the greedy > algorithm. The old join reorder rule will also be the default join reorder > rule and the new busy join reorder strategy will be optional. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30376) Introduce a new flink bushy join reorder rule which based on greedy algorithm
[ https://issues.apache.org/jira/browse/FLINK-30376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yunhong Zheng updated FLINK-30376: -- Description: Introducing a new Flink bushy join reorder strategy which based on the greedy algorithm. The old join reorder rule will also be the default join reorder rule and the new bushy join reorder strategy will be optional. (was: Introducing a new Flink busy join reorder strategy which based on the greedy algorithm. The old join reorder rule will also be the default join reorder rule and the new busy join reorder strategy will be optional.) > Introduce a new flink bushy join reorder rule which based on greedy algorithm > - > > Key: FLINK-30376 > URL: https://issues.apache.org/jira/browse/FLINK-30376 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: Yunhong Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > Introducing a new Flink bushy join reorder strategy which based on the greedy > algorithm. The old join reorder rule will also be the default join reorder > rule and the new bushy join reorder strategy will be optional. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26029) Generalize the checkpoint protocol of OperatorCoordinator.
[ https://issues.apache.org/jira/browse/FLINK-26029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653799#comment-17653799 ] Yunfeng Zhou commented on FLINK-26029: -- According to offline discussions with Dong Lin, a solution to this problem could be to make OperatorCoordinators generate checkpoint barriers and send them to their subtasks. The subtasks would need to align these barriers with the ones they receive from upstream operators or sources, and actually trigger the checkpoint when checkpoint barrier alignment is reached. The solution mentioned above requires further discussion and the community's agreement about the behavior and performance of Flink runtime during checkpoints. Given that currently no subclass of OperatorCoordinator would be affected by this function, it is thus of lower priority for now. > Generalize the checkpoint protocol of OperatorCoordinator. > -- > > Key: FLINK-26029 > URL: https://issues.apache.org/jira/browse/FLINK-26029 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.14.3 >Reporter: Jiangjie Qin >Assignee: Yunfeng Zhou >Priority: Minor > Labels: extensibility, pull-request-available, stale-assigned > > **Problem statement** > Currently, the JM uses OperatorEventValve to control the communication from > OperatorCoordinator (OC) to the subtasks in order to achieve state > consistency across OC and subtasks after recovering from a checkpoint. The > valve is closed when a checkpoint starts and reopened after the checkpoint > barriers are sent to the source subtasks. > While this mechanism works for the source operators, it unnecessarily limits > the general usage of OC due to the following limitations: > - It does not handle (e.g. blocks) the control flow messages from subtasks to > OC. > - It does not handle the control flow messages from OC to subtasks which are > sent after checkpoint barriers have been sent to sources but before subtasks > have received those barriers. > If the limitations mentioned above are not satisfied, consistency issues > might occur. For example, if a subtask sends messages to its coordinator > after the checkpoint barriers are sent to the sources and before the subtasks > receive the barriers, these messages would be recorded in the subtask's > snapshot but not the coordinator's. When the Flink job recovers from this > snapshot, the subtask would have a record of sending out the message while > the coordinator has no record of receiving it. > **Proposed solution** > We plan to address this problem by extending the blocking period. The > communication should be blocked before the OC starts the checkpoint and > reopened for each individual subtask after that subtask finishes the > checkpoint, to make sure that both OCs and subtasks would see the same > version of message after recovering from a checkpoint. Communications in both > directions should be blocked during this period. And the messages blocked on > the subtasks side should be properly stored in the checkpoint snapshot for > recovery. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29217) CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint failed with AssertionFailedError
[ https://issues.apache.org/jira/browse/FLINK-29217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-29217: - Fix Version/s: 1.17.0 > CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint > failed with AssertionFailedError > - > > Key: FLINK-29217 > URL: https://issues.apache.org/jira/browse/FLINK-29217 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.16.0 >Reporter: Xingbo Huang >Assignee: Yunfeng Zhou >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.17.0, 1.16.1 > > > {code:java} > 2022-09-07T02:00:50.2507464Z Sep 07 02:00:50 [ERROR] > org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint > Time elapsed: 2.137 s <<< FAILURE! > 2022-09-07T02:00:50.2508673Z Sep 07 02:00:50 > org.opentest4j.AssertionFailedError: > 2022-09-07T02:00:50.2509309Z Sep 07 02:00:50 > 2022-09-07T02:00:50.2509945Z Sep 07 02:00:50 Expecting value to be false but > was true > 2022-09-07T02:00:50.2511950Z Sep 07 02:00:50 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > 2022-09-07T02:00:50.2513254Z Sep 07 02:00:50 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > 2022-09-07T02:00:50.2514621Z Sep 07 02:00:50 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > 2022-09-07T02:00:50.2516342Z Sep 07 02:00:50 at > org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint(CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:173) > 2022-09-07T02:00:50.2517852Z Sep 07 02:00:50 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-09-07T02:00:50.251Z Sep 07 02:00:50 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-09-07T02:00:50.2520065Z Sep 07 02:00:50 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-09-07T02:00:50.2521153Z Sep 07 02:00:50 at > java.lang.reflect.Method.invoke(Method.java:498) > 2022-09-07T02:00:50.2522747Z Sep 07 02:00:50 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2022-09-07T02:00:50.2523973Z Sep 07 02:00:50 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2022-09-07T02:00:50.2525158Z Sep 07 02:00:50 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > 2022-09-07T02:00:50.2526347Z Sep 07 02:00:50 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2022-09-07T02:00:50.2527525Z Sep 07 02:00:50 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2022-09-07T02:00:50.2528646Z Sep 07 02:00:50 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2022-09-07T02:00:50.2529708Z Sep 07 02:00:50 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > 2022-09-07T02:00:50.2530744Z Sep 07 02:00:50 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-09-07T02:00:50.2532008Z Sep 07 02:00:50 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > 2022-09-07T02:00:50.2533137Z Sep 07 02:00:50 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > 2022-09-07T02:00:50.2544265Z Sep 07 02:00:50 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > 2022-09-07T02:00:50.2545595Z Sep 07 02:00:50 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > 2022-09-07T02:00:50.2546782Z Sep 07 02:00:50 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > 2022-09-07T02:00:50.2547810Z Sep 07 02:00:50 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > 2022-09-07T02:00:50.2548890Z Sep 07 02:00:50 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > 2022-09-07T02:00:50.2549932Z Sep 07 02:00:50 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > 2022-09-07T02:00:50.2550933Z Sep 07 02:00:50 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > 2022-09-07T02:00:50.2552325Z Sep 07 02:00:50 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > 2022-09-07T02:00:50.2553660Z Sep 07 02:00:50 at > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2022-09-07T02:00:50.2554661Z Sep 07 02:00:50 at >
[jira] [Updated] (FLINK-26029) Generalize the checkpoint protocol of OperatorCoordinator.
[ https://issues.apache.org/jira/browse/FLINK-26029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-26029: - Priority: Minor (was: Major) > Generalize the checkpoint protocol of OperatorCoordinator. > -- > > Key: FLINK-26029 > URL: https://issues.apache.org/jira/browse/FLINK-26029 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.14.3 >Reporter: Jiangjie Qin >Assignee: Yunfeng Zhou >Priority: Minor > Labels: extensibility, pull-request-available, stale-assigned > > **Problem statement** > Currently, the JM uses OperatorEventValve to control the communication from > OperatorCoordinator (OC) to the subtasks in order to achieve state > consistency across OC and subtasks after recovering from a checkpoint. The > valve is closed when a checkpoint starts and reopened after the checkpoint > barriers are sent to the source subtasks. > While this mechanism works for the source operators, it unnecessarily limits > the general usage of OC due to the following limitations: > - It does not handle (e.g. blocks) the control flow messages from subtasks to > OC. > - It does not handle the control flow messages from OC to subtasks which are > sent after checkpoint barriers have been sent to sources but before subtasks > have received those barriers. > If the limitations mentioned above are not satisfied, consistency issues > might occur. For example, if a subtask sends messages to its coordinator > after the checkpoint barriers are sent to the sources and before the subtasks > receive the barriers, these messages would be recorded in the subtask's > snapshot but not the coordinator's. When the Flink job recovers from this > snapshot, the subtask would have a record of sending out the message while > the coordinator has no record of receiving it. > **Proposed solution** > We plan to address this problem by extending the blocking period. The > communication should be blocked before the OC starts the checkpoint and > reopened for each individual subtask after that subtask finishes the > checkpoint, to make sure that both OCs and subtasks would see the same > version of message after recovering from a checkpoint. Communications in both > directions should be blocked during this period. And the messages blocked on > the subtasks side should be properly stored in the checkpoint snapshot for > recovery. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28639) Preserve distributed consistency of OperatorEvents from subtasks to OperatorCoordinator
[ https://issues.apache.org/jira/browse/FLINK-28639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-28639: - Priority: Minor (was: Major) > Preserve distributed consistency of OperatorEvents from subtasks to > OperatorCoordinator > --- > > Key: FLINK-28639 > URL: https://issues.apache.org/jira/browse/FLINK-28639 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing >Affects Versions: 1.14.3 >Reporter: Yunfeng Zhou >Assignee: Yunfeng Zhou >Priority: Minor > Labels: pull-request-available > > This is the second step to solving the consistency issue of OC > communications. In this step, we would also guarantee the consistency of > operator events sent from subtasks to OCs. Combined with the other subtask to > preserve the consistency of communications in the reverse direction, all > communications between OC and subtasks would be consistent across checkpoints > and global failovers. > To achieve the goal of this step, we need to add closing/reopening functions > to the subtasks' gateways and make the subtasks aware of a checkpoint before > they receive the checkpoint barriers. The general process would be as follows. > 1. When the OC starts checkpoint, it notifies all subtasks about this > information. > 2. After being notified about the ongoing checkpoint in OC, a subtask sends a > special operator event to its OC, which is the last operator event the OC > could receive from the subtask before the subtask completes the checkpoint. > Then the subtask closes its gateway. > 3. After receiving this special event from all subtasks, the OC finishes its > checkpoint and closes its gateway. Then the checkpoint coordinator sends > checkpoint barriers to the sources. > 4. If the subtask or the OC generate any event to send to each other, they > buffer the events locally. > 5. When a subtask starts checkpointing, it also stores the buffered events in > the checkpoint. > 6. After the subtask completes the checkpoint, communications in both > directions are recovered and the buffered events are sent out. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-26029) Generalize the checkpoint protocol of OperatorCoordinator.
[ https://issues.apache.org/jira/browse/FLINK-26029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-26029: - Fix Version/s: (was: 1.17.0) > Generalize the checkpoint protocol of OperatorCoordinator. > -- > > Key: FLINK-26029 > URL: https://issues.apache.org/jira/browse/FLINK-26029 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.14.3 >Reporter: Jiangjie Qin >Assignee: Yunfeng Zhou >Priority: Major > Labels: extensibility, pull-request-available, stale-assigned > > **Problem statement** > Currently, the JM uses OperatorEventValve to control the communication from > OperatorCoordinator (OC) to the subtasks in order to achieve state > consistency across OC and subtasks after recovering from a checkpoint. The > valve is closed when a checkpoint starts and reopened after the checkpoint > barriers are sent to the source subtasks. > While this mechanism works for the source operators, it unnecessarily limits > the general usage of OC due to the following limitations: > - It does not handle (e.g. blocks) the control flow messages from subtasks to > OC. > - It does not handle the control flow messages from OC to subtasks which are > sent after checkpoint barriers have been sent to sources but before subtasks > have received those barriers. > If the limitations mentioned above are not satisfied, consistency issues > might occur. For example, if a subtask sends messages to its coordinator > after the checkpoint barriers are sent to the sources and before the subtasks > receive the barriers, these messages would be recorded in the subtask's > snapshot but not the coordinator's. When the Flink job recovers from this > snapshot, the subtask would have a record of sending out the message while > the coordinator has no record of receiving it. > **Proposed solution** > We plan to address this problem by extending the blocking period. The > communication should be blocked before the OC starts the checkpoint and > reopened for each individual subtask after that subtask finishes the > checkpoint, to make sure that both OCs and subtasks would see the same > version of message after recovering from a checkpoint. Communications in both > directions should be blocked during this period. And the messages blocked on > the subtasks side should be properly stored in the checkpoint snapshot for > recovery. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28639) Preserve distributed consistency of OperatorEvents from subtasks to OperatorCoordinator
[ https://issues.apache.org/jira/browse/FLINK-28639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-28639: - Fix Version/s: (was: 1.17.0) > Preserve distributed consistency of OperatorEvents from subtasks to > OperatorCoordinator > --- > > Key: FLINK-28639 > URL: https://issues.apache.org/jira/browse/FLINK-28639 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing >Affects Versions: 1.14.3 >Reporter: Yunfeng Zhou >Assignee: Yunfeng Zhou >Priority: Major > Labels: pull-request-available > > This is the second step to solving the consistency issue of OC > communications. In this step, we would also guarantee the consistency of > operator events sent from subtasks to OCs. Combined with the other subtask to > preserve the consistency of communications in the reverse direction, all > communications between OC and subtasks would be consistent across checkpoints > and global failovers. > To achieve the goal of this step, we need to add closing/reopening functions > to the subtasks' gateways and make the subtasks aware of a checkpoint before > they receive the checkpoint barriers. The general process would be as follows. > 1. When the OC starts checkpoint, it notifies all subtasks about this > information. > 2. After being notified about the ongoing checkpoint in OC, a subtask sends a > special operator event to its OC, which is the last operator event the OC > could receive from the subtask before the subtask completes the checkpoint. > Then the subtask closes its gateway. > 3. After receiving this special event from all subtasks, the OC finishes its > checkpoint and closes its gateway. Then the checkpoint coordinator sends > checkpoint barriers to the sources. > 4. If the subtask or the OC generate any event to send to each other, they > buffer the events locally. > 5. When a subtask starts checkpointing, it also stores the buffered events in > the checkpoint. > 6. After the subtask completes the checkpoint, communications in both > directions are recovered and the buffered events are sent out. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29217) CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint failed with AssertionFailedError
[ https://issues.apache.org/jira/browse/FLINK-29217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651958#comment-17651958 ] Yunfeng Zhou edited comment on FLINK-29217 at 1/3/23 5:36 AM: -- According to offline discussions with Dong Lin, a solution to this problem could be to make OperatorCoordinators generate checkpoint barriers and send them to their subtasks. The subtasks would need to align these barriers with the ones they receive from upstream operators or sources, and actually trigger the checkpoint when checkpoint barrier alignment is reached. The solution mentioned above requires further discussion about the behavior and performance of Flink runtime during checkpoints. Given that currently no subclass of OperatorCoordinator would be affected by this function, it is thus of lower priority and we would temporarily remove the guarantee that this function works correctly under concurrent checkpoints. was (Author: yunfengzhou): According to offline discussions with Dong Lin, it requires further discussion about the behavior and performance of Flink runtime during checkpoints to apply a design to preserve the consistency of operator events in case of concurrent checkpoints. Given that currently no subclass of OperatorCoordinator would be affected by this function, it is thus of lower priority and we would temporarily remove the guarantee that this function works correctly under concurrent checkpoints until the community has reached an agreement on the design for this function. > CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint > failed with AssertionFailedError > - > > Key: FLINK-29217 > URL: https://issues.apache.org/jira/browse/FLINK-29217 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.16.0 >Reporter: Xingbo Huang >Assignee: Yunfeng Zhou >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.16.1 > > > {code:java} > 2022-09-07T02:00:50.2507464Z Sep 07 02:00:50 [ERROR] > org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint > Time elapsed: 2.137 s <<< FAILURE! > 2022-09-07T02:00:50.2508673Z Sep 07 02:00:50 > org.opentest4j.AssertionFailedError: > 2022-09-07T02:00:50.2509309Z Sep 07 02:00:50 > 2022-09-07T02:00:50.2509945Z Sep 07 02:00:50 Expecting value to be false but > was true > 2022-09-07T02:00:50.2511950Z Sep 07 02:00:50 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > 2022-09-07T02:00:50.2513254Z Sep 07 02:00:50 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > 2022-09-07T02:00:50.2514621Z Sep 07 02:00:50 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > 2022-09-07T02:00:50.2516342Z Sep 07 02:00:50 at > org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint(CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:173) > 2022-09-07T02:00:50.2517852Z Sep 07 02:00:50 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-09-07T02:00:50.251Z Sep 07 02:00:50 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-09-07T02:00:50.2520065Z Sep 07 02:00:50 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-09-07T02:00:50.2521153Z Sep 07 02:00:50 at > java.lang.reflect.Method.invoke(Method.java:498) > 2022-09-07T02:00:50.2522747Z Sep 07 02:00:50 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2022-09-07T02:00:50.2523973Z Sep 07 02:00:50 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2022-09-07T02:00:50.2525158Z Sep 07 02:00:50 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > 2022-09-07T02:00:50.2526347Z Sep 07 02:00:50 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2022-09-07T02:00:50.2527525Z Sep 07 02:00:50 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2022-09-07T02:00:50.2528646Z Sep 07 02:00:50 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2022-09-07T02:00:50.2529708Z Sep 07 02:00:50 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > 2022-09-07T02:00:50.2530744Z Sep 07 02:00:50 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) >
[GitHub] [flink] reswqa commented on a diff in pull request #21560: [FLINK-29768] Hybrid shuffle supports consume partial finished subtask
reswqa commented on code in PR #21560: URL: https://github.com/apache/flink/pull/21560#discussion_r1060273190 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java: ## @@ -762,7 +763,9 @@ private static PartitionInfo createPartitionInfo( ShuffleDescriptor shuffleDescriptor = getConsumedPartitionShuffleDescriptor( consumedPartition, - TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN); + TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN, +// because partition is already finished, false is fair enough. Review Comment: Because `createPartitionInfo` will only be invoked from `finishPartitionsAndUpdateConsumers`, which has been already finished this partition. But in order not to be misused later, I renamed it to `createFinishedPartitionInfo`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-30544) Speed up finding minimum watermark across all channels by introducing heap-based algorithm
Lijie Wang created FLINK-30544: -- Summary: Speed up finding minimum watermark across all channels by introducing heap-based algorithm Key: FLINK-30544 URL: https://issues.apache.org/jira/browse/FLINK-30544 Project: Flink Issue Type: Improvement Components: Runtime / Task Reporter: Lijie Wang Fix For: 1.17.0 Currently, every time a task receives a watermark, it tries to update the minimum watermark.Currently, we use the traversal algorithm to find the minimum watermark across all channels(see [StatusWatermarkValue#findAndOutputNewMinWatermarkAcrossAlignedChannels|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java#:~:text=private%20void-,findAndOutputNewMinWatermarkAcrossAlignedChannels,-(DataOutput%3C%3F%3E] for details), and the time complexity is O(N), where N is the number of channels. We can optimize it by introducing a heap-based algorthim, reducing the time complexity to O(log(N))) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-30544) Speed up finding minimum watermark across all channels by introducing heap-based algorithm
[ https://issues.apache.org/jira/browse/FLINK-30544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Wang reassigned FLINK-30544: -- Assignee: Lijie Wang > Speed up finding minimum watermark across all channels by introducing > heap-based algorithm > -- > > Key: FLINK-30544 > URL: https://issues.apache.org/jira/browse/FLINK-30544 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Lijie Wang >Assignee: Lijie Wang >Priority: Major > Fix For: 1.17.0 > > > Currently, every time a task receives a watermark, it tries to update the > minimum watermark.Currently, we use the traversal algorithm to find the > minimum watermark across all channels(see > [StatusWatermarkValue#findAndOutputNewMinWatermarkAcrossAlignedChannels|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java#:~:text=private%20void-,findAndOutputNewMinWatermarkAcrossAlignedChannels,-(DataOutput%3C%3F%3E] > for details), and the time complexity is O(N), where N is the number of > channels. > We can optimize it by introducing a heap-based algorthim, reducing the time > complexity to O(log(N))) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #21587: [bp-1.16][FLINK-29849][table-planner] Fix event time temporal join on an upsert source may produce incorrect execution plan
flinkbot commented on PR #21587: URL: https://github.com/apache/flink/pull/21587#issuecomment-1369384736 ## CI report: * 608cedb9a58a23aa63d7291c6f32ec1318107f6c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30543) Adding more examples for setting up jobs via operator.
[ https://issues.apache.org/jira/browse/FLINK-30543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653769#comment-17653769 ] Sriram Ganesh commented on FLINK-30543: --- [~gyfora] - Please add your thoughts. > Adding more examples for setting up jobs via operator. > -- > > Key: FLINK-30543 > URL: https://issues.apache.org/jira/browse/FLINK-30543 > Project: Flink > Issue Type: Improvement >Reporter: Sriram Ganesh >Priority: Minor > > Currently, we have only basic examples which help to see how to run the job > via an operator if we can add more examples for all upgrade modes that would > be more helpful. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lincoln-lil opened a new pull request, #21587: [bp-1.16][FLINK-29849][table-planner] Fix event time temporal join on an upsert source may produce incorrect execution plan
lincoln-lil opened a new pull request, #21587: URL: https://github.com/apache/flink/pull/21587 this is a backport pr for [FLINK-29849](https://issues.apache.org/jira/browse/FLINK-29849) to release-1.16 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30389) Add retry to read hints
[ https://issues.apache.org/jira/browse/FLINK-30389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653768#comment-17653768 ] Jingsong Lee commented on FLINK-30389: -- [~Wencong Liu] SnapshotManager.readHints. Feel free to open a PR. > Add retry to read hints > --- > > Key: FLINK-30389 > URL: https://issues.apache.org/jira/browse/FLINK-30389 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Priority: Major > Fix For: table-store-0.3.0 > > > For the oss (object store) filesystem. When writing hint file, delete it > first and then add it. Reading hint file may fail frequently. We don't need > to return directly in case of failure. We can add a retry. > {code:java} > Failed to read hint file LATEST. Falling back to listing files. > java.io.FileNotFoundException: oss://lake_v4/snapshot/LATEST: No such file or > directory! > at > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30543) Adding more examples for setting up jobs via operator.
Sriram Ganesh created FLINK-30543: - Summary: Adding more examples for setting up jobs via operator. Key: FLINK-30543 URL: https://issues.apache.org/jira/browse/FLINK-30543 Project: Flink Issue Type: Improvement Reporter: Sriram Ganesh Currently, we have only basic examples which help to see how to run the job via an operator if we can add more examples for all upgrade modes that would be more helpful. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-28988) Incorrect result for filter after temporal join
[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17643318#comment-17643318 ] Godfrey He edited comment on FLINK-28988 at 1/3/23 4:08 AM: Fixed in 1.17.0: b2203eaef68364306dfcc27fb34ac82baefda3d3 2851fac9c4c052876c80440b6b0b637603de06ea 1.16.1: 17b42516ceb73fa342101aedf830df40a84d82bc c14355243995bff7b03a527ed073a2bbaab70ce8 was (Author: godfrey): Fixed in 1.17.0: b2203eaef68364306dfcc27fb34ac82baefda3d3 2851fac9c4c052876c80440b6b0b637603de06ea > Incorrect result for filter after temporal join > --- > > Key: FLINK-28988 > URL: https://issues.apache.org/jira/browse/FLINK-28988 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.1 >Reporter: Xuannan Su >Assignee: Shuiqiang Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > The following code can reproduce the case > > {code:java} > public class TemporalJoinSQLExample1 { > public static void main(String[] args) throws Exception { > // set up the Java DataStream API > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // set up the Java Table API > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > final DataStreamSource> ds = > env.fromElements( > new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), > new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), > new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); > final Table table = > tableEnv.fromDataStream( > ds, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", DataTypes.STRING()) > .column("f2", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f2", "f2 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "state", "ts"); > tableEnv.createTemporaryView("source_table", table); > final Table dedupeTable = > tableEnv.sqlQuery( > "SELECT * FROM (" > + " SELECT *, ROW_NUMBER() OVER (PARTITION BY > id ORDER BY ts DESC) AS row_num FROM source_table" > + ") WHERE row_num = 1"); > tableEnv.createTemporaryView("versioned_table", dedupeTable); > DataStreamSource> event = > env.fromElements( > new Tuple2<>(0, Instant.ofEpochMilli(0)), > new Tuple2<>(0, Instant.ofEpochMilli(5)), > new Tuple2<>(0, Instant.ofEpochMilli(10)), > new Tuple2<>(0, Instant.ofEpochMilli(15)), > new Tuple2<>(0, Instant.ofEpochMilli(20)), > new Tuple2<>(0, Instant.ofEpochMilli(25))); > final Table eventTable = > tableEnv.fromDataStream( > event, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f1", "f1 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "ts"); > tableEnv.createTemporaryView("event_table", eventTable); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM event_table" > + " LEFT JOIN versioned_table FOR SYSTEM_TIME > AS OF event_table.ts" > + " ON event_table.id = versioned_table.id"); > result.execute().print(); > result.filter($("state").isEqual("online")).execute().print(); > } > } {code} > > The result of temporal join is the following: > |op| id| ts| id0| > state| ts0| row_num| > |+I| 0|1970-01-01 08:00:00.000| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.005| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.010| 0| > offline|1970-01-01 08:00:00.010|
[jira] [Updated] (FLINK-28988) Incorrect result for filter after temporal join
[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Godfrey He updated FLINK-28988: --- Fix Version/s: 1.16.1 > Incorrect result for filter after temporal join > --- > > Key: FLINK-28988 > URL: https://issues.apache.org/jira/browse/FLINK-28988 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.1 >Reporter: Xuannan Su >Assignee: Shuiqiang Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0, 1.16.1 > > > The following code can reproduce the case > > {code:java} > public class TemporalJoinSQLExample1 { > public static void main(String[] args) throws Exception { > // set up the Java DataStream API > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // set up the Java Table API > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > final DataStreamSource> ds = > env.fromElements( > new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), > new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), > new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); > final Table table = > tableEnv.fromDataStream( > ds, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", DataTypes.STRING()) > .column("f2", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f2", "f2 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "state", "ts"); > tableEnv.createTemporaryView("source_table", table); > final Table dedupeTable = > tableEnv.sqlQuery( > "SELECT * FROM (" > + " SELECT *, ROW_NUMBER() OVER (PARTITION BY > id ORDER BY ts DESC) AS row_num FROM source_table" > + ") WHERE row_num = 1"); > tableEnv.createTemporaryView("versioned_table", dedupeTable); > DataStreamSource> event = > env.fromElements( > new Tuple2<>(0, Instant.ofEpochMilli(0)), > new Tuple2<>(0, Instant.ofEpochMilli(5)), > new Tuple2<>(0, Instant.ofEpochMilli(10)), > new Tuple2<>(0, Instant.ofEpochMilli(15)), > new Tuple2<>(0, Instant.ofEpochMilli(20)), > new Tuple2<>(0, Instant.ofEpochMilli(25))); > final Table eventTable = > tableEnv.fromDataStream( > event, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f1", "f1 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "ts"); > tableEnv.createTemporaryView("event_table", eventTable); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM event_table" > + " LEFT JOIN versioned_table FOR SYSTEM_TIME > AS OF event_table.ts" > + " ON event_table.id = versioned_table.id"); > result.execute().print(); > result.filter($("state").isEqual("online")).execute().print(); > } > } {code} > > The result of temporal join is the following: > |op| id| ts| id0| > state| ts0| row_num| > |+I| 0|1970-01-01 08:00:00.000| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.005| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.010| 0| > offline|1970-01-01 08:00:00.010| 1| > |+I| 0|1970-01-01 08:00:00.015| 0| > offline|1970-01-01 08:00:00.010| 1| > |+I| 0|1970-01-01 08:00:00.020| 0| > online|1970-01-01 08:00:00.020| 1| > |+I| 0|1970-01-01 08:00:00.025| 0| >
[GitHub] [flink] reswqa commented on a diff in pull request #21560: [FLINK-29768] Hybrid shuffle supports consume partial finished subtask
reswqa commented on code in PR #21560: URL: https://github.com/apache/flink/pull/21560#discussion_r1060267262 ## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.util.function.FunctionWithException; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** {@link ShuffleDescriptor}s cache for a {@link ConsumedPartitionGroup} */ +public class CachedShuffleDescriptors { +/** + * Stores all serialized shuffle descriptors. For unknown shuffle descriptor, it will be + * replaced by real shuffle descriptor after upstream task finished. + */ +private final List> serializedShuffleDescriptors; + +/** + * Stores all to be serialized shuffle descriptors, They will be serialized and replace + * corresponding value(unknown shuffle descriptor) in serializedShuffleDescriptors during the + * next time TDD is generated. + */ +private final Map toBeSerialized; Review Comment: > Why do we need a map? Would it be better to use a queue of tuples? Yes, queue is better than map. > And why not eagerly serialize the descriptor and replace the previous unknown descriptor in `markPartitionFinished`? Because we can not get the serializer during mark partition finished. This is also the reason why `serializeShuffleDescriptors` has param of `shuffleDescriptorSerializer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe merged pull request #21578: [bp-1.16][FLINK-28988] Don't push filters down into the right table for temporal join
godfreyhe merged PR #21578: URL: https://github.com/apache/flink/pull/21578 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #21560: [FLINK-29768] Hybrid shuffle supports consume partial finished subtask
reswqa commented on code in PR #21560: URL: https://github.com/apache/flink/pull/21560#discussion_r1060265216 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +/** + * {@link PartialFinishedInputConsumableDecider} is a special {@link InputConsumableDecider}. The + * input is considered to be consumable: + * + * + * for hybrid input: when partial producer partitions are finished. + * for blocking input: when all producer partitions are finished. + * + */ +public class PartialFinishedInputConsumableDecider implements InputConsumableDecider { +public static final int NUM_FINISHED_PARTITIONS_AS_CONSUMABLE = 1; + +@Override +public boolean isInputConsumable( +SchedulingExecutionVertex executionVertex, +Set verticesToDeploy, +Map consumableStatusCache) { +for (ConsumedPartitionGroup consumedPartitionGroup : +executionVertex.getConsumedPartitionGroups()) { + +if (!consumableStatusCache.computeIfAbsent( +consumedPartitionGroup, this::isConsumedPartitionGroupConsumable)) { +return false; +} Review Comment: I think it is reasonable to require all groups to have at least one finished partition. The main reason is that in the case of multiple inputs, if the low priority input is finished first, the downstream cannot consume data even if it is scheduled in advance, which will cause a waste of resources to some extent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on pull request #21565: [FLINK-18229][ResourceManager] Support cancel pending workers if no longer needed.
xintongsong commented on PR #21565: URL: https://github.com/apache/flink/pull/21565#issuecomment-1369369021 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21586: [FLINK-30352][table-runtime] Introduce adaptive hash aggregate to adaptively determine whether local hash aggregate is required at runtime
flinkbot commented on PR #21586: URL: https://github.com/apache/flink/pull/21586#issuecomment-1369368128 ## CI report: * 9fe565a241df6db1a22bd552d90233e9895ff9db UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30423) Introduce cast executor codegen for column type evolution
[ https://issues.apache.org/jira/browse/FLINK-30423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-30423. Fix Version/s: table-store-0.4.0 Assignee: Shammon Resolution: Fixed master: dcae4c28d69750713fa0d478d6ea348735790dbe b22ed19740f427852f404d93570e3b0cff1b09ef > Introduce cast executor codegen for column type evolution > - > > Key: FLINK-30423 > URL: https://issues.apache.org/jira/browse/FLINK-30423 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.3.0 >Reporter: Shammon >Assignee: Shammon >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > Introduce cast executor codegen for column type evolution -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] yuchuanchen commented on a diff in pull request #21563: [FLINK-19889][connectors/hive/filesystem][format/parquet] Supports nested projection pushdown for filesystem connector of colum
yuchuanchen commented on code in PR #21563: URL: https://github.com/apache/flink/pull/21563#discussion_r1060263844 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java: ## @@ -94,6 +99,217 @@ public static DataType projectRow(DataType dataType, int[] indexPaths) { return Projection.of(indexPaths).project(dataType); } +public static List buildRowFields(RowType allType, int[][] projectedFields) { +final List updatedFields = new ArrayList<>(); +for (int[] projectedField : projectedFields) { +updatedFields.add( +buildRowFieldInProjectFields( +allType.getFields().get(projectedField[0]), projectedField, 0)); +} +return updatedFields; +} + +public static RowType buildRow(RowType allType, int[][] projectedFields) { +return new RowType( +allType.isNullable(), mergeRowFields(buildRowFields(allType, projectedFields))); +} + +public static RowType buildRow(RowType allType, List updatedFields) { +return new RowType(allType.isNullable(), mergeRowFields(updatedFields)); +} + +public static RowType.RowField buildRowFieldInProjectFields( +RowType.RowField rowField, int[] fields, int index) { +if (fields.length == 1 || index == fields.length - 1) { +LogicalType rowType = rowField.getType(); +if (rowType.is(ROW)) { +rowType = new RowType(rowType.isNullable(), ((RowType) rowType).getFields()); +} +return new RowField(rowField.getName(), rowType); +} +Preconditions.checkArgument(rowField.getType().is(ROW), "Row data type expected."); +final List updatedFields = new ArrayList<>(); +RowType rowtype = ((RowType) rowField.getType()); +updatedFields.add( +buildRowFieldInProjectFields( +rowtype.getFields().get(fields[index + 1]), fields, index + 1)); +return new RowType.RowField( +rowField.getName(), new RowType(rowtype.isNullable(), updatedFields)); +} + +public static List mergeRowFields(List updatedFields) { +List updatedFieldsCopy = + updatedFields.stream().map(RowField::copy).collect(Collectors.toList()); +final List fieldNames = + updatedFieldsCopy.stream().map(RowField::getName).collect(Collectors.toList()); +if (fieldNames.stream().anyMatch(StringUtils::isNullOrWhitespaceOnly)) { +throw new ValidationException( +"Field names must contain at least one non-whitespace character."); +} +final Set duplicates = +fieldNames.stream() +.filter(n -> Collections.frequency(fieldNames, n) > 1) +.collect(Collectors.toSet()); +if (duplicates.isEmpty()) { +return updatedFieldsCopy; +} +List duplicatesFields = +updatedFieldsCopy.stream() +.filter(f -> duplicates.contains(f.getName())) +.collect(Collectors.toList()); +updatedFieldsCopy.removeAll(duplicatesFields); +Map> duplicatesMap = new HashMap<>(); +duplicatesFields.forEach( +f -> { +List tmp = duplicatesMap.getOrDefault(f.getName(), new ArrayList<>()); +tmp.add(f); +duplicatesMap.put(f.getName(), tmp); +}); +duplicatesMap.forEach( +(fieldName, duplicateList) -> { +List fieldsToMerge = new ArrayList<>(); +for (RowField rowField : duplicateList) { +Preconditions.checkArgument( +rowField.getType().is(ROW), "Row data type expected."); +RowType rowType = (RowType) rowField.getType(); +fieldsToMerge.addAll(rowType.getFields()); +} +RowField mergedField = +new RowField( +fieldName, +new RowType( + duplicateList.get(0).getType().isNullable(), +mergeRowFields(fieldsToMerge))); +updatedFieldsCopy.add(mergedField); +}); +return updatedFieldsCopy; +} + +public static int[][] computeProjectedFields(int[] selectFields) { +int[][] projectedFields = new int[selectFields.length][1]; +for (int i = 0; i < selectFields.length; i++) { +projectedFields[i][0] = selectFields[i]; +} +return projectedFields; +} + +public static int[][] computeProjectedFields( +String[] selectedFieldNames,
[GitHub] [flink] swuferhong opened a new pull request, #21586: [FLINK-30352][table-runtime] Introduce adaptive hash aggregate to adaptively determine whether local hash aggregate is required at runtim
swuferhong opened a new pull request, #21586: URL: https://github.com/apache/flink/pull/21586 ## What is the purpose of the change This pr is aims to introduce adaptive hash aggregate in `batch` mode to adaptively determine whether local hash aggregate is required according to the aggregation degree of local hash aggregate. `adaptive hash agg` will first sample the data, and then decide whether `local hash agg` is required according to the degree of aggregation of the sampled data. If requiring `local hash agg`, it will not take any action. If required, we will transfer the output rowType of data to same with aggregated results and pass to downstream. ## Brief change log - Add codeGen code in `HashAggCodeGenerator`, - Add ITCase in `HashAggITCase`. ## Verifying this change - Add ITCase in `HashAggITCase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? no document now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yuchuanchen commented on a diff in pull request #21563: [FLINK-19889][connectors/hive/filesystem][format/parquet] Supports nested projection pushdown for filesystem connector of colum
yuchuanchen commented on code in PR #21563: URL: https://github.com/apache/flink/pull/21563#discussion_r1060263636 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java: ## @@ -186,11 +193,30 @@ private boolean supportsMetadata(DynamicTableSource tableSource) { return tableSource instanceof SupportsReadingMetadata; } -private boolean supportsNestedProjection(DynamicTableSource tableSource) { +private boolean supportsNestedProjection( +DynamicTableSource tableSource, LogicalProject project) { +List projects = project.getProjects(); return supportsProjectionPushDown(tableSource) +&& isVectorizedSupportedTypes(projects) && ((SupportsProjectionPushDown) tableSource).supportsNestedProjection(); } +private boolean isVectorizedSupportedTypes(List projects) { Review Comment: good idea, I will change it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yuchuanchen commented on a diff in pull request #21563: [FLINK-19889][connectors/hive/filesystem][format/parquet] Supports nested projection pushdown for filesystem connector of colum
yuchuanchen commented on code in PR #21563: URL: https://github.com/apache/flink/pull/21563#discussion_r1060263439 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala: ## @@ -538,7 +538,30 @@ class RexNodeToExpressionConverter( } } - override def visitFieldAccess(fieldAccess: RexFieldAccess): Option[ResolvedExpression] = None + override def visitFieldAccess(fieldAccess: RexFieldAccess): Option[ResolvedExpression] = { Review Comment: Sorry, My mistake. This method is called when TableSource implements SupportsFilterPushDown. I will remove this since this patch only supports nested projection pushdown. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on a diff in pull request #21576: [FLINK-30533][runtime] SourceOperator#emitNext() should push records to DataOutput in a while loop
xintongsong commented on code in PR #21576: URL: https://github.com/apache/flink/pull/21576#discussion_r1060259412 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ## @@ -990,6 +991,10 @@ public MailboxExecutorFactory getMailboxExecutorFactory() { return this.mailboxProcessor::getMailboxExecutor; } +public Supplier getMailboxHasMail() { +return this.mailboxProcessor::hasMail; Review Comment: With this change, the `@VisibleForTesting` annotation can be removed for `MailboxProcessor#hasMail`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi merged pull request #441: [FLINK-30423] Introduce generated codes for CastExecutor
JingsongLi merged PR #441: URL: https://github.com/apache/flink-table-store/pull/441 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fsk119 commented on a diff in pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change
fsk119 commented on code in PR #21577: URL: https://github.com/apache/flink/pull/21577#discussion_r1060260536 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java: ## @@ -698,7 +782,7 @@ private AlterSchemaStrategy computeAlterSchemaStrategy(SqlAlterTableSchema alter alterTableSchema.getClass().getCanonicalName())); } -private static T unwrap(Optional value) { +private T unwrap(Optional value) { Review Comment: ModifySchemaConverter and AddSchemaConverter are both static classes and can only use static methods in the parent class. But now I change them to class-level inner class, so I think we can remove the static keyword now. BTW, I think we should narrow the scope of the visibility. With the `static` keyword, the visibility is much larger. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] LadyForest commented on pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change
LadyForest commented on PR #21577: URL: https://github.com/apache/flink/pull/21577#issuecomment-1369359579 > Thanks for the review! > > > Since the MODIFY operation is split into a list of changes, I would like to know whether the order matters and how the external catalog copes with them. For example, does API expose a requirement that it should be an atomic operation, or ? does the external catalog decide it? > > Yes, the order matters. The catalog developer needs to make sure the execution in order and the execution is atomic. The logic may looks like(the idea is from the Spark JdbcTablecatalog#alterTable) > > ``` > > default void alterTable( > ObjectPath tablePath, > CatalogBaseTable newTable, > List tableChanges, > boolean ignoreIfNotExists) > throws TableNotExistException, CatalogException { > // make sure all table changes are supported. > validate(tableChanges); > // jdbc catalog can work like > conn.setAutoCommit(false); > Statement statement = conn.createStatement() > try { > statement.setQueryTimeout(options.queryTimeout); > for (String sql = dialect.alterTable(tableName, changes, metaData.getDatabaseMajorVersion)) { >statement.executeUpdate(sql) > } > conn.commit() > } catch (Exception e) { > if (conn != null) { > conn.rollback(); > } > throw e; > } finally { > statement.close(); > conn.setAutoCommit(true); > } > } > ``` > > > The "oldColumn" and "originColumn" are interchangeably used across the changelog, and can we align them to improve the readability? > > Thanks for pointing it out. I agree with you we should align the term in the codes. I am prone to use the term "oldColumn" instead because we already have used this in the FLIP-273. WDYT? > > > SchemaConverter#updatePositionAndCollectColumnChange and OperationConverterUtils#buildColumnChange are very similar. Do we have a plan to do some refactor work? > > Yes. I think we can refactor this after all parts finish. WDYT? Thanks for the detailed explanation. +1 for refactoring after all the subtasks finished -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30234) SourceReaderBase should provide an option to disable numRecordsIn metric registration
[ https://issues.apache.org/jira/browse/FLINK-30234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653749#comment-17653749 ] Qingsheng Ren commented on FLINK-30234: --- Thanks [~Wencong Liu] ! I've assigned the ticket to you. Actually we need to introduce a new option for SourceReaderBase, which touches public API, so it's better to have a discussion in the ML. > SourceReaderBase should provide an option to disable numRecordsIn metric > registration > - > > Key: FLINK-30234 > URL: https://issues.apache.org/jira/browse/FLINK-30234 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.17.0 >Reporter: Qingsheng Ren >Assignee: Wencong Liu >Priority: Major > > Currently the numRecordsIn metric is pre-registered for all sources in > SourceReaderBase. Considering different implementation of source reader, the > definition of "record" might differ from the one we use in SourceReaderBase, > hence numRecordsIn might be inaccurate. > We could introduce an option in SourceReader to disable the registration of > numRecordsIn in SourceReaderBase and let the actual implementation to report > the metric instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29217) CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint failed with AssertionFailedError
[ https://issues.apache.org/jira/browse/FLINK-29217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651958#comment-17651958 ] Yunfeng Zhou edited comment on FLINK-29217 at 1/3/23 3:11 AM: -- According to offline discussions with Dong Lin, it requires further discussion about the behavior and performance of Flink runtime during checkpoints to apply a design to preserve the consistency of operator events in case of concurrent checkpoints. Given that currently no subclass of OperatorCoordinator would be affected by this function, it is thus of lower priority and we would temporarily remove the guarantee that this function works correctly under concurrent checkpoints until the community has reached an agreement on the design for this function. was (Author: yunfengzhou): According to offline discussion with Becket Qin and Dong Lin, given that Flink Operator Coordinator's support for saving buffered operator events in face of concurrent checkpoints is not good enough for now, we need to temporarily and partially disable concurrent manipulation of multiple checkpoints in OperatorCoordinator. A short-term solution is as follows. - If a new checkpoint is triggered on an OperatorCoordinatorHolder when a checkpoint is still in process, and the new checkpoint cannot be subsumed (i.e. a savepoint instead of an automatically triggered checkpoint), the checkpoint would be processed concurrently, and all blocked OperatorEvents would be regarded as generated after the new checkpoint is triggered (i.e. they would not be saved to the snapshot of the new checkpoint). - If a new checkpoint is triggered on an OperatorCoordinatorHolder when a checkpoint is still in process, and the new checkpoint can be subsumed, the checkpoint would be temporarily blocked until all ongoing checkpoints have finished. A long-term solution could be to make OperatorCoordinators generate checkpoint barriers and send them to their subtasks. The subtasks would need to align these barriers with the ones they receive from upstream operators or sources, and actually trigger the checkpoint when checkpoint barrier alignment is reached. > CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint > failed with AssertionFailedError > - > > Key: FLINK-29217 > URL: https://issues.apache.org/jira/browse/FLINK-29217 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.16.0 >Reporter: Xingbo Huang >Assignee: Yunfeng Zhou >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.16.1 > > > {code:java} > 2022-09-07T02:00:50.2507464Z Sep 07 02:00:50 [ERROR] > org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint > Time elapsed: 2.137 s <<< FAILURE! > 2022-09-07T02:00:50.2508673Z Sep 07 02:00:50 > org.opentest4j.AssertionFailedError: > 2022-09-07T02:00:50.2509309Z Sep 07 02:00:50 > 2022-09-07T02:00:50.2509945Z Sep 07 02:00:50 Expecting value to be false but > was true > 2022-09-07T02:00:50.2511950Z Sep 07 02:00:50 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > 2022-09-07T02:00:50.2513254Z Sep 07 02:00:50 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > 2022-09-07T02:00:50.2514621Z Sep 07 02:00:50 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > 2022-09-07T02:00:50.2516342Z Sep 07 02:00:50 at > org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint(CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:173) > 2022-09-07T02:00:50.2517852Z Sep 07 02:00:50 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-09-07T02:00:50.251Z Sep 07 02:00:50 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-09-07T02:00:50.2520065Z Sep 07 02:00:50 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-09-07T02:00:50.2521153Z Sep 07 02:00:50 at > java.lang.reflect.Method.invoke(Method.java:498) > 2022-09-07T02:00:50.2522747Z Sep 07 02:00:50 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2022-09-07T02:00:50.2523973Z Sep 07 02:00:50 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2022-09-07T02:00:50.2525158Z Sep 07 02:00:50 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) >
[jira] [Resolved] (FLINK-30392) Increase the default value of cluster.thread-dump.stacktrace-max-depth
[ https://issues.apache.org/jira/browse/FLINK-30392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-30392. -- Resolution: Fixed merged in master: 187e572768932b121b5f5e785727f0c48ca98aff > Increase the default value of cluster.thread-dump.stacktrace-max-depth > -- > > Key: FLINK-30392 > URL: https://issues.apache.org/jira/browse/FLINK-30392 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Web Frontend >Reporter: Yun Tang >Assignee: Yu Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > Currently, the thread dump function still have the default stack-trace depth > as 8, which is the same as before. However, the default value is really too > small for developers to know the actual thread info. > From our experiences, we can set this value as 24. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] paul8263 commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…
paul8263 commented on PR #20343: URL: https://github.com/apache/flink/pull/20343#issuecomment-1369346830 Hi @MartijnVisser and @jaumebecks , I updated the unit test and I am currently waiting for the CI result. If there are other issues that need any changes, please let me know. Thank you very much. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fsk119 commented on a diff in pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change
fsk119 commented on code in PR #21577: URL: https://github.com/apache/flink/pull/21577#discussion_r1060254282 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java: ## @@ -276,6 +409,359 @@ public String toString() { } } +// +// Modify Change +// + +/** + * A base schema change to modify a column. The modification includes: + * + * + * change column data type + * reorder column position + * modify column comment + * change the computed expression + * + * + * Some fine-grained column changes are defined in the {@link ModifyPhysicalColumnType}, + * {@link ModifyColumnComment}, {@link ModifyColumnPosition} and {@link ModifyColumnName}. + * + * It is equal to the following statement: + * + * + *ALTER TABLE table_name MODIFY column_definition COMMENT 'column_comment' column_position + * + */ +@PublicEvolving +class ModifyColumn implements TableChange { + +protected final Column oldColumn; +protected final Column newColumn; + +protected final @Nullable ColumnPosition newPosition; + +public ModifyColumn( +Column oldColumn, Column newColumn, @Nullable ColumnPosition newPosition) { +this.oldColumn = oldColumn; +this.newColumn = newColumn; +this.newPosition = newPosition; +} + +/** Returns the original {@link Column} instance. */ +public Column getOldColumn() { +return oldColumn; +} + +/** Returns the modified {@link Column} instance. */ +public Column getNewColumn() { +return newColumn; +} + +/** + * Returns the position of the modified {@link Column} instance. When the return value is + * null, it means modify the column at the original position. When the return value is + * FIRST, it means move the modified column to the first. When the return value is AFTER, it + * means move the column after the referred column. + */ +public @Nullable ColumnPosition getNewPosition() { +return newPosition; +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (!(o instanceof ModifyColumn)) { +return false; +} +ModifyColumn that = (ModifyColumn) o; +return Objects.equals(oldColumn, that.oldColumn) +&& Objects.equals(newColumn, that.newColumn) +&& Objects.equals(newPosition, that.newPosition); +} + +@Override +public int hashCode() { +return Objects.hash(oldColumn, newColumn, newPosition); +} + +@Override +public String toString() { +return "ModifyColumn{" ++ "oldColumn=" ++ oldColumn ++ ", newColumn=" ++ newColumn ++ ", newPosition=" ++ newPosition ++ '}'; +} +} + +/** + * A table change to modify the column comment. + * + * It is equal to the following statement: + * + * + *ALTER TABLE table_name MODIFY column_name original_column_type COMMENT 'new_column_comment' + * + */ +@PublicEvolving +class ModifyColumnComment extends ModifyColumn { + +private final String newComment; + +private ModifyColumnComment(Column oldColumn, String newComment) { +super(oldColumn, oldColumn.withComment(newComment), null); +this.newComment = newComment; +} + +/** Get the new comment for the column. */ +public String getNewComment() { +return newComment; +} + +@Override +public boolean equals(Object o) { +return (o instanceof ModifyColumnComment) && super.equals(o); +} + +@Override +public String toString() { +return "ModifyColumnComment{" ++ "Column=" ++ oldColumn ++ ", newComment='" ++ newComment ++ '\'' ++ '}'; +} +} + +/** + * A table change to modify the column position. + * + * It is equal to the following statement: + * + * + *ALTER TABLE table_name MODIFY column_name original_column_type column_position + * + */ +@PublicEvolving +class ModifyColumnPosition extends ModifyColumn { + +public ModifyColumnPosition(Column oldColumn,
[GitHub] [flink] link3280 commented on pull request #21581: [FLINK-30538][SQL gateway/client] Improve error handling of stop job operation
link3280 commented on PR #21581: URL: https://github.com/apache/flink/pull/21581#issuecomment-1369343457 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-30234) SourceReaderBase should provide an option to disable numRecordsIn metric registration
[ https://issues.apache.org/jira/browse/FLINK-30234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren reassigned FLINK-30234: - Assignee: Wencong Liu > SourceReaderBase should provide an option to disable numRecordsIn metric > registration > - > > Key: FLINK-30234 > URL: https://issues.apache.org/jira/browse/FLINK-30234 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.17.0 >Reporter: Qingsheng Ren >Assignee: Wencong Liu >Priority: Major > > Currently the numRecordsIn metric is pre-registered for all sources in > SourceReaderBase. Considering different implementation of source reader, the > definition of "record" might differ from the one we use in SourceReaderBase, > hence numRecordsIn might be inaccurate. > We could introduce an option in SourceReader to disable the registration of > numRecordsIn in SourceReaderBase and let the actual implementation to report > the metric instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] link3280 commented on pull request #21582: [FLINK-28655][SQL gateway] Support show jobs operation in SqlGateway
link3280 commented on PR #21582: URL: https://github.com/apache/flink/pull/21582#issuecomment-1369343392 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka closed pull request #21498: [FLINK-30392] Replace default thread dump depth to 50
Myasuka closed pull request #21498: [FLINK-30392] Replace default thread dump depth to 50 URL: https://github.com/apache/flink/pull/21498 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fsk119 commented on pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change
fsk119 commented on PR #21577: URL: https://github.com/apache/flink/pull/21577#issuecomment-1369338527 Thanks for the review! >Since the MODIFY operation is split into a list of changes, I would like to know whether the order matters and how the external catalog copes with them. For example, does API expose a requirement that it should be an atomic operation, or ? does the external catalog decide it? Yes, the order matters. The catalog developer needs to make sure the execution in order and the execution is atomic. The logic may looks like(the idea is from the Spark JdbcTablecatalog#alterTable) ``` default void alterTable( ObjectPath tablePath, CatalogBaseTable newTable, List tableChanges, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { // make sure all table changes are supported. validate(tableChanges); // jdbc catalog can work like conn.setAutoCommit(false); Statement statement = conn.createStatement() try { statement.setQueryTimeout(options.queryTimeout); for (String sql = dialect.alterTable(tableName, changes, metaData.getDatabaseMajorVersion)) { statement.executeUpdate(sql) } conn.commit() } catch (Exception e) { if (conn != null) { conn.rollback(); } throw e; } finally { statement.close(); conn.setAutoCommit(true); } } ``` > The "oldColumn" and "originColumn" are interchangeably used across the changelog, and can we align them to improve the readability? Thanks for pointing it out. I agree with you we should align the term in the codes. I am prone to use the term "oldColumn" instead because we already have used this in the FLIP-273. WDYT? > SchemaConverter#updatePositionAndCollectColumnChange and OperationConverterUtils#buildColumnChange are very similar. Do we have a plan to do some refactor work? Yes. I think we can refactor this after all parts finish. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lindong28 commented on pull request #21576: [FLINK-30533][runtime] SourceOperator#emitNext() should push records to DataOutput in a while loop
lindong28 commented on PR #21576: URL: https://github.com/apache/flink/pull/21576#issuecomment-1369329109 @xintongsong @zhuzhurk Can you review this PR? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-30542) Support adaptive hash aggregate in runtime
Yunhong Zheng created FLINK-30542: - Summary: Support adaptive hash aggregate in runtime Key: FLINK-30542 URL: https://issues.apache.org/jira/browse/FLINK-30542 Project: Flink Issue Type: Sub-task Components: Table SQL / Runtime Affects Versions: 1.17.0 Reporter: Yunhong Zheng Fix For: 1.17.0 Introduce a new strategy to adaptively determine whether local hash aggregate is required according to the aggregation degree of local hash aggregate. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] paul8263 commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…
paul8263 commented on PR #20343: URL: https://github.com/apache/flink/pull/20343#issuecomment-1369309368 > @MartijnVisser it seems @paul8263 is not responding to the requested changes, we may create a similar PR in the following days for fixing this, does that sound good to you? Hi @jaumebecks , Sorry for the late response. I will do it these days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-29634) Support periodic checkpoint triggering
[ https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653712#comment-17653712 ] Jiale Tan edited comment on FLINK-29634 at 1/3/23 12:50 AM: I see. [~gyfora] Thanks for the input. I checked pom files again and your solution makes sense. One more question - how can we deal with {{io.fabric8.kubernetes.client.server.mock.*}} classes? They are not a part of {{flink-kubernetes}}, but introduced [here|https://github.com/apache/flink-kubernetes-operator/blob/a1842d4c0170feb008293963ec51c0343f42771d/flink-kubernetes-standalone/pom.xml#L74-L78], so there are no shaded version for them. I am running into issues [here|https://github.com/apache/flink-kubernetes-operator/blob/7ced741f51a99f2093ce8a45c8c92879a247f836/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java#L58]: the code was expecting an {{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient}} object for {{kubernetesClient}} but the mockServer.createClient() will create an {{io.fabric8.kubernetes.client.NamespacedKubernetesClient}}. was (Author: JIRAUSER290356): I see. [~gyfora] Thanks for the input. I checked pom files again and your solution makes sense. One more question - how can we deal with {{io.fabric8.kubernetes.client.server.mock.*}} classes? They are not a part of {{flink-kubernetes}}, but introduced [here|https://github.com/apache/flink-kubernetes-operator/blob/a1842d4c0170feb008293963ec51c0343f42771d/flink-kubernetes-standalone/pom.xml#L74-L78], so there are no shaded version for them. I am running into issues [here|https://github.com/apache/flink-kubernetes-operator/blob/7ced741f51a99f2093ce8a45c8c92879a247f836/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java#L58], we are expecting an {{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient}} object for {{kubernetesClient}} but the mockServer.createClient() will create an {{io.fabric8.kubernetes.client.NamespacedKubernetesClient}}. > Support periodic checkpoint triggering > -- > > Key: FLINK-29634 > URL: https://issues.apache.org/jira/browse/FLINK-29634 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Thomas Weise >Assignee: Jiale Tan >Priority: Major > > Similar to the support for periodic savepoints, the operator should support > triggering periodic checkpoints to break the incremental checkpoint chain. > Support for external triggering will come with 1.17: > https://issues.apache.org/jira/browse/FLINK-27101 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29634) Support periodic checkpoint triggering
[ https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653712#comment-17653712 ] Jiale Tan edited comment on FLINK-29634 at 1/3/23 12:50 AM: I see. [~gyfora] Thanks for the input. I checked pom files again and your solution makes sense. One more question - how can we deal with {{io.fabric8.kubernetes.client.server.mock.*}} classes? They are not a part of {{flink-kubernetes}}, but introduced [here|https://github.com/apache/flink-kubernetes-operator/blob/a1842d4c0170feb008293963ec51c0343f42771d/flink-kubernetes-standalone/pom.xml#L74-L78], so there are no shaded version for them. I am running into issues [here|https://github.com/apache/flink-kubernetes-operator/blob/7ced741f51a99f2093ce8a45c8c92879a247f836/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java#L58], we are expecting an {{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient}} object for {{kubernetesClient}} but the mockServer.createClient() will create an {{io.fabric8.kubernetes.client.NamespacedKubernetesClient}}. was (Author: JIRAUSER290356): I see. [~gyfora] Thanks for the input. I checked pom files again and your solution makes sense. One more question - how can we deal with {{io.fabric8.kubernetes.client.server.mock.*}} classes? They are not a part of {{{}flink-kubernetes{}}}, but introduced [here|https://github.com/apache/flink-kubernetes-operator/blob/a1842d4c0170feb008293963ec51c0343f42771d/flink-kubernetes-standalone/pom.xml#L74-L78], so there are no shaded version for them. We will run into issues like [this|https://github.com/apache/flink-kubernetes-operator/blob/7ced741f51a99f2093ce8a45c8c92879a247f836/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java#L58], we are expecting an {{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient}} object for {{kubernetesClient}} but the mockServer.createClient() will create an {{io.fabric8.kubernetes.client.NamespacedKubernetesClient}}. > Support periodic checkpoint triggering > -- > > Key: FLINK-29634 > URL: https://issues.apache.org/jira/browse/FLINK-29634 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Thomas Weise >Assignee: Jiale Tan >Priority: Major > > Similar to the support for periodic savepoints, the operator should support > triggering periodic checkpoints to break the incremental checkpoint chain. > Support for external triggering will come with 1.17: > https://issues.apache.org/jira/browse/FLINK-27101 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29634) Support periodic checkpoint triggering
[ https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653712#comment-17653712 ] Jiale Tan commented on FLINK-29634: --- I see. [~gyfora] Thanks for the input. I checked pom files again and your solution makes sense. One more question - how can we deal with {{io.fabric8.kubernetes.client.server.mock.*}} classes? They are not a part of {{{}flink-kubernetes{}}}, but introduced [here|https://github.com/apache/flink-kubernetes-operator/blob/a1842d4c0170feb008293963ec51c0343f42771d/flink-kubernetes-standalone/pom.xml#L74-L78], so there are no shaded version for them. We will run into issues like [this|https://github.com/apache/flink-kubernetes-operator/blob/7ced741f51a99f2093ce8a45c8c92879a247f836/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java#L58], we are expecting an {{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient}} object for {{kubernetesClient}} but the mockServer.createClient() will create an {{io.fabric8.kubernetes.client.NamespacedKubernetesClient}}. > Support periodic checkpoint triggering > -- > > Key: FLINK-29634 > URL: https://issues.apache.org/jira/browse/FLINK-29634 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Thomas Weise >Assignee: Jiale Tan >Priority: Major > > Similar to the support for periodic savepoints, the operator should support > triggering periodic checkpoints to break the incremental checkpoint chain. > Support for external triggering will come with 1.17: > https://issues.apache.org/jira/browse/FLINK-27101 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lindong28 commented on pull request #21579: [FLINK-30536][runtime] Remove CountingOutput from per-record code path for most operators
lindong28 commented on PR #21579: URL: https://github.com/apache/flink/pull/21579#issuecomment-1369291177 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-30475) Improved speed of RocksDBMapState clear() using rocksDB.deleteRange
[ https://issues.apache.org/jira/browse/FLINK-30475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653585#comment-17653585 ] David Hrbacek edited comment on FLINK-30475 at 1/2/23 10:35 PM: Hello [~Yanfei Lei] [~masteryhx] , Sorry for the late response. I do not expect, that performance results with {{deleteRange()}} changed since FLINK-28010. But if I describe our use-case scenario: * Write extensive keyed map-state, * where the state basically represents a queue of elements waiting for processing. * This state is periodically (e.g.: after 3 days) cleared (with millions of items). When {{MapState#clear()}} with iterator is called, then clearing of the state lasts for hours(!) with occasional checkpoint failing and big degradation of pipeline performance. So little performance drop mentioned in [blog|https://rocksdb.org/blog/2018/11/21/delete-range.html] is not really a problem in comparison to {{clear()}} with scan and delete. On the other hand, I admit that for most use-cases performance drop can be problematic. As I understood the problem, then: * {{deleteRange()}} is better for clearing map states with big amount of keys, * whereas clear via scan and delete is optimal for map states with a low amount of keys. This leads me to the solution proposal, where the {{RocksDBMapState#clear()}} method implementation can be chosen via configuration. I can imagine two ways how to implement configurable clear operation: * Switchable: e.g.: config {{state.backend.rocksdb.map-state.clear-op}} with default option {{scan-and-delete}} and optional {{delete-range}}. * With threshold: e.g.: config {{state.backend.rocksdb.map-state.clear-with-delete-range-threshold}} where user can configure threshold of map state size from which will be used {{delete-range}}. And default value will be {{-1}} which means never use {{delete-range}} . What do you think? was (Author: JIRAUSER298597): Hello [~Yanfei Lei] [~masteryhx] , Sorry for the late response. I do not expect, that performance results with {{deleteRange()}} changed since FLINK-28010. But if I describe our use-case scenario: * Write extensive keyed map-state, * where the state basically represents a queue of elements waiting for processing. * This state is periodically (e.g.: after 3 days) cleared (with millions of items). When {{MapState#clear()}} with iterator is called, then clearing of the state lasts for hours(!) with occasional checkpoint failing and big degradation of pipeline performance. So little performance drop mentioned in [blog|https://rocksdb.org/blog/2018/11/21/delete-range.html] is not really a problem in comparison to {{clear()}} with scan and delete. On the other hand, I admit that for most use-cases performance drop can be problematic. As I understood the problem, then: * {{deleteRange()}} is better for clearing map states with big amount of keys, * whereas clear via scan and delete is optimal for map states with a low amount of keys. This leads me to the solution proposal, where the {{RocksDBMapState#clear()}} method implementation can be chosen via configuration. I can imagine two ways how to implement configurable clear operation: * Switchable: e.g.: config {{state.backend.rocksdb.map-state.clear-op}} with default option {{scan-and-delete}} and optional {{delete-range}}. * With threshold: e.g.: config {{state.backend.rocksdb.map-state.clear-with-delete-range-threshold}} where user can configure threshold of map state size from which will be used {{delete-range} }. And default value will be {{-1}} which means never use {{delete-range}} . What do you think? > Improved speed of RocksDBMapState clear() using rocksDB.deleteRange > --- > > Key: FLINK-30475 > URL: https://issues.apache.org/jira/browse/FLINK-30475 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: David Hrbacek >Priority: Major > Labels: pull-request-available > > Currently {{RocksDBMapState#clear()}} is processed via keyRange traversing > and inserting particular keys into BatchWrite for deletion. > RocksDb offer much faster way how to delete key range - {{deleteRange}} > This issue is follow-up for > [FLINK-9070|https://issues.apache.org/jira/browse/FLINK-9070] where > {{deleteRange}} was also considered. But at that time it implied slower read, > it was buggy and not even available in the Java API of RocksDB. All of these > problems were solved since that time (see also RocksDB [blog article for > deleteRange|https://rocksdb.org/blog/2018/11/21/delete-range.html]) > Delete range enables to clear {{RocksDBMapState}} for one key in constant > computational complexity whereas the old
[jira] [Updated] (FLINK-30541) Add Transformer and Estimator for OnlineStandardScaler
[ https://issues.apache.org/jira/browse/FLINK-30541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30541: --- Labels: pull-request-available (was: ) > Add Transformer and Estimator for OnlineStandardScaler > -- > > Key: FLINK-30541 > URL: https://issues.apache.org/jira/browse/FLINK-30541 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] zhipeng93 opened a new pull request, #196: [FLINK-30541] Add Transformer and Estimator for OnlineStandardScaler
zhipeng93 opened a new pull request, #196: URL: https://github.com/apache/flink-ml/pull/196 ## What is the purpose of the change - Add Transformer and Estimator for OnlineStandardScaler ## Brief change log - Added Transformer and Estimator for OnlineStandardScaler (java/python source/test/example and docs). - Added two metrics for exposing ML model attributes (i.e., `timestamp` and `version` of model data). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-30541) Add Transformer and Estimator for OnlineStandardScaler
[ https://issues.apache.org/jira/browse/FLINK-30541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-30541: - Assignee: Zhipeng Zhang > Add Transformer and Estimator for OnlineStandardScaler > -- > > Key: FLINK-30541 > URL: https://issues.apache.org/jira/browse/FLINK-30541 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30541) Add Transformer and Estimator for OnlineStandardScaler
Zhipeng Zhang created FLINK-30541: - Summary: Add Transformer and Estimator for OnlineStandardScaler Key: FLINK-30541 URL: https://issues.apache.org/jira/browse/FLINK-30541 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Zhipeng Zhang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] fsk119 closed pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement
fsk119 closed pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement URL: https://github.com/apache/flink/pull/19419 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fsk119 commented on pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement
fsk119 commented on PR #19419: URL: https://github.com/apache/flink/pull/19419#issuecomment-1369055614 Merged in the #21571 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30442) Update table walkthrough playground for 1.16
[ https://issues.apache.org/jira/browse/FLINK-30442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30442: --- Labels: pull-request-available (was: ) > Update table walkthrough playground for 1.16 > > > Key: FLINK-30442 > URL: https://issues.apache.org/jira/browse/FLINK-30442 > Project: Flink > Issue Type: Sub-task >Reporter: David Anderson >Assignee: Gunnar Morling >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-22317) Support DROP column/constraint/watermark for ALTER TABLE statement
[ https://issues.apache.org/jira/browse/FLINK-22317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang closed FLINK-22317. - Resolution: Fixed Merged into master: fb9354d8e1fc7bc17bc59b874089a457badacbe8 cc9540cacc2d775cc190ee6534794ec33fd07828 > Support DROP column/constraint/watermark for ALTER TABLE statement > -- > > Key: FLINK-22317 > URL: https://issues.apache.org/jira/browse/FLINK-22317 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Jark Wu >Assignee: Jane Chan >Priority: Major > Labels: pull-request-available, stale-assigned > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-playgrounds] gunnarmorling opened a new pull request, #39: FLINK-30442: Upgrading table walk-through to Flink 1.16
gunnarmorling opened a new pull request, #39: URL: https://github.com/apache/flink-playgrounds/pull/39 Hey @alpinegizmo, here's the second PR for the playground updates. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fsk119 closed pull request #21571: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement
fsk119 closed pull request #21571: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement URL: https://github.com/apache/flink/pull/21571 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-playgrounds] gunnarmorling commented on pull request #37: [FLINK-30446][flink-playgrounds]bump mysql version to 8.0.31 for table-walkthrough supporting more architectures
gunnarmorling commented on PR #37: URL: https://github.com/apache/flink-playgrounds/pull/37#issuecomment-1369042907 I can confirm that this change makes the table walkthrough work on macOS/Aarch64. // CC @alpinegizmo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30440) Update operations playground for 1.16
[ https://issues.apache.org/jira/browse/FLINK-30440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653636#comment-17653636 ] Gunnar Morling commented on FLINK-30440: Ok, got it. I'm planning to send the PRs for the two other playgrounds in the course of this week. > Update operations playground for 1.16 > - > > Key: FLINK-30440 > URL: https://issues.apache.org/jira/browse/FLINK-30440 > Project: Flink > Issue Type: Sub-task >Reporter: David Anderson >Assignee: Gunnar Morling >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30343) Migrate KubernetesLeaderElectionAndRetrievalITCase
[ https://issues.apache.org/jira/browse/FLINK-30343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653630#comment-17653630 ] Wencong Liu commented on FLINK-30343: - [~mapohl] Thanks for your reply. I think that even though FLINK-26522 is currently in progress, the current changes to FLINK-30338 are still meaningful. After FLINK-30338 is completed, only a few minor changes may be needed. Old (legacy) test cases should not be needed, right? If you think it's OK, could you please assign the remaining subtasks to me? > Migrate KubernetesLeaderElectionAndRetrievalITCase > -- > > Key: FLINK-30343 > URL: https://issues.apache.org/jira/browse/FLINK-30343 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.16.0, 1.17.0 >Reporter: Matthias Pohl >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30440) Update operations playground for 1.16
[ https://issues.apache.org/jira/browse/FLINK-30440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653629#comment-17653629 ] David Anderson commented on FLINK-30440: With these specific playground tasks, I usually merge each of them to master as soon as they are ready, but wait until all 3 sub-tasks are done before merging all of them to the release branch and then resolving all of the issues. > Update operations playground for 1.16 > - > > Key: FLINK-30440 > URL: https://issues.apache.org/jira/browse/FLINK-30440 > Project: Flink > Issue Type: Sub-task >Reporter: David Anderson >Assignee: Gunnar Morling >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30440) Update operations playground for 1.16
[ https://issues.apache.org/jira/browse/FLINK-30440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Anderson updated FLINK-30440: --- Fix Version/s: 1.16.0 > Update operations playground for 1.16 > - > > Key: FLINK-30440 > URL: https://issues.apache.org/jira/browse/FLINK-30440 > Project: Flink > Issue Type: Sub-task >Reporter: David Anderson >Assignee: Gunnar Morling >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30440) Update operations playground for 1.16
[ https://issues.apache.org/jira/browse/FLINK-30440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653614#comment-17653614 ] Gunnar Morling commented on FLINK-30440: [~danderson], thanks for merging that PR! What is the procedure in regards to Jira? It seems you'd have to set the Fix Version and transition this issue to Resolved, as I'm lacking the rights to do so myself. Thanks again. > Update operations playground for 1.16 > - > > Key: FLINK-30440 > URL: https://issues.apache.org/jira/browse/FLINK-30440 > Project: Flink > Issue Type: Sub-task >Reporter: David Anderson >Assignee: Gunnar Morling >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30537) Add support for OpenSearch 2.3
[ https://issues.apache.org/jira/browse/FLINK-30537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653613#comment-17653613 ] Martijn Visser commented on FLINK-30537: [~reta] I'll reply to the poster in Slack, thanks > Add support for OpenSearch 2.3 > -- > > Key: FLINK-30537 > URL: https://issues.apache.org/jira/browse/FLINK-30537 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Reporter: Martijn Visser >Priority: Major > > Create a version for Flink’s Opensearch connector that supports version 2.3. > From the ASF Flink Slack: > https://apache-flink.slack.com/archives/C03GV7L3G2C/p1672339157102319 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29363) Allow web ui to fully redirect to other page
[ https://issues.apache.org/jira/browse/FLINK-29363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653612#comment-17653612 ] Martijn Visser commented on FLINK-29363: [~rmetzger] Given that this feature is currently not documented, should we at least add something to the release notes? > Allow web ui to fully redirect to other page > > > Key: FLINK-29363 > URL: https://issues.apache.org/jira/browse/FLINK-29363 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.15.3 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > Fix For: 1.17.0 > > > In a streaming platform system, web ui usually integrates with internal > authentication and authorization system. Given the validation failed, the > request needs to be redirected to a landing page. It does't work for AJAX > request. It will be great to have the web ui configurable to allow auto full > redirect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #21585: [BP-1.15][FLINK-30539][tests] Removes timeout from YARNSessionCapacitySchedulerITCase
flinkbot commented on PR #21585: URL: https://github.com/apache/flink/pull/21585#issuecomment-1368970990 ## CI report: * cd690a9492a6185b7351b724a22962c9ee05f0e7 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21584: [BP-1.16][FLINK-30539][tests] Removes timeout from YARNSessionCapacitySchedulerITCase
flinkbot commented on PR #21584: URL: https://github.com/apache/flink/pull/21584#issuecomment-1368970897 ## CI report: * 3e74c2e251ae727bfd0e61d0a98a650036922fba UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org