Re: [PR] [FLINK-32650][protobuf]Added the ability to split flink-protobuf code… [flink]

2023-11-12 Thread via GitHub


ljw-hit commented on code in PR #23162:
URL: https://github.com/apache/flink/pull/23162#discussion_r1390343343


##
flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import 
org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
+import org.apache.flink.formats.protobuf.testproto.BigPbClass;
+import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.ByteString;
+import org.junit.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for below case
+ *
+ * 
+ * syntax = "proto3";
+ * package org.apache.flink.formats.protobuf.testproto;
+ * option java_package = "org.apache.flink.formats.protobuf.testproto";
+ * option java_outer_classname = "BigPbClass";
+ * import "google/protobuf/descriptor.proto";
+ * message BigPbMessage {
+ * 
+ *
+ * It is valid proto definition.
+ */
+public class BigPbRowToProtoTest {
+
+@Test
+public void testSimple() throws Exception {
+GenericRowData rowData = new GenericRowData(34);
+rowData.setField(7, 20);
+rowData.setField(8, StringData.fromString("test1"));
+rowData.setField(9, false);
+rowData.setField(10, 1F);
+rowData.setField(11, 2D);
+rowData.setField(12, new byte[] {1, 2, 3});

Review Comment:
   Hmm, thank you for your suggestion, I will fix it.



##
flink-formats/flink-protobuf/src/test/proto/test_big_pb.proto:
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.formats.protobuf.testproto;
+
+option java_package = "org.apache.flink.formats.protobuf.testproto";
+option java_outer_classname = "BigPbClass";
+
+message BigPbMessage {
+  repeated string field = 1;
+  int32 int_field = 2;
+  float float_field = 3;
+  double double_field = 4;
+  bool bool_field = 5;
+  string string_field = 6;
+  bytes bytes_field = 7;
+  int32 a_field_1 = 8;
+  string a_field_2 = 9 ;
+  bool a_field_3 = 10;
+  float b_field_1 = 11;
+  double b_field_2 = 12;
+  bytes b_field_3 = 13;
+  int64 c_field_1 = 14;
+  uint32 c_field_2 = 15;
+  uint64 c_field_3 = 16;
+  int32 e_field_1 = 17;
+  float e_field_2 = 18;
+  string e_field_3 = 19;
+  bool e_field_4 = 20;
+  bytes e_field_5 = 21;
+  double f_field_1 = 22;
+  uint32 f_field_2 = 23;
+  uint64 f_field_3 = 24;
+  fixed32 f_field_4 = 25;
+  fixed64 f_field_5 = 26;
+  sfixed32 f_field_6 = 27;
+  sfixed64 f_field_7 = 28;
+  float f_field_8 = 29;
+  bool f_field_9 = 30;
+  string f_field_10 = 31;
+  bytes f_field_11 = 32;

Review Comment:
   resolved



##
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenDeserializer.java:
##
@@ -28,12 +29,19 @@
  * 
  */
 public interface PbCodegenDeserializer {
+
 /**
  * @param resultVar the final var name that is calculated by codegen. This 
var name will be used
- * by outsider codegen environment. {@code resultVariable} s

[jira] [Created] (FLINK-33526) Improve default autoscaler configs

2023-11-12 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-33526:
--

 Summary: Improve default autoscaler configs
 Key: FLINK-33526
 URL: https://issues.apache.org/jira/browse/FLINK-33526
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler, Kubernetes Operator
Reporter: Gyula Fora
Assignee: Gyula Fora


There are a few config defaults that should be improved based on prod usage:
 * Metric window : 10 -> 15m
 * Catch up duration: 15 -> 30m
 * Restart time: 3 -> 5m
 * Utilisation boundary: 0.4 -> 0.3

These configs help make the default autoscaler behaviour smoother and less 
aggressive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33527) Clear all physical states after autoscaler is disabled

2023-11-12 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33527:

Summary: Clear all physical states after autoscaler is disabled  (was: 
Clear all pysical states after autoscaler is disabled)

> Clear all physical states after autoscaler is disabled
> --
>
> Key: FLINK-33527
> URL: https://issues.apache.org/jira/browse/FLINK-33527
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Fix For: kubernetes-operator-1.7.0
>
>
> Currently, we just clear ParallelismOverrides  after autoscaler is disabled.
> We should clear CollectedMetrics and ScalingHistory  as well:
>  * CollectedMetrics can be cleared directly.
>  * ScalingHistory can be cleared based on trim logic( 
> {color:#9876aa}VERTEX_SCALING_HISTORY_AGE){color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33527) Clear all pysical states after autoscaler is disabled

2023-11-12 Thread Rui Fan (Jira)
Rui Fan created FLINK-33527:
---

 Summary: Clear all pysical states after autoscaler is disabled
 Key: FLINK-33527
 URL: https://issues.apache.org/jira/browse/FLINK-33527
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Reporter: Rui Fan
Assignee: Rui Fan
 Fix For: kubernetes-operator-1.7.0


Currently, we just clear ParallelismOverrides  after autoscaler is disabled.

We should clear CollectedMetrics and ScalingHistory  as well:
 * CollectedMetrics can be cleared directly.
 * ScalingHistory can be cleared based on trim logic( 
{color:#9876aa}VERTEX_SCALING_HISTORY_AGE){color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33527) Clear all physical states after autoscaler is disabled

2023-11-12 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33527:

Description: 
Currently, we just clear ParallelismOverrides  after autoscaler is disabled.

We should clear CollectedMetrics and ScalingHistory  as well to prevent state 
leak.
 * CollectedMetrics can be cleared directly.
 * ScalingHistory can be cleared based on trim logic( 
{color:#9876aa}VERTEX_SCALING_HISTORY_AGE){color}

  was:
Currently, we just clear ParallelismOverrides  after autoscaler is disabled.

We should clear CollectedMetrics and ScalingHistory  as well:
 * CollectedMetrics can be cleared directly.
 * ScalingHistory can be cleared based on trim logic( 
{color:#9876aa}VERTEX_SCALING_HISTORY_AGE){color}


> Clear all physical states after autoscaler is disabled
> --
>
> Key: FLINK-33527
> URL: https://issues.apache.org/jira/browse/FLINK-33527
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> Currently, we just clear ParallelismOverrides  after autoscaler is disabled.
> We should clear CollectedMetrics and ScalingHistory  as well to prevent state 
> leak.
>  * CollectedMetrics can be cleared directly.
>  * ScalingHistory can be cleared based on trim logic( 
> {color:#9876aa}VERTEX_SCALING_HISTORY_AGE){color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33527][autoscaler] Clear all physical states after autoscaler is disabled [flink-kubernetes-operator]

2023-11-12 Thread via GitHub


1996fanrui opened a new pull request, #707:
URL: https://github.com/apache/flink-kubernetes-operator/pull/707

   ## What is the purpose of the change
   
   Currently, we just clear ParallelismOverrides  after autoscaler is disabled.
   
   We should clear CollectedMetrics and ScalingHistory  as well to prevent 
state leak.
   
   - CollectedMetrics can be cleared directly.
   - ScalingHistory can be cleared based on trim logic( 
VERTEX_SCALING_HISTORY_AGE)
   
   
   ## Brief change log
   
   [FLINK-33527][autoscaler] Clear all physical states after autoscaler is 
disabled
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *JobAutoScalerImplTest#testAutoscalerDisabled*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
 no
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33527) Clear all physical states after autoscaler is disabled

2023-11-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33527:
---
Labels: pull-request-available  (was: )

> Clear all physical states after autoscaler is disabled
> --
>
> Key: FLINK-33527
> URL: https://issues.apache.org/jira/browse/FLINK-33527
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> Currently, we just clear ParallelismOverrides  after autoscaler is disabled.
> We should clear CollectedMetrics and ScalingHistory  as well:
>  * CollectedMetrics can be cleared directly.
>  * ScalingHistory can be cleared based on trim logic( 
> {color:#9876aa}VERTEX_SCALING_HISTORY_AGE){color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33331] Update netty to 4.1.100 and arrow to 13.0.0 to make flink-python passing on java 21 [flink]

2023-11-12 Thread via GitHub


snuyanzin commented on PR #23664:
URL: https://github.com/apache/flink/pull/23664#issuecomment-1807143508

   Thanks for taking a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33331] Update netty to 4.1.100 and arrow to 13.0.0 to make flink-python passing on java 21 [flink]

2023-11-12 Thread via GitHub


snuyanzin merged PR #23664:
URL: https://github.com/apache/flink/pull/23664


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33526) Improve default autoscaler configs

2023-11-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33526:
---
Labels: pull-request-available  (was: )

> Improve default autoscaler configs
> --
>
> Key: FLINK-33526
> URL: https://issues.apache.org/jira/browse/FLINK-33526
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
>
> There are a few config defaults that should be improved based on prod usage:
>  * Metric window : 10 -> 15m
>  * Catch up duration: 15 -> 30m
>  * Restart time: 3 -> 5m
>  * Utilisation boundary: 0.4 -> 0.3
> These configs help make the default autoscaler behaviour smoother and less 
> aggressive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33526] Autoscaler config improvement + cleanup [flink-kubernetes-operator]

2023-11-12 Thread via GitHub


gyfora opened a new pull request, #708:
URL: https://github.com/apache/flink-kubernetes-operator/pull/708

   ## What is the purpose of the change
   
   There are a few config defaults that should be improved based on prod usage:
- Metric window : 10 -> 15m
- Catch up duration: 15 -> 30m
- Restart time: 3 -> 5m
- Utilisation boundary: 0.4 -> 0.3
   
   These configs help make the default autoscaler behaviour smoother and less 
aggressive.
   
   ## Brief change log
   
 - *Config improvements*
 - *Javadoc method naming cleanup*
 - *Support autoscaler for session jobs*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33526] Autoscaler config improvement + cleanup [flink-kubernetes-operator]

2023-11-12 Thread via GitHub


1996fanrui commented on code in PR #708:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/708#discussion_r1390425243


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java:
##
@@ -190,7 +189,8 @@ void registerSessionJobController() {
 var metricManager =
 MetricManager.createFlinkSessionJobMetricManager(baseConfig, 
metricGroup);
 var statusRecorder = StatusRecorder.create(client, metricManager, 
listeners);
-var reconciler = new SessionJobReconciler(eventRecorder, 
statusRecorder);
+var autoscaler = AutoscalerFactory.create(client, eventRecorder);
+var reconciler = new SessionJobReconciler(eventRecorder, 
statusRecorder, autoscaler);

Review Comment:
   Session Job doesn't use the autoscaler before, and autoscaler will be used 
after this PR, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33528) Externalize Python connector code

2023-11-12 Thread Jira
Márton Balassi created FLINK-33528:
--

 Summary: Externalize Python connector code
 Key: FLINK-33528
 URL: https://issues.apache.org/jira/browse/FLINK-33528
 Project: Flink
  Issue Type: Technical Debt
  Components: API / Python, Connectors / Common
Affects Versions: 1.18.0
Reporter: Márton Balassi
 Fix For: 1.19.0


During the connector externalization effort end to end tests for the python 
connectors were left in the main repository under:

[https://github.com/apache/flink/tree/master/flink-python/pyflink/datastream/connectors]

These include both python connector implementation and tests. Currently they 
depend on a previously released version of the underlying connectors, otherwise 
they would introduce a circular dependency given that they are in the flink 
repo at the moment.

This setup prevents us from propagating any breaking change to PublicEvolving 
and Internal APIs used by the connectors as they lead to breaking the python 
e2e tests. We run into this while implementing FLINK-25857.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-25857] Add committer metrics to track the status of committables [flink]

2023-11-12 Thread via GitHub


mbalassi merged PR #23555:
URL: https://github.com/apache/flink/pull/23555


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-25857) Add committer metrics to track the status of committables

2023-11-12 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi closed FLINK-25857.
--
Fix Version/s: 1.19.0
   Resolution: Implemented

[{{c3a07f9}}|https://github.com/apache/flink/commit/c3a07f98e5d1d7624adc967932f57d31355d9ddd]
 in master

> Add committer metrics to track the status of committables
> -
>
> Key: FLINK-25857
> URL: https://issues.apache.org/jira/browse/FLINK-25857
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Fabian Paul
>Assignee: Peter Vary
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-10-20-17-23-09-595.png, screenshot-1.png
>
>
> With Sink V2 we can now track the progress of a committable during committing 
> and show metrics about the committing status. (i.e. failed, retried, 
> succeeded).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33213][table] Flink SQL calculate SqlMonotonicity for Calc. [flink]

2023-11-12 Thread via GitHub


snuyanzin merged PR #23516:
URL: https://github.com/apache/flink/pull/23516


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33213) Flink SQL MinMax aggregations without retract messages when `where` condition exist

2023-11-12 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin closed FLINK-33213.
---
Resolution: Fixed

> Flink SQL MinMax aggregations without retract messages when `where` condition 
> exist
> ---
>
> Key: FLINK-33213
> URL: https://issues.apache.org/jira/browse/FLINK-33213
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.7, 1.13.6, 1.14.6, 1.15.4, 1.16.2, 1.17.1
>Reporter: Sergey Paryshev
>Assignee: Sergey Paryshev
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently Flink didn't take account `where` condition when calculate 
> SqlMonotonicity for Calc/FlinkLogicalCalc/StreamPhysicalCalc.
>  
> To reproduce bug (put into 
> org.apache.flink.table.planner.runtime.stream.sql.AggregateITCase):
> {code:java}
> @Test
> def testMaxRetract(): Unit = {
>   env.setParallelism(1)
>   val data = new mutable.MutableList[(Int, Int)]
>   data.+=((1, 10))
>   data.+=((1, 10))
>   data.+=((2, 5))
>   data.+=((1, 10))
>   val t = failingDataSource(data).toTable(tEnv, 'id, 'price)
>   tEnv.createTemporaryView("T", t)
>   val sql =
> """
>   |SELECT MAX(price) FROM(
>   |   SELECT id, count(*) as c, price FROM T GROUP BY id, price)
>   |WHERE c > 0 and c < 3""".stripMargin
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
>   env.execute()
>   val expected = List("5")
>   assertEquals(expected.sorted, sink.getRetractResults.sorted)
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33213) Flink SQL MinMax aggregations without retract messages when `where` condition exist

2023-11-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785297#comment-17785297
 ] 

Sergey Nuyanzin edited comment on FLINK-33213 at 11/12/23 5:08 PM:
---

Merged as 
[29e49482a82e8c1cd404e42c3aae0944188d956e|https://github.com/apache/flink/commit/29e49482a82e8c1cd404e42c3aae0944188d956e]


was (Author: sergey nuyanzin):
Merged as 
[29e49482a82e8c1cd404e42c3aae0944188d956e|29e49482a82e8c1cd404e42c3aae0944188d956e]

> Flink SQL MinMax aggregations without retract messages when `where` condition 
> exist
> ---
>
> Key: FLINK-33213
> URL: https://issues.apache.org/jira/browse/FLINK-33213
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.7, 1.13.6, 1.14.6, 1.15.4, 1.16.2, 1.17.1
>Reporter: Sergey Paryshev
>Assignee: Sergey Paryshev
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently Flink didn't take account `where` condition when calculate 
> SqlMonotonicity for Calc/FlinkLogicalCalc/StreamPhysicalCalc.
>  
> To reproduce bug (put into 
> org.apache.flink.table.planner.runtime.stream.sql.AggregateITCase):
> {code:java}
> @Test
> def testMaxRetract(): Unit = {
>   env.setParallelism(1)
>   val data = new mutable.MutableList[(Int, Int)]
>   data.+=((1, 10))
>   data.+=((1, 10))
>   data.+=((2, 5))
>   data.+=((1, 10))
>   val t = failingDataSource(data).toTable(tEnv, 'id, 'price)
>   tEnv.createTemporaryView("T", t)
>   val sql =
> """
>   |SELECT MAX(price) FROM(
>   |   SELECT id, count(*) as c, price FROM T GROUP BY id, price)
>   |WHERE c > 0 and c < 3""".stripMargin
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
>   env.execute()
>   val expected = List("5")
>   assertEquals(expected.sorted, sink.getRetractResults.sorted)
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33213) Flink SQL MinMax aggregations without retract messages when `where` condition exist

2023-11-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785297#comment-17785297
 ] 

Sergey Nuyanzin commented on FLINK-33213:
-

Merged as 
[29e49482a82e8c1cd404e42c3aae0944188d956e|29e49482a82e8c1cd404e42c3aae0944188d956e]

> Flink SQL MinMax aggregations without retract messages when `where` condition 
> exist
> ---
>
> Key: FLINK-33213
> URL: https://issues.apache.org/jira/browse/FLINK-33213
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.7, 1.13.6, 1.14.6, 1.15.4, 1.16.2, 1.17.1
>Reporter: Sergey Paryshev
>Assignee: Sergey Paryshev
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently Flink didn't take account `where` condition when calculate 
> SqlMonotonicity for Calc/FlinkLogicalCalc/StreamPhysicalCalc.
>  
> To reproduce bug (put into 
> org.apache.flink.table.planner.runtime.stream.sql.AggregateITCase):
> {code:java}
> @Test
> def testMaxRetract(): Unit = {
>   env.setParallelism(1)
>   val data = new mutable.MutableList[(Int, Int)]
>   data.+=((1, 10))
>   data.+=((1, 10))
>   data.+=((2, 5))
>   data.+=((1, 10))
>   val t = failingDataSource(data).toTable(tEnv, 'id, 'price)
>   tEnv.createTemporaryView("T", t)
>   val sql =
> """
>   |SELECT MAX(price) FROM(
>   |   SELECT id, count(*) as c, price FROM T GROUP BY id, price)
>   |WHERE c > 0 and c < 3""".stripMargin
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
>   env.execute()
>   val expected = List("5")
>   assertEquals(expected.sorted, sink.getRetractResults.sorted)
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33331) Upgrade netty to 4.1.93+

2023-11-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785298#comment-17785298
 ] 

Sergey Nuyanzin commented on FLINK-1:
-

Merged to flink main repo as 
[c488adfbcdca30320b8721c028f9289f1563ae05|https://github.com/apache/flink/commit/c488adfbcdca30320b8721c028f9289f1563ae05]

> Upgrade netty to 4.1.93+
> 
>
> Key: FLINK-1
> URL: https://issues.apache.org/jira/browse/FLINK-1
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Sergey Nuyanzin
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: shaded-18.0
>
>
> A number of tests fails like e.g.for 
> {{org.apache.flink.table.runtime.operators.python.scalar.arrow.ArrowPythonScalarFunctionOpe}}
> {noformat}
> [ERROR] ratorTest.testFinishBundleTriggeredByTime  Time elapsed: 0.031 s  <<< 
> ERROR!
> java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer
>   at 
> org.apache.flink.table.runtime.operators.python.scalar.arrow.ArrowPythonScalarFunctionOperator.open(ArrowPythonScalarFunctionOperator.java:72)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:681)
>   at 
> org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase.testFinishBundleTriggeredByTime(PythonScalarFunctionOperatorTestBase.java:156)
>   at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> ...
> Caused by: java.lang.ExceptionInInitializerError: Exception 
> java.lang.RuntimeException: Arrow depends on DirectByteBuffer.(long, 
> int) which is not available. Please set the system property 
> 'io.netty.tryReflectionSetAccessible' to 'true'. [in thread 
> "ForkJoinPool-3-worker-1"]
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils.checkArrowUsable(ArrowUtils.java:184)
>     at 
> org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer.(ArrowSerializer.java:44)
>     at 
> org.apache.flink.table.runtime.utils.PassThroughPythonAggregateFunctionRunner.(PassThroughPythonAggregateFunctionRunner.java:96)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest$PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator.createPythonFunctionRunner(BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java:414)
>     at 
> org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:56)
>     at 
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator.open(AbstractArrowPythonAggregateFunctionOperator.java:89)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.open(AbstractBatchArrowPythonAggregateFunctionOperator.java:82)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperator.open(BatchArrowPythonGroupWindowAggregateFunctionOperator.java:119)
>     at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:681)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount(BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java:140)
>     ... 57 more
> {noformat}
> UPDATE
> The reason is that since JDK21 there was removed this constructor within 
> https://bugs.openjdk.org/browse/JDK-8303083
>  and corresponding changes in Netty are done at 
> https://github.com/netty/netty/pull/13366 which is a part of 4.1.93.Final



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33331) Upgrade netty to 4.1.93+

2023-11-12 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-1:

Fix Version/s: 1.19.0

> Upgrade netty to 4.1.93+
> 
>
> Key: FLINK-1
> URL: https://issues.apache.org/jira/browse/FLINK-1
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Sergey Nuyanzin
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: shaded-18.0, 1.19.0
>
>
> A number of tests fails like e.g.for 
> {{org.apache.flink.table.runtime.operators.python.scalar.arrow.ArrowPythonScalarFunctionOpe}}
> {noformat}
> [ERROR] ratorTest.testFinishBundleTriggeredByTime  Time elapsed: 0.031 s  <<< 
> ERROR!
> java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer
>   at 
> org.apache.flink.table.runtime.operators.python.scalar.arrow.ArrowPythonScalarFunctionOperator.open(ArrowPythonScalarFunctionOperator.java:72)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:681)
>   at 
> org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase.testFinishBundleTriggeredByTime(PythonScalarFunctionOperatorTestBase.java:156)
>   at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> ...
> Caused by: java.lang.ExceptionInInitializerError: Exception 
> java.lang.RuntimeException: Arrow depends on DirectByteBuffer.(long, 
> int) which is not available. Please set the system property 
> 'io.netty.tryReflectionSetAccessible' to 'true'. [in thread 
> "ForkJoinPool-3-worker-1"]
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils.checkArrowUsable(ArrowUtils.java:184)
>     at 
> org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer.(ArrowSerializer.java:44)
>     at 
> org.apache.flink.table.runtime.utils.PassThroughPythonAggregateFunctionRunner.(PassThroughPythonAggregateFunctionRunner.java:96)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest$PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator.createPythonFunctionRunner(BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java:414)
>     at 
> org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:56)
>     at 
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator.open(AbstractArrowPythonAggregateFunctionOperator.java:89)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.open(AbstractBatchArrowPythonAggregateFunctionOperator.java:82)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperator.open(BatchArrowPythonGroupWindowAggregateFunctionOperator.java:119)
>     at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:681)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount(BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java:140)
>     ... 57 more
> {noformat}
> UPDATE
> The reason is that since JDK21 there was removed this constructor within 
> https://bugs.openjdk.org/browse/JDK-8303083
>  and corresponding changes in Netty are done at 
> https://github.com/netty/netty/pull/13366 which is a part of 4.1.93.Final



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33331) Upgrade netty to 4.1.93+

2023-11-12 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-1.
-
Resolution: Fixed

> Upgrade netty to 4.1.93+
> 
>
> Key: FLINK-1
> URL: https://issues.apache.org/jira/browse/FLINK-1
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Sergey Nuyanzin
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: shaded-18.0, 1.19.0
>
>
> A number of tests fails like e.g.for 
> {{org.apache.flink.table.runtime.operators.python.scalar.arrow.ArrowPythonScalarFunctionOpe}}
> {noformat}
> [ERROR] ratorTest.testFinishBundleTriggeredByTime  Time elapsed: 0.031 s  <<< 
> ERROR!
> java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer
>   at 
> org.apache.flink.table.runtime.operators.python.scalar.arrow.ArrowPythonScalarFunctionOperator.open(ArrowPythonScalarFunctionOperator.java:72)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:681)
>   at 
> org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase.testFinishBundleTriggeredByTime(PythonScalarFunctionOperatorTestBase.java:156)
>   at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> ...
> Caused by: java.lang.ExceptionInInitializerError: Exception 
> java.lang.RuntimeException: Arrow depends on DirectByteBuffer.(long, 
> int) which is not available. Please set the system property 
> 'io.netty.tryReflectionSetAccessible' to 'true'. [in thread 
> "ForkJoinPool-3-worker-1"]
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils.checkArrowUsable(ArrowUtils.java:184)
>     at 
> org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer.(ArrowSerializer.java:44)
>     at 
> org.apache.flink.table.runtime.utils.PassThroughPythonAggregateFunctionRunner.(PassThroughPythonAggregateFunctionRunner.java:96)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest$PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator.createPythonFunctionRunner(BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java:414)
>     at 
> org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:56)
>     at 
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator.open(AbstractArrowPythonAggregateFunctionOperator.java:89)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.open(AbstractBatchArrowPythonAggregateFunctionOperator.java:82)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperator.open(BatchArrowPythonGroupWindowAggregateFunctionOperator.java:119)
>     at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:681)
>     at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount(BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java:140)
>     ... 57 more
> {noformat}
> UPDATE
> The reason is that since JDK21 there was removed this constructor within 
> https://bugs.openjdk.org/browse/JDK-8303083
>  and corresponding changes in Netty are done at 
> https://github.com/netty/netty/pull/13366 which is a part of 4.1.93.Final



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33526] Autoscaler config improvement + cleanup [flink-kubernetes-operator]

2023-11-12 Thread via GitHub


gyfora commented on code in PR #708:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/708#discussion_r1390449190


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java:
##
@@ -190,7 +189,8 @@ void registerSessionJobController() {
 var metricManager =
 MetricManager.createFlinkSessionJobMetricManager(baseConfig, 
metricGroup);
 var statusRecorder = StatusRecorder.create(client, metricManager, 
listeners);
-var reconciler = new SessionJobReconciler(eventRecorder, 
statusRecorder);
+var autoscaler = AutoscalerFactory.create(client, eventRecorder);
+var reconciler = new SessionJobReconciler(eventRecorder, 
statusRecorder, autoscaler);

Review Comment:
   Yes, seemed like an unnecessary limitation. We can also include this during 
our testing .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33529) PyFlink fails with "No module named 'cloudpickle"

2023-11-12 Thread Prabhu Joseph (Jira)
Prabhu Joseph created FLINK-33529:
-

 Summary: PyFlink fails with "No module named 'cloudpickle"
 Key: FLINK-33529
 URL: https://issues.apache.org/jira/browse/FLINK-33529
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.18.0
 Environment: Python 3.7.16 or Python 3.9
YARN
Reporter: Prabhu Joseph


PyFlink fails with "No module named 'cloudpickle" on Flink 1.18. The same 
program works fine on Flink 1.17. This is after the change 
(https://issues.apache.org/jira/browse/FLINK-32034).

*Repro:*

{code}
[hadoop@ip-1-2-3-4 ~]$ python --version
Python 3.7.16

[hadoop@ip-1-2-3-4 ~]$ rpm -qa | grep flink
flink-1.18.0-1.amzn2.x86_64

[hadoop@ip-1-2-3-4 ~]$ flink-yarn-session -d

[hadoop@ip-1-2-3-4 ~]$ flink run -py /tmp/batch_wc.py --output 
s3://prabhuflinks3/OUT2/
{code}


*Error*

{code}
ModuleNotFoundError: No module named 'cloudpickle'

at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:656)
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:281)
at 
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
at 
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
at 
org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.open(PythonTableFunctionOperator.java:114)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
{code}

*Analysis*

1. On Flink 1.17 and Python-3.7.16, 
PythonEnvironmentManagerUtils#getSitePackagesPath used to return following two 
paths

{code}
[root@ip-172-31-45-97 tmp]# python flink1.17-get_site_packages.py /tmp
/tmp/lib/python3.7/site-packages
/tmp/lib64/python3.7/site-packages
{code}

whereas Flink 1.18 (FLINK-32034) has changed the 
PythonEnvironmentManagerUtils#getSitePackagesPath and only one path is returned

{code}
[root@ip-172-31-45-97 tmp]# python flink1.18-get_site_packages.py /tmp
/tmp/lib64/python3.7/site-packages
[root@ip-172-31-45-97 tmp]#
{code}

The pyflink dependencies are installed in "/tmp/lib/python3.7/site-packages" 
which is not returned by the getSitePackagesPath in Flink1.18 causing the 
pyflink job failure.

Attached batch_wc.py, flink1.17-get_site_packages.py and 
flink1.18-get_site_packages.py.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33529) PyFlink fails with "No module named 'cloudpickle"

2023-11-12 Thread Prabhu Joseph (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prabhu Joseph updated FLINK-33529:
--
Description: 
PyFlink fails with "No module named 'cloudpickle" on Flink 1.18. The same 
program works fine on Flink 1.17. This is after the change 
(https://issues.apache.org/jira/browse/FLINK-32034).

*Repro:*

{code}
[hadoop@ip-1-2-3-4 ~]$ python --version
Python 3.7.16

[hadoop@ip-1-2-3-4 ~]$ rpm -qa | grep flink
flink-1.18.0-1.amzn2.x86_64

[hadoop@ip-1-2-3-4 ~]$ flink-yarn-session -d

[hadoop@ip-1-2-3-4 ~]$ flink run -py /tmp/batch_wc.py --output 
s3://prabhuflinks3/OUT2/
{code}


*Error*

{code}
ModuleNotFoundError: No module named 'cloudpickle'

at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:656)
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:281)
at 
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
at 
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
at 
org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.open(PythonTableFunctionOperator.java:114)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
{code}

*Analysis*

1. On Flink 1.17 and Python-3.7.16, 
PythonEnvironmentManagerUtils#getSitePackagesPath used to return following two 
paths

{code}
[root@ip-172-31-45-97 tmp]# python flink1.17-get_site_packages.py /tmp
/tmp/lib/python3.7/site-packages
/tmp/lib64/python3.7/site-packages
{code}

whereas Flink 1.18 (FLINK-32034) has changed the 
PythonEnvironmentManagerUtils#getSitePackagesPath and only one path is returned

{code}
[root@ip-172-31-45-97 tmp]# python flink1.18-get_site_packages.py /tmp
/tmp/lib64/python3.7/site-packages
[root@ip-172-31-45-97 tmp]#
{code}

The pyflink dependencies are installed in "/tmp/lib/python3.7/site-packages" 
which is not returned by the getSitePackagesPath in Flink1.18 causing the 
pyflink job failure.

*Attached batch_wc.py, flink1.17-get_site_packages.py and 
flink1.18-get_site_packages.py.*


  was:
PyFlink fails with "No module named 'cloudpickle" on Flink 1.18. The same 
program works fine on Flink 1.17. This is after the change 
(https://issues.apache.org/jira/browse/FLINK-32034).

*Repro:*

{code}
[hadoop@ip-1-2-3-4 ~]$ python --version
Python 3.7.16

[hadoop@ip-1-2-3-4 ~]$ rpm -qa | grep flink
flink-1.18.0-1.amzn2.x86_64

[hadoop@ip-1-2-3-4 ~]$ flink-yarn-session -d

[hadoop@ip-1-2-3-4 ~]$ flink run -py /tmp/batch_wc.py --output 
s3://prabhuflinks3/OUT2/
{code}


*Error*

{code}
ModuleNotFoundError: No module named 'cloudpickle'

at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:656)
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:281)
at 
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
at 
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
at 
org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.open(PythonTableFunctionOperator.java:114)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
 

[jira] [Updated] (FLINK-33529) PyFlink fails with "No module named 'cloudpickle"

2023-11-12 Thread Prabhu Joseph (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prabhu Joseph updated FLINK-33529:
--
Attachment: flink1.17-get_site_packages.py
flink1.18-get_site_packages.py

> PyFlink fails with "No module named 'cloudpickle"
> -
>
> Key: FLINK-33529
> URL: https://issues.apache.org/jira/browse/FLINK-33529
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.18.0
> Environment: Python 3.7.16 or Python 3.9
> YARN
>Reporter: Prabhu Joseph
>Priority: Major
> Attachments: batch_wc.py, flink1.17-get_site_packages.py, 
> flink1.18-get_site_packages.py
>
>
> PyFlink fails with "No module named 'cloudpickle" on Flink 1.18. The same 
> program works fine on Flink 1.17. This is after the change 
> (https://issues.apache.org/jira/browse/FLINK-32034).
> *Repro:*
> {code}
> [hadoop@ip-1-2-3-4 ~]$ python --version
> Python 3.7.16
> [hadoop@ip-1-2-3-4 ~]$ rpm -qa | grep flink
> flink-1.18.0-1.amzn2.x86_64
> [hadoop@ip-1-2-3-4 ~]$ flink-yarn-session -d
> [hadoop@ip-1-2-3-4 ~]$ flink run -py /tmp/batch_wc.py --output 
> s3://prabhuflinks3/OUT2/
> {code}
> *Error*
> {code}
> ModuleNotFoundError: No module named 'cloudpickle'
>   at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:656)
>   at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:281)
>   at 
> org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
>   at 
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
>   at 
> org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.open(PythonTableFunctionOperator.java:114)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> {code}
> *Analysis*
> 1. On Flink 1.17 and Python-3.7.16, 
> PythonEnvironmentManagerUtils#getSitePackagesPath used to return following 
> two paths
> {code}
> [root@ip-172-31-45-97 tmp]# python flink1.17-get_site_packages.py /tmp
> /tmp/lib/python3.7/site-packages
> /tmp/lib64/python3.7/site-packages
> {code}
> whereas Flink 1.18 (FLINK-32034) has changed the 
> PythonEnvironmentManagerUtils#getSitePackagesPath and only one path is 
> returned
> {code}
> [root@ip-172-31-45-97 tmp]# python flink1.18-get_site_packages.py /tmp
> /tmp/lib64/python3.7/site-packages
> [root@ip-172-31-45-97 tmp]#
> {code}
> The pyflink dependencies are installed in "/tmp/lib/python3.7/site-packages" 
> which is not returned by the getSitePackagesPath in Flink1.18 causing the 
> pyflink job failure.
> *Attached batch_wc.py, flink1.17-get_site_packages.py and 
> flink1.18-get_site_packages.py.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33529) PyFlink fails with "No module named 'cloudpickle"

2023-11-12 Thread Prabhu Joseph (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prabhu Joseph updated FLINK-33529:
--
Attachment: batch_wc.py

> PyFlink fails with "No module named 'cloudpickle"
> -
>
> Key: FLINK-33529
> URL: https://issues.apache.org/jira/browse/FLINK-33529
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.18.0
> Environment: Python 3.7.16 or Python 3.9
> YARN
>Reporter: Prabhu Joseph
>Priority: Major
> Attachments: batch_wc.py, flink1.17-get_site_packages.py, 
> flink1.18-get_site_packages.py
>
>
> PyFlink fails with "No module named 'cloudpickle" on Flink 1.18. The same 
> program works fine on Flink 1.17. This is after the change 
> (https://issues.apache.org/jira/browse/FLINK-32034).
> *Repro:*
> {code}
> [hadoop@ip-1-2-3-4 ~]$ python --version
> Python 3.7.16
> [hadoop@ip-1-2-3-4 ~]$ rpm -qa | grep flink
> flink-1.18.0-1.amzn2.x86_64
> [hadoop@ip-1-2-3-4 ~]$ flink-yarn-session -d
> [hadoop@ip-1-2-3-4 ~]$ flink run -py /tmp/batch_wc.py --output 
> s3://prabhuflinks3/OUT2/
> {code}
> *Error*
> {code}
> ModuleNotFoundError: No module named 'cloudpickle'
>   at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:656)
>   at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:281)
>   at 
> org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
>   at 
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
>   at 
> org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.open(PythonTableFunctionOperator.java:114)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> {code}
> *Analysis*
> 1. On Flink 1.17 and Python-3.7.16, 
> PythonEnvironmentManagerUtils#getSitePackagesPath used to return following 
> two paths
> {code}
> [root@ip-172-31-45-97 tmp]# python flink1.17-get_site_packages.py /tmp
> /tmp/lib/python3.7/site-packages
> /tmp/lib64/python3.7/site-packages
> {code}
> whereas Flink 1.18 (FLINK-32034) has changed the 
> PythonEnvironmentManagerUtils#getSitePackagesPath and only one path is 
> returned
> {code}
> [root@ip-172-31-45-97 tmp]# python flink1.18-get_site_packages.py /tmp
> /tmp/lib64/python3.7/site-packages
> [root@ip-172-31-45-97 tmp]#
> {code}
> The pyflink dependencies are installed in "/tmp/lib/python3.7/site-packages" 
> which is not returned by the getSitePackagesPath in Flink1.18 causing the 
> pyflink job failure.
> *Attached batch_wc.py, flink1.17-get_site_packages.py and 
> flink1.18-get_site_packages.py.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33527][autoscaler] Clear all physical states after autoscaler is disabled [flink-kubernetes-operator]

2023-11-12 Thread via GitHub


gyfora commented on code in PR #707:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/707#discussion_r1390470139


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -115,10 +123,39 @@ public void cleanup(KEY jobKey) {
 stateStore.removeInfoFromCache(jobKey);
 }
 
-private void clearParallelismOverrides(Context ctx) throws Exception {
-var parallelismOverrides = stateStore.getParallelismOverrides(ctx);
-if (parallelismOverrides.isPresent()) {
+private void clearStatesAfterAutoscalerDisabled(Context ctx) throws 
Exception {
+var needFlush = false;
+var parallelismOverridesOpt = stateStore.getParallelismOverrides(ctx);
+if (parallelismOverridesOpt.isPresent()) {
+needFlush = true;
 stateStore.removeParallelismOverrides(ctx);
+}
+
+Optional> collectedMetricsOpt =
+stateStore.getCollectedMetrics(ctx);
+if (collectedMetricsOpt.isPresent()) {
+needFlush = true;
+stateStore.removeCollectedMetrics(ctx);
+}
+
+Optional>> 
scalingHistoryOpt =
+stateStore.getScalingHistory(ctx);
+if (scalingHistoryOpt.isPresent()) {
+Map> 
scalingHistory =
+trimScalingHistory(
+clock.instant(), ctx.getConfiguration(), 
scalingHistoryOpt.get());
+if (scalingHistory.isEmpty()) {
+// All scaling histories are trimmed.
+needFlush = true;
+stateStore.removeScalingHistory(ctx);
+} else if (!scalingHistoryOpt.get().equals(scalingHistory)) {
+// Some scaling histories are trimmed.
+needFlush = true;
+stateStore.storeScalingHistory(ctx, scalingHistory);
+}
+}

Review Comment:
   This looks strange. Why not simply call ` 
stateStore.removeScalingHistory(ctx);` all the time? Then we wouldn't need to 
touch the trimming logic. 



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -115,10 +123,39 @@ public void cleanup(KEY jobKey) {
 stateStore.removeInfoFromCache(jobKey);
 }
 
-private void clearParallelismOverrides(Context ctx) throws Exception {
-var parallelismOverrides = stateStore.getParallelismOverrides(ctx);
-if (parallelismOverrides.isPresent()) {
+private void clearStatesAfterAutoscalerDisabled(Context ctx) throws 
Exception {
+var needFlush = false;
+var parallelismOverridesOpt = stateStore.getParallelismOverrides(ctx);
+if (parallelismOverridesOpt.isPresent()) {
+needFlush = true;
 stateStore.removeParallelismOverrides(ctx);
+}
+
+Optional> collectedMetricsOpt =
+stateStore.getCollectedMetrics(ctx);
+if (collectedMetricsOpt.isPresent()) {
+needFlush = true;
+stateStore.removeCollectedMetrics(ctx);
+}
+
+Optional>> 
scalingHistoryOpt =

Review Comment:
   lets be consistent and use:
   ```
   var scalingHistoryOpt = ...
   ...
   var scalingHistory = ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785321#comment-17785321
 ] 

Sergey Nuyanzin commented on FLINK-33365:
-

looks like the issue is incomplete implementation of filter push down 
FLINK-16024

TC to reproduce
add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}}
{code:java}
@Test
public void MyTest() {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.of(1L, "Alice"),
Row.of(1L, "Alice"),
Row.of(2L, "Bob"),
Row.of(3L, "Charlie")));
util.tableEnv().executeSql(
String.format(
"CREATE TABLE value_source (\n"
+ "`id` BIGINT,\n"
+ "`name` STRING,\n"
+ "`proctime` AS PROCTIME()\n"
+ ") WITH (\n"
+ "'connector' = 'values', \n"
+ "'data-id' = '%s')",
dataId));
util.verifyExecPlan(
"SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM 
value_source"
+ " AS S JOIN jdbc for system_time as of S.proctime AS 
D ON S.id = D.id and D.decimal_col = 0.0");
}
{code}
and check before and after commit https://github.com/apache/flink/pull/20140 it 
shows different results.


At the same time there is a bunch of WA like add something meaningless or cast 
or some other function use, 
e.g. slightly changed query  started behave as expected since math operations, 
cast and other functions are not going to push down (yet)
however in Flink 1.18 math operations could be simplified by newer Calcite, so 
need to check case by case
like 
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = 0.0 + 0
{code}

or with casting like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = cast(0.0 as decimal)
{code}
[~macdoor615] that might help you with your queries

[~qingwei91] could you have a look here please since you are  aware of current 
implementation


> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS procti

[jira] [Comment Edited] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785321#comment-17785321
 ] 

Sergey Nuyanzin edited comment on FLINK-33365 at 11/12/23 9:36 PM:
---

looks like the issue is incomplete implementation of filter push down 
FLINK-16024

TC to reproduce
add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}}
{code:java}
@Test
public void issue33365() {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.of(1L, "Alice"),
Row.of(1L, "Alice"),
Row.of(2L, "Bob"),
Row.of(3L, "Charlie")));
util.tableEnv().executeSql(
String.format(
"CREATE TABLE value_source (\n"
+ "`id` BIGINT,\n"
+ "`name` STRING,\n"
+ "`proctime` AS PROCTIME()\n"
+ ") WITH (\n"
+ "'connector' = 'values', \n"
+ "'data-id' = '%s')",
dataId));
util.verifyExecPlan(
"SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM 
value_source"
+ " AS S JOIN jdbc for system_time as of S.proctime AS 
D ON S.id = D.id and D.decimal_col = 0.0");
}
{code}
and check before and after commit https://github.com/apache/flink/pull/20140 it 
shows different results.


At the same time there is a bunch of WA like add something meaningless or cast 
or some other function use, 
e.g. slightly changed query  started behave as expected since math operations, 
cast and other functions are not going to push down (yet)
however in Flink 1.18 math operations could be simplified by newer Calcite, so 
need to check case by case
like 
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = 0.0 + 0
{code}

or with casting like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = cast(0.0 as decimal)
{code}
[~macdoor615] that might help you with your queries

[~qingwei91] could you have a look here please since you are  aware of current 
implementation



was (Author: sergey nuyanzin):
looks like the issue is incomplete implementation of filter push down 
FLINK-16024

TC to reproduce
add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}}
{code:java}
@Test
public void MyTest() {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.of(1L, "Alice"),
Row.of(1L, "Alice"),
Row.of(2L, "Bob"),
Row.of(3L, "Charlie")));
util.tableEnv().executeSql(
String.format(
"CREATE TABLE value_source (\n"
+ "`id` BIGINT,\n"
+ "`name` STRING,\n"
+ "`proctime` AS PROCTIME()\n"
+ ") WITH (\n"
+ "'connector' = 'values', \n"
+ "'data-id' = '%s')",
dataId));
util.verifyExecPlan(
"SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM 
value_source"
+ " AS S JOIN jdbc for system_time as of S.proctime AS 
D ON S.id = D.id and D.decimal_col = 0.0");
}
{code}
and check before and after commit https://github.com/apache/flink/pull/20140 it 
shows different results.


At the same time there is a bunch of WA like add something meaningless or cast 
or some other function use, 
e.g. slightly changed query  started behave as expected since math operations, 
cast and other functions are not going to push down (yet)
however in Flink 1.18 math operations could be simplified by newer Calcite, so 
need to check case by case
like 
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = 0.0 + 0
{code}

or with casting like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = cast(0.0 as decimal)
{code}
[~macdoor615] that might hel

[jira] [Comment Edited] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785321#comment-17785321
 ] 

Sergey Nuyanzin edited comment on FLINK-33365 at 11/12/23 9:49 PM:
---

looks like the issue is incomplete (at least for the case with {noformat}for 
system_time as of {noformat}) implementation of filter push down FLINK-16024

TC to reproduce
add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}}
{code:java}
@Test
public void issue33365() {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.of(1L, "Alice"),
Row.of(1L, "Alice"),
Row.of(2L, "Bob"),
Row.of(3L, "Charlie")));
util.tableEnv().executeSql(
String.format(
"CREATE TABLE value_source (\n"
+ "`id` BIGINT,\n"
+ "`name` STRING,\n"
+ "`proctime` AS PROCTIME()\n"
+ ") WITH (\n"
+ "'connector' = 'values', \n"
+ "'data-id' = '%s')",
dataId));
util.verifyExecPlan(
"SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM 
value_source"
+ " AS S JOIN jdbc for system_time as of S.proctime AS 
D ON S.id = D.id and D.decimal_col = 0.0");
}
{code}
and check before and after commit [https://github.com/apache/flink/pull/20140] 
it shows different results.

At the same time there is a bunch of WA like add something meaningless or cast 
or some other function use, 
e.g. slightly changed query started behave as expected since math operations, 
cast and other functions are not going to push down (yet)
however in Flink 1.18 math operations could be simplified by newer Calcite, so 
need to check case by case
like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = 0.0 + 0
{code}
or with casting like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = cast(0.0 as decimal)
{code}
[~macdoor615] that might help you with your queries

[~qingwei91] could you have a look here please since you are aware of current 
implementation


was (Author: sergey nuyanzin):
looks like the issue is incomplete implementation of filter push down 
FLINK-16024

TC to reproduce
add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}}
{code:java}
@Test
public void issue33365() {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.of(1L, "Alice"),
Row.of(1L, "Alice"),
Row.of(2L, "Bob"),
Row.of(3L, "Charlie")));
util.tableEnv().executeSql(
String.format(
"CREATE TABLE value_source (\n"
+ "`id` BIGINT,\n"
+ "`name` STRING,\n"
+ "`proctime` AS PROCTIME()\n"
+ ") WITH (\n"
+ "'connector' = 'values', \n"
+ "'data-id' = '%s')",
dataId));
util.verifyExecPlan(
"SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM 
value_source"
+ " AS S JOIN jdbc for system_time as of S.proctime AS 
D ON S.id = D.id and D.decimal_col = 0.0");
}
{code}
and check before and after commit https://github.com/apache/flink/pull/20140 it 
shows different results.


At the same time there is a bunch of WA like add something meaningless or cast 
or some other function use, 
e.g. slightly changed query  started behave as expected since math operations, 
cast and other functions are not going to push down (yet)
however in Flink 1.18 math operations could be simplified by newer Calcite, so 
need to check case by case
like 
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = 0.0 + 0
{code}

or with casting like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and 

[jira] [Comment Edited] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785321#comment-17785321
 ] 

Sergey Nuyanzin edited comment on FLINK-33365 at 11/12/23 9:57 PM:
---

looks like the issue is incomplete (at least for the case with
{noformat}
for system_time as of {noformat}
) implementation of filter push down FLINK-16024

TC to reproduce
add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}}
{code:java}
@Test
public void issue33365() {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.of(1L, "Alice"),
Row.of(1L, "Alice"),
Row.of(2L, "Bob"),
Row.of(3L, "Charlie")));
util.tableEnv().executeSql(
String.format(
"CREATE TABLE value_source (\n"
+ "`id` BIGINT,\n"
+ "`name` STRING,\n"
+ "`proctime` AS PROCTIME()\n"
+ ") WITH (\n"
+ "'connector' = 'values', \n"
+ "'data-id' = '%s')",
dataId));
util.verifyExecPlan(
"SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM 
value_source"
+ " AS S JOIN jdbc for system_time as of S.proctime AS 
D ON S.id = D.id and D.decimal_col = 0.0");
}
{code}
and check before and after commit [https://github.com/apache/flink/pull/20140] 
it shows different results.

At the same time there is a bunch of WA like add something meaningless or cast 
or some other function use, 
e.g. slightly changed query started behave as expected since math operations, 
cast and other functions are not going to push down (yet)
however in Flink 1.18 math operations could be simplified by newer Calcite 
(CALCITE-4420), so need to check case by case
like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = 0.0 + 0
{code}
or with casting like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = cast(0.0 as decimal)
{code}
[~macdoor615] that might help you with your queries

[~qingwei91] could you have a look here please since you are aware of current 
implementation


was (Author: sergey nuyanzin):
looks like the issue is incomplete (at least for the case with {noformat}for 
system_time as of {noformat}) implementation of filter push down FLINK-16024

TC to reproduce
add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}}
{code:java}
@Test
public void issue33365() {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.of(1L, "Alice"),
Row.of(1L, "Alice"),
Row.of(2L, "Bob"),
Row.of(3L, "Charlie")));
util.tableEnv().executeSql(
String.format(
"CREATE TABLE value_source (\n"
+ "`id` BIGINT,\n"
+ "`name` STRING,\n"
+ "`proctime` AS PROCTIME()\n"
+ ") WITH (\n"
+ "'connector' = 'values', \n"
+ "'data-id' = '%s')",
dataId));
util.verifyExecPlan(
"SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM 
value_source"
+ " AS S JOIN jdbc for system_time as of S.proctime AS 
D ON S.id = D.id and D.decimal_col = 0.0");
}
{code}
and check before and after commit [https://github.com/apache/flink/pull/20140] 
it shows different results.

At the same time there is a bunch of WA like add something meaningless or cast 
or some other function use, 
e.g. slightly changed query started behave as expected since math operations, 
cast and other functions are not going to push down (yet)
however in Flink 1.18 math operations could be simplified by newer Calcite, so 
need to check case by case
like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = 0.0 + 0
{code}
or with casting like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source

[jira] [Comment Edited] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785321#comment-17785321
 ] 

Sergey Nuyanzin edited comment on FLINK-33365 at 11/12/23 10:18 PM:


looks like the issue is incomplete (at least for the case with
{noformat}
for system_time as of {noformat}
) implementation of filter push down FLINK-16024

TC to reproduce
add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}}
{code:java}
@Test
public void issue33365() {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.of(1L, "Alice"),
Row.of(1L, "Alice"),
Row.of(2L, "Bob"),
Row.of(3L, "Charlie")));
util.tableEnv().executeSql(
String.format(
"CREATE TABLE value_source (\n"
+ "`id` BIGINT,\n"
+ "`name` STRING,\n"
+ "`proctime` AS PROCTIME()\n"
+ ") WITH (\n"
+ "'connector' = 'values', \n"
+ "'data-id' = '%s')",
dataId));
util.verifyExecPlan(
"SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM 
value_source"
+ " AS S JOIN jdbc for system_time as of S.proctime AS 
D ON S.id = D.id and D.decimal_col = 0.0");
}
{code}
and check before and after commit [https://github.com/apache/flink/pull/20140] 
or same commit in jdbc connector 
(https://github.com/apache/flink-connector-jdbc/commit/3e3b40e8cfadfc16a8ab74d4ef6a3ab3ceafa57b)
 it shows different results.

At the same time there is a bunch of WA like add something meaningless or cast 
or some other function use, 
e.g. slightly changed query started behave as expected since math operations, 
cast and other functions are not going to push down (yet)
however in Flink 1.18 math operations could be simplified by newer Calcite 
(CALCITE-4420), so need to check case by case
like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = 0.0 + 0
{code}
or with casting like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = cast(0.0 as decimal)
{code}
[~macdoor615] that might help you with your queries

[~qingwei91] could you have a look here please since you are aware of current 
implementation


was (Author: sergey nuyanzin):
looks like the issue is incomplete (at least for the case with
{noformat}
for system_time as of {noformat}
) implementation of filter push down FLINK-16024

TC to reproduce
add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}}
{code:java}
@Test
public void issue33365() {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.of(1L, "Alice"),
Row.of(1L, "Alice"),
Row.of(2L, "Bob"),
Row.of(3L, "Charlie")));
util.tableEnv().executeSql(
String.format(
"CREATE TABLE value_source (\n"
+ "`id` BIGINT,\n"
+ "`name` STRING,\n"
+ "`proctime` AS PROCTIME()\n"
+ ") WITH (\n"
+ "'connector' = 'values', \n"
+ "'data-id' = '%s')",
dataId));
util.verifyExecPlan(
"SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM 
value_source"
+ " AS S JOIN jdbc for system_time as of S.proctime AS 
D ON S.id = D.id and D.decimal_col = 0.0");
}
{code}
and check before and after commit [https://github.com/apache/flink/pull/20140] 
it shows different results.

At the same time there is a bunch of WA like add something meaningless or cast 
or some other function use, 
e.g. slightly changed query started behave as expected since math operations, 
cast and other functions are not going to push down (yet)
however in Flink 1.18 math operations could be simplified by newer Calcite 
(CALCITE-4420), so need to check case by case
like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D

[jira] [Comment Edited] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785321#comment-17785321
 ] 

Sergey Nuyanzin edited comment on FLINK-33365 at 11/12/23 10:20 PM:


looks like the issue is incomplete (at least for the case with
{noformat}
for system_time as of {noformat}
) implementation of filter push down FLINK-16024

TC to reproduce
add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}}
{code:java}
@Test
public void issue33365() {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.of(1L, "Alice"),
Row.of(1L, "Alice"),
Row.of(2L, "Bob"),
Row.of(3L, "Charlie")));
util.tableEnv().executeSql(
String.format(
"CREATE TABLE value_source (\n"
+ "`id` BIGINT,\n"
+ "`name` STRING,\n"
+ "`proctime` AS PROCTIME()\n"
+ ") WITH (\n"
+ "'connector' = 'values', \n"
+ "'data-id' = '%s')",
dataId));
util.verifyExecPlan(
"SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM 
value_source"
+ " AS S JOIN jdbc for system_time as of S.proctime AS 
D ON S.id = D.id and D.decimal_col = 0.0");
}
{code}
and check before and after commit [https://github.com/apache/flink/pull/20140] 
or same commit in jdbc connector 
(https://github.com/apache/flink-connector-jdbc/commit/3e3b40e8cfadfc16a8ab74d4ef6a3ab3ceafa57b)
 it shows different results.

At the same time there is a bunch of WA like add something meaningless or cast 
or some other function use, 
e.g. slightly changed query started behave as expected since math operations, 
cast and other functions are not going to push down (yet)
however in Flink 1.18 math operations could be simplified by newer Calcite 
(CALCITE-4420), so need to check case by case
like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col 
  FROM value_source AS S 
  JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = 0.0 + 0
{code}
or with casting like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col 
  FROM value_source AS S 
  JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = cast(0.0 as decimal)
{code}
[~macdoor615] that might help you with your queries

[~qingwei91] could you have a look here please since you are aware of current 
implementation


was (Author: sergey nuyanzin):
looks like the issue is incomplete (at least for the case with
{noformat}
for system_time as of {noformat}
) implementation of filter push down FLINK-16024

TC to reproduce
add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}}
{code:java}
@Test
public void issue33365() {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.of(1L, "Alice"),
Row.of(1L, "Alice"),
Row.of(2L, "Bob"),
Row.of(3L, "Charlie")));
util.tableEnv().executeSql(
String.format(
"CREATE TABLE value_source (\n"
+ "`id` BIGINT,\n"
+ "`name` STRING,\n"
+ "`proctime` AS PROCTIME()\n"
+ ") WITH (\n"
+ "'connector' = 'values', \n"
+ "'data-id' = '%s')",
dataId));
util.verifyExecPlan(
"SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM 
value_source"
+ " AS S JOIN jdbc for system_time as of S.proctime AS 
D ON S.id = D.id and D.decimal_col = 0.0");
}
{code}
and check before and after commit [https://github.com/apache/flink/pull/20140] 
or same commit in jdbc connector 
(https://github.com/apache/flink-connector-jdbc/commit/3e3b40e8cfadfc16a8ab74d4ef6a3ab3ceafa57b)
 it shows different results.

At the same time there is a bunch of WA like add something meaningless or cast 
or some other function use, 
e.g. slightly changed query started behave as expected since math operations, 
cast and other functions are not going to push down (yet)
however in Flink 1.18 math operations could be simplified by newer Calcite 
(CALCITE-4420), so need to check case by case
like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_sour

Re: [PR] [FLINK-33361][connectors/kafka] Add Java 17 compatibility to Flink Kafka connector [flink-connector-kafka]

2023-11-12 Thread via GitHub


snuyanzin commented on PR #68:
URL: 
https://github.com/apache/flink-connector-kafka/pull/68#issuecomment-1807265659

   @MartijnVisser do you happen to know what should be done to make title 
validation passing?
   it seems it satisfies the regexp now, however it didn't with the first 
commit of the PR, then I force pushed it... Might that be a reason?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33530) Add entropy to Google Cloud Storage path for better scalability

2023-11-12 Thread Talat Uyarer (Jira)
Talat Uyarer created FLINK-33530:


 Summary: Add entropy to Google Cloud Storage path for better 
scalability
 Key: FLINK-33530
 URL: https://issues.apache.org/jira/browse/FLINK-33530
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.17.1, 1.15.4, 1.19.0
Reporter: Talat Uyarer


We are using GCS. To better scalability we need entropy support like as S3 FS 
Plugin.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33069]Mysql and Postgres catalog support url extra parameters [flink-connector-jdbc]

2023-11-12 Thread via GitHub


waywtdcc commented on PR #74:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/74#issuecomment-1807313296

   > > Because sometimes you need to use ?characterEncoding=utf8 similar url 
parameters to connect to jdbc
   > 
   > But why not just pass these in to the base URL?
   
   The real url is a combination of base-url+default-database, so I cannot form 
the correct url when I add it to base-url.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33069]Mysql and Postgres catalog support url extra parameters [flink-connector-jdbc]

2023-11-12 Thread via GitHub


waywtdcc commented on PR #74:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/74#issuecomment-1807313452

   > > Because sometimes you need to use ?characterEncoding=utf8 similar url 
parameters to connect to jdbc
   > 
   > But why not just pass these in to the base URL?
   
   The real url is a combination of base-url+default-database, so I cannot form 
the correct url when I add it to base-url. For example, if the base-url is set 
to jdbc:mysql://xxx:53309?characterEncoding=utf8, then the final real url is 
jdbc:mysql://xxx:53309?characterEncoding=utf8/test; this is the wrong url.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33069]Mysql and Postgres catalog support url extra parameters [flink-connector-jdbc]

2023-11-12 Thread via GitHub


waywtdcc closed pull request #74: [FLINK-33069]Mysql and Postgres catalog 
support url extra parameters
URL: https://github.com/apache/flink-connector-jdbc/pull/74


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33069]Mysql and Postgres catalog support url extra parameters [flink-connector-jdbc]

2023-11-12 Thread via GitHub


waywtdcc opened a new pull request, #74:
URL: https://github.com/apache/flink-connector-jdbc/pull/74

   Mysql and Postgres catalog support url extra parameters
   
   CREATE CATALOG mymysql WITH(
   'type' = 'jdbc',
   'default-database' = 'test',
   'username' = 'root',
   'password' = 'xxx',
   'base-url' = 'jdbc:mysql://xxx:53309',
   'extra-url-param' = '?characterEncoding=utf8'
   );
   
   If used in this way, the URLs of all tables obtained from this catalog are: 
jdbc:mysql://xxx:53309?characterEncoding=utf8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33531) Nightly Python fails with NPE at metadataHandlerProvider on AZP

2023-11-12 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-33531:
---

 Summary: Nightly Python fails with NPE at metadataHandlerProvider 
on AZP
 Key: FLINK-33531
 URL: https://issues.apache.org/jira/browse/FLINK-33531
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.19.0
Reporter: Sergey Nuyanzin


It seems starting 02.11.2023 every master nightly fails with this (that's why 
it is a blocker)

for instance
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=54512&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=c67e71ed-6451-5d26-8920-5a8cf9651901]

{noformat}
2023-11-12T02:10:24.5082784Z Nov 12 02:10:24 if is_error(answer)[0]: 
2023-11-12T02:10:24.5083620Z Nov 12 02:10:24 if len(answer) > 1: 
2023-11-12T02:10:24.5084326Z Nov 12 02:10:24 type = answer[1] 
2023-11-12T02:10:24.5085164Z Nov 12 02:10:24 value = 
OUTPUT_CONVERTER[type](answer[2:], gateway_client) 2023-11-12T02:10:24.5086061Z 
Nov 12 02:10:24 if answer[1] == REFERENCE_TYPE: 2023-11-12T02:10:24.5086850Z 
Nov 12 02:10:24 > raise Py4JJavaError( 2023-11-12T02:10:24.5087677Z Nov 12 
02:10:24 "An error occurred while calling \{0}{1}\{2}.\n". 
2023-11-12T02:10:24.5088538Z Nov 12 02:10:24 format(target_id, ".", name), 
value) 2023-11-12T02:10:24.5089551Z Nov 12 02:10:24 E 
py4j.protocol.Py4JJavaError: An error occurred while calling 
o3371.executeInsert. 2023-11-12T02:10:24.5090832Z Nov 12 02:10:24 E : 
java.lang.NullPointerException: metadataHandlerProvider 
2023-11-12T02:10:24.5091832Z Nov 12 02:10:24 E at 
java.util.Objects.requireNonNull(Objects.java:228) 2023-11-12T02:10:24.5093399Z 
Nov 12 02:10:24 E at 
org.apache.calcite.rel.metadata.RelMetadataQueryBase.getMetadataHandlerProvider(RelMetadataQueryBase.java:122)
 2023-11-12T02:10:24.5094480Z Nov 12 02:10:24 E at 
org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118)
 2023-11-12T02:10:24.5095365Z Nov 12 02:10:24 E at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844)
 2023-11-12T02:10:24.5096306Z Nov 12 02:10:24 E at 
org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:307)
 2023-11-12T02:10:24.5097238Z Nov 12 02:10:24 E at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337)
 2023-11-12T02:10:24.5098014Z Nov 12 02:10:24 E at 
org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556) 
2023-11-12T02:10:24.5098753Z Nov 12 02:10:24 E at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:420) 
2023-11-12T02:10:24.5099517Z Nov 12 02:10:24 E at 
org.apache.calcite.plan.hep.HepPlanner.executeRuleInstance(HepPlanner.java:243) 
2023-11-12T02:10:24.5100373Z Nov 12 02:10:24 E at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance$State.execute(HepInstruction.java:178)
 2023-11-12T02:10:24.5101313Z Nov 12 02:10:24 E at 
org.apache.calcite.plan.hep.HepPlanner.lambda$executeProgram$0(HepPlanner.java:211)
 2023-11-12T02:10:24.5102410Z Nov 12 02:10:24 E at 
org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422)
 2023-11-12T02:10:24.5103343Z Nov 12 02:10:24 E at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210) 
2023-11-12T02:10:24.5104105Z Nov 12 02:10:24 E at 
org.apache.calcite.plan.hep.HepProgram$State.execute(HepProgram.java:118) 
2023-11-12T02:10:24.5104868Z Nov 12 02:10:24 E at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:205) 
2023-11-12T02:10:24.5105616Z Nov 12 02:10:24 E at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:191) 
2023-11-12T02:10:24.5106421Z Nov 12 02:10:24 E at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
 2023-11-12T02:10:24.5107359Z Nov 12 02:10:24 E at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
 2023-11-12T02:10:24.5108346Z Nov 12 02:10:24 E at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
 2023-11-12T02:10:24.5109407Z Nov 12 02:10:24 E at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) 
2023-11-12T02:10:24.5110241Z Nov 12 02:10:24 E at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
 2023-11-12T02:10:24.5111078Z Nov 12 02:10:24 E at 
scala.collection.Iterator.foreach(Iterator.scala:937) 
2023-11-12T02:10:24.5111734Z Nov 12 02:10:24 E at 
scala.collection.Iterator.foreach$(Iterator.scala:937) 
2023-11-12T02:10:24.5112410Z Nov 12 02:10:24 E at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425) 
2023-11-12T02:10:24.5113145Z Nov 12 02:10:24 E at 
scala.collection.IterableLike.foreach(IterableLike.scala:70) 
2023-11-

[jira] [Updated] (FLINK-33531) Nightly Python fails with NPE at metadataHandlerProvider on AZP

2023-11-12 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-33531:

Description: 
It seems starting 02.11.2023 every master nightly fails with this (that's why 
it is a blocker)

for instance
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=54512&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=c67e71ed-6451-5d26-8920-5a8cf9651901]

{noformat}
2023-11-12T02:10:24.5082784Z Nov 12 02:10:24 if is_error(answer)[0]:
2023-11-12T02:10:24.5083620Z Nov 12 02:10:24 if len(answer) > 1:
2023-11-12T02:10:24.5084326Z Nov 12 02:10:24 type = answer[1]
2023-11-12T02:10:24.5085164Z Nov 12 02:10:24 value = 
OUTPUT_CONVERTER[type](answer[2:], gateway_client)
2023-11-12T02:10:24.5086061Z Nov 12 02:10:24 if answer[1] == 
REFERENCE_TYPE:
2023-11-12T02:10:24.5086850Z Nov 12 02:10:24 >   raise 
Py4JJavaError(
2023-11-12T02:10:24.5087677Z Nov 12 02:10:24 "An error 
occurred while calling {0}{1}{2}.\n".
2023-11-12T02:10:24.5088538Z Nov 12 02:10:24 
format(target_id, ".", name), value)
2023-11-12T02:10:24.5089551Z Nov 12 02:10:24 E   
py4j.protocol.Py4JJavaError: An error occurred while calling 
o3371.executeInsert.
2023-11-12T02:10:24.5090832Z Nov 12 02:10:24 E   : 
java.lang.NullPointerException: metadataHandlerProvider
2023-11-12T02:10:24.5091832Z Nov 12 02:10:24 E  at 
java.util.Objects.requireNonNull(Objects.java:228)
2023-11-12T02:10:24.5093399Z Nov 12 02:10:24 E  at 
org.apache.calcite.rel.metadata.RelMetadataQueryBase.getMetadataHandlerProvider(RelMetadataQueryBase.java:122)
2023-11-12T02:10:24.5094480Z Nov 12 02:10:24 E  at 
org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118)
2023-11-12T02:10:24.5095365Z Nov 12 02:10:24 E  at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844)
2023-11-12T02:10:24.5096306Z Nov 12 02:10:24 E  at 
org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:307)
2023-11-12T02:10:24.5097238Z Nov 12 02:10:24 E  at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337)
2023-11-12T02:10:24.5098014Z Nov 12 02:10:24 E  at 
org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
2023-11-12T02:10:24.5098753Z Nov 12 02:10:24 E  at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:420)
2023-11-12T02:10:24.5099517Z Nov 12 02:10:24 E  at 
org.apache.calcite.plan.hep.HepPlanner.executeRuleInstance(HepPlanner.java:243)
2023-11-12T02:10:24.5100373Z Nov 12 02:10:24 E  at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance$State.execute(HepInstruction.java:178)
2023-11-12T02:10:24.5101313Z Nov 12 02:10:24 E  at 
org.apache.calcite.plan.hep.HepPlanner.lambda$executeProgram$0(HepPlanner.java:211)
2023-11-12T02:10:24.5102410Z Nov 12 02:10:24 E  at 
org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422)
2023-11-12T02:10:24.5103343Z Nov 12 02:10:24 E  at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210)
2023-11-12T02:10:24.5104105Z Nov 12 02:10:24 E  at 
org.apache.calcite.plan.hep.HepProgram$State.execute(HepProgram.java:118)
2023-11-12T02:10:24.5104868Z Nov 12 02:10:24 E  at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:205)
2023-11-12T02:10:24.5105616Z Nov 12 02:10:24 E  at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:191)
2023-11-12T02:10:24.5106421Z Nov 12 02:10:24 E  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
2023-11-12T02:10:24.5107359Z Nov 12 02:10:24 E  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
2023-11-12T02:10:24.5108346Z Nov 12 02:10:24 E  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
2023-11-12T02:10:24.5109407Z Nov 12 02:10:24 E  at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
2023-11-12T02:10:24.5110241Z Nov 12 02:10:24 E  at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.

Re: [PR] [FLINK-33340] Upgrade jackson to 2.15.3 [flink-shaded]

2023-11-12 Thread via GitHub


snuyanzin commented on PR #126:
URL: https://github.com/apache/flink-shaded/pull/126#issuecomment-1807326279

   resolved conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33340] Upgrade jackson to 2.15.3 [flink-shaded]

2023-11-12 Thread via GitHub


snuyanzin merged PR #126:
URL: https://github.com/apache/flink-shaded/pull/126


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33340) Bump Jackson to 2.15.3

2023-11-12 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-33340:

Fix Version/s: shaded-18.0

> Bump Jackson to 2.15.3
> --
>
> Key: FLINK-33340
> URL: https://issues.apache.org/jira/browse/FLINK-33340
> Project: Flink
>  Issue Type: Technical Debt
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: shaded-18.0
>
>
> Among others there is a number of improvements regarding parsing of numbers 
> (jackson-core)
> https://github.com/FasterXML/jackson-core/blob/2.16/release-notes/VERSION-2.x



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33340) Bump Jackson to 2.15.3

2023-11-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785346#comment-17785346
 ] 

Sergey Nuyanzin commented on FLINK-33340:
-

Merged to flink-shaded master as 
[/9fef53a5532d394bc853ba92088d23c98005da98|https://github.com/apache/flink-shaded/commit/9fef53a5532d394bc853ba92088d23c98005da98]

> Bump Jackson to 2.15.3
> --
>
> Key: FLINK-33340
> URL: https://issues.apache.org/jira/browse/FLINK-33340
> Project: Flink
>  Issue Type: Technical Debt
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: shaded-18.0
>
>
> Among others there is a number of improvements regarding parsing of numbers 
> (jackson-core)
> https://github.com/FasterXML/jackson-core/blob/2.16/release-notes/VERSION-2.x



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33397) FLIP-373: Support Configuring Different State TTLs using SQL Hint

2023-11-12 Thread CaoZhen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785347#comment-17785347
 ] 

CaoZhen commented on FLINK-33397:
-

hi [~qingyue], [~xuyangzhong], can I participate in the development?
 

> FLIP-373: Support Configuring Different State TTLs using SQL Hint
> -
>
> Key: FLINK-33397
> URL: https://issues.apache.org/jira/browse/FLINK-33397
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: Jane Chan
>Assignee: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> Please refer to 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-373%3A+Support+Configuring+Different+State+TTLs+using+SQL+Hint
>  
> |https://cwiki.apache.org/confluence/display/FLINK/FLIP-373%3A+Support+Configuring+Different+State+TTLs+using+SQL+Hint]
>  for more details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-28758] Fix stop-with-savepoint for FlinkKafkaConsumer [flink-connector-kafka]

2023-11-12 Thread via GitHub


ngannt1710 commented on PR #49:
URL: 
https://github.com/apache/flink-connector-kafka/pull/49#issuecomment-1807353419

   I use FlinkKafkaConsumer with two consumer(2 nodes) and two partitions. But 
the two consumers receive the same message regardless of which partitions it is 
pushed to.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32380) Support Java records

2023-11-12 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-32380:
---
Attachment: image-2023-11-13-10-19-06-035.png

> Support Java records
> 
>
> Key: FLINK-32380
> URL: https://issues.apache.org/jira/browse/FLINK-32380
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Reporter: Chesnay Schepler
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-11-13-10-19-06-035.png
>
>
> Reportedly Java records are not supported, because they are neither detected 
> by our Pojo serializer nor supported by Kryo 2.x



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32380) Support Java records

2023-11-12 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785353#comment-17785353
 ] 

xuyang commented on FLINK-32380:


Hi, [~gyfora] Sorry for this noise. When I rebased master that contains this pr 
and re-run tests, the idea failed to build with missing class 
`PojoToRecordVerifier.PojoAfterUpgrade`. I still use jdk8, and I think that is 
the root cause. Is any way I can work around it in addition to upgrading jdk?  
!image-2023-11-13-10-19-06-035.png|width=1031,height=613!

> Support Java records
> 
>
> Key: FLINK-32380
> URL: https://issues.apache.org/jira/browse/FLINK-32380
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Reporter: Chesnay Schepler
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Reportedly Java records are not supported, because they are neither detected 
> by our Pojo serializer nor supported by Kryo 2.x



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32380) Support Java records

2023-11-12 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785353#comment-17785353
 ] 

xuyang edited comment on FLINK-32380 at 11/13/23 2:21 AM:
--

Hi, [~gyfora] Sorry for this noise. When I rebased master that contains this 
pr, use mvn clean/install to build the whole project and re-run tests, the idea 
failed to build with missing class `PojoToRecordVerifier.PojoAfterUpgrade`. I 
still use jdk8, and I think that is the root cause. Is any way I can work 
around it in addition to upgrading jdk?  
!image-2023-11-13-10-19-06-035.png|width=1031,height=613!


was (Author: xuyangzhong):
Hi, [~gyfora] Sorry for this noise. When I rebased master that contains this pr 
and re-run tests, the idea failed to build with missing class 
`PojoToRecordVerifier.PojoAfterUpgrade`. I still use jdk8, and I think that is 
the root cause. Is any way I can work around it in addition to upgrading jdk?  
!image-2023-11-13-10-19-06-035.png|width=1031,height=613!

> Support Java records
> 
>
> Key: FLINK-32380
> URL: https://issues.apache.org/jira/browse/FLINK-32380
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Reporter: Chesnay Schepler
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-11-13-10-19-06-035.png
>
>
> Reportedly Java records are not supported, because they are neither detected 
> by our Pojo serializer nor supported by Kryo 2.x



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33532) Move the serialization of ShuffleDescriptorGroup out of the RPC main thread

2023-11-12 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-33532:
--

 Summary: Move the serialization of ShuffleDescriptorGroup out of 
the RPC main thread
 Key: FLINK-33532
 URL: https://issues.apache.org/jira/browse/FLINK-33532
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yangze Guo
Assignee: Yangze Guo
 Fix For: 1.19.0


Currently, the serilization of ShuffleDescriptorGroup would happen in the RPC 
main thread even for light jobs. We propose to move it out of the rpc main 
thread. For heavy jobs, it still worth to cache the serialized value for memory 
efficiency.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33532) Move the serialization of ShuffleDescriptorGroup out of the RPC main thread

2023-11-12 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo updated FLINK-33532:
---
Description: Currently, the serilization of ShuffleDescriptorGroup would 
happen in the RPC main thread even for light jobs. We propose to move it out of 
the rpc main thread. For heavy jobs, it still worth to cache the serialized 
value for memory efficiency. At the same time we need to make the serialization 
thread pool configurable.  (was: Currently, the serilization of 
ShuffleDescriptorGroup would happen in the RPC main thread even for light jobs. 
We propose to move it out of the rpc main thread. For heavy jobs, it still 
worth to cache the serialized value for memory efficiency.)

> Move the serialization of ShuffleDescriptorGroup out of the RPC main thread
> ---
>
> Key: FLINK-33532
> URL: https://issues.apache.org/jira/browse/FLINK-33532
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yangze Guo
>Assignee: Yangze Guo
>Priority: Major
> Fix For: 1.19.0
>
>
> Currently, the serilization of ShuffleDescriptorGroup would happen in the RPC 
> main thread even for light jobs. We propose to move it out of the rpc main 
> thread. For heavy jobs, it still worth to cache the serialized value for 
> memory efficiency. At the same time we need to make the serialization thread 
> pool configurable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33532) Move the serialization of ShuffleDescriptorGroup out of the RPC main thread

2023-11-12 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo reassigned FLINK-33532:
--

Assignee: dizhou cao  (was: Yangze Guo)

> Move the serialization of ShuffleDescriptorGroup out of the RPC main thread
> ---
>
> Key: FLINK-33532
> URL: https://issues.apache.org/jira/browse/FLINK-33532
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yangze Guo
>Assignee: dizhou cao
>Priority: Major
> Fix For: 1.19.0
>
>
> Currently, the serilization of ShuffleDescriptorGroup would happen in the RPC 
> main thread even for light jobs. We propose to move it out of the rpc main 
> thread. For heavy jobs, it still worth to cache the serialized value for 
> memory efficiency. At the same time we need to make the serialization thread 
> pool configurable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33532) Move the serialization of ShuffleDescriptorGroup out of the RPC main thread

2023-11-12 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo updated FLINK-33532:
---
Description: Currently, the serilization of ShuffleDescriptorGroup would 
happen in the RPC main thread even for light jobs. That would harm the QPS of 
OLAP session, e.g. the deserialization will take 3ms at TM side for each task 
of a 128*128 wordcount job. We propose to move it out of the rpc main thread. 
For heavy jobs, it still worth to cache the serialized value for memory 
efficiency. At the same time we need to make the serialization thread pool 
configurable.  (was: Currently, the serilization of ShuffleDescriptorGroup 
would happen in the RPC main thread even for light jobs. We propose to move it 
out of the rpc main thread. For heavy jobs, it still worth to cache the 
serialized value for memory efficiency. At the same time we need to make the 
serialization thread pool configurable.)

> Move the serialization of ShuffleDescriptorGroup out of the RPC main thread
> ---
>
> Key: FLINK-33532
> URL: https://issues.apache.org/jira/browse/FLINK-33532
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yangze Guo
>Assignee: dizhou cao
>Priority: Major
> Fix For: 1.19.0
>
>
> Currently, the serilization of ShuffleDescriptorGroup would happen in the RPC 
> main thread even for light jobs. That would harm the QPS of OLAP session, 
> e.g. the deserialization will take 3ms at TM side for each task of a 128*128 
> wordcount job. We propose to move it out of the rpc main thread. For heavy 
> jobs, it still worth to cache the serialized value for memory efficiency. At 
> the same time we need to make the serialization thread pool configurable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33395][table-planner] fix the join hint doesn't work when appears in subquery [flink]

2023-11-12 Thread via GitHub


xuyangzhong commented on code in PR #23620:
URL: https://github.com/apache/flink/pull/23620#discussion_r1390569239


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/alias/ClearJoinHintsWithInvalidPropagationShuttle.java:
##
@@ -48,19 +49,10 @@
  * TODO some node will be attached join hints when parse SqlNode to RelNode 
such as Project and
  * etc. The join hints on these node can also be cleared.
  */
-public class ClearJoinHintWithInvalidPropagationShuttle extends RelShuttleImpl 
{
+public class ClearJoinHintsWithInvalidPropagationShuttle extends 
JoinHintsRelShuttle {

Review Comment:
   That sounds better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32650][protobuf]Added the ability to split flink-protobuf code… [flink]

2023-11-12 Thread via GitHub


ljw-hit commented on code in PR #23162:
URL: https://github.com/apache/flink/pull/23162#discussion_r1390343616


##
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java:
##
@@ -27,4 +27,10 @@ public class PbConstant {
 public static final String PB_MAP_KEY_NAME = "key";
 public static final String PB_MAP_VALUE_NAME = "value";
 public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass";
+/**
+ * JIT optimizer threshold is 8K, unicode encode one char use 2byte, so 
use 3K as

Review Comment:
   Thank you for your suggestion, I will modify my comment. By the way, if 1 
character corresponds to 1 byte, does this threshold need to be modified?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-12 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785355#comment-17785355
 ] 

Benchao Li commented on FLINK-33365:


[~Sergey Nuyanzin] Thanks for the digging. It's indeed a problem from the 
current JDBC filter pushdown implementation, one way to mitigate this problem 
might be also considering the predicates in {{JdbcRowDataLookupFunction}}.

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33527][autoscaler] Clear all physical states after autoscaler is disabled [flink-kubernetes-operator]

2023-11-12 Thread via GitHub


1996fanrui commented on code in PR #707:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/707#discussion_r1390585356


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -115,10 +123,39 @@ public void cleanup(KEY jobKey) {
 stateStore.removeInfoFromCache(jobKey);
 }
 
-private void clearParallelismOverrides(Context ctx) throws Exception {
-var parallelismOverrides = stateStore.getParallelismOverrides(ctx);
-if (parallelismOverrides.isPresent()) {
+private void clearStatesAfterAutoscalerDisabled(Context ctx) throws 
Exception {
+var needFlush = false;
+var parallelismOverridesOpt = stateStore.getParallelismOverrides(ctx);
+if (parallelismOverridesOpt.isPresent()) {
+needFlush = true;
 stateStore.removeParallelismOverrides(ctx);
+}
+
+Optional> collectedMetricsOpt =
+stateStore.getCollectedMetrics(ctx);
+if (collectedMetricsOpt.isPresent()) {
+needFlush = true;
+stateStore.removeCollectedMetrics(ctx);
+}
+
+Optional>> 
scalingHistoryOpt =

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33527][autoscaler] Clear all physical states after autoscaler is disabled [flink-kubernetes-operator]

2023-11-12 Thread via GitHub


1996fanrui commented on code in PR #707:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/707#discussion_r1390586714


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -115,10 +123,39 @@ public void cleanup(KEY jobKey) {
 stateStore.removeInfoFromCache(jobKey);
 }
 
-private void clearParallelismOverrides(Context ctx) throws Exception {
-var parallelismOverrides = stateStore.getParallelismOverrides(ctx);
-if (parallelismOverrides.isPresent()) {
+private void clearStatesAfterAutoscalerDisabled(Context ctx) throws 
Exception {
+var needFlush = false;
+var parallelismOverridesOpt = stateStore.getParallelismOverrides(ctx);
+if (parallelismOverridesOpt.isPresent()) {
+needFlush = true;
 stateStore.removeParallelismOverrides(ctx);
+}
+
+Optional> collectedMetricsOpt =
+stateStore.getCollectedMetrics(ctx);
+if (collectedMetricsOpt.isPresent()) {
+needFlush = true;
+stateStore.removeCollectedMetrics(ctx);
+}
+
+Optional>> 
scalingHistoryOpt =
+stateStore.getScalingHistory(ctx);
+if (scalingHistoryOpt.isPresent()) {
+Map> 
scalingHistory =
+trimScalingHistory(
+clock.instant(), ctx.getConfiguration(), 
scalingHistoryOpt.get());
+if (scalingHistory.isEmpty()) {
+// All scaling histories are trimmed.
+needFlush = true;
+stateStore.removeScalingHistory(ctx);
+} else if (!scalingHistoryOpt.get().equals(scalingHistory)) {
+// Some scaling histories are trimmed.
+needFlush = true;
+stateStore.storeScalingHistory(ctx, scalingHistory);
+}
+}

Review Comment:
   Trimming logic will consider `VERTEX_SCALING_HISTORY_AGE` option, and the 
default value is `24 hours`.
   
   My thought is: if users enable autoscaler within 24 hours, the 
`ScalingHistory` can be used for subsequent scaling. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33526] Autoscaler config improvement + cleanup [flink-kubernetes-operator]

2023-11-12 Thread via GitHub


1996fanrui merged PR #708:
URL: https://github.com/apache/flink-kubernetes-operator/pull/708


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33526) Improve default autoscaler configs

2023-11-12 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785357#comment-17785357
 ] 

Rui Fan commented on FLINK-33526:
-

Merged to master<1.7.0> via : cbcc6b67c98ddfad8bd6141edfd1a6e8c2ff00f5

> Improve default autoscaler configs
> --
>
> Key: FLINK-33526
> URL: https://issues.apache.org/jira/browse/FLINK-33526
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
>
> There are a few config defaults that should be improved based on prod usage:
>  * Metric window : 10 -> 15m
>  * Catch up duration: 15 -> 30m
>  * Restart time: 3 -> 5m
>  * Utilisation boundary: 0.4 -> 0.3
> These configs help make the default autoscaler behaviour smoother and less 
> aggressive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33526) Improve default autoscaler configs

2023-11-12 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-33526.
-
Resolution: Fixed

> Improve default autoscaler configs
> --
>
> Key: FLINK-33526
> URL: https://issues.apache.org/jira/browse/FLINK-33526
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> There are a few config defaults that should be improved based on prod usage:
>  * Metric window : 10 -> 15m
>  * Catch up duration: 15 -> 30m
>  * Restart time: 3 -> 5m
>  * Utilisation boundary: 0.4 -> 0.3
> These configs help make the default autoscaler behaviour smoother and less 
> aggressive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33526) Improve default autoscaler configs

2023-11-12 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33526:

Fix Version/s: kubernetes-operator-1.7.0

> Improve default autoscaler configs
> --
>
> Key: FLINK-33526
> URL: https://issues.apache.org/jira/browse/FLINK-33526
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> There are a few config defaults that should be improved based on prod usage:
>  * Metric window : 10 -> 15m
>  * Catch up duration: 15 -> 30m
>  * Restart time: 3 -> 5m
>  * Utilisation boundary: 0.4 -> 0.3
> These configs help make the default autoscaler behaviour smoother and less 
> aggressive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33445][docs-zh] Translate DataSet migration guideline to Chinese [flink]

2023-11-12 Thread via GitHub


WencongLiu commented on code in PR #23666:
URL: https://github.com/apache/flink/pull/23666#discussion_r1390574350


##
docs/content.zh/docs/dev/datastream/dataset_migration.md:
##
@@ -134,10 +131,9 @@ DataStreamSource<> source = 
StreamExecutionEnvironment.createInput(inputFormat)
 
 
 
-### Sinks
+### 写

Review Comment:
   Keep it to "Sinks".



##
docs/content.zh/docs/dev/datastream/dataset_migration.md:
##
@@ -62,44 +58,45 @@ The first step of migrating an application from DataSet API 
to DataStream API is
 
 
 {{< highlight "java" >}}
-// Create the execution environment
+// 创建执行环境
 ExecutionEnvironment.getExecutionEnvironment();
-// Create the local execution environment
+// 创建本地执行环境
 ExecutionEnvironment.createLocalEnvironment();
-// Create the collection environment
+// 创建 collection 环境
 new CollectionEnvironment();
-// Create the remote environment
+// 创建远程执行环境
 ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... 
jarFiles);
 {{< /highlight >}}
 
 
 {{< highlight "java" >}}
-// Create the execution environment
+// 创建执行环境
 StreamExecutionEnvironment.getExecutionEnvironment();
-// Create the local execution environment
+// 创建本地执行环境
 StreamExecutionEnvironment.createLocalEnvironment();
-// The collection environment is not supported.
-// Create the remote environment
+// 不支持 collection 环境
+// 创建远程执行环境
 StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, 
String... jarFiles);
 {{< /highlight >}}
 
 
 
 
 
-Unlike DataSet, DataStream supports processing on both bounded and unbounded 
data streams. Thus, user needs to explicitly set the execution mode
-to `RuntimeExecutionMode.BATCH` if that is expected.
+与 DataSet 不同,DataStream 支持对有界和无界数据流进行处理。
+
+如果需要的话,用户可以显式地将执行模式设置为 `RuntimeExecutionMode.BATCH`。
 
 ```java
 StreamExecutionEnvironment executionEnvironment = // [...];
 executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
 ```
 
-## Using the streaming sources and sinks
+## 流读和流写

Review Comment:
   I think a better expression is “设置streaming类型的Source和Sink”.



##
docs/content.zh/docs/dev/datastream/dataset_migration.md:
##
@@ -62,44 +58,45 @@ The first step of migrating an application from DataSet API 
to DataStream API is
 
 
 {{< highlight "java" >}}
-// Create the execution environment
+// 创建执行环境
 ExecutionEnvironment.getExecutionEnvironment();
-// Create the local execution environment
+// 创建本地执行环境
 ExecutionEnvironment.createLocalEnvironment();
-// Create the collection environment
+// 创建 collection 环境
 new CollectionEnvironment();
-// Create the remote environment
+// 创建远程执行环境
 ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... 
jarFiles);
 {{< /highlight >}}
 
 
 {{< highlight "java" >}}
-// Create the execution environment
+// 创建执行环境
 StreamExecutionEnvironment.getExecutionEnvironment();
-// Create the local execution environment
+// 创建本地执行环境
 StreamExecutionEnvironment.createLocalEnvironment();
-// The collection environment is not supported.
-// Create the remote environment
+// 不支持 collection 环境
+// 创建远程执行环境
 StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, 
String... jarFiles);
 {{< /highlight >}}
 
 
 
 
 
-Unlike DataSet, DataStream supports processing on both bounded and unbounded 
data streams. Thus, user needs to explicitly set the execution mode
-to `RuntimeExecutionMode.BATCH` if that is expected.
+与 DataSet 不同,DataStream 支持对有界和无界数据流进行处理。
+
+如果需要的话,用户可以显式地将执行模式设置为 `RuntimeExecutionMode.BATCH`。
 
 ```java
 StreamExecutionEnvironment executionEnvironment = // [...];
 executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
 ```
 
-## Using the streaming sources and sinks
+## 流读和流写
 
-### Sources
+### 读

Review Comment:
   Keep it to "Sources".



##
docs/content.zh/docs/dev/datastream/dataset_migration.md:
##
@@ -62,44 +58,45 @@ The first step of migrating an application from DataSet API 
to DataStream API is
 
 
 {{< highlight "java" >}}
-// Create the execution environment
+// 创建执行环境
 ExecutionEnvironment.getExecutionEnvironment();
-// Create the local execution environment
+// 创建本地执行环境
 ExecutionEnvironment.createLocalEnvironment();
-// Create the collection environment
+// 创建 collection 环境
 new CollectionEnvironment();
-// Create the remote environment
+// 创建远程执行环境
 ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... 
jarFiles);
 {{< /highlight >}}
 
 
 {{< highlight "java" >}}
-// Create the execution environment
+// 创建执行环境
 StreamExecutionEnvironment.getExecutionEnvironment();
-// Create the local execution environmen

Re: [PR] [FLINK-32650][protobuf]Added the ability to split flink-protobuf code… [flink]

2023-11-12 Thread via GitHub


ljw-hit commented on PR #23162:
URL: https://github.com/apache/flink/pull/23162#issuecomment-1807421613

   @libenchao  Thank you very much for your code review. I learned a lot from 
this review and I have solved all the comments. Please review again in your 
free time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33449][table]Support array_contains_seq function [flink]

2023-11-12 Thread via GitHub


leoyy0316 commented on PR #23656:
URL: https://github.com/apache/flink/pull/23656#issuecomment-1807423595

   > > I have a question, why we always follow hive and spark? if we thinks is 
a feature need to support, we can implement it firstly. i think having rich 
functions is better
   > 
   > Because we want to make sure that we have a consistent set of functions 
available, with function names that are consistent. If we pick random function 
names from different tools, that because more problematic. It might be that 
Calcite or any of the other tech solves the same problem but with a different 
function and/or different function signature. That's why we always try to look 
at what others are also doing. Then again, this discussion should have happened 
before a PR was opened. Either in the Jira, or on the Dev mailing list. See 
https://flink.apache.org/how-to-contribute/contribute-code/#1-create-jira-ticket-and-reach-consensus
   
   yeah, it is good. trino ,clickhouse, starrocks has the same function. if 
flink teams think we not need to support this function now. pls close this jira 
and pr. thanks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33527][autoscaler] Clear all physical states after autoscaler is disabled [flink-kubernetes-operator]

2023-11-12 Thread via GitHub


gyfora commented on code in PR #707:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/707#discussion_r1390598097


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -115,10 +123,39 @@ public void cleanup(KEY jobKey) {
 stateStore.removeInfoFromCache(jobKey);
 }
 
-private void clearParallelismOverrides(Context ctx) throws Exception {
-var parallelismOverrides = stateStore.getParallelismOverrides(ctx);
-if (parallelismOverrides.isPresent()) {
+private void clearStatesAfterAutoscalerDisabled(Context ctx) throws 
Exception {
+var needFlush = false;
+var parallelismOverridesOpt = stateStore.getParallelismOverrides(ctx);
+if (parallelismOverridesOpt.isPresent()) {
+needFlush = true;
 stateStore.removeParallelismOverrides(ctx);
+}
+
+Optional> collectedMetricsOpt =
+stateStore.getCollectedMetrics(ctx);
+if (collectedMetricsOpt.isPresent()) {
+needFlush = true;
+stateStore.removeCollectedMetrics(ctx);
+}
+
+Optional>> 
scalingHistoryOpt =
+stateStore.getScalingHistory(ctx);
+if (scalingHistoryOpt.isPresent()) {
+Map> 
scalingHistory =
+trimScalingHistory(
+clock.instant(), ctx.getConfiguration(), 
scalingHistoryOpt.get());
+if (scalingHistory.isEmpty()) {
+// All scaling histories are trimmed.
+needFlush = true;
+stateStore.removeScalingHistory(ctx);
+} else if (!scalingHistoryOpt.get().equals(scalingHistory)) {
+// Some scaling histories are trimmed.
+needFlush = true;
+stateStore.storeScalingHistory(ctx, scalingHistory);
+}
+}

Review Comment:
   I see, makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33527][autoscaler] Clear all physical states after autoscaler is disabled [flink-kubernetes-operator]

2023-11-12 Thread via GitHub


gyfora merged PR #707:
URL: https://github.com/apache/flink-kubernetes-operator/pull/707


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33527) Clear all physical states after autoscaler is disabled

2023-11-12 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-33527.
--
Resolution: Fixed

merged to main bf5bb94bc87295e5ecb8ace8f579a22dca440e2d

> Clear all physical states after autoscaler is disabled
> --
>
> Key: FLINK-33527
> URL: https://issues.apache.org/jira/browse/FLINK-33527
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> Currently, we just clear ParallelismOverrides  after autoscaler is disabled.
> We should clear CollectedMetrics and ScalingHistory  as well to prevent state 
> leak.
>  * CollectedMetrics can be cleared directly.
>  * ScalingHistory can be cleared based on trim logic( 
> {color:#9876aa}VERTEX_SCALING_HISTORY_AGE){color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32650][protobuf]Added the ability to split flink-protobuf code… [flink]

2023-11-12 Thread via GitHub


libenchao commented on code in PR #23162:
URL: https://github.com/apache/flink/pull/23162#discussion_r1390600656


##
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java:
##
@@ -84,11 +85,18 @@ public RowToProtoConverter(RowType rowType, PbFormatConfig 
formatConfig)
 PbCodegenSerializer codegenSer =
 PbCodegenSerializeFactory.getPbCodegenTopRowSer(
 descriptor, rowType, formatContext);
+LOG.info("Fast-pb generate split serialize code");

Review Comment:
   remove unnecessary log.



##
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java:
##
@@ -109,4 +117,8 @@ public byte[] convertRowToProtoBinary(RowData rowData) 
throws Exception {
 AbstractMessage message = (AbstractMessage) encodeMethod.invoke(null, 
rowData);
 return message.toByteArray();
 }
+
+public boolean isCodeSplit() {

Review Comment:
   for testing.



##
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java:
##
@@ -104,10 +106,19 @@ public ProtoToRowConverter(RowType rowType, 
PbFormatConfig formatConfig)
 PbCodegenDeserializer codegenDes =
 PbCodegenDeserializeFactory.getPbCodegenTopRowDes(
 descriptor, rowType, pbFormatContext);
-String genCode = codegenDes.codegen("rowData", "message", 0);
+// if codgen generate code size over threshod then split the code
+PbCodeSplitter pbCodeSplitter = new PbCodeSplitter();
+LOG.info("Fast-pb generate split deserialize code");

Review Comment:
   ping, it seems you are missing this one.



##
flink-formats/flink-protobuf/src/test/proto/test_big_pb.proto:
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.formats.protobuf.testproto;
+
+option java_package = "org.apache.flink.formats.protobuf.testproto";
+option java_outer_classname = "BigPbClass";
+
+message BigPbMessage {
+  int32 int_field_1 = 1;
+  bool bool_field_2 = 2;
+  string string_field_3 = 3;
+  bytes bytes_field_4 = 4;
+  double double_field_5 = 5;
+  float float_field_6 = 6;
+  uint32 uint32_field_7 = 7;
+  int64 int64_field_8 = 8;
+  uint64 uint64_field_9 = 9;
+  bytes bytes_field_10 = 10;
+  double double_field_11 = 11;
+  bytes bytes_field_12 = 12;
+  bool bool_field_13 = 13;
+  string string_field_14 = 14;
+  float float_field_15 = 15;
+  int32 int32_field_16 = 16;
+  bytes bytes_field_17 = 17;
+  bool bool_field_18 = 18;
+  string string_field_19 = 19;
+  float float_field_20 = 20;
+  fixed32 fixed32_field_21 = 21;
+  fixed64 fixed64_field_22 = 22;
+  sfixed32 sfixed32_field_23 = 23;
+  sfixed64 sfixed64_field_24 = 24;
+  double double_field_25 = 25;
+  uint32 uint32_field_26 = 26;
+  uint64 uint64_field_27 = 27;
+  bool bool_field_28 = 28;
+  repeated string field_29 = 29;
+  float float_field_30 = 30;
+  string string_field_31 = 31;
+
+  map map_field_32 = 32;
+  map map_field_33 = 33;
+}

Review Comment:
   Always put a `\n` in the final line, then it will not complain that "Now new 
line at the end of file".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33533) Add documentation about performance for huge ProtoBuf definations

2023-11-12 Thread Benchao Li (Jira)
Benchao Li created FLINK-33533:
--

 Summary: Add documentation about performance for huge ProtoBuf 
definations
 Key: FLINK-33533
 URL: https://issues.apache.org/jira/browse/FLINK-33533
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Formats (JSON, Avro, Parquet, ORC, 
SequenceFile)
Reporter: Benchao Li






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32380) Support Java records

2023-11-12 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785361#comment-17785361
 ] 

Gyula Fora commented on FLINK-32380:


Technically speaking we exclude these using the compiler plugin, but 
unfortunately IntelliJ doesn't respect this:
[https://youtrack.jetbrains.com/issue/IDEA-87868]

you could check 
[https://stackoverflow.com/questions/14792798/how-to-make-intellijidea-ignore-work-in-progress-class-files]
 on how to exclude individual classes. 

Or simply bump the java version for development

> Support Java records
> 
>
> Key: FLINK-32380
> URL: https://issues.apache.org/jira/browse/FLINK-32380
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Reporter: Chesnay Schepler
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-11-13-10-19-06-035.png
>
>
> Reportedly Java records are not supported, because they are neither detected 
> by our Pojo serializer nor supported by Kryo 2.x



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32650][protobuf]Added the ability to split flink-protobuf code… [flink]

2023-11-12 Thread via GitHub


ljw-hit commented on PR #23162:
URL: https://github.com/apache/flink/pull/23162#issuecomment-1807478539

   @libenchao Thank you for your detailed review work. I have solved these 
comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33533) Add documentation about performance for huge ProtoBuf definations

2023-11-12 Thread Benchao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li updated FLINK-33533:
---
Description: 
This is brought up in the discussion of FLINK-32650. For huge ProtoBuf 
definations, the generated java code would hive JIT C2 optimization threshold. 
To mitigate this problem temporarily, users can add 
{{-XX:-DontCompileHugeMethods}} to JVM args manually.

See related issues in protobuf-java project[1][2]

[1] https://github.com/protocolbuffers/protobuf/pull/10367
[2] https://github.com/protocolbuffers/protobuf/issues/10247

> Add documentation about performance for huge ProtoBuf definations
> -
>
> Key: FLINK-33533
> URL: https://issues.apache.org/jira/browse/FLINK-33533
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile)
>Reporter: Benchao Li
>Priority: Minor
>
> This is brought up in the discussion of FLINK-32650. For huge ProtoBuf 
> definations, the generated java code would hive JIT C2 optimization 
> threshold. To mitigate this problem temporarily, users can add 
> {{-XX:-DontCompileHugeMethods}} to JVM args manually.
> See related issues in protobuf-java project[1][2]
> [1] https://github.com/protocolbuffers/protobuf/pull/10367
> [2] https://github.com/protocolbuffers/protobuf/issues/10247



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32650) Added the ability to split flink-protobuf codegen code

2023-11-12 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785365#comment-17785365
 ] 

Benchao Li commented on FLINK-32650:


I've created another ticket to add the document about current restriction, see 
FLINK-33533. Feel free to pick it up if you are interested.

> Added the ability to split flink-protobuf codegen code
> --
>
> Key: FLINK-32650
> URL: https://issues.apache.org/jira/browse/FLINK-32650
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.17.1
>Reporter: JingWei Li
>Assignee: JingWei Li
>Priority: Major
>  Labels: pull-request-available
>
> h3. backgroud
> Flink serializes and deserializes protobuf format data by calling the decode 
> or encode method in GeneratedProtoToRow_XXX.java generated by codegen to 
> parse byte[] data into protobuf java objects. The size of the decode/encode 
> codegen method body is strongly related to the number of defined fields in 
> protobuf. When the number of fields exceeds a certain threshold and the 
> compiled method body exceeds 8k, the decode/encode method will not be 
> optimized by JIT, seriously affecting serialization or deserialization 
> performance. Even if the compiled method body exceeds 64k, it will directly 
> cause the task to fail to start.
> h3. solution
> So I proposed Codegen Splitter for protobuf parsing to split the 
> encode/decode method to solve this problem.
> The specific idea is as follows. In the current decode/encode method, each 
> field defined for the protobuf message is placed in the method body. In fact, 
> there are no shared parameters between the fields, so multiple fields can be 
> merged and parsed and written into the split method body. If the number of 
> strings in the current method body exceeds the threshold, a split method will 
> be generated, these fields will be parsed in the split method, and the split 
> method will be called in the decode/encode method. By analogy, the 
> decode/encode method including the split method is finally generated.
> after spilt code example
>  
> {code:java}
> //代码占位符
> public static RowData 
> decode(org.apache.flink.formats.protobuf.testproto.AdProfile.AdProfilePb 
> message){
> RowData rowData=null;
> org.apache.flink.formats.protobuf.testproto.AdProfile.AdProfilePb message1242 
> = message;
> GenericRowData rowData1242 = new GenericRowData(5);
> split2585(rowData1242, message1242);
> rowData = rowData1242;return rowData;
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33533) Add documentation about performance for huge ProtoBuf definations

2023-11-12 Thread Benchao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li updated FLINK-33533:
---
Issue Type: Improvement  (was: Bug)

> Add documentation about performance for huge ProtoBuf definations
> -
>
> Key: FLINK-33533
> URL: https://issues.apache.org/jira/browse/FLINK-33533
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile)
>Reporter: Benchao Li
>Priority: Minor
>
> This is brought up in the discussion of FLINK-32650. For huge ProtoBuf 
> definations, the generated java code would hive JIT C2 optimization 
> threshold. To mitigate this problem temporarily, users can add 
> {{-XX:-DontCompileHugeMethods}} to JVM args manually.
> See related issues in protobuf-java project[1][2]
> [1] https://github.com/protocolbuffers/protobuf/pull/10367
> [2] https://github.com/protocolbuffers/protobuf/issues/10247



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33534) PipelineOptions.PARALLELISM_OVERRIDES is not picked up from jar submission request

2023-11-12 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-33534:
--

 Summary: PipelineOptions.PARALLELISM_OVERRIDES is not picked up 
from jar submission request
 Key: FLINK-33534
 URL: https://issues.apache.org/jira/browse/FLINK-33534
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration, Runtime / REST
Affects Versions: 1.17.1, 1.18.0
Reporter: Gyula Fora


PARALLELISM_OVERRIDES are currently only applied when they are part of the 
JobManager / Cluster configuration.

When this config is provided as part of the JarRunRequestBody it is completely 
ignored and does not take effect. 

The main reason is that the dispatcher reads this value from it's own 
configuration object and does not include the extra configs passed through the 
rest request.

This is a blocker for supporting the autoscaler properly for FlinkSessionJobs 
in the autoscaler



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33534) PipelineOptions.PARALLELISM_OVERRIDES is not picked up from jar submission request

2023-11-12 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-33534:
---
Issue Type: Bug  (was: Improvement)

> PipelineOptions.PARALLELISM_OVERRIDES is not picked up from jar submission 
> request
> --
>
> Key: FLINK-33534
> URL: https://issues.apache.org/jira/browse/FLINK-33534
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Runtime / REST
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Gyula Fora
>Priority: Major
>
> PARALLELISM_OVERRIDES are currently only applied when they are part of the 
> JobManager / Cluster configuration.
> When this config is provided as part of the JarRunRequestBody it is 
> completely ignored and does not take effect. 
> The main reason is that the dispatcher reads this value from it's own 
> configuration object and does not include the extra configs passed through 
> the rest request.
> This is a blocker for supporting the autoscaler properly for FlinkSessionJobs 
> in the autoscaler



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33534) PipelineOptions.PARALLELISM_OVERRIDES is not picked up from jar submission request

2023-11-12 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785367#comment-17785367
 ] 

Gyula Fora commented on FLINK-33534:


cc [~mxm] [~fanrui] 

> PipelineOptions.PARALLELISM_OVERRIDES is not picked up from jar submission 
> request
> --
>
> Key: FLINK-33534
> URL: https://issues.apache.org/jira/browse/FLINK-33534
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration, Runtime / REST
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Gyula Fora
>Priority: Major
>
> PARALLELISM_OVERRIDES are currently only applied when they are part of the 
> JobManager / Cluster configuration.
> When this config is provided as part of the JarRunRequestBody it is 
> completely ignored and does not take effect. 
> The main reason is that the dispatcher reads this value from it's own 
> configuration object and does not include the extra configs passed through 
> the rest request.
> This is a blocker for supporting the autoscaler properly for FlinkSessionJobs 
> in the autoscaler



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33148] Update Web UI to adopt the new "endpoint" field in REST API [flink]

2023-11-12 Thread via GitHub


X-czh commented on PR #23698:
URL: https://github.com/apache/flink/pull/23698#issuecomment-1807522914

   @huwh Could you help take a review when you are free? Many thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33445][docs-zh] Translate DataSet migration guideline to Chinese [flink]

2023-11-12 Thread via GitHub


liyubin117 commented on code in PR #23666:
URL: https://github.com/apache/flink/pull/23666#discussion_r1390667818


##
docs/content.zh/docs/dev/datastream/dataset_migration.md:
##
@@ -62,44 +58,45 @@ The first step of migrating an application from DataSet API 
to DataStream API is
 
 
 {{< highlight "java" >}}
-// Create the execution environment
+// 创建执行环境
 ExecutionEnvironment.getExecutionEnvironment();
-// Create the local execution environment
+// 创建本地执行环境
 ExecutionEnvironment.createLocalEnvironment();
-// Create the collection environment
+// 创建 collection 环境
 new CollectionEnvironment();
-// Create the remote environment
+// 创建远程执行环境
 ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... 
jarFiles);
 {{< /highlight >}}
 
 
 {{< highlight "java" >}}
-// Create the execution environment
+// 创建执行环境
 StreamExecutionEnvironment.getExecutionEnvironment();
-// Create the local execution environment
+// 创建本地执行环境
 StreamExecutionEnvironment.createLocalEnvironment();
-// The collection environment is not supported.
-// Create the remote environment
+// 不支持 collection 环境
+// 创建远程执行环境
 StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, 
String... jarFiles);
 {{< /highlight >}}
 
 
 
 
 
-Unlike DataSet, DataStream supports processing on both bounded and unbounded 
data streams. Thus, user needs to explicitly set the execution mode
-to `RuntimeExecutionMode.BATCH` if that is expected.
+与 DataSet 不同,DataStream 支持对有界和无界数据流进行处理。
+
+如果需要的话,用户可以显式地将执行模式设置为 `RuntimeExecutionMode.BATCH`。
 
 ```java
 StreamExecutionEnvironment executionEnvironment = // [...];
 executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
 ```
 
-## Using the streaming sources and sinks
+## 流读和流写

Review Comment:
   more clear indeed :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-12 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1390593913


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java:
##
@@ -76,6 +76,6 @@ public ResourceProfile getResourceProfile() {
 
 @Override
 public String toString() {
-return "SlotSharingGroup " + this.ids.toString();
+return "SlotSharingGroup{" + "ids=" + ids + ", resourceProfile=" + 
resourceProfile + '}';

Review Comment:
   Should belong to a seperate hotfix commit.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##
@@ -31,22 +35,26 @@ class ExecutionSlotSharingGroup {
 
 private final Set executionVertexIds;
 
-private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+@Nonnull private final SlotSharingGroup slotSharingGroup;

Review Comment:
   IIUC, we change it to `SlotSharingGroup` only for test? I don't think that's 
a good practice.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+public static final Logger LOG =
+
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+TaskBalancedPreferredSlotSharingStrategy(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+super(topology, slotSharingGroups, coLocationGroups);
+}
+
+@Override
+protected Map 
computeExecutionToESsgMap(
+SchedulingTopology schedulingTopology) {
+return new TaskBalancedExecutionSlotSharingGroupBuilder(
+schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+.build();
+}
+
+static class Factory implements SlotSharingStrategy.Factory {
+
+public TaskBalancedPreferredSlotSharingStrategy create(
+final SchedulingTopology topology,
+final Set logicalSlotSharingGroups,
+final Set coLocationGroups) {
+
+return new TaskBalancedPreferredSlotSharingStrategy(
+topology, logicalSlotSharingGroups, coLocationGroups);
+}
+}
+
+/** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+private final SchedulingTopology topology;
+
+private final Map slotSharingGroupMap;
+
+/**
+ * Record the number of {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s.
+ */
+private final Map> 
ssgToExecutionSSGs;
+
+/**
+ * Record the

[PR] Add release 1.17.2 [flink-web]

2023-11-12 Thread via GitHub


Myasuka opened a new pull request, #696:
URL: https://github.com/apache/flink-web/pull/696

   Add the content of release 1.17.2 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33445][docs-zh] Translate DataSet migration guideline to Chinese [flink]

2023-11-12 Thread via GitHub


liyubin117 commented on code in PR #23666:
URL: https://github.com/apache/flink/pull/23666#discussion_r1390673821


##
docs/content.zh/docs/dev/datastream/dataset_migration.md:
##
@@ -134,10 +131,9 @@ DataStreamSource<> source = 
StreamExecutionEnvironment.createInput(inputFormat)
 
 
 
-### Sinks
+### 写

Review Comment:
   done



##
docs/content.zh/docs/dev/datastream/dataset_migration.md:
##
@@ -62,44 +58,45 @@ The first step of migrating an application from DataSet API 
to DataStream API is
 
 
 {{< highlight "java" >}}
-// Create the execution environment
+// 创建执行环境
 ExecutionEnvironment.getExecutionEnvironment();
-// Create the local execution environment
+// 创建本地执行环境
 ExecutionEnvironment.createLocalEnvironment();
-// Create the collection environment
+// 创建 collection 环境
 new CollectionEnvironment();
-// Create the remote environment
+// 创建远程执行环境
 ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... 
jarFiles);
 {{< /highlight >}}
 
 
 {{< highlight "java" >}}
-// Create the execution environment
+// 创建执行环境
 StreamExecutionEnvironment.getExecutionEnvironment();
-// Create the local execution environment
+// 创建本地执行环境
 StreamExecutionEnvironment.createLocalEnvironment();
-// The collection environment is not supported.
-// Create the remote environment
+// 不支持 collection 环境
+// 创建远程执行环境
 StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, 
String... jarFiles);
 {{< /highlight >}}
 
 
 
 
 
-Unlike DataSet, DataStream supports processing on both bounded and unbounded 
data streams. Thus, user needs to explicitly set the execution mode
-to `RuntimeExecutionMode.BATCH` if that is expected.
+与 DataSet 不同,DataStream 支持对有界和无界数据流进行处理。
+
+如果需要的话,用户可以显式地将执行模式设置为 `RuntimeExecutionMode.BATCH`。
 
 ```java
 StreamExecutionEnvironment executionEnvironment = // [...];
 executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
 ```
 
-## Using the streaming sources and sinks
+## 流读和流写
 
-### Sources
+### 读

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33445][docs-zh] Translate DataSet migration guideline to Chinese [flink]

2023-11-12 Thread via GitHub


liyubin117 commented on code in PR #23666:
URL: https://github.com/apache/flink/pull/23666#discussion_r1390674299


##
docs/content.zh/docs/dev/datastream/dataset_migration.md:
##
@@ -679,19 +673,19 @@ dataStream3.join(dataStream4)
 
 
 
-### Category 4
+### 第四类
 
-The behaviors of the following DataSet APIs are not supported by DataStream.
+以下 DataSet API 的行为不被 DataStream 支持。
 
 * RangePartition
 * GroupCombine
 
 
-## Appendix
+## 附录
 
  EndOfStreamWindows
 
-The following code shows the example of `EndOfStreamWindows`.
+以下代码显示了 `EndOfStreamWindows` 示例实现。

Review Comment:
   done



##
docs/content.zh/docs/dev/datastream/dataset_migration.md:
##
@@ -761,7 +755,8 @@ public class EndOfStreamWindows extends 
WindowAssigner {
 
  AddSubtaskIDMapFunction
 
-The following code shows the example of `AddSubtaskIDMapFunction`.
+以下代码显示了 `AddSubtaskIDMapFunction` 示例实现。

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Add flink-shaded 16.2 release [flink-web]

2023-11-12 Thread via GitHub


reswqa opened a new pull request, #697:
URL: https://github.com/apache/flink-web/pull/697

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-12 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1390238967


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java:
##
@@ -295,6 +294,19 @@ public static SlotManagerConfiguration fromConfiguration(
 redundantTaskManagerNum);
 }
 
+@VisibleForTesting
+public static SlotMatchingStrategy 
getSlotMatchingStrategy(TaskManagerLoadBalanceMode mode) {

Review Comment:
   ```suggestion
  static SlotMatchingStrategy 
getSlotMatchingStrategy(TaskManagerLoadBalanceMode mode) {
   ```
   
   Is the default enough here? I see all caller are the same package.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+public static final Logger LOG =
+
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+TaskBalancedPreferredSlotSharingStrategy(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+super(topology, slotSharingGroups, coLocationGroups);
+}
+
+@Override
+protected Map 
computeExecutionToESsgMap(
+SchedulingTopology schedulingTopology) {
+return new TaskBalancedExecutionSlotSharingGroupBuilder(
+schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+.build();
+}
+
+static class Factory implements SlotSharingStrategy.Factory {
+
+public TaskBalancedPreferredSlotSharingStrategy create(
+final SchedulingTopology topology,
+final Set logicalSlotSharingGroups,
+final Set coLocationGroups) {
+
+return new TaskBalancedPreferredSlotSharingStrategy(
+topology, logicalSlotSharingGroups, coLocationGroups);
+}
+}
+
+/** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+private final SchedulingTopology topology;
+
+private final Map slotSharingGroupMap;
+
+/**
+ * Record the number of {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s.
+ */
+private final Map> 
ssgToExecutionSSGs;
+
+/**
+ * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+ * SlotSharingGroup}s.
+ */
+private final Map slotRoundRobinIndex;
+
+private final Map
+executionSlotSharingGroupMap;
+
+private final Map coLocationGroupMap;
+
+private final Map 
clcToES

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-12 Thread via GitHub


1996fanrui commented on PR #23635:
URL: https://github.com/apache/flink/pull/23635#issuecomment-1807589290

   Hi @KarmaGYZ , thanks for your hard review!
   
   > I think this PR contains two components. First would be a supplement of 
[FLINK-33448](https://issues.apache.org/jira/browse/FLINK-33448). Second is 
part of the TASKS strategy. I think we may split it into two seperate commit. 
   
   Split it makes sense, it's clearer.
   
   > It would be better to include 
[FLINK-33388](https://issues.apache.org/jira/browse/FLINK-33388) and introduce 
TASKS strategy.
   
   Would you mind if we keep them into multiple PRs? I'm afraid one PR has a 
lot of commits and changes is hard to review. Of course, only one PR is 
acceptable for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26490) Adjust the MaxParallelism or remove the MaxParallelism check when unnecessary.

2023-11-12 Thread Yuan Mei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785394#comment-17785394
 ] 

Yuan Mei commented on FLINK-26490:
--

Sorry to respond late. Just saw this ticket recently.

I think we should be cautious about loosing checks for MaxParallelism.
 # In general, MaxParallelism is not expected to be changed (at least changed 
often) after starting a job. A more strict check on "MaxParallelism" is not too 
big a problem.
 # The other thing is should OPstate be completely not affected by 
"MaxParallelism". I know it is not affected today, but if we change the 
contract between "MaxParallelism" and "State", we need to think of all these 
corner cases if we make any changes later.

 

I am concerned about making such changes.

> Adjust the MaxParallelism or remove the MaxParallelism check when unnecessary.
> --
>
> Key: FLINK-26490
> URL: https://issues.apache.org/jira/browse/FLINK-26490
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: chenfengLiu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> Since Flink introduce key group and MaxParallelism, Flink can rescale with 
> less cost.
> But when we want to update the job parallelism bigger than the 
> MaxParallelism, it 's impossible cause there are so many MaxParallelism check 
> that require new parallelism should not bigger than MaxParallelism. 
> Actually, when an operator which don't contain keyed state, there should be 
> no problem when update the parallelism bigger than the MaxParallelism,, cause 
> only keyed state need MaxParallelism and key group.
> So should we remove this check or auto adjust the MaxParallelism when we 
> restore an operator state that don't contain keyed state?
> It can make job restore from checkpoint easier.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32380) Support Java records

2023-11-12 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785396#comment-17785396
 ] 

xuyang commented on FLINK-32380:


[~gyfora] Thanks for your quick reply! I'll try it.

> Support Java records
> 
>
> Key: FLINK-32380
> URL: https://issues.apache.org/jira/browse/FLINK-32380
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Reporter: Chesnay Schepler
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-11-13-10-19-06-035.png
>
>
> Reportedly Java records are not supported, because they are neither detected 
> by our Pojo serializer nor supported by Kryo 2.x



--
This message was sent by Atlassian Jira
(v8.20.10#820010)