[jira] [Commented] (FLINK-29634) Support periodic checkpoint triggering

2023-01-02 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653846#comment-17653846
 ] 

Gyula Fora commented on FLINK-29634:


Yea, the Mock classes can be tricky [~Jiale] I don't really have a good answer 
yet, have to think about this.

> Support periodic checkpoint triggering
> --
>
> Key: FLINK-29634
> URL: https://issues.apache.org/jira/browse/FLINK-29634
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Thomas Weise
>Assignee: Jiale Tan
>Priority: Major
>
> Similar to the support for periodic savepoints, the operator should support 
> triggering periodic checkpoints to break the incremental checkpoint chain.
> Support for external triggering will come with 1.17: 
> https://issues.apache.org/jira/browse/FLINK-27101 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-pulsar] tisonkun commented on a diff in pull request #10: [FLINK-30413][Connector/Pulsar] Drop subscription support, remove unordered consuming, change related tests.

2023-01-02 Thread GitBox


tisonkun commented on code in PR #10:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/10#discussion_r1060341436


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java:
##
@@ -203,12 +175,7 @@ public boolean isEnableMetrics() {
 
 /** Convert the subscription into a readable str. */
 public String getSubscriptionDesc() {
-return getSubscriptionName()
-+ "("
-+ getSubscriptionType()
-+ ","
-+ getSubscriptionMode()
-+ ")";
+return getSubscriptionName() + "(" + getSubscriptionMode() + ")";

Review Comment:
   Maybe we hard code the type as `Exclusive`? I think it's still expressive 
and meaningful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] tisonkun commented on a diff in pull request #10: [FLINK-30413][Connector/Pulsar] Drop subscription support, remove unordered consuming, change related tests.

2023-01-02 Thread GitBox


tisonkun commented on code in PR #10:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/10#discussion_r1060340966


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java:
##
@@ -124,6 +124,8 @@ private PulsarSourceOptions() {
 " We would automatically commit 
the cursor using the given period (in ms).")
 .build());
 
+/** @deprecated We no longer need transitions for consuming messages. */

Review Comment:
   ```suggestion
   /** @deprecated We no longer need transactions for consuming messages. */
   ```
   
   ?
   
   Also, if this option has no effect, we'd better directly remove it.



##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java:
##
@@ -236,6 +230,8 @@ private PulsarSourceOptions() {
 " This argument is required when 
constructing the consumer.")
 .build());
 
+/** @deprecated This config option is no longer supported. */
+@Deprecated
 public static final ConfigOption 
PULSAR_SUBSCRIPTION_TYPE =

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30413) Drop Shared and Key_Shared subscription support in Pulsar connector

2023-01-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30413:
---
Labels: pull-request-available  (was: )

> Drop Shared and Key_Shared subscription support in Pulsar connector
> ---
>
> Key: FLINK-30413
> URL: https://issues.apache.org/jira/browse/FLINK-30413
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.0
>Reporter: Yufan Sheng
>Assignee: Yufan Sheng
>Priority: Critical
>  Labels: pull-request-available
> Fix For: pulsar-4.0.0
>
>
> A lot of Pulsar connector test unstable issues are related to {{Shared}} and 
> {{Key_Shared}} subscription. Because this two subscription is designed to 
> consume the records in an unordered way. And we can support multiple 
> consumers in same topic partition. But this feature lead to some drawbacks in 
> connector.
> 1. Performance
> Flink is a true stream processor with high correctness support. But support 
> multiple consumer will require higher correctness which depends on Pulsar 
> transaction. But the internal implementation of Pulsar transaction on source 
> is record the message one by one and stores all the pending ack status in 
> client side. Which is slow and memory inefficient.
> This means that we can only use {{Shared}} and {{Key_Shared}} on Flink with 
> low throughput. This against our intention to support these two subscription. 
> Because adding multiple consumer to same partition can increase the consuming 
> speed.
> 2. Unstable
> Pulsar transaction acknowledge the messages one by one in an internal 
> Pulsar's topic. But it's not stable enough to get it works. A lot of pending 
> issues in Flink JIRA are related to Pulsar transaction and we don't have any 
> workaround.
> 3. Complex
> Support {{Shared}} and {{Key_Shared}} subscription make the connector's code 
> more complex than we expect. We have to make every part of code into ordered 
> and unordered way. Which is hard to understand for the maintainer.
> 4. Necessary
> The current implementation on {{Shared}} and {{Key_Shared}} is completely 
> unusable to use in Production environment. For the user, this function is not 
> necessary. Because there is no bottleneck in consuming data from Pulsar, the 
> bottleneck is in processing the data, which we can achieve by increasing the 
> parallelism of the processing operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-pulsar] tisonkun commented on a diff in pull request #10: [FLINK-30413][Connector/Pulsar] Drop subscription support, remove unordered consuming, change related tests.

2023-01-02 Thread GitBox


tisonkun commented on code in PR #10:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/10#discussion_r1060340290


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java:
##
@@ -177,9 +171,11 @@ public PulsarSourceBuilder setSubscriptionName(String 
subscriptionName) {
  * @return this PulsarSourceBuilder.
  * @see https://pulsar.apache.org/docs/en/concepts-messaging/#subscriptions;>Pulsar
  * Subscriptions
+ * @deprecated We don't support setting the subscription type now.
  */
+@Deprecated
 public PulsarSourceBuilder setSubscriptionType(SubscriptionType 
subscriptionType) {

Review Comment:
   If it's a dummy method, shall we remove these methods? Otherwise, although 
users' code doesn't fail on compile, the result is actually unexpected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2023-01-02 Thread GitBox


jiangxin369 commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1060307196


##
flink-ml-python/pyflink/ml/lib/feature/lsh.py:
##
@@ -0,0 +1,191 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+import typing
+from abc import ABC
+from pyflink.java_gateway import get_gateway
+from pyflink.table import Table
+from pyflink.util.java_utils import to_jarray
+
+from pyflink.ml.core.linalg import Vector, DenseVector, SparseVector
+from pyflink.ml.core.param import Param, IntParam, ParamValidators
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.lib.feature.common import JavaFeatureEstimator, 
JavaFeatureModel
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol, HasSeed
+
+
+class _LSHModelParams(JavaWithParams,
+  HasInputCol,
+  HasOutputCol):
+"""
+Params for :class:`LSHModel`
+"""
+
+def __init__(self, java_params):
+super(_LSHModelParams, self).__init__(java_params)
+
+
+class _LSHParams(_LSHModelParams):
+"""
+Params for :class:`LSH`
+"""
+
+NUM_HASH_TABLES: Param[int] = IntParam(
+"num_hash_tables", "Number of hash tables.", 1, 
ParamValidators.gt_eq(1)
+)
+
+NUM_HASH_FUNCTIONS_PER_TABLE: Param[int] = IntParam(
+"num_hash_functions_per_table",
+"Number of hash functions per table.",
+1,
+ParamValidators.gt_eq(1.))
+
+def __init__(self, java_params):
+super(_LSHParams, self).__init__(java_params)
+
+def set_num_hash_tables(self, value: int):
+return typing.cast(_LSHParams, self.set(self.NUM_HASH_TABLES, value))
+
+def get_num_hash_tables(self):
+return self.get(self.NUM_HASH_TABLES)
+
+@property
+def num_hash_tables(self):
+return self.get_num_hash_tables()
+
+def set_num_hash_functions_per_table(self, value: int):
+return typing.cast(_LSHParams, 
self.set(self.NUM_HASH_FUNCTIONS_PER_TABLE, value))
+
+def get_num_hash_functions_per_table(self):
+return self.get(self.NUM_HASH_FUNCTIONS_PER_TABLE)
+
+@property
+def num_hash_functions_per_table(self):
+return self.get_num_hash_functions_per_table()
+
+
+class _LSH(JavaFeatureEstimator, ABC):
+"""
+Base class for estimators which implement LSH (Locality-sensitive hashing) 
algorithms.
+"""
+
+def __init__(self):
+super(_LSH, self).__init__()
+
+@classmethod
+def _java_estimator_package_name(cls) -> str:
+return "lsh"
+
+
+class _LSHModel(JavaFeatureModel, ABC):
+"""
+Base class for LSH model.
+"""
+
+def __init__(self, java_model):
+super(_LSHModel, self).__init__(java_model)
+
+@classmethod
+def _java_model_package_name(cls) -> str:
+return "lsh"
+
+def approx_nearest_neighbors(self, dataset: Table, key: Vector, k: int,
+ dist_col: str = 'distCol'):
+"""
+Given a dataset and an item, approximately find at most k items which 
have the closest
+distance to the item. If the `outputCol` is missing in the given 
dataset, this method
+transforms the dataset with the model at first.
+
+:param dataset: The dataset in which to to search for nearest 
neighbors.
+:param key: The item to search for.
+:param k: The maximum number of nearest neighbors.
+:param dist_col: The output column storing the distance between each 
neighbor and the
+key.
+:return: A dataset containing at most k items closest to the key with 
a column named
+`distCol` appended.

Review Comment:
   To keep the same coding style with PyFlink and flink-ml-python, it's 
recommended to align the beginning of each line. E.g., 
   ```
   :param dist_col: The output column storing the distance between each 
neighbor and the
key.
   :return: A dataset containing at most k items closest to the key with a 
column named
`distCol` appended.
   ```
   So as the other 

[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2023-01-02 Thread GitBox


jiangxin369 commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1060307196


##
flink-ml-python/pyflink/ml/lib/feature/lsh.py:
##
@@ -0,0 +1,191 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+import typing
+from abc import ABC
+from pyflink.java_gateway import get_gateway
+from pyflink.table import Table
+from pyflink.util.java_utils import to_jarray
+
+from pyflink.ml.core.linalg import Vector, DenseVector, SparseVector
+from pyflink.ml.core.param import Param, IntParam, ParamValidators
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.lib.feature.common import JavaFeatureEstimator, 
JavaFeatureModel
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol, HasSeed
+
+
+class _LSHModelParams(JavaWithParams,
+  HasInputCol,
+  HasOutputCol):
+"""
+Params for :class:`LSHModel`
+"""
+
+def __init__(self, java_params):
+super(_LSHModelParams, self).__init__(java_params)
+
+
+class _LSHParams(_LSHModelParams):
+"""
+Params for :class:`LSH`
+"""
+
+NUM_HASH_TABLES: Param[int] = IntParam(
+"num_hash_tables", "Number of hash tables.", 1, 
ParamValidators.gt_eq(1)
+)
+
+NUM_HASH_FUNCTIONS_PER_TABLE: Param[int] = IntParam(
+"num_hash_functions_per_table",
+"Number of hash functions per table.",
+1,
+ParamValidators.gt_eq(1.))
+
+def __init__(self, java_params):
+super(_LSHParams, self).__init__(java_params)
+
+def set_num_hash_tables(self, value: int):
+return typing.cast(_LSHParams, self.set(self.NUM_HASH_TABLES, value))
+
+def get_num_hash_tables(self):
+return self.get(self.NUM_HASH_TABLES)
+
+@property
+def num_hash_tables(self):
+return self.get_num_hash_tables()
+
+def set_num_hash_functions_per_table(self, value: int):
+return typing.cast(_LSHParams, 
self.set(self.NUM_HASH_FUNCTIONS_PER_TABLE, value))
+
+def get_num_hash_functions_per_table(self):
+return self.get(self.NUM_HASH_FUNCTIONS_PER_TABLE)
+
+@property
+def num_hash_functions_per_table(self):
+return self.get_num_hash_functions_per_table()
+
+
+class _LSH(JavaFeatureEstimator, ABC):
+"""
+Base class for estimators which implement LSH (Locality-sensitive hashing) 
algorithms.
+"""
+
+def __init__(self):
+super(_LSH, self).__init__()
+
+@classmethod
+def _java_estimator_package_name(cls) -> str:
+return "lsh"
+
+
+class _LSHModel(JavaFeatureModel, ABC):
+"""
+Base class for LSH model.
+"""
+
+def __init__(self, java_model):
+super(_LSHModel, self).__init__(java_model)
+
+@classmethod
+def _java_model_package_name(cls) -> str:
+return "lsh"
+
+def approx_nearest_neighbors(self, dataset: Table, key: Vector, k: int,
+ dist_col: str = 'distCol'):
+"""
+Given a dataset and an item, approximately find at most k items which 
have the closest
+distance to the item. If the `outputCol` is missing in the given 
dataset, this method
+transforms the dataset with the model at first.
+
+:param dataset: The dataset in which to to search for nearest 
neighbors.
+:param key: The item to search for.
+:param k: The maximum number of nearest neighbors.
+:param dist_col: The output column storing the distance between each 
neighbor and the
+key.
+:return: A dataset containing at most k items closest to the key with 
a column named
+`distCol` appended.

Review Comment:
   To keep the same coding style with PyFlink and flink-ml-python, it's 
recommended to align the beginning of each line. E.g., 
   ```
   :param dist_col: The output column storing the distance between each 
neighbor and the
key.
   :return: A dataset containing at most k items closest to the key with a 
column named
`distCol` appended.
   ```
   So as the other 

[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2023-01-02 Thread GitBox


jiangxin369 commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1060261023


##
flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/MinHashLSHExample.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.ml.feature.lsh.MinHashLSH;
+import org.apache.flink.ml.feature.lsh.MinHashLSHModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/**
+ * Simple program that trains a MinHashLSH model and uses it for approximate 
nearest neighbors and
+ * similarity join.
+ */
+public class MinHashLSHExample {
+public static void main(String[] args) throws Exception {
+
+// Creates a new StreamExecutionEnvironment
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Creates a StreamTableEnvironment
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+// Generates two datasets
+Table data =
+tEnv.fromDataStream(
+env.fromCollection(
+Arrays.asList(
+Row.of(
+0,
+Vectors.sparse(
+6,
+new int[] {0, 1, 2},
+new double[] {1., 1., 
1.})),
+Row.of(
+1,
+Vectors.sparse(
+6,
+new int[] {2, 3, 4},
+new double[] {1., 1., 
1.})),
+Row.of(
+2,
+Vectors.sparse(
+6,
+new int[] {0, 2, 4},
+new double[] {1., 1., 
1.}))),
+Types.ROW_NAMED(
+new String[] {"id", "vec"},
+Types.INT,
+
TypeInformation.of(SparseVector.class;
+
+Table dataB =

Review Comment:
   As the examples are expected to be the best practice for users, the variable 
naming should be more strict. Would it be better if rename the `data` to 
`inputTable`, `dataB` to `similarityJoinTable`? `dataA` and `dataB` are also 
acceptable. So as in the python example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2023-01-02 Thread GitBox


jiangxin369 commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1060256575


##
docs/content/docs/operators/feature/minhashlsh.md:
##
@@ -0,0 +1,280 @@
+---
+title: "MinHash LSH"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/minhashlsh.html
+---
+
+
+
+## MinHash LSH
+
+MinHash LSH is a Locality Sensitive Hashing (LSH) scheme for Jaccard distance 
metric.
+The input features are sets of natural numbers represented as non-zero indices 
of vectors,
+either dense vectors or sparse vectors. Typically, sparse vectors are more 
efficient.
+
+In addition to transforming input feature vectors to multiple hash values, the 
MinHash LSH 
+model also supports approximate nearest neighbors search within a dataset 
regarding a key 
+vector and approximate similarity join between two datasets.
+
+### Input Columns
+
+| Param name | Type   | Default   | Description|
+|:---|:---|:--|:---|
+| inputCol   | Vector | `"input"` | Features to be mapped. |
+
+### Output Columns
+
+| Param name | Type  | Default| Description  |
+|:---|:--|:---|:-|
+| outputCol  | DenseVector[] | `"output"` | Hash values. |
+
+### Parameters

Review Comment:
   Since `MinHashLSH` has some parameters that cannot be set to 
`MinHashLSHModel`, let's separate the parameters into two tables just like in 
`vectorindexer.md`.



##
flink-ml-python/pyflink/ml/lib/feature/lsh.py:
##
@@ -0,0 +1,191 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+import typing
+from abc import ABC
+from pyflink.java_gateway import get_gateway
+from pyflink.table import Table
+from pyflink.util.java_utils import to_jarray
+
+from pyflink.ml.core.linalg import Vector, DenseVector, SparseVector
+from pyflink.ml.core.param import Param, IntParam, ParamValidators
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.lib.feature.common import JavaFeatureEstimator, 
JavaFeatureModel
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol, HasSeed
+
+
+class _LSHModelParams(JavaWithParams,
+  HasInputCol,
+  HasOutputCol):
+"""
+Params for :class:`LSHModel`
+"""
+
+def __init__(self, java_params):
+super(_LSHModelParams, self).__init__(java_params)
+
+
+class _LSHParams(_LSHModelParams):
+"""
+Params for :class:`LSH`
+"""
+
+NUM_HASH_TABLES: Param[int] = IntParam(
+"num_hash_tables", "Number of hash tables.", 1, 
ParamValidators.gt_eq(1)
+)
+
+NUM_HASH_FUNCTIONS_PER_TABLE: Param[int] = IntParam(
+"num_hash_functions_per_table",
+"Number of hash functions per table.",
+1,
+ParamValidators.gt_eq(1.))

Review Comment:
   nit: let's unify the parameter of `gt_eq` in `NUM_HASH_TABLES` and 
`NUM_HASH_FUNCTIONS_PER_TABLE` to either 1. or 1.



##
flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/MinHashLSHExample.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;

[jira] [Commented] (FLINK-30544) Speed up finding minimum watermark across all channels by introducing heap-based algorithm

2023-01-02 Thread XiangQianLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653836#comment-17653836
 ] 

XiangQianLiu commented on FLINK-30544:
--

不错的提议

> Speed up finding minimum watermark across all channels by introducing 
> heap-based algorithm
> --
>
> Key: FLINK-30544
> URL: https://issues.apache.org/jira/browse/FLINK-30544
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Lijie Wang
>Assignee: Lijie Wang
>Priority: Major
> Fix For: 1.17.0
>
>
> Currently, every time a task receives a watermark, it tries to update the 
> minimum watermark.Currently, we use the traversal algorithm to find the 
> minimum watermark across all channels(see 
> [StatusWatermarkValue#findAndOutputNewMinWatermarkAcrossAlignedChannels|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java#:~:text=private%20void-,findAndOutputNewMinWatermarkAcrossAlignedChannels,-(DataOutput%3C%3F%3E]
>  for details), and the time complexity is O(N), where N is the number of 
> channels.
> We can optimize it by introducing a heap-based algorthim, reducing the time 
> complexity to O(log(N)))



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong commented on a diff in pull request #21565: [FLINK-18229][ResourceManager] Support cancel pending workers if no longer needed.

2023-01-02 Thread GitBox


xintongsong commented on code in PR #21565:
URL: https://github.com/apache/flink/pull/21565#discussion_r1060267951


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java:
##
@@ -526,6 +528,40 @@ public Collection 
getPendingTaskManagerSlots() {
 return pendingSlots.values();
 }
 
+/**
+ * remove unused pending task manager slots.
+ *
+ * @param unUsedResourceCounter the count of unused resources.
+ */
+public void removePendingTaskManagerSlots(ResourceCounter 
unUsedResourceCounter) {
+Iterator> 
pendingSlotIterator =
+pendingSlots.entrySet().iterator();
+while (pendingSlotIterator.hasNext()) {
+Map.Entry 
pendingTaskManagerSlotEntry =
+pendingSlotIterator.next();
+PendingTaskManagerSlot pendingTaskManagerSlot = 
pendingTaskManagerSlotEntry.getValue();
+ResourceProfile resourceProfile = 
pendingTaskManagerSlot.getResourceProfile();
+if (unUsedResourceCounter.getResourceCount(resourceProfile) > 0) {

Review Comment:
   For `DeclarativeSlotManager`, fine-grained resource management is 
unsupported, and the resource profiles for pending slots should always be 
`defaultSlotResourceProfile`. (Constructor of `PendingTaskManagerSlot` is 
always called with `defaultSlotResourceProfile` as the argument.)
   
   Based on this assumption, we can add a `checkState` for protection and 
simplifies this method.



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java:
##
@@ -82,12 +83,26 @@ public void replaceAllPendingAllocations(
 LOG.trace("Record the pending allocations {}.", 
pendingSlotAllocations);
 pendingSlotAllocationRecords.clear();
 pendingSlotAllocationRecords.putAll(pendingSlotAllocations);
+Set unusedPendingTaskManager =
+pendingTaskManagers.keySet().stream()
+.filter(id -> 
!pendingSlotAllocationRecords.containsKey(id))
+.collect(Collectors.toSet());
+for (PendingTaskManagerId pendingTaskManagerId : 
unusedPendingTaskManager) {
+removePendingTaskManager(pendingTaskManagerId);
+}
 }
 
 @Override
 public void clearPendingAllocationsOfJob(JobID jobId) {
 LOG.info("Clear all pending allocations for job {}.", jobId);
 pendingSlotAllocationRecords.values().forEach(allocation -> 
allocation.remove(jobId));
+Set unusedPendingTaskManager =
+pendingTaskManagers.keySet().stream()
+.filter(id -> 
!pendingSlotAllocationRecords.containsKey(id))
+.collect(Collectors.toSet());
+for (PendingTaskManagerId pendingTaskManagerId : 
unusedPendingTaskManager) {
+removePendingTaskManager(pendingTaskManagerId);
+}

Review Comment:
   These can be deduplicated as `removeUnusedPendingTaskManagers()` and reused 
in `replaceAllPendingAllocations`. Even `clearAllPendingTaskManager` can be 
replaced.



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##
@@ -384,18 +422,59 @@ private int releaseUnWantedResources(
 return needReleaseWorkerNumber;
 }
 
+private int releaseResources(Collection resourceIDS, int 
needReleaseWorkerNumber) {

Review Comment:
   ```suggestion
   private int releaseResources(Collection resourceIds, int 
needReleaseWorkerNumber) {
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java:
##
@@ -526,6 +528,40 @@ public Collection 
getPendingTaskManagerSlots() {
 return pendingSlots.values();
 }
 
+/**
+ * remove unused pending task manager slots.
+ *
+ * @param unUsedResourceCounter the count of unused resources.
+ */
+public void removePendingTaskManagerSlots(ResourceCounter 
unUsedResourceCounter) {
+Iterator> 
pendingSlotIterator =
+pendingSlots.entrySet().iterator();
+while (pendingSlotIterator.hasNext()) {
+Map.Entry 
pendingTaskManagerSlotEntry =
+pendingSlotIterator.next();
+PendingTaskManagerSlot pendingTaskManagerSlot = 
pendingTaskManagerSlotEntry.getValue();
+ResourceProfile resourceProfile = 
pendingTaskManagerSlot.getResourceProfile();
+if (unUsedResourceCounter.getResourceCount(resourceProfile) > 0) {
+pendingSlotIterator.remove();
+unUsedResourceCounter = 
unUsedResourceCounter.subtract(resourceProfile, 1);
+}
+}
+
+if (resourceAllocator.isSupported()) {
+declareNeededResourcesWithDelay();
+}
+}
+
+/** clear all pending task manager slots. */
+public void 

[jira] [Created] (FLINK-30545) The SchemaManager doesn't check 'NOT NULL' specification when committing AddColumn change

2023-01-02 Thread yuzelin (Jira)
yuzelin created FLINK-30545:
---

 Summary: The SchemaManager doesn't check 'NOT NULL' specification 
when committing AddColumn change
 Key: FLINK-30545
 URL: https://issues.apache.org/jira/browse/FLINK-30545
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Reporter: yuzelin
 Fix For: table-store-0.4.0


Currently, table store doesn't support adding column with 'NOT NULL' 
specification, but it doesn't check this condition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] yunfengzhou-hub commented on pull request #189: [FLINK-30348] Refine Transformer for RandomSplitter

2023-01-02 Thread GitBox


yunfengzhou-hub commented on PR #189:
URL: https://github.com/apache/flink-ml/pull/189#issuecomment-1369450594

   Hi @weibozhao, thanks for the update. Could you please modify the 
description section of this PR and its corresponding Jira ticket, explaining 
the changes made in this PR and why we should add them to Flink ML?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-30539) YARNSessionCapacitySchedulerITCase.testDetachedPerJobYarnCluster and testDetachedPerJobYarnClusterWithStreamingJob timing out

2023-01-02 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653606#comment-17653606
 ] 

Matthias Pohl edited comment on FLINK-30539 at 1/3/23 7:07 AM:
---

-Ok, I assume you meant waiting with investigating the issue which makes sense. 
I went ahead and created the PRs for removing the timeout, though.-

reevaluating my comment: I missed going over [your comment in 
FLINK-24169|https://issues.apache.org/jira/browse/FLINK-24169?focusedCommentId=17653560=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17653560]
 initially. Considering that you want to tackle the relative path issue for 
YARN tests in [PR #21128|https://github.com/apache/flink/pull/21128] that 
covers the hadoop update, it makes sense to wait with investigating this issue. 
Thanks for that. Merging the PRs for removing the timeouts makes sense, anyway.


was (Author: mapohl):
Ok, I assume you meant waiting with investigating the issue which makes sense. 
I went ahead and created the PRs for removing the timeout, though.

> YARNSessionCapacitySchedulerITCase.testDetachedPerJobYarnCluster and 
> testDetachedPerJobYarnClusterWithStreamingJob timing out
> -
>
> Key: FLINK-30539
> URL: https://issues.apache.org/jira/browse/FLINK-30539
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.16.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44337=logs=298e20ef-7951-5965-0e79-ea664ddc435e=d4c90338-c843-57b0-3232-10ae74f00347=32023
> In order to help investigating the issue: Both tests failed because they are 
> running into the 60s timeout that was defined for each of them. We should get 
> rid of the timeout to access the thread dump. It might be related to 
> FLINK-24169



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30343) Migrate KubernetesLeaderElectionAndRetrievalITCase

2023-01-02 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653825#comment-17653825
 ] 

Wencong Liu commented on FLINK-30343:
-

It makes sense. Let's follow the progress of FLINK-26522.

> Migrate KubernetesLeaderElectionAndRetrievalITCase
> --
>
> Key: FLINK-30343
> URL: https://issues.apache.org/jira/browse/FLINK-30343
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] link3280 commented on pull request #21581: [FLINK-30538][SQL gateway/client] Improve error handling of stop job operation

2023-01-02 Thread GitBox


link3280 commented on PR #21581:
URL: https://github.com/apache/flink/pull/21581#issuecomment-1369443823

   Please kindly take a look @fsk119 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27518) Refactor migration tests to support version update automatically

2023-01-02 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653822#comment-17653822
 ] 

Yun Gao commented on FLINK-27518:
-

Got that, I'm now a bit freed and will improve the priority of this issue. I'll 
open the PR before early next week. 

> Refactor migration tests to support version update automatically
> 
>
> Key: FLINK-27518
> URL: https://issues.apache.org/jira/browse/FLINK-27518
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 1.16.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> Currently on releasing each version, we need to manually generate the 
> snapshots for every migration tests and update the current versions. With 
> more and more migration tests are added, this has been more and more 
> intractable. It is better if we could make it happen automatically on cutting 
> new branches. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26974) Python EmbeddedThreadDependencyTests.test_add_python_file failed on azure

2023-01-02 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653821#comment-17653821
 ] 

Matthias Pohl commented on FLINK-26974:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44394=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=28170

> Python EmbeddedThreadDependencyTests.test_add_python_file failed on azure
> -
>
> Key: FLINK-26974
> URL: https://issues.apache.org/jira/browse/FLINK-26974
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yun Gao
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: auto-deprioritized-major, test-stability
>
> {code:java}
> Mar 31 10:49:17 === FAILURES 
> ===
> Mar 31 10:49:17 __ 
> EmbeddedThreadDependencyTests.test_add_python_file __
> Mar 31 10:49:17 
> Mar 31 10:49:17 self = 
>  testMethod=test_add_python_file>
> Mar 31 10:49:17 
> Mar 31 10:49:17 def test_add_python_file(self):
> Mar 31 10:49:17 python_file_dir = os.path.join(self.tempdir, 
> "python_file_dir_" + str(uuid.uuid4()))
> Mar 31 10:49:17 os.mkdir(python_file_dir)
> Mar 31 10:49:17 python_file_path = os.path.join(python_file_dir, 
> "test_dependency_manage_lib.py")
> Mar 31 10:49:17 with open(python_file_path, 'w') as f:
> Mar 31 10:49:17 f.write("def add_two(a):\nraise 
> Exception('This function should not be called!')")
> Mar 31 10:49:17 self.t_env.add_python_file(python_file_path)
> Mar 31 10:49:17 
> Mar 31 10:49:17 python_file_dir_with_higher_priority = os.path.join(
> Mar 31 10:49:17 self.tempdir, "python_file_dir_" + 
> str(uuid.uuid4()))
> Mar 31 10:49:17 os.mkdir(python_file_dir_with_higher_priority)
> Mar 31 10:49:17 python_file_path_higher_priority = 
> os.path.join(python_file_dir_with_higher_priority,
> Mar 31 10:49:17 
> "test_dependency_manage_lib.py")
> Mar 31 10:49:17 with open(python_file_path_higher_priority, 'w') as f:
> Mar 31 10:49:17 f.write("def add_two(a):\nreturn a + 2")
> Mar 31 10:49:17 
> self.t_env.add_python_file(python_file_path_higher_priority)
> Mar 31 10:49:17 
> Mar 31 10:49:17 def plus_two(i):
> Mar 31 10:49:17 from test_dependency_manage_lib import add_two
> Mar 31 10:49:17 return add_two(i)
> Mar 31 10:49:17 
> Mar 31 10:49:17 self.t_env.create_temporary_system_function(
> Mar 31 10:49:17 "add_two", udf(plus_two, DataTypes.BIGINT(), 
> DataTypes.BIGINT()))
> Mar 31 10:49:17 table_sink = source_sink_utils.TestAppendSink(
> Mar 31 10:49:17 ['a', 'b'], [DataTypes.BIGINT(), 
> DataTypes.BIGINT()])
> Mar 31 10:49:17 self.t_env.register_table_sink("Results", table_sink)
> Mar 31 10:49:17 t = self.t_env.from_elements([(1, 2), (2, 5), (3, 
> 1)], ['a', 'b'])
> Mar 31 10:49:17 >   t.select(expr.call("add_two", t.a), 
> t.a).execute_insert("Results").wait()
> Mar 31 10:49:17 
> Mar 31 10:49:17 pyflink/table/tests/test_dependency.py:63: 
> Mar 31 10:49:17 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ 
> Mar 31 10:49:17 pyflink/table/table_result.py:76: in wait
> Mar 31 10:49:17 get_method(self._j_table_result, "await")()
> Mar 31 10:49:17 
> .tox/py38-cython/lib/python3.8/site-packages/py4j/java_gateway.py:1321: in 
> __call__
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34001=logs=821b528f-1eed-5598-a3b4-7f748b13f261=6bb545dd-772d-5d8c-f258-f5085fba3295=27239



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29849) Event time temporal join on an upsert source may produce incorrect execution plan

2023-01-02 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-29849:

Release Note: 
This resolves the correctness issue when do event time temporal join with a 
versioned table backed by an upsert source. When the right input of the join is 
an upsert source, it no longer generates a ChangelogNormalize node for it.
Note this is an incompatible plan change compare to 1.16.0

> Event time temporal join on an upsert source may produce incorrect execution 
> plan
> -
>
> Key: FLINK-29849
> URL: https://issues.apache.org/jira/browse/FLINK-29849
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.15.3
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> For current implementation, the execution plan is incorrect when do event 
> time temporal join on an upsert source. There's two problems:
> 1.  for an upsert source, we should not add a ChangelogNormalize node under a 
> temporal join input, or it will damage the versions of the version table. For 
> versioned tables, we use a single-temporal mechanism which relies sequencial 
> records of a same key to ensure the valid period of each version, so if the 
> ChangelogNormalize was added then an UB message will be produced based on the 
> previous  UA or Insert message, and all the columns are totally same include 
> event time, e.g., 
> original upsert input
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> the versioned data should be:
> {code}
> v1  [~, '2022-11-02 10:00:00')
> v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> after ChangelogNormalize's processing, will output:
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> -U (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> versions are incorrect:
> {code}
> v1  ['2022-11-02 10:00:00', '2022-11-02 10:00:00')  // invalid period
> v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> 2. semantically, a filter cannot be pushed into an event time temporal join, 
> otherwise, the filter may also corrupt the versioned table



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30343) Migrate KubernetesLeaderElectionAndRetrievalITCase

2023-01-02 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653817#comment-17653817
 ] 

Matthias Pohl commented on FLINK-30343:
---

I'm not sure whether it's worth it to start on FLINK-30338 while FLINK-26522. 
If FLINK-26522 is resolved in the way it is described in its comment section 
right now, the {{MultipleComponent*}} classes would either go away or become 
what's {{DefaultLeaderElectionService}} right now with slight adaptations. 
Therefore, it would make sense to rather migrate the test cases that are 
implemented for the {{MultiComponent*}} as part of FLINK-26522 than migrating 
the legacy test cases to what's now under the {{MultipleComponent*}} leader 
election as part of FLINK-30338 just to migrate them back as part of 
FLINK-26522 again. It seems to be extra effort without any additional value. 
WDYT?

> Migrate KubernetesLeaderElectionAndRetrievalITCase
> --
>
> Key: FLINK-30343
> URL: https://issues.apache.org/jira/browse/FLINK-30343
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28988) Incorrect result for filter after temporal join

2023-01-02 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-28988:

   Flags: Important
Release Note: 
After FLINK-28988 applied, the filter will not be pushed down into both inputs 
of the join.
Note this may cause incompatible plan changes compare to 1.16.0, e.g., when 
left input is an upsert source(use upsert-kafka connector), the query plan will 
remove the ChangelogNormalize node from which appeared in 1.16.0.

> Incorrect result for filter after temporal join
> ---
>
> Key: FLINK-28988
> URL: https://issues.apache.org/jira/browse/FLINK-28988
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1
>
>
> The following code can reproduce the case
>  
> {code:java}
> public class TemporalJoinSQLExample1 {
> public static void main(String[] args) throws Exception {
> // set up the Java DataStream API
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // set up the Java Table API
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> final DataStreamSource> ds =
> env.fromElements(
> new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
> new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
> new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
> final Table table =
> tableEnv.fromDataStream(
> ds,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", DataTypes.STRING())
> .column("f2", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f2", "f2 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "state", "ts");
> tableEnv.createTemporaryView("source_table", table);
> final Table dedupeTable =
> tableEnv.sqlQuery(
> "SELECT * FROM ("
> + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
> id ORDER BY ts DESC) AS row_num FROM source_table"
> + ") WHERE row_num = 1");
> tableEnv.createTemporaryView("versioned_table", dedupeTable);
> DataStreamSource> event =
> env.fromElements(
> new Tuple2<>(0, Instant.ofEpochMilli(0)),
> new Tuple2<>(0, Instant.ofEpochMilli(5)),
> new Tuple2<>(0, Instant.ofEpochMilli(10)),
> new Tuple2<>(0, Instant.ofEpochMilli(15)),
> new Tuple2<>(0, Instant.ofEpochMilli(20)),
> new Tuple2<>(0, Instant.ofEpochMilli(25)));
> final Table eventTable =
> tableEnv.fromDataStream(
> event,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f1", "f1 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "ts");
> tableEnv.createTemporaryView("event_table", eventTable);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM event_table"
> + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
> AS OF event_table.ts"
> + " ON event_table.id = versioned_table.id");
> result.execute().print();
> result.filter($("state").isEqual("online")).execute().print();
> }
> } {code}
>  
> The result of temporal join is the following:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.000|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.005|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.010|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          

[jira] [Updated] (FLINK-28988) Incorrect result for filter after temporal join

2023-01-02 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-28988:

Release Note: 
After FLINK-28988 applied, the filter will not be pushed down into both inputs 
of the event time temporal join.
Note this may cause incompatible plan changes compare to 1.16.0, e.g., when 
left input is an upsert source(use upsert-kafka connector), the query plan will 
remove the ChangelogNormalize node from which appeared in 1.16.0.

  was:
After FLINK-28988 applied, the filter will not be pushed down into both inputs 
of the join.
Note this may cause incompatible plan changes compare to 1.16.0, e.g., when 
left input is an upsert source(use upsert-kafka connector), the query plan will 
remove the ChangelogNormalize node from which appeared in 1.16.0.


> Incorrect result for filter after temporal join
> ---
>
> Key: FLINK-28988
> URL: https://issues.apache.org/jira/browse/FLINK-28988
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1
>
>
> The following code can reproduce the case
>  
> {code:java}
> public class TemporalJoinSQLExample1 {
> public static void main(String[] args) throws Exception {
> // set up the Java DataStream API
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // set up the Java Table API
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> final DataStreamSource> ds =
> env.fromElements(
> new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
> new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
> new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
> final Table table =
> tableEnv.fromDataStream(
> ds,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", DataTypes.STRING())
> .column("f2", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f2", "f2 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "state", "ts");
> tableEnv.createTemporaryView("source_table", table);
> final Table dedupeTable =
> tableEnv.sqlQuery(
> "SELECT * FROM ("
> + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
> id ORDER BY ts DESC) AS row_num FROM source_table"
> + ") WHERE row_num = 1");
> tableEnv.createTemporaryView("versioned_table", dedupeTable);
> DataStreamSource> event =
> env.fromElements(
> new Tuple2<>(0, Instant.ofEpochMilli(0)),
> new Tuple2<>(0, Instant.ofEpochMilli(5)),
> new Tuple2<>(0, Instant.ofEpochMilli(10)),
> new Tuple2<>(0, Instant.ofEpochMilli(15)),
> new Tuple2<>(0, Instant.ofEpochMilli(20)),
> new Tuple2<>(0, Instant.ofEpochMilli(25)));
> final Table eventTable =
> tableEnv.fromDataStream(
> event,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f1", "f1 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "ts");
> tableEnv.createTemporaryView("event_table", eventTable);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM event_table"
> + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
> AS OF event_table.ts"
> + " ON event_table.id = versioned_table.id");
> result.execute().print();
> result.filter($("state").isEqual("online")).execute().print();
> }
> } {code}
>  
> The result of temporal join is the following:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.000|          0|                        
> online|1970-01-01 

[jira] [Assigned] (FLINK-30389) Add retry to read hints

2023-01-02 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-30389:


Assignee: Wencong Liu

> Add retry to read hints
> ---
>
> Key: FLINK-30389
> URL: https://issues.apache.org/jira/browse/FLINK-30389
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Wencong Liu
>Priority: Major
> Fix For: table-store-0.3.0
>
>
> For the oss (object store) filesystem. When writing hint file, delete it 
> first and then add it. Reading hint file may fail frequently. We don't need 
> to return directly in case of failure. We can add a retry.
> {code:java}
> Failed to read hint file LATEST. Falling back to listing files.
> java.io.FileNotFoundException: oss://lake_v4/snapshot/LATEST: No such file or 
> directory!
>   at 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30389) Add retry to read hints

2023-01-02 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653815#comment-17653815
 ] 

Jingsong Lee commented on FLINK-30389:
--

[~Wencong Liu] Assigned

> Add retry to read hints
> ---
>
> Key: FLINK-30389
> URL: https://issues.apache.org/jira/browse/FLINK-30389
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Wencong Liu
>Priority: Major
> Fix For: table-store-0.3.0
>
>
> For the oss (object store) filesystem. When writing hint file, delete it 
> first and then add it. Reading hint file may fail frequently. We don't need 
> to return directly in case of failure. We can add a retry.
> {code:java}
> Failed to read hint file LATEST. Falling back to listing files.
> java.io.FileNotFoundException: oss://lake_v4/snapshot/LATEST: No such file or 
> directory!
>   at 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30234) SourceReaderBase should provide an option to disable numRecordsIn metric registration

2023-01-02 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653814#comment-17653814
 ] 

Wencong Liu commented on FLINK-30234:
-

The discussion has been created. cc [~renqs] 

[[DISCUSS] Allow source readers extending SourceReaderBase to override 
numRecordsIn report logic-Apache Mail 
Archives|https://lists.apache.org/thread/pq7vrqc1vjbzj6of4wm5x8x7pkpdchox]

> SourceReaderBase should provide an option to disable numRecordsIn metric 
> registration
> -
>
> Key: FLINK-30234
> URL: https://issues.apache.org/jira/browse/FLINK-30234
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Assignee: Wencong Liu
>Priority: Major
>
> Currently the numRecordsIn metric is pre-registered for all sources in 
> SourceReaderBase. Considering different implementation of source reader, the 
> definition of "record" might differ from the one we use in SourceReaderBase, 
> hence numRecordsIn might be inaccurate.
> We could introduce an option in SourceReader to disable the registration of 
> numRecordsIn in SourceReaderBase and let the actual implementation to report 
> the metric instead. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on a diff in pull request #21560: [FLINK-29768] Hybrid shuffle supports consume partial finished subtask

2023-01-02 Thread GitBox


reswqa commented on code in PR #21560:
URL: https://github.com/apache/flink/pull/21560#discussion_r1060305155


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java:
##
@@ -122,4 +122,6 @@ List 
getClusterPartitionShuffleDescriptors(
 IntermediateDataSetID intermediateResultPartition);
 
 MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy();
+
+boolean isHybridEnableConsumePartialFinishedProducer();

Review Comment:
   Yes, I also didn't find a better way to handle this. Let's keep it first, If 
there is a better way, we can reconstruct it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #21560: [FLINK-29768] Hybrid shuffle supports consume partial finished subtask

2023-01-02 Thread GitBox


reswqa commented on code in PR #21560:
URL: https://github.com/apache/flink/pull/21560#discussion_r1060304453


##
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##
@@ -670,6 +670,24 @@ public enum SchedulerType {
 code(SPECULATIVE_ENABLED.key()))
 .build());
 
+@Documentation.Section({
+Documentation.Sections.EXPERT_SCHEDULING,
+Documentation.Sections.ALL_JOB_MANAGER
+})
+public static final ConfigOption 
CONSUME_PARTIAL_FINISHED_PRODUCER_ENABLED =

Review Comment:
   Good idea, I will use an enum type to replace this two config option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on a diff in pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

2023-01-02 Thread GitBox


LadyForest commented on code in PR #21577:
URL: https://github.com/apache/flink/pull/21577#discussion_r1060300795


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##
@@ -685,18 +731,18 @@ String getComment(SqlTableColumn column) {
 // 

 
 private void validateColumnName(
-String originColumnName,
+String oldColumnName,
 String newColumnName,
-ResolvedSchema originSchemas,
+ResolvedSchema oldSchemas,
 List partitionKeys) {
 validateColumnName(
-originColumnName,
-originSchemas,
+oldColumnName,
+oldSchemas,

Review Comment:
   Nit: `oldSchemas` -> `oldSchema` ?



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##
@@ -685,18 +731,18 @@ String getComment(SqlTableColumn column) {
 // 

 
 private void validateColumnName(
-String originColumnName,
+String oldColumnName,
 String newColumnName,
-ResolvedSchema originSchemas,
+ResolvedSchema oldSchemas,

Review Comment:
   Nit: `oldSchemas` -> `oldSchema` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30389) Add retry to read hints

2023-01-02 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653812#comment-17653812
 ] 

Wencong Liu commented on FLINK-30389:
-

Thanks [~lzljs3620320] . A fixed number of retries and detection intervals can 
be added to readHints. I'd like to take this ticket. Could you please assign to 
me? 

> Add retry to read hints
> ---
>
> Key: FLINK-30389
> URL: https://issues.apache.org/jira/browse/FLINK-30389
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Priority: Major
> Fix For: table-store-0.3.0
>
>
> For the oss (object store) filesystem. When writing hint file, delete it 
> first and then add it. Reading hint file may fail frequently. We don't need 
> to return directly in case of failure. We can add a retry.
> {code:java}
> Failed to read hint file LATEST. Falling back to listing files.
> java.io.FileNotFoundException: oss://lake_v4/snapshot/LATEST: No such file or 
> directory!
>   at 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30376) Introduce a new flink bushy join reorder rule which based on greedy algorithm

2023-01-02 Thread Yunhong Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yunhong Zheng updated FLINK-30376:
--
Summary: Introduce a new flink bushy join reorder rule which based on 
greedy algorithm  (was: Introduce a new flink busy join reorder rule which 
based on greedy algorithm)

> Introduce a new flink bushy join reorder rule which based on greedy algorithm
> -
>
> Key: FLINK-30376
> URL: https://issues.apache.org/jira/browse/FLINK-30376
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Introducing a new Flink busy join reorder strategy which based on the greedy 
> algorithm. The old join reorder rule will also be the default join reorder 
> rule and the new busy join reorder strategy will be optional.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30376) Introduce a new flink bushy join reorder rule which based on greedy algorithm

2023-01-02 Thread Yunhong Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yunhong Zheng updated FLINK-30376:
--
Description: Introducing a new Flink bushy join reorder strategy which 
based on the greedy algorithm. The old join reorder rule will also be the 
default join reorder rule and the new bushy join reorder strategy will be 
optional.  (was: Introducing a new Flink busy join reorder strategy which based 
on the greedy algorithm. The old join reorder rule will also be the default 
join reorder rule and the new busy join reorder strategy will be optional.)

> Introduce a new flink bushy join reorder rule which based on greedy algorithm
> -
>
> Key: FLINK-30376
> URL: https://issues.apache.org/jira/browse/FLINK-30376
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Introducing a new Flink bushy join reorder strategy which based on the greedy 
> algorithm. The old join reorder rule will also be the default join reorder 
> rule and the new bushy join reorder strategy will be optional.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26029) Generalize the checkpoint protocol of OperatorCoordinator.

2023-01-02 Thread Yunfeng Zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653799#comment-17653799
 ] 

Yunfeng Zhou commented on FLINK-26029:
--

According to offline discussions with Dong Lin, a solution to this problem 
could be to make OperatorCoordinators generate checkpoint barriers and send 
them to their subtasks. The subtasks would need to align these barriers with 
the ones they receive from upstream operators or sources, and actually trigger 
the checkpoint when checkpoint barrier alignment is reached.

The solution mentioned above requires further discussion and the community's 
agreement about the behavior and performance of Flink runtime during 
checkpoints. Given that currently no subclass of OperatorCoordinator would be 
affected by this function, it is thus of lower priority for now.

> Generalize the checkpoint protocol of OperatorCoordinator.
> --
>
> Key: FLINK-26029
> URL: https://issues.apache.org/jira/browse/FLINK-26029
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.3
>Reporter: Jiangjie Qin
>Assignee: Yunfeng Zhou
>Priority: Minor
>  Labels: extensibility, pull-request-available, stale-assigned
>
> **Problem statement**
> Currently, the JM uses OperatorEventValve to control the communication from 
> OperatorCoordinator (OC) to the subtasks in order to achieve state 
> consistency across OC and subtasks after recovering from a checkpoint. The 
> valve is closed when a checkpoint starts and reopened after the checkpoint 
> barriers are sent to the source subtasks.
> While this mechanism works for the source operators, it unnecessarily limits 
> the general usage of OC due to the following limitations:
> - It does not handle (e.g. blocks) the control flow messages from subtasks to 
> OC.
> - It does not handle the control flow messages from OC to subtasks which are 
> sent after checkpoint barriers have been sent to sources but before subtasks 
> have received those barriers.
> If the limitations mentioned above are not satisfied, consistency issues 
> might occur. For example, if a subtask sends messages to its coordinator 
> after the checkpoint barriers are sent to the sources and before the subtasks 
> receive the barriers, these messages would be recorded in the subtask's 
> snapshot but not the coordinator's. When the Flink job recovers from this 
> snapshot, the subtask would have a record of sending out the message while 
> the coordinator has no record of receiving it.
> **Proposed solution**
> We plan to address this problem by extending the blocking period. The 
> communication should be blocked before the OC starts the checkpoint and 
> reopened for each individual subtask after that subtask finishes the 
> checkpoint, to make sure that both OCs and subtasks would see the same 
> version of message after recovering from a checkpoint. Communications in both 
> directions should be blocked during this period. And the messages blocked on 
> the subtasks side should be properly stored in the checkpoint snapshot for 
> recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29217) CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint failed with AssertionFailedError

2023-01-02 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-29217:
-
Fix Version/s: 1.17.0

> CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint
>  failed with AssertionFailedError
> -
>
> Key: FLINK-29217
> URL: https://issues.apache.org/jira/browse/FLINK-29217
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Xingbo Huang
>Assignee: Yunfeng Zhou
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0, 1.16.1
>
>
> {code:java}
> 2022-09-07T02:00:50.2507464Z Sep 07 02:00:50 [ERROR] 
> org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint
>   Time elapsed: 2.137 s  <<< FAILURE!
> 2022-09-07T02:00:50.2508673Z Sep 07 02:00:50 
> org.opentest4j.AssertionFailedError: 
> 2022-09-07T02:00:50.2509309Z Sep 07 02:00:50 
> 2022-09-07T02:00:50.2509945Z Sep 07 02:00:50 Expecting value to be false but 
> was true
> 2022-09-07T02:00:50.2511950Z Sep 07 02:00:50  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 2022-09-07T02:00:50.2513254Z Sep 07 02:00:50  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 2022-09-07T02:00:50.2514621Z Sep 07 02:00:50  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 2022-09-07T02:00:50.2516342Z Sep 07 02:00:50  at 
> org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint(CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:173)
> 2022-09-07T02:00:50.2517852Z Sep 07 02:00:50  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-09-07T02:00:50.251Z Sep 07 02:00:50  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-09-07T02:00:50.2520065Z Sep 07 02:00:50  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-09-07T02:00:50.2521153Z Sep 07 02:00:50  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-09-07T02:00:50.2522747Z Sep 07 02:00:50  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-09-07T02:00:50.2523973Z Sep 07 02:00:50  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-09-07T02:00:50.2525158Z Sep 07 02:00:50  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-09-07T02:00:50.2526347Z Sep 07 02:00:50  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-09-07T02:00:50.2527525Z Sep 07 02:00:50  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2022-09-07T02:00:50.2528646Z Sep 07 02:00:50  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-09-07T02:00:50.2529708Z Sep 07 02:00:50  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-09-07T02:00:50.2530744Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-09-07T02:00:50.2532008Z Sep 07 02:00:50  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2022-09-07T02:00:50.2533137Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2022-09-07T02:00:50.2544265Z Sep 07 02:00:50  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2022-09-07T02:00:50.2545595Z Sep 07 02:00:50  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2022-09-07T02:00:50.2546782Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2022-09-07T02:00:50.2547810Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2022-09-07T02:00:50.2548890Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2022-09-07T02:00:50.2549932Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 2022-09-07T02:00:50.2550933Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 2022-09-07T02:00:50.2552325Z Sep 07 02:00:50  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-09-07T02:00:50.2553660Z Sep 07 02:00:50  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2022-09-07T02:00:50.2554661Z Sep 07 02:00:50  at 
> 

[jira] [Updated] (FLINK-26029) Generalize the checkpoint protocol of OperatorCoordinator.

2023-01-02 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-26029:
-
Priority: Minor  (was: Major)

> Generalize the checkpoint protocol of OperatorCoordinator.
> --
>
> Key: FLINK-26029
> URL: https://issues.apache.org/jira/browse/FLINK-26029
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.3
>Reporter: Jiangjie Qin
>Assignee: Yunfeng Zhou
>Priority: Minor
>  Labels: extensibility, pull-request-available, stale-assigned
>
> **Problem statement**
> Currently, the JM uses OperatorEventValve to control the communication from 
> OperatorCoordinator (OC) to the subtasks in order to achieve state 
> consistency across OC and subtasks after recovering from a checkpoint. The 
> valve is closed when a checkpoint starts and reopened after the checkpoint 
> barriers are sent to the source subtasks.
> While this mechanism works for the source operators, it unnecessarily limits 
> the general usage of OC due to the following limitations:
> - It does not handle (e.g. blocks) the control flow messages from subtasks to 
> OC.
> - It does not handle the control flow messages from OC to subtasks which are 
> sent after checkpoint barriers have been sent to sources but before subtasks 
> have received those barriers.
> If the limitations mentioned above are not satisfied, consistency issues 
> might occur. For example, if a subtask sends messages to its coordinator 
> after the checkpoint barriers are sent to the sources and before the subtasks 
> receive the barriers, these messages would be recorded in the subtask's 
> snapshot but not the coordinator's. When the Flink job recovers from this 
> snapshot, the subtask would have a record of sending out the message while 
> the coordinator has no record of receiving it.
> **Proposed solution**
> We plan to address this problem by extending the blocking period. The 
> communication should be blocked before the OC starts the checkpoint and 
> reopened for each individual subtask after that subtask finishes the 
> checkpoint, to make sure that both OCs and subtasks would see the same 
> version of message after recovering from a checkpoint. Communications in both 
> directions should be blocked during this period. And the messages blocked on 
> the subtasks side should be properly stored in the checkpoint snapshot for 
> recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28639) Preserve distributed consistency of OperatorEvents from subtasks to OperatorCoordinator

2023-01-02 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-28639:
-
Priority: Minor  (was: Major)

> Preserve distributed consistency of OperatorEvents from subtasks to 
> OperatorCoordinator
> ---
>
> Key: FLINK-28639
> URL: https://issues.apache.org/jira/browse/FLINK-28639
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.3
>Reporter: Yunfeng Zhou
>Assignee: Yunfeng Zhou
>Priority: Minor
>  Labels: pull-request-available
>
> This is the second step to solving the consistency issue of OC 
> communications. In this step, we would also guarantee the consistency of 
> operator events sent from subtasks to OCs. Combined with the other subtask to 
> preserve the consistency of communications in the reverse direction, all 
> communications between OC and subtasks would be consistent across checkpoints 
> and global failovers.
> To achieve the goal of this step, we need to add closing/reopening functions 
> to the subtasks' gateways and make the subtasks aware of a checkpoint before 
> they receive the checkpoint barriers. The general process would be as follows.
> 1. When the OC starts checkpoint, it notifies all subtasks about this 
> information.
> 2. After being notified about the ongoing checkpoint in OC, a subtask sends a 
> special operator event to its OC, which is the last operator event the OC 
> could receive from the subtask before the subtask completes the checkpoint. 
> Then the subtask closes its gateway.
> 3. After receiving this special event from all subtasks, the OC finishes its 
> checkpoint and closes its gateway. Then the checkpoint coordinator sends 
> checkpoint barriers to the sources.
> 4. If the subtask or the OC generate any event to send to each other, they 
> buffer the events locally.
> 5. When a subtask starts checkpointing, it also stores the buffered events in 
> the checkpoint.
> 6. After the subtask completes the checkpoint, communications in both 
> directions are recovered and the buffered events are sent out.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26029) Generalize the checkpoint protocol of OperatorCoordinator.

2023-01-02 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-26029:
-
Fix Version/s: (was: 1.17.0)

> Generalize the checkpoint protocol of OperatorCoordinator.
> --
>
> Key: FLINK-26029
> URL: https://issues.apache.org/jira/browse/FLINK-26029
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.3
>Reporter: Jiangjie Qin
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: extensibility, pull-request-available, stale-assigned
>
> **Problem statement**
> Currently, the JM uses OperatorEventValve to control the communication from 
> OperatorCoordinator (OC) to the subtasks in order to achieve state 
> consistency across OC and subtasks after recovering from a checkpoint. The 
> valve is closed when a checkpoint starts and reopened after the checkpoint 
> barriers are sent to the source subtasks.
> While this mechanism works for the source operators, it unnecessarily limits 
> the general usage of OC due to the following limitations:
> - It does not handle (e.g. blocks) the control flow messages from subtasks to 
> OC.
> - It does not handle the control flow messages from OC to subtasks which are 
> sent after checkpoint barriers have been sent to sources but before subtasks 
> have received those barriers.
> If the limitations mentioned above are not satisfied, consistency issues 
> might occur. For example, if a subtask sends messages to its coordinator 
> after the checkpoint barriers are sent to the sources and before the subtasks 
> receive the barriers, these messages would be recorded in the subtask's 
> snapshot but not the coordinator's. When the Flink job recovers from this 
> snapshot, the subtask would have a record of sending out the message while 
> the coordinator has no record of receiving it.
> **Proposed solution**
> We plan to address this problem by extending the blocking period. The 
> communication should be blocked before the OC starts the checkpoint and 
> reopened for each individual subtask after that subtask finishes the 
> checkpoint, to make sure that both OCs and subtasks would see the same 
> version of message after recovering from a checkpoint. Communications in both 
> directions should be blocked during this period. And the messages blocked on 
> the subtasks side should be properly stored in the checkpoint snapshot for 
> recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28639) Preserve distributed consistency of OperatorEvents from subtasks to OperatorCoordinator

2023-01-02 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-28639:
-
Fix Version/s: (was: 1.17.0)

> Preserve distributed consistency of OperatorEvents from subtasks to 
> OperatorCoordinator
> ---
>
> Key: FLINK-28639
> URL: https://issues.apache.org/jira/browse/FLINK-28639
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.3
>Reporter: Yunfeng Zhou
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
>
> This is the second step to solving the consistency issue of OC 
> communications. In this step, we would also guarantee the consistency of 
> operator events sent from subtasks to OCs. Combined with the other subtask to 
> preserve the consistency of communications in the reverse direction, all 
> communications between OC and subtasks would be consistent across checkpoints 
> and global failovers.
> To achieve the goal of this step, we need to add closing/reopening functions 
> to the subtasks' gateways and make the subtasks aware of a checkpoint before 
> they receive the checkpoint barriers. The general process would be as follows.
> 1. When the OC starts checkpoint, it notifies all subtasks about this 
> information.
> 2. After being notified about the ongoing checkpoint in OC, a subtask sends a 
> special operator event to its OC, which is the last operator event the OC 
> could receive from the subtask before the subtask completes the checkpoint. 
> Then the subtask closes its gateway.
> 3. After receiving this special event from all subtasks, the OC finishes its 
> checkpoint and closes its gateway. Then the checkpoint coordinator sends 
> checkpoint barriers to the sources.
> 4. If the subtask or the OC generate any event to send to each other, they 
> buffer the events locally.
> 5. When a subtask starts checkpointing, it also stores the buffered events in 
> the checkpoint.
> 6. After the subtask completes the checkpoint, communications in both 
> directions are recovered and the buffered events are sent out.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29217) CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint failed with AssertionFailedError

2023-01-02 Thread Yunfeng Zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651958#comment-17651958
 ] 

Yunfeng Zhou edited comment on FLINK-29217 at 1/3/23 5:36 AM:
--

According to offline discussions with Dong Lin, a solution to this problem 
could be to make OperatorCoordinators generate checkpoint barriers and send 
them to their subtasks. The subtasks would need to align these barriers with 
the ones they receive from upstream operators or sources, and actually trigger 
the checkpoint when checkpoint barrier alignment is reached.

The solution mentioned above requires further discussion about the behavior and 
performance of Flink runtime during checkpoints. Given that currently no 
subclass of OperatorCoordinator would be affected by this function, it is thus 
of lower priority and we would temporarily remove the guarantee that this 
function works correctly under concurrent checkpoints.


was (Author: yunfengzhou):
According to offline discussions with Dong Lin, it requires further discussion 
about the behavior and performance of Flink runtime during checkpoints to apply 
a design to preserve the consistency of operator events in case of concurrent 
checkpoints. Given that currently no subclass of OperatorCoordinator would be 
affected by this function, it is thus of lower priority and we would 
temporarily remove the guarantee that this function works correctly under 
concurrent checkpoints until the community has reached an agreement on the 
design for this function.

> CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint
>  failed with AssertionFailedError
> -
>
> Key: FLINK-29217
> URL: https://issues.apache.org/jira/browse/FLINK-29217
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Xingbo Huang
>Assignee: Yunfeng Zhou
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.16.1
>
>
> {code:java}
> 2022-09-07T02:00:50.2507464Z Sep 07 02:00:50 [ERROR] 
> org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint
>   Time elapsed: 2.137 s  <<< FAILURE!
> 2022-09-07T02:00:50.2508673Z Sep 07 02:00:50 
> org.opentest4j.AssertionFailedError: 
> 2022-09-07T02:00:50.2509309Z Sep 07 02:00:50 
> 2022-09-07T02:00:50.2509945Z Sep 07 02:00:50 Expecting value to be false but 
> was true
> 2022-09-07T02:00:50.2511950Z Sep 07 02:00:50  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 2022-09-07T02:00:50.2513254Z Sep 07 02:00:50  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 2022-09-07T02:00:50.2514621Z Sep 07 02:00:50  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 2022-09-07T02:00:50.2516342Z Sep 07 02:00:50  at 
> org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint(CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:173)
> 2022-09-07T02:00:50.2517852Z Sep 07 02:00:50  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-09-07T02:00:50.251Z Sep 07 02:00:50  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-09-07T02:00:50.2520065Z Sep 07 02:00:50  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-09-07T02:00:50.2521153Z Sep 07 02:00:50  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-09-07T02:00:50.2522747Z Sep 07 02:00:50  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-09-07T02:00:50.2523973Z Sep 07 02:00:50  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-09-07T02:00:50.2525158Z Sep 07 02:00:50  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-09-07T02:00:50.2526347Z Sep 07 02:00:50  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-09-07T02:00:50.2527525Z Sep 07 02:00:50  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2022-09-07T02:00:50.2528646Z Sep 07 02:00:50  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-09-07T02:00:50.2529708Z Sep 07 02:00:50  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-09-07T02:00:50.2530744Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 

[GitHub] [flink] reswqa commented on a diff in pull request #21560: [FLINK-29768] Hybrid shuffle supports consume partial finished subtask

2023-01-02 Thread GitBox


reswqa commented on code in PR #21560:
URL: https://github.com/apache/flink/pull/21560#discussion_r1060273190


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##
@@ -762,7 +763,9 @@ private static PartitionInfo createPartitionInfo(
 ShuffleDescriptor shuffleDescriptor =
 getConsumedPartitionShuffleDescriptor(
 consumedPartition,
-
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN);
+
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN,
+// because partition is already finished, false is 
fair enough.

Review Comment:
   Because `createPartitionInfo` will only be invoked from 
`finishPartitionsAndUpdateConsumers`, which has been already finished this 
partition. But in order not to be misused later, I renamed it to 
`createFinishedPartitionInfo`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-30544) Speed up finding minimum watermark across all channels by introducing heap-based algorithm

2023-01-02 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-30544:
--

 Summary: Speed up finding minimum watermark across all channels by 
introducing heap-based algorithm
 Key: FLINK-30544
 URL: https://issues.apache.org/jira/browse/FLINK-30544
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Lijie Wang
 Fix For: 1.17.0


Currently, every time a task receives a watermark, it tries to update the 
minimum watermark.Currently, we use the traversal algorithm to find the minimum 
watermark across all channels(see 
[StatusWatermarkValue#findAndOutputNewMinWatermarkAcrossAlignedChannels|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java#:~:text=private%20void-,findAndOutputNewMinWatermarkAcrossAlignedChannels,-(DataOutput%3C%3F%3E]
 for details), and the time complexity is O(N), where N is the number of 
channels.

We can optimize it by introducing a heap-based algorthim, reducing the time 
complexity to O(log(N)))



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30544) Speed up finding minimum watermark across all channels by introducing heap-based algorithm

2023-01-02 Thread Lijie Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang reassigned FLINK-30544:
--

Assignee: Lijie Wang

> Speed up finding minimum watermark across all channels by introducing 
> heap-based algorithm
> --
>
> Key: FLINK-30544
> URL: https://issues.apache.org/jira/browse/FLINK-30544
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Lijie Wang
>Assignee: Lijie Wang
>Priority: Major
> Fix For: 1.17.0
>
>
> Currently, every time a task receives a watermark, it tries to update the 
> minimum watermark.Currently, we use the traversal algorithm to find the 
> minimum watermark across all channels(see 
> [StatusWatermarkValue#findAndOutputNewMinWatermarkAcrossAlignedChannels|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java#:~:text=private%20void-,findAndOutputNewMinWatermarkAcrossAlignedChannels,-(DataOutput%3C%3F%3E]
>  for details), and the time complexity is O(N), where N is the number of 
> channels.
> We can optimize it by introducing a heap-based algorthim, reducing the time 
> complexity to O(log(N)))



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21587: [bp-1.16][FLINK-29849][table-planner] Fix event time temporal join on an upsert source may produce incorrect execution plan

2023-01-02 Thread GitBox


flinkbot commented on PR #21587:
URL: https://github.com/apache/flink/pull/21587#issuecomment-1369384736

   
   ## CI report:
   
   * 608cedb9a58a23aa63d7291c6f32ec1318107f6c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30543) Adding more examples for setting up jobs via operator.

2023-01-02 Thread Sriram Ganesh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653769#comment-17653769
 ] 

Sriram Ganesh commented on FLINK-30543:
---

[~gyfora] - Please add your thoughts.

> Adding more examples for setting up jobs via operator.
> --
>
> Key: FLINK-30543
> URL: https://issues.apache.org/jira/browse/FLINK-30543
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sriram Ganesh
>Priority: Minor
>
> Currently, we have only basic examples which help to see how to run the job 
> via an operator if we can add more examples for all upgrade modes that would 
> be more helpful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lincoln-lil opened a new pull request, #21587: [bp-1.16][FLINK-29849][table-planner] Fix event time temporal join on an upsert source may produce incorrect execution plan

2023-01-02 Thread GitBox


lincoln-lil opened a new pull request, #21587:
URL: https://github.com/apache/flink/pull/21587

   this is a backport pr for 
[FLINK-29849](https://issues.apache.org/jira/browse/FLINK-29849) to release-1.16


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30389) Add retry to read hints

2023-01-02 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653768#comment-17653768
 ] 

Jingsong Lee commented on FLINK-30389:
--

[~Wencong Liu] SnapshotManager.readHints. Feel free to open a PR.

> Add retry to read hints
> ---
>
> Key: FLINK-30389
> URL: https://issues.apache.org/jira/browse/FLINK-30389
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Priority: Major
> Fix For: table-store-0.3.0
>
>
> For the oss (object store) filesystem. When writing hint file, delete it 
> first and then add it. Reading hint file may fail frequently. We don't need 
> to return directly in case of failure. We can add a retry.
> {code:java}
> Failed to read hint file LATEST. Falling back to listing files.
> java.io.FileNotFoundException: oss://lake_v4/snapshot/LATEST: No such file or 
> directory!
>   at 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30543) Adding more examples for setting up jobs via operator.

2023-01-02 Thread Sriram Ganesh (Jira)
Sriram Ganesh created FLINK-30543:
-

 Summary: Adding more examples for setting up jobs via operator.
 Key: FLINK-30543
 URL: https://issues.apache.org/jira/browse/FLINK-30543
 Project: Flink
  Issue Type: Improvement
Reporter: Sriram Ganesh


Currently, we have only basic examples which help to see how to run the job via 
an operator if we can add more examples for all upgrade modes that would be 
more helpful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28988) Incorrect result for filter after temporal join

2023-01-02 Thread Godfrey He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17643318#comment-17643318
 ] 

Godfrey He edited comment on FLINK-28988 at 1/3/23 4:08 AM:


Fixed in 1.17.0:

b2203eaef68364306dfcc27fb34ac82baefda3d3

2851fac9c4c052876c80440b6b0b637603de06ea

 

1.16.1:

17b42516ceb73fa342101aedf830df40a84d82bc

c14355243995bff7b03a527ed073a2bbaab70ce8


was (Author: godfrey):
Fixed in 1.17.0:

b2203eaef68364306dfcc27fb34ac82baefda3d3

2851fac9c4c052876c80440b6b0b637603de06ea

> Incorrect result for filter after temporal join
> ---
>
> Key: FLINK-28988
> URL: https://issues.apache.org/jira/browse/FLINK-28988
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> The following code can reproduce the case
>  
> {code:java}
> public class TemporalJoinSQLExample1 {
> public static void main(String[] args) throws Exception {
> // set up the Java DataStream API
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // set up the Java Table API
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> final DataStreamSource> ds =
> env.fromElements(
> new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
> new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
> new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
> final Table table =
> tableEnv.fromDataStream(
> ds,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", DataTypes.STRING())
> .column("f2", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f2", "f2 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "state", "ts");
> tableEnv.createTemporaryView("source_table", table);
> final Table dedupeTable =
> tableEnv.sqlQuery(
> "SELECT * FROM ("
> + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
> id ORDER BY ts DESC) AS row_num FROM source_table"
> + ") WHERE row_num = 1");
> tableEnv.createTemporaryView("versioned_table", dedupeTable);
> DataStreamSource> event =
> env.fromElements(
> new Tuple2<>(0, Instant.ofEpochMilli(0)),
> new Tuple2<>(0, Instant.ofEpochMilli(5)),
> new Tuple2<>(0, Instant.ofEpochMilli(10)),
> new Tuple2<>(0, Instant.ofEpochMilli(15)),
> new Tuple2<>(0, Instant.ofEpochMilli(20)),
> new Tuple2<>(0, Instant.ofEpochMilli(25)));
> final Table eventTable =
> tableEnv.fromDataStream(
> event,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f1", "f1 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "ts");
> tableEnv.createTemporaryView("event_table", eventTable);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM event_table"
> + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
> AS OF event_table.ts"
> + " ON event_table.id = versioned_table.id");
> result.execute().print();
> result.filter($("state").isEqual("online")).execute().print();
> }
> } {code}
>  
> The result of temporal join is the following:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.000|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.005|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.010|          0|                       
> offline|1970-01-01 08:00:00.010|     

[jira] [Updated] (FLINK-28988) Incorrect result for filter after temporal join

2023-01-02 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-28988:
---
Fix Version/s: 1.16.1

> Incorrect result for filter after temporal join
> ---
>
> Key: FLINK-28988
> URL: https://issues.apache.org/jira/browse/FLINK-28988
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1
>
>
> The following code can reproduce the case
>  
> {code:java}
> public class TemporalJoinSQLExample1 {
> public static void main(String[] args) throws Exception {
> // set up the Java DataStream API
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // set up the Java Table API
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> final DataStreamSource> ds =
> env.fromElements(
> new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
> new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
> new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
> final Table table =
> tableEnv.fromDataStream(
> ds,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", DataTypes.STRING())
> .column("f2", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f2", "f2 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "state", "ts");
> tableEnv.createTemporaryView("source_table", table);
> final Table dedupeTable =
> tableEnv.sqlQuery(
> "SELECT * FROM ("
> + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
> id ORDER BY ts DESC) AS row_num FROM source_table"
> + ") WHERE row_num = 1");
> tableEnv.createTemporaryView("versioned_table", dedupeTable);
> DataStreamSource> event =
> env.fromElements(
> new Tuple2<>(0, Instant.ofEpochMilli(0)),
> new Tuple2<>(0, Instant.ofEpochMilli(5)),
> new Tuple2<>(0, Instant.ofEpochMilli(10)),
> new Tuple2<>(0, Instant.ofEpochMilli(15)),
> new Tuple2<>(0, Instant.ofEpochMilli(20)),
> new Tuple2<>(0, Instant.ofEpochMilli(25)));
> final Table eventTable =
> tableEnv.fromDataStream(
> event,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f1", "f1 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "ts");
> tableEnv.createTemporaryView("event_table", eventTable);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM event_table"
> + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
> AS OF event_table.ts"
> + " ON event_table.id = versioned_table.id");
> result.execute().print();
> result.filter($("state").isEqual("online")).execute().print();
> }
> } {code}
>  
> The result of temporal join is the following:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.000|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.005|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.010|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.015|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.020|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
> |+I|          0|1970-01-01 08:00:00.025|          0|                        
> 

[GitHub] [flink] reswqa commented on a diff in pull request #21560: [FLINK-29768] Hybrid shuffle supports consume partial finished subtask

2023-01-02 Thread GitBox


reswqa commented on code in PR #21560:
URL: https://github.com/apache/flink/pull/21560#discussion_r1060267262


##
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link ShuffleDescriptor}s cache for a {@link ConsumedPartitionGroup} */
+public class CachedShuffleDescriptors {
+/**
+ * Stores all serialized shuffle descriptors. For unknown shuffle 
descriptor, it will be
+ * replaced by real shuffle descriptor after upstream task finished.
+ */
+private final List> 
serializedShuffleDescriptors;
+
+/**
+ * Stores all to be serialized shuffle descriptors, They will be 
serialized and replace
+ * corresponding value(unknown shuffle descriptor) in 
serializedShuffleDescriptors during the
+ * next time TDD is generated.
+ */
+private final Map toBeSerialized;

Review Comment:
   > Why do we need a map? Would it be better to use a queue of tuples?
   
   Yes, queue is better than map.
   
   > And why not eagerly serialize the descriptor and replace the previous 
unknown descriptor in `markPartitionFinished`?
   
   Because we can not get the serializer during mark partition finished. This 
is also the reason why `serializeShuffleDescriptors` has param of 
`shuffleDescriptorSerializer`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] godfreyhe merged pull request #21578: [bp-1.16][FLINK-28988] Don't push filters down into the right table for temporal join

2023-01-02 Thread GitBox


godfreyhe merged PR #21578:
URL: https://github.com/apache/flink/pull/21578


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #21560: [FLINK-29768] Hybrid shuffle supports consume partial finished subtask

2023-01-02 Thread GitBox


reswqa commented on code in PR #21560:
URL: https://github.com/apache/flink/pull/21560#discussion_r1060265216


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * {@link PartialFinishedInputConsumableDecider} is a special {@link 
InputConsumableDecider}. The
+ * input is considered to be consumable:
+ *
+ * 
+ *   for hybrid input: when partial producer partitions are finished.
+ *   for blocking input: when all producer partitions are finished.
+ * 
+ */
+public class PartialFinishedInputConsumableDecider implements 
InputConsumableDecider {
+public static final int NUM_FINISHED_PARTITIONS_AS_CONSUMABLE = 1;
+
+@Override
+public boolean isInputConsumable(
+SchedulingExecutionVertex executionVertex,
+Set verticesToDeploy,
+Map consumableStatusCache) {
+for (ConsumedPartitionGroup consumedPartitionGroup :
+executionVertex.getConsumedPartitionGroups()) {
+
+if (!consumableStatusCache.computeIfAbsent(
+consumedPartitionGroup, 
this::isConsumedPartitionGroupConsumable)) {
+return false;
+}

Review Comment:
   I think it is reasonable to require all groups to have at least one finished 
partition. The main reason is that in the case of multiple inputs, if the low 
priority input is finished first, the downstream cannot consume data even if it 
is scheduled in advance, which will cause a waste of resources to some extent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xintongsong commented on pull request #21565: [FLINK-18229][ResourceManager] Support cancel pending workers if no longer needed.

2023-01-02 Thread GitBox


xintongsong commented on PR #21565:
URL: https://github.com/apache/flink/pull/21565#issuecomment-1369369021

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21586: [FLINK-30352][table-runtime] Introduce adaptive hash aggregate to adaptively determine whether local hash aggregate is required at runtime

2023-01-02 Thread GitBox


flinkbot commented on PR #21586:
URL: https://github.com/apache/flink/pull/21586#issuecomment-1369368128

   
   ## CI report:
   
   * 9fe565a241df6db1a22bd552d90233e9895ff9db UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30423) Introduce cast executor codegen for column type evolution

2023-01-02 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-30423.

Fix Version/s: table-store-0.4.0
 Assignee: Shammon
   Resolution: Fixed

master: 
dcae4c28d69750713fa0d478d6ea348735790dbe
b22ed19740f427852f404d93570e3b0cff1b09ef

> Introduce cast executor codegen for column type evolution
> -
>
> Key: FLINK-30423
> URL: https://issues.apache.org/jira/browse/FLINK-30423
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.3.0
>Reporter: Shammon
>Assignee: Shammon
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
> Introduce cast executor codegen for column type evolution



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] yuchuanchen commented on a diff in pull request #21563: [FLINK-19889][connectors/hive/filesystem][format/parquet] Supports nested projection pushdown for filesystem connector of colum

2023-01-02 Thread GitBox


yuchuanchen commented on code in PR #21563:
URL: https://github.com/apache/flink/pull/21563#discussion_r1060263844


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java:
##
@@ -94,6 +99,217 @@ public static DataType projectRow(DataType dataType, int[] 
indexPaths) {
 return Projection.of(indexPaths).project(dataType);
 }
 
+public static List buildRowFields(RowType allType, 
int[][] projectedFields) {
+final List updatedFields = new ArrayList<>();
+for (int[] projectedField : projectedFields) {
+updatedFields.add(
+buildRowFieldInProjectFields(
+allType.getFields().get(projectedField[0]), 
projectedField, 0));
+}
+return updatedFields;
+}
+
+public static RowType buildRow(RowType allType, int[][] projectedFields) {
+return new RowType(
+allType.isNullable(), mergeRowFields(buildRowFields(allType, 
projectedFields)));
+}
+
+public static RowType buildRow(RowType allType, List 
updatedFields) {
+return new RowType(allType.isNullable(), 
mergeRowFields(updatedFields));
+}
+
+public static RowType.RowField buildRowFieldInProjectFields(
+RowType.RowField rowField, int[] fields, int index) {
+if (fields.length == 1 || index == fields.length - 1) {
+LogicalType rowType = rowField.getType();
+if (rowType.is(ROW)) {
+rowType = new RowType(rowType.isNullable(), ((RowType) 
rowType).getFields());
+}
+return new RowField(rowField.getName(), rowType);
+}
+Preconditions.checkArgument(rowField.getType().is(ROW), "Row data type 
expected.");
+final List updatedFields = new ArrayList<>();
+RowType rowtype = ((RowType) rowField.getType());
+updatedFields.add(
+buildRowFieldInProjectFields(
+rowtype.getFields().get(fields[index + 1]), fields, 
index + 1));
+return new RowType.RowField(
+rowField.getName(), new RowType(rowtype.isNullable(), 
updatedFields));
+}
+
+public static List mergeRowFields(List 
updatedFields) {
+List updatedFieldsCopy =
+
updatedFields.stream().map(RowField::copy).collect(Collectors.toList());
+final List fieldNames =
+
updatedFieldsCopy.stream().map(RowField::getName).collect(Collectors.toList());
+if (fieldNames.stream().anyMatch(StringUtils::isNullOrWhitespaceOnly)) 
{
+throw new ValidationException(
+"Field names must contain at least one non-whitespace 
character.");
+}
+final Set duplicates =
+fieldNames.stream()
+.filter(n -> Collections.frequency(fieldNames, n) > 1)
+.collect(Collectors.toSet());
+if (duplicates.isEmpty()) {
+return updatedFieldsCopy;
+}
+List duplicatesFields =
+updatedFieldsCopy.stream()
+.filter(f -> duplicates.contains(f.getName()))
+.collect(Collectors.toList());
+updatedFieldsCopy.removeAll(duplicatesFields);
+Map> duplicatesMap = new HashMap<>();
+duplicatesFields.forEach(
+f -> {
+List tmp = 
duplicatesMap.getOrDefault(f.getName(), new ArrayList<>());
+tmp.add(f);
+duplicatesMap.put(f.getName(), tmp);
+});
+duplicatesMap.forEach(
+(fieldName, duplicateList) -> {
+List fieldsToMerge = new ArrayList<>();
+for (RowField rowField : duplicateList) {
+Preconditions.checkArgument(
+rowField.getType().is(ROW), "Row data type 
expected.");
+RowType rowType = (RowType) rowField.getType();
+fieldsToMerge.addAll(rowType.getFields());
+}
+RowField mergedField =
+new RowField(
+fieldName,
+new RowType(
+
duplicateList.get(0).getType().isNullable(),
+mergeRowFields(fieldsToMerge)));
+updatedFieldsCopy.add(mergedField);
+});
+return updatedFieldsCopy;
+}
+
+public static int[][] computeProjectedFields(int[] selectFields) {
+int[][] projectedFields = new int[selectFields.length][1];
+for (int i = 0; i < selectFields.length; i++) {
+projectedFields[i][0] = selectFields[i];
+}
+return projectedFields;
+}
+
+public static int[][] computeProjectedFields(
+String[] selectedFieldNames, 

[GitHub] [flink] swuferhong opened a new pull request, #21586: [FLINK-30352][table-runtime] Introduce adaptive hash aggregate to adaptively determine whether local hash aggregate is required at runtim

2023-01-02 Thread GitBox


swuferhong opened a new pull request, #21586:
URL: https://github.com/apache/flink/pull/21586

   
   
   ## What is the purpose of the change
   
   This pr is aims to introduce adaptive hash aggregate in `batch` mode to 
adaptively determine whether local hash aggregate is required according to the 
aggregation degree of local hash aggregate. `adaptive hash agg` will first 
sample the data, and then decide whether `local hash agg` is required according 
to the degree of aggregation of the sampled data. If requiring `local hash 
agg`, it will not take any action. If required, we will transfer the output 
rowType of data to same with aggregated results and pass to downstream.
   
   
   ## Brief change log
   
   - Add codeGen code in `HashAggCodeGenerator`,
   - Add ITCase in `HashAggITCase`.
   
   
   ## Verifying this change
   
   - Add ITCase in `HashAggITCase`.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? no document now.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] yuchuanchen commented on a diff in pull request #21563: [FLINK-19889][connectors/hive/filesystem][format/parquet] Supports nested projection pushdown for filesystem connector of colum

2023-01-02 Thread GitBox


yuchuanchen commented on code in PR #21563:
URL: https://github.com/apache/flink/pull/21563#discussion_r1060263636


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java:
##
@@ -186,11 +193,30 @@ private boolean supportsMetadata(DynamicTableSource 
tableSource) {
 return tableSource instanceof SupportsReadingMetadata;
 }
 
-private boolean supportsNestedProjection(DynamicTableSource tableSource) {
+private boolean supportsNestedProjection(
+DynamicTableSource tableSource, LogicalProject project) {
+List projects = project.getProjects();
 return supportsProjectionPushDown(tableSource)
+&& isVectorizedSupportedTypes(projects)
 && ((SupportsProjectionPushDown) 
tableSource).supportsNestedProjection();
 }
 
+private boolean isVectorizedSupportedTypes(List projects) {

Review Comment:
   good idea, I will change it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] yuchuanchen commented on a diff in pull request #21563: [FLINK-19889][connectors/hive/filesystem][format/parquet] Supports nested projection pushdown for filesystem connector of colum

2023-01-02 Thread GitBox


yuchuanchen commented on code in PR #21563:
URL: https://github.com/apache/flink/pull/21563#discussion_r1060263439


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala:
##
@@ -538,7 +538,30 @@ class RexNodeToExpressionConverter(
 }
   }
 
-  override def visitFieldAccess(fieldAccess: RexFieldAccess): 
Option[ResolvedExpression] = None
+  override def visitFieldAccess(fieldAccess: RexFieldAccess): 
Option[ResolvedExpression] = {

Review Comment:
   Sorry, My mistake. This method is called when TableSource implements 
SupportsFilterPushDown. I will remove this since this patch only supports 
nested projection pushdown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xintongsong commented on a diff in pull request #21576: [FLINK-30533][runtime] SourceOperator#emitNext() should push records to DataOutput in a while loop

2023-01-02 Thread GitBox


xintongsong commented on code in PR #21576:
URL: https://github.com/apache/flink/pull/21576#discussion_r1060259412


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -990,6 +991,10 @@ public MailboxExecutorFactory getMailboxExecutorFactory() {
 return this.mailboxProcessor::getMailboxExecutor;
 }
 
+public Supplier getMailboxHasMail() {
+return this.mailboxProcessor::hasMail;

Review Comment:
   With this change, the `@VisibleForTesting` annotation can be removed for 
`MailboxProcessor#hasMail`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi merged pull request #441: [FLINK-30423] Introduce generated codes for CastExecutor

2023-01-02 Thread GitBox


JingsongLi merged PR #441:
URL: https://github.com/apache/flink-table-store/pull/441


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 commented on a diff in pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

2023-01-02 Thread GitBox


fsk119 commented on code in PR #21577:
URL: https://github.com/apache/flink/pull/21577#discussion_r1060260536


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##
@@ -698,7 +782,7 @@ private AlterSchemaStrategy 
computeAlterSchemaStrategy(SqlAlterTableSchema alter
 alterTableSchema.getClass().getCanonicalName()));
 }
 
-private static  T unwrap(Optional value) {
+private  T unwrap(Optional value) {

Review Comment:
   ModifySchemaConverter and AddSchemaConverter are both static classes and can 
only use static methods in the parent class. But now I change them to 
class-level inner class, so I think we can remove the static keyword now. 
   
   BTW, I think we should narrow the scope of the visibility. With the `static` 
keyword, the visibility is much larger. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

2023-01-02 Thread GitBox


LadyForest commented on PR #21577:
URL: https://github.com/apache/flink/pull/21577#issuecomment-1369359579

   > Thanks for the review!
   > 
   > > Since the MODIFY operation is split into a list of changes, I would like 
to know whether the order matters and how the external catalog copes with them. 
For example, does API expose a requirement that it should be an atomic 
operation, or ? does the external catalog decide it?
   > 
   > Yes, the order matters. The catalog developer needs to make sure the 
execution in order and the execution is atomic. The logic may looks like(the 
idea is from the Spark JdbcTablecatalog#alterTable)
   > 
   > ```
   > 
   > default void alterTable(
   > ObjectPath tablePath,
   > CatalogBaseTable newTable,
   > List tableChanges,
   > boolean ignoreIfNotExists)
   > throws TableNotExistException, CatalogException {
   > // make sure all table changes are supported.
   > validate(tableChanges);
   > // jdbc catalog can work like
   > conn.setAutoCommit(false);
   > Statement statement = conn.createStatement()
   > try {
   > statement.setQueryTimeout(options.queryTimeout);
   > for (String sql = dialect.alterTable(tableName, changes, 
metaData.getDatabaseMajorVersion)) {
   >statement.executeUpdate(sql)
   > }
   > conn.commit()
   > } catch (Exception e) {
   >   if (conn != null) {
   > conn.rollback();
   >   }
   >   throw e;
   > } finally {
   >   statement.close();
   >   conn.setAutoCommit(true);
   > }
   > }
   > ```
   > 
   > > The "oldColumn" and "originColumn" are interchangeably used across the 
changelog, and can we align them to improve the readability?
   > 
   > Thanks for pointing it out. I agree with you we should align the term in 
the codes. I am prone to use the term "oldColumn" instead because we already 
have used this in the FLIP-273. WDYT?
   > 
   > > SchemaConverter#updatePositionAndCollectColumnChange and 
OperationConverterUtils#buildColumnChange are very similar. Do we have a plan 
to do some refactor work?
   > 
   > Yes. I think we can refactor this after all parts finish. WDYT?
   
   Thanks for the detailed explanation. +1 for refactoring after all the 
subtasks finished


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30234) SourceReaderBase should provide an option to disable numRecordsIn metric registration

2023-01-02 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653749#comment-17653749
 ] 

Qingsheng Ren commented on FLINK-30234:
---

Thanks [~Wencong Liu] ! I've assigned the ticket to you. Actually we need to 
introduce a new option for SourceReaderBase, which touches public API, so it's 
better to have a discussion in the ML.

> SourceReaderBase should provide an option to disable numRecordsIn metric 
> registration
> -
>
> Key: FLINK-30234
> URL: https://issues.apache.org/jira/browse/FLINK-30234
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Assignee: Wencong Liu
>Priority: Major
>
> Currently the numRecordsIn metric is pre-registered for all sources in 
> SourceReaderBase. Considering different implementation of source reader, the 
> definition of "record" might differ from the one we use in SourceReaderBase, 
> hence numRecordsIn might be inaccurate.
> We could introduce an option in SourceReader to disable the registration of 
> numRecordsIn in SourceReaderBase and let the actual implementation to report 
> the metric instead. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29217) CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint failed with AssertionFailedError

2023-01-02 Thread Yunfeng Zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651958#comment-17651958
 ] 

Yunfeng Zhou edited comment on FLINK-29217 at 1/3/23 3:11 AM:
--

According to offline discussions with Dong Lin, it requires further discussion 
about the behavior and performance of Flink runtime during checkpoints to apply 
a design to preserve the consistency of operator events in case of concurrent 
checkpoints. Given that currently no subclass of OperatorCoordinator would be 
affected by this function, it is thus of lower priority and we would 
temporarily remove the guarantee that this function works correctly under 
concurrent checkpoints until the community has reached an agreement on the 
design for this function.


was (Author: yunfengzhou):
According to offline discussion with Becket Qin and Dong Lin, given that Flink 
Operator Coordinator's support for saving buffered operator events in face of 
concurrent checkpoints is not good enough for now, we need to temporarily and 
partially disable concurrent manipulation of multiple checkpoints in 
OperatorCoordinator.

A short-term solution is as follows.

- If a new checkpoint is triggered on an OperatorCoordinatorHolder when a 
checkpoint is still in process, and the new checkpoint cannot be subsumed (i.e. 
a savepoint instead of an automatically triggered checkpoint), the checkpoint 
would be processed concurrently, and all blocked OperatorEvents would be 
regarded as generated after the new checkpoint is triggered (i.e. they would 
not be saved to the snapshot of the new checkpoint).
- If a new checkpoint is triggered on an OperatorCoordinatorHolder when a 
checkpoint is still in process, and the new checkpoint can be subsumed, the 
checkpoint would be temporarily blocked until all ongoing checkpoints have 
finished.

A long-term solution could be to make OperatorCoordinators generate checkpoint 
barriers and send them to their subtasks. The subtasks would need to align 
these barriers with the ones they receive from upstream operators or sources, 
and actually trigger the checkpoint when checkpoint barrier alignment is 
reached.

> CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint
>  failed with AssertionFailedError
> -
>
> Key: FLINK-29217
> URL: https://issues.apache.org/jira/browse/FLINK-29217
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Xingbo Huang
>Assignee: Yunfeng Zhou
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.16.1
>
>
> {code:java}
> 2022-09-07T02:00:50.2507464Z Sep 07 02:00:50 [ERROR] 
> org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint
>   Time elapsed: 2.137 s  <<< FAILURE!
> 2022-09-07T02:00:50.2508673Z Sep 07 02:00:50 
> org.opentest4j.AssertionFailedError: 
> 2022-09-07T02:00:50.2509309Z Sep 07 02:00:50 
> 2022-09-07T02:00:50.2509945Z Sep 07 02:00:50 Expecting value to be false but 
> was true
> 2022-09-07T02:00:50.2511950Z Sep 07 02:00:50  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 2022-09-07T02:00:50.2513254Z Sep 07 02:00:50  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 2022-09-07T02:00:50.2514621Z Sep 07 02:00:50  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 2022-09-07T02:00:50.2516342Z Sep 07 02:00:50  at 
> org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint(CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:173)
> 2022-09-07T02:00:50.2517852Z Sep 07 02:00:50  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-09-07T02:00:50.251Z Sep 07 02:00:50  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-09-07T02:00:50.2520065Z Sep 07 02:00:50  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-09-07T02:00:50.2521153Z Sep 07 02:00:50  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-09-07T02:00:50.2522747Z Sep 07 02:00:50  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-09-07T02:00:50.2523973Z Sep 07 02:00:50  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-09-07T02:00:50.2525158Z Sep 07 02:00:50  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 

[jira] [Resolved] (FLINK-30392) Increase the default value of cluster.thread-dump.stacktrace-max-depth

2023-01-02 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-30392.
--
Resolution: Fixed

merged in master: 187e572768932b121b5f5e785727f0c48ca98aff

> Increase the default value of cluster.thread-dump.stacktrace-max-depth
> --
>
> Key: FLINK-30392
> URL: https://issues.apache.org/jira/browse/FLINK-30392
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Yun Tang
>Assignee: Yu Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Currently, the thread dump function still have the default stack-trace depth 
> as 8, which is the same as before. However, the default value is really too 
> small for developers to know the actual thread info.
> From our experiences, we can set this value as 24.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] paul8263 commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

2023-01-02 Thread GitBox


paul8263 commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1369346830

   Hi @MartijnVisser and @jaumebecks ,
   
   I updated the unit test and I am currently waiting for the CI result.
   
   If there are other issues that need any changes, please let me know. Thank 
you very much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 commented on a diff in pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

2023-01-02 Thread GitBox


fsk119 commented on code in PR #21577:
URL: https://github.com/apache/flink/pull/21577#discussion_r1060254282


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##
@@ -276,6 +409,359 @@ public String toString() {
 }
 }
 
+// 

+// Modify Change
+// 

+
+/**
+ * A base schema change to modify a column. The modification includes:
+ *
+ * 
+ *   change column data type
+ *   reorder column position
+ *   modify column comment
+ *   change the computed expression
+ * 
+ *
+ * Some fine-grained column changes are defined in the {@link 
ModifyPhysicalColumnType},
+ * {@link ModifyColumnComment}, {@link ModifyColumnPosition} and {@link 
ModifyColumnName}.
+ *
+ * It is equal to the following statement:
+ *
+ * 
+ *ALTER TABLE table_name MODIFY column_definition 
COMMENT 'column_comment' column_position
+ * 
+ */
+@PublicEvolving
+class ModifyColumn implements TableChange {
+
+protected final Column oldColumn;
+protected final Column newColumn;
+
+protected final @Nullable ColumnPosition newPosition;
+
+public ModifyColumn(
+Column oldColumn, Column newColumn, @Nullable ColumnPosition 
newPosition) {
+this.oldColumn = oldColumn;
+this.newColumn = newColumn;
+this.newPosition = newPosition;
+}
+
+/** Returns the original {@link Column} instance. */
+public Column getOldColumn() {
+return oldColumn;
+}
+
+/** Returns the modified {@link Column} instance. */
+public Column getNewColumn() {
+return newColumn;
+}
+
+/**
+ * Returns the position of the modified {@link Column} instance. When 
the return value is
+ * null, it means modify the column at the original position. When the 
return value is
+ * FIRST, it means move the modified column to the first. When the 
return value is AFTER, it
+ * means move the column after the referred column.
+ */
+public @Nullable ColumnPosition getNewPosition() {
+return newPosition;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (!(o instanceof ModifyColumn)) {
+return false;
+}
+ModifyColumn that = (ModifyColumn) o;
+return Objects.equals(oldColumn, that.oldColumn)
+&& Objects.equals(newColumn, that.newColumn)
+&& Objects.equals(newPosition, that.newPosition);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(oldColumn, newColumn, newPosition);
+}
+
+@Override
+public String toString() {
+return "ModifyColumn{"
++ "oldColumn="
++ oldColumn
++ ", newColumn="
++ newColumn
++ ", newPosition="
++ newPosition
++ '}';
+}
+}
+
+/**
+ * A table change to modify the column comment.
+ *
+ * It is equal to the following statement:
+ *
+ * 
+ *ALTER TABLE table_name MODIFY column_name 
original_column_type COMMENT 'new_column_comment'
+ * 
+ */
+@PublicEvolving
+class ModifyColumnComment extends ModifyColumn {
+
+private final String newComment;
+
+private ModifyColumnComment(Column oldColumn, String newComment) {
+super(oldColumn, oldColumn.withComment(newComment), null);
+this.newComment = newComment;
+}
+
+/** Get the new comment for the column. */
+public String getNewComment() {
+return newComment;
+}
+
+@Override
+public boolean equals(Object o) {
+return (o instanceof ModifyColumnComment) && super.equals(o);
+}
+
+@Override
+public String toString() {
+return "ModifyColumnComment{"
++ "Column="
++ oldColumn
++ ", newComment='"
++ newComment
++ '\''
++ '}';
+}
+}
+
+/**
+ * A table change to modify the column position.
+ *
+ * It is equal to the following statement:
+ *
+ * 
+ *ALTER TABLE table_name MODIFY column_name 
original_column_type column_position
+ * 
+ */
+@PublicEvolving
+class ModifyColumnPosition extends ModifyColumn {
+
+public ModifyColumnPosition(Column oldColumn, 

[GitHub] [flink] link3280 commented on pull request #21581: [FLINK-30538][SQL gateway/client] Improve error handling of stop job operation

2023-01-02 Thread GitBox


link3280 commented on PR #21581:
URL: https://github.com/apache/flink/pull/21581#issuecomment-1369343457

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-30234) SourceReaderBase should provide an option to disable numRecordsIn metric registration

2023-01-02 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren reassigned FLINK-30234:
-

Assignee: Wencong Liu

> SourceReaderBase should provide an option to disable numRecordsIn metric 
> registration
> -
>
> Key: FLINK-30234
> URL: https://issues.apache.org/jira/browse/FLINK-30234
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Assignee: Wencong Liu
>Priority: Major
>
> Currently the numRecordsIn metric is pre-registered for all sources in 
> SourceReaderBase. Considering different implementation of source reader, the 
> definition of "record" might differ from the one we use in SourceReaderBase, 
> hence numRecordsIn might be inaccurate.
> We could introduce an option in SourceReader to disable the registration of 
> numRecordsIn in SourceReaderBase and let the actual implementation to report 
> the metric instead. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] link3280 commented on pull request #21582: [FLINK-28655][SQL gateway] Support show jobs operation in SqlGateway

2023-01-02 Thread GitBox


link3280 commented on PR #21582:
URL: https://github.com/apache/flink/pull/21582#issuecomment-1369343392

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myasuka closed pull request #21498: [FLINK-30392] Replace default thread dump depth to 50

2023-01-02 Thread GitBox


Myasuka closed pull request #21498: [FLINK-30392] Replace default thread dump 
depth to 50
URL: https://github.com/apache/flink/pull/21498


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 commented on pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

2023-01-02 Thread GitBox


fsk119 commented on PR #21577:
URL: https://github.com/apache/flink/pull/21577#issuecomment-1369338527

   Thanks for the review!
   
   >Since the MODIFY operation is split into a list of changes, I would like to 
know whether the order matters and how the external catalog copes with them. 
For example, does API expose a requirement that it should be an atomic 
operation, or ? does the external catalog decide it?
   
   Yes, the order matters. The catalog developer needs to make sure the 
execution in order and the execution is atomic. The logic may looks like(the 
idea is from the Spark JdbcTablecatalog#alterTable)
   
   ```
   
   default void alterTable(
   ObjectPath tablePath,
   CatalogBaseTable newTable,
   List tableChanges,
   boolean ignoreIfNotExists)
   throws TableNotExistException, CatalogException {
   // make sure all table changes are supported.
   validate(tableChanges);
   // jdbc catalog can work like
   conn.setAutoCommit(false);
   Statement statement = conn.createStatement()
   try {
   statement.setQueryTimeout(options.queryTimeout);
   for (String sql = dialect.alterTable(tableName, changes, 
metaData.getDatabaseMajorVersion)) {
  statement.executeUpdate(sql)
   }
   conn.commit()
   } catch (Exception e) {
 if (conn != null) {
   conn.rollback();
 }
 throw e;
   } finally {
 statement.close();
 conn.setAutoCommit(true);
   }
   }
   
   ``` 
   
   > The "oldColumn" and "originColumn" are interchangeably used across the 
changelog, and can we align them to improve the readability?
   
   Thanks for pointing it out. I agree with you we should align the term in the 
codes. I am prone to use the term "oldColumn" instead because we already have 
used this in the FLIP-273. WDYT?
   
   > SchemaConverter#updatePositionAndCollectColumnChange and 
OperationConverterUtils#buildColumnChange are very similar. Do we have a plan 
to do some refactor work?
   
   Yes. I think we can refactor this after all parts finish. WDYT?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lindong28 commented on pull request #21576: [FLINK-30533][runtime] SourceOperator#emitNext() should push records to DataOutput in a while loop

2023-01-02 Thread GitBox


lindong28 commented on PR #21576:
URL: https://github.com/apache/flink/pull/21576#issuecomment-1369329109

   @xintongsong @zhuzhurk Can you review this PR? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-30542) Support adaptive hash aggregate in runtime

2023-01-02 Thread Yunhong Zheng (Jira)
Yunhong Zheng created FLINK-30542:
-

 Summary: Support adaptive hash aggregate in runtime
 Key: FLINK-30542
 URL: https://issues.apache.org/jira/browse/FLINK-30542
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Affects Versions: 1.17.0
Reporter: Yunhong Zheng
 Fix For: 1.17.0


Introduce a new strategy to adaptively determine whether local hash aggregate 
is required according to the aggregation degree of local hash aggregate.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] paul8263 commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

2023-01-02 Thread GitBox


paul8263 commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1369309368

   > @MartijnVisser it seems @paul8263 is not responding to the requested 
changes, we may create a similar PR in the following days for fixing this, does 
that sound good to you?
   
   Hi @jaumebecks ,
   Sorry for the late response. I will do it these days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-29634) Support periodic checkpoint triggering

2023-01-02 Thread Jiale Tan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653712#comment-17653712
 ] 

Jiale Tan edited comment on FLINK-29634 at 1/3/23 12:50 AM:


I see. [~gyfora] Thanks for the input. I checked pom files again and your 
solution makes sense.

One more question - how can we deal with 
{{io.fabric8.kubernetes.client.server.mock.*}} classes? 
They are not a part of {{flink-kubernetes}}, but introduced 
[here|https://github.com/apache/flink-kubernetes-operator/blob/a1842d4c0170feb008293963ec51c0343f42771d/flink-kubernetes-standalone/pom.xml#L74-L78],
 so there are no shaded version for them.

I am running into issues 
[here|https://github.com/apache/flink-kubernetes-operator/blob/7ced741f51a99f2093ce8a45c8c92879a247f836/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java#L58]:
 the code was expecting an 
{{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient}}
 object for {{kubernetesClient}} but the mockServer.createClient() will create 
an {{io.fabric8.kubernetes.client.NamespacedKubernetesClient}}.


was (Author: JIRAUSER290356):
I see. [~gyfora] Thanks for the input. I checked pom files again and your 
solution makes sense.

One more question - how can we deal with 
{{io.fabric8.kubernetes.client.server.mock.*}} classes? 
They are not a part of {{flink-kubernetes}}, but introduced 
[here|https://github.com/apache/flink-kubernetes-operator/blob/a1842d4c0170feb008293963ec51c0343f42771d/flink-kubernetes-standalone/pom.xml#L74-L78],
 so there are no shaded version for them.

I am running into issues 
[here|https://github.com/apache/flink-kubernetes-operator/blob/7ced741f51a99f2093ce8a45c8c92879a247f836/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java#L58],
 we are expecting an 
{{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient}}
 object for {{kubernetesClient}} but the mockServer.createClient() will create 
an {{io.fabric8.kubernetes.client.NamespacedKubernetesClient}}.

> Support periodic checkpoint triggering
> --
>
> Key: FLINK-29634
> URL: https://issues.apache.org/jira/browse/FLINK-29634
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Thomas Weise
>Assignee: Jiale Tan
>Priority: Major
>
> Similar to the support for periodic savepoints, the operator should support 
> triggering periodic checkpoints to break the incremental checkpoint chain.
> Support for external triggering will come with 1.17: 
> https://issues.apache.org/jira/browse/FLINK-27101 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29634) Support periodic checkpoint triggering

2023-01-02 Thread Jiale Tan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653712#comment-17653712
 ] 

Jiale Tan edited comment on FLINK-29634 at 1/3/23 12:50 AM:


I see. [~gyfora] Thanks for the input. I checked pom files again and your 
solution makes sense.

One more question - how can we deal with 
{{io.fabric8.kubernetes.client.server.mock.*}} classes? 
They are not a part of {{flink-kubernetes}}, but introduced 
[here|https://github.com/apache/flink-kubernetes-operator/blob/a1842d4c0170feb008293963ec51c0343f42771d/flink-kubernetes-standalone/pom.xml#L74-L78],
 so there are no shaded version for them.

I am running into issues 
[here|https://github.com/apache/flink-kubernetes-operator/blob/7ced741f51a99f2093ce8a45c8c92879a247f836/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java#L58],
 we are expecting an 
{{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient}}
 object for {{kubernetesClient}} but the mockServer.createClient() will create 
an {{io.fabric8.kubernetes.client.NamespacedKubernetesClient}}.


was (Author: JIRAUSER290356):
I see. [~gyfora] Thanks for the input. I checked pom files again and your 
solution makes sense.

One more question - how can we deal with 
{{io.fabric8.kubernetes.client.server.mock.*}} classes? 
They are not a part of {{{}flink-kubernetes{}}}, but introduced 
[here|https://github.com/apache/flink-kubernetes-operator/blob/a1842d4c0170feb008293963ec51c0343f42771d/flink-kubernetes-standalone/pom.xml#L74-L78],
 so there are no shaded version for them.

We will run into issues like 
[this|https://github.com/apache/flink-kubernetes-operator/blob/7ced741f51a99f2093ce8a45c8c92879a247f836/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java#L58],
 we are expecting an 
{{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient}}
 object for {{kubernetesClient}} but the mockServer.createClient() will create 
an {{io.fabric8.kubernetes.client.NamespacedKubernetesClient}}.

> Support periodic checkpoint triggering
> --
>
> Key: FLINK-29634
> URL: https://issues.apache.org/jira/browse/FLINK-29634
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Thomas Weise
>Assignee: Jiale Tan
>Priority: Major
>
> Similar to the support for periodic savepoints, the operator should support 
> triggering periodic checkpoints to break the incremental checkpoint chain.
> Support for external triggering will come with 1.17: 
> https://issues.apache.org/jira/browse/FLINK-27101 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29634) Support periodic checkpoint triggering

2023-01-02 Thread Jiale Tan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653712#comment-17653712
 ] 

Jiale Tan commented on FLINK-29634:
---

I see. [~gyfora] Thanks for the input. I checked pom files again and your 
solution makes sense.

One more question - how can we deal with 
{{io.fabric8.kubernetes.client.server.mock.*}} classes? 
They are not a part of {{{}flink-kubernetes{}}}, but introduced 
[here|https://github.com/apache/flink-kubernetes-operator/blob/a1842d4c0170feb008293963ec51c0343f42771d/flink-kubernetes-standalone/pom.xml#L74-L78],
 so there are no shaded version for them.

We will run into issues like 
[this|https://github.com/apache/flink-kubernetes-operator/blob/7ced741f51a99f2093ce8a45c8c92879a247f836/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java#L58],
 we are expecting an 
{{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient}}
 object for {{kubernetesClient}} but the mockServer.createClient() will create 
an {{io.fabric8.kubernetes.client.NamespacedKubernetesClient}}.

> Support periodic checkpoint triggering
> --
>
> Key: FLINK-29634
> URL: https://issues.apache.org/jira/browse/FLINK-29634
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Thomas Weise
>Assignee: Jiale Tan
>Priority: Major
>
> Similar to the support for periodic savepoints, the operator should support 
> triggering periodic checkpoints to break the incremental checkpoint chain.
> Support for external triggering will come with 1.17: 
> https://issues.apache.org/jira/browse/FLINK-27101 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lindong28 commented on pull request #21579: [FLINK-30536][runtime] Remove CountingOutput from per-record code path for most operators

2023-01-02 Thread GitBox


lindong28 commented on PR #21579:
URL: https://github.com/apache/flink/pull/21579#issuecomment-1369291177

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-30475) Improved speed of RocksDBMapState clear() using rocksDB.deleteRange

2023-01-02 Thread David Hrbacek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653585#comment-17653585
 ] 

David Hrbacek edited comment on FLINK-30475 at 1/2/23 10:35 PM:


Hello [~Yanfei Lei] [~masteryhx] ,

Sorry for the late response.

I do not expect, that performance results with {{deleteRange()}} changed since 
FLINK-28010.

But if I describe our use-case scenario:
  * Write extensive keyed map-state,
  * where the state basically represents a queue of elements waiting for 
processing.
  * This state is periodically (e.g.: after 3 days) cleared (with millions of 
items).

When {{MapState#clear()}} with iterator is called, then clearing of the state 
lasts for hours(!) with occasional checkpoint failing and big degradation of 
pipeline performance.
So little performance drop mentioned in 
[blog|https://rocksdb.org/blog/2018/11/21/delete-range.html] is not really a 
problem in comparison to {{clear()}} with scan and delete.
On the other hand, I admit that for most use-cases performance drop can be 
problematic.

As I understood the problem, then:
* {{deleteRange()}} is better for clearing map states with big amount of keys,
* whereas clear via scan and delete is optimal for map states with a low amount 
of keys.

This leads me to the solution proposal, where the {{RocksDBMapState#clear()}} 
method implementation can be chosen via configuration.

I can imagine two ways how to implement configurable clear operation:
* Switchable: e.g.: config {{state.backend.rocksdb.map-state.clear-op}} with 
default option {{scan-and-delete}} and optional {{delete-range}}.
* With threshold: e.g.: config 
{{state.backend.rocksdb.map-state.clear-with-delete-range-threshold}} where 
user can configure threshold of map state size from which will be used 
{{delete-range}}. And default value will be {{-1}} which means never use 
{{delete-range}} .

What do you think?
 


was (Author: JIRAUSER298597):
Hello [~Yanfei Lei] [~masteryhx] ,

Sorry for the late response.

I do not expect, that performance results with {{deleteRange()}} changed since 
FLINK-28010.

But if I describe our use-case scenario:
  * Write extensive keyed map-state,
  * where the state basically represents a queue of elements waiting for 
processing.
  * This state is periodically (e.g.: after 3 days) cleared (with millions of 
items).

When {{MapState#clear()}} with iterator is called, then clearing of the state 
lasts for hours(!) with occasional checkpoint failing and big degradation of 
pipeline performance.
So little performance drop mentioned in 
[blog|https://rocksdb.org/blog/2018/11/21/delete-range.html] is not really a 
problem in comparison to {{clear()}} with scan and delete.
On the other hand, I admit that for most use-cases performance drop can be 
problematic.

As I understood the problem, then:
* {{deleteRange()}} is better for clearing map states with big amount of keys,
* whereas clear via scan and delete is optimal for map states with a low amount 
of keys.

This leads me to the solution proposal, where the {{RocksDBMapState#clear()}} 
method implementation can be chosen via configuration.

I can imagine two ways how to implement configurable clear operation:
* Switchable: e.g.: config {{state.backend.rocksdb.map-state.clear-op}} with 
default option {{scan-and-delete}} and optional {{delete-range}}.
* With threshold: e.g.: config 
{{state.backend.rocksdb.map-state.clear-with-delete-range-threshold}} where 
user can configure threshold of map state size from which will be used 
{{delete-range} }. And default value will be {{-1}} which means never use 
{{delete-range}} .

What do you think?
 

> Improved speed of RocksDBMapState clear() using rocksDB.deleteRange
> ---
>
> Key: FLINK-30475
> URL: https://issues.apache.org/jira/browse/FLINK-30475
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: David Hrbacek
>Priority: Major
>  Labels: pull-request-available
>
> Currently {{RocksDBMapState#clear()}} is processed via keyRange traversing 
> and inserting particular keys into BatchWrite for deletion.
> RocksDb offer much faster way how to delete key range - {{deleteRange}}
> This issue is follow-up for 
> [FLINK-9070|https://issues.apache.org/jira/browse/FLINK-9070] where 
> {{deleteRange}} was also considered. But at that time it implied slower read, 
> it was buggy and not even available in the Java API of RocksDB. All of these 
> problems were solved since that time (see also RocksDB [blog article for 
> deleteRange|https://rocksdb.org/blog/2018/11/21/delete-range.html])
> Delete range enables to clear {{RocksDBMapState}} for one key in constant 
> computational complexity whereas the old 

[jira] [Updated] (FLINK-30541) Add Transformer and Estimator for OnlineStandardScaler

2023-01-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30541:
---
Labels: pull-request-available  (was: )

> Add Transformer and Estimator for OnlineStandardScaler
> --
>
> Key: FLINK-30541
> URL: https://issues.apache.org/jira/browse/FLINK-30541
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] zhipeng93 opened a new pull request, #196: [FLINK-30541] Add Transformer and Estimator for OnlineStandardScaler

2023-01-02 Thread GitBox


zhipeng93 opened a new pull request, #196:
URL: https://github.com/apache/flink-ml/pull/196

   ## What is the purpose of the change
   - Add Transformer and Estimator for OnlineStandardScaler
   
   ## Brief change log
   - Added Transformer and Estimator for OnlineStandardScaler (java/python 
source/test/example and docs).
   - Added two metrics for exposing ML model attributes (i.e., `timestamp` and 
`version` of model data).
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-30541) Add Transformer and Estimator for OnlineStandardScaler

2023-01-02 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-30541:
-

Assignee: Zhipeng Zhang

> Add Transformer and Estimator for OnlineStandardScaler
> --
>
> Key: FLINK-30541
> URL: https://issues.apache.org/jira/browse/FLINK-30541
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30541) Add Transformer and Estimator for OnlineStandardScaler

2023-01-02 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-30541:
-

 Summary: Add Transformer and Estimator for OnlineStandardScaler
 Key: FLINK-30541
 URL: https://issues.apache.org/jira/browse/FLINK-30541
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Zhipeng Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] fsk119 closed pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

2023-01-02 Thread GitBox


fsk119 closed pull request #19419: [FLINK-22317][table] Support DROP 
column/constraint/watermark for ALTER TABLE statement
URL: https://github.com/apache/flink/pull/19419


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 commented on pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

2023-01-02 Thread GitBox


fsk119 commented on PR #19419:
URL: https://github.com/apache/flink/pull/19419#issuecomment-1369055614

   Merged in the #21571


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30442) Update table walkthrough playground for 1.16

2023-01-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30442:
---
Labels: pull-request-available  (was: )

> Update table walkthrough playground for 1.16
> 
>
> Key: FLINK-30442
> URL: https://issues.apache.org/jira/browse/FLINK-30442
> Project: Flink
>  Issue Type: Sub-task
>Reporter: David Anderson
>Assignee: Gunnar Morling
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-22317) Support DROP column/constraint/watermark for ALTER TABLE statement

2023-01-02 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang closed FLINK-22317.
-
Resolution: Fixed

Merged into master: 
fb9354d8e1fc7bc17bc59b874089a457badacbe8
cc9540cacc2d775cc190ee6534794ec33fd07828

> Support DROP column/constraint/watermark for ALTER TABLE statement
> --
>
> Key: FLINK-22317
> URL: https://issues.apache.org/jira/browse/FLINK-22317
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Jark Wu
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-playgrounds] gunnarmorling opened a new pull request, #39: FLINK-30442: Upgrading table walk-through to Flink 1.16

2023-01-02 Thread GitBox


gunnarmorling opened a new pull request, #39:
URL: https://github.com/apache/flink-playgrounds/pull/39

   Hey @alpinegizmo, here's the second PR for the playground updates.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 closed pull request #21571: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

2023-01-02 Thread GitBox


fsk119 closed pull request #21571: [FLINK-22317][table] Support DROP 
column/constraint/watermark for ALTER TABLE statement
URL: https://github.com/apache/flink/pull/21571


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-playgrounds] gunnarmorling commented on pull request #37: [FLINK-30446][flink-playgrounds]bump mysql version to 8.0.31 for table-walkthrough supporting more architectures

2023-01-02 Thread GitBox


gunnarmorling commented on PR #37:
URL: https://github.com/apache/flink-playgrounds/pull/37#issuecomment-1369042907

   I can confirm that this change makes the table walkthrough work on 
macOS/Aarch64. // CC @alpinegizmo 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30440) Update operations playground for 1.16

2023-01-02 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653636#comment-17653636
 ] 

Gunnar Morling commented on FLINK-30440:


Ok, got it. I'm planning to send the PRs for the two other playgrounds in the 
course of this week.

> Update operations playground for 1.16
> -
>
> Key: FLINK-30440
> URL: https://issues.apache.org/jira/browse/FLINK-30440
> Project: Flink
>  Issue Type: Sub-task
>Reporter: David Anderson
>Assignee: Gunnar Morling
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30343) Migrate KubernetesLeaderElectionAndRetrievalITCase

2023-01-02 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653630#comment-17653630
 ] 

Wencong Liu commented on FLINK-30343:
-

[~mapohl] Thanks for your reply. I think that even though FLINK-26522 is 
currently in progress, the current changes to FLINK-30338 are still meaningful. 
After FLINK-30338 is completed, only a few minor changes may be needed. Old 
(legacy) test cases should not be needed, right? If you think it's OK, could 
you please assign the remaining subtasks to me?

> Migrate KubernetesLeaderElectionAndRetrievalITCase
> --
>
> Key: FLINK-30343
> URL: https://issues.apache.org/jira/browse/FLINK-30343
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30440) Update operations playground for 1.16

2023-01-02 Thread David Anderson (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653629#comment-17653629
 ] 

David Anderson commented on FLINK-30440:


With these specific playground tasks, I usually merge each of them to master as 
soon as they are ready, but wait until all 3 sub-tasks are done before merging 
all of them to the release branch and then resolving all of the issues.

> Update operations playground for 1.16
> -
>
> Key: FLINK-30440
> URL: https://issues.apache.org/jira/browse/FLINK-30440
> Project: Flink
>  Issue Type: Sub-task
>Reporter: David Anderson
>Assignee: Gunnar Morling
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30440) Update operations playground for 1.16

2023-01-02 Thread David Anderson (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Anderson updated FLINK-30440:
---
Fix Version/s: 1.16.0

> Update operations playground for 1.16
> -
>
> Key: FLINK-30440
> URL: https://issues.apache.org/jira/browse/FLINK-30440
> Project: Flink
>  Issue Type: Sub-task
>Reporter: David Anderson
>Assignee: Gunnar Morling
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30440) Update operations playground for 1.16

2023-01-02 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653614#comment-17653614
 ] 

Gunnar Morling commented on FLINK-30440:


[~danderson], thanks for merging that PR! What is the procedure in regards to 
Jira? It seems you'd have to set the Fix Version and transition this issue to 
Resolved, as I'm lacking the rights to do so myself. Thanks again.

> Update operations playground for 1.16
> -
>
> Key: FLINK-30440
> URL: https://issues.apache.org/jira/browse/FLINK-30440
> Project: Flink
>  Issue Type: Sub-task
>Reporter: David Anderson
>Assignee: Gunnar Morling
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30537) Add support for OpenSearch 2.3

2023-01-02 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653613#comment-17653613
 ] 

Martijn Visser commented on FLINK-30537:


[~reta] I'll reply to the poster in Slack, thanks

> Add support for OpenSearch 2.3
> --
>
> Key: FLINK-30537
> URL: https://issues.apache.org/jira/browse/FLINK-30537
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Reporter: Martijn Visser
>Priority: Major
>
> Create a version for Flink’s Opensearch connector that supports version 2.3.
> From the ASF Flink Slack: 
> https://apache-flink.slack.com/archives/C03GV7L3G2C/p1672339157102319



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29363) Allow web ui to fully redirect to other page

2023-01-02 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653612#comment-17653612
 ] 

Martijn Visser commented on FLINK-29363:


[~rmetzger] Given that this feature is currently not documented, should we at 
least add something to the release notes? 

> Allow web ui to fully redirect to other page
> 
>
> Key: FLINK-29363
> URL: https://issues.apache.org/jira/browse/FLINK-29363
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.15.3
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> In a streaming platform system, web ui usually integrates with internal 
> authentication and authorization system. Given the validation failed, the 
> request needs to be redirected to a landing page. It does't work for AJAX 
> request. It will be great to have the web ui configurable to allow auto full 
> redirect. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21585: [BP-1.15][FLINK-30539][tests] Removes timeout from YARNSessionCapacitySchedulerITCase

2023-01-02 Thread GitBox


flinkbot commented on PR #21585:
URL: https://github.com/apache/flink/pull/21585#issuecomment-1368970990

   
   ## CI report:
   
   * cd690a9492a6185b7351b724a22962c9ee05f0e7 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21584: [BP-1.16][FLINK-30539][tests] Removes timeout from YARNSessionCapacitySchedulerITCase

2023-01-02 Thread GitBox


flinkbot commented on PR #21584:
URL: https://github.com/apache/flink/pull/21584#issuecomment-1368970897

   
   ## CI report:
   
   * 3e74c2e251ae727bfd0e61d0a98a650036922fba UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >