[flink] branch master updated: [FLINK-31424][table-planner] Fix NPE produced by multiple sink with local-global window aggregate

2023-03-22 Thread lincoln
This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 824259ecf53 [FLINK-31424][table-planner] Fix NPE produced by multiple 
sink with local-global window aggregate
824259ecf53 is described below

commit 824259ecf53fd82061d177b508594e6510e69060
Author: Jane Chan <55568005+ladyfor...@users.noreply.github.com>
AuthorDate: Wed Mar 22 16:30:58 2023 +0800

[FLINK-31424][table-planner] Fix NPE produced by multiple sink with 
local-global window aggregate

This closes #2
---
 .../plan/metadata/FlinkRelMdWindowProperties.scala |  40 ++-
 .../table/planner/plan/stats/FlinkStatistic.scala  |   6 +-
 .../plan/stream/sql/join/WindowJoinTest.xml|  72 +
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  | 318 -
 .../metadata/FlinkRelMdWindowPropertiesTest.scala  | 184 
 .../planner/plan/metadata/MetadataTestUtil.scala   |  36 +++
 .../plan/stream/sql/join/WindowJoinTest.scala  |  60 
 7 files changed, 708 insertions(+), 8 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
index 30087585974..a688302e914 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
@@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.metadata
 
 import org.apache.flink.table.planner.{JArrayList, JHashMap, JList}
 import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
+import org.apache.flink.table.planner.plan.logical.WindowSpec
 import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, 
WatermarkAssigner}
 import 
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate, 
FlinkLogicalCorrelate, FlinkLogicalJoin, FlinkLogicalRank}
 import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
@@ -27,6 +28,7 @@ import 
org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase
 import 
org.apache.flink.table.planner.plan.utils.WindowJoinUtil.satisfyWindowJoin
 import 
org.apache.flink.table.planner.plan.utils.WindowUtil.{convertToWindowingStrategy,
 groupingContainsWindowStartEnd, isWindowTableFunctionCall}
 import org.apache.flink.table.runtime.groupwindow._
+import org.apache.flink.table.types.logical.LogicalType
 
 import org.apache.calcite.plan.hep.HepRelVertex
 import org.apache.calcite.plan.volcano.RelSubset
@@ -254,11 +256,34 @@ class FlinkRelMdWindowProperties private extends 
MetadataHandler[FlinkMetadata.W
   def getWindowProperties(
   rel: StreamPhysicalWindowAggregate,
   mq: RelMetadataQuery): RelWindowProperties = {
+getWindowAggregateWindowProperties(
+  rel.grouping.length + rel.aggCalls.size(),
+  rel.namedWindowProperties,
+  rel.windowing.getWindow,
+  rel.windowing.getTimeAttributeType
+)
+  }
+
+  def getWindowProperties(
+  rel: StreamPhysicalGlobalWindowAggregate,
+  mq: RelMetadataQuery): RelWindowProperties = {
+getWindowAggregateWindowProperties(
+  rel.grouping.length + rel.aggCalls.size(),
+  rel.namedWindowProperties,
+  rel.windowing.getWindow,
+  rel.windowing.getTimeAttributeType
+)
+  }
+
+  private def getWindowAggregateWindowProperties(
+  propertyOffset: Int,
+  windowProperties: Seq[NamedWindowProperty],
+  windowSpec: WindowSpec,
+  timeAttributeType: LogicalType): RelWindowProperties = {
 val starts = ArrayBuffer[Int]()
 val ends = ArrayBuffer[Int]()
 val times = ArrayBuffer[Int]()
-val propertyOffset = rel.grouping.length + rel.aggCalls.size()
-rel.namedWindowProperties.map(_.getProperty).zipWithIndex.foreach {
+windowProperties.map(_.getProperty).zipWithIndex.foreach {
   case (p, index) =>
 p match {
   case _: WindowStart =>
@@ -275,11 +300,18 @@ class FlinkRelMdWindowProperties private extends 
MetadataHandler[FlinkMetadata.W
   ImmutableBitSet.of(starts: _*),
   ImmutableBitSet.of(ends: _*),
   ImmutableBitSet.of(times: _*),
-  rel.windowing.getWindow,
-  rel.windowing.getTimeAttributeType
+  windowSpec,
+  timeAttributeType
 )
   }
 
+  def getWindowProperties(
+  rel: StreamPhysicalLocalWindowAggregate,
+  mq: RelMetadataQuery): RelWindowProperties = {
+val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
+fmq.getRelWindowProperties(rel.getInput)
+  }
+
   def getWindowProperties(
   rel: StreamPhysicalWindowRank,
   mq: RelMetadataQuery): Re

[flink-connector-cassandra] branch main updated: [FLINK-26822] Add Cassandra Source

2023-03-22 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git


The following commit(s) were added to refs/heads/main by this push:
 new 72e3bef  [FLINK-26822] Add Cassandra Source
72e3bef is described below

commit 72e3bef1fb9ee6042955b5e9871a9f70a8837cca
Author: Etienne Chauchot 
AuthorDate: Wed Mar 22 09:38:17 2023 +0100

[FLINK-26822] Add Cassandra Source
---
 flink-connector-cassandra/pom.xml  |   7 +
 .../cassandra/source/CassandraSource.java  | 203 +
 .../enumerator/CassandraEnumeratorState.java   | 167 ++
 .../CassandraEnumeratorStateSerializer.java| 106 +
 .../enumerator/CassandraSplitEnumerator.java   | 148 +
 .../source/reader/CassandraRecordEmitter.java  | 138 
 .../cassandra/source/reader/CassandraRow.java  |  46 
 .../source/reader/CassandraSourceReader.java   | 104 +
 .../reader/CassandraSourceReaderFactory.java   |  55 +
 .../source/reader/CassandraSplitReader.java| 205 +
 .../cassandra/source/split/CassandraSplit.java |  75 +++
 .../source/split/CassandraSplitSerializer.java |  63 ++
 .../cassandra/source/split/SplitsGenerator.java| 242 +
 .../source/utils/BigIntegerSerializationUtils.java |  40 
 .../cassandra/example/BatchPojoExample.java|   1 +
 .../cassandra/source/CassandraSourceITCase.java| 234 
 .../cassandra/source/CassandraTestContext.java | 161 ++
 .../cassandra/source/CassandraTestEnvironment.java | 196 +
 .../CassandraEnumeratorStateSerializerTest.java|  58 +
 .../source/reader/CassandraQueryTest.java  | 119 ++
 .../source/split/CassandraSplitSerializerTest.java |  43 
 .../cassandra/utils}/Pojo.java |  26 ++-
 tools/maven/suppressions.xml   |   2 +-
 23 files changed, 2437 insertions(+), 2 deletions(-)

diff --git a/flink-connector-cassandra/pom.xml 
b/flink-connector-cassandra/pom.xml
index fa78a65..58d70b4 100644
--- a/flink-connector-cassandra/pom.xml
+++ b/flink-connector-cassandra/pom.xml
@@ -78,6 +78,13 @@ under the License.
provided

 
+   
+   org.apache.flink
+   flink-connector-base
+   ${flink.version}
+   provided
+   
+

org.scala-lang
scala-library
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java
new file mode 100644
index 000..dd45913
--- /dev/null
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState;
+import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer;
+imp

[flink-web] branch asf-site updated: Add flink-connector-parent 1.0.0

2023-03-22 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new df678144b Add flink-connector-parent 1.0.0
df678144b is described below

commit df678144b162d42e2c2b38546584b8fa32451db5
Author: Chesnay Schepler 
AuthorDate: Wed Mar 15 16:38:51 2023 +0100

Add flink-connector-parent 1.0.0
---
 docs/data/additional_components.yml | 6 ++
 1 file changed, 6 insertions(+)

diff --git a/docs/data/additional_components.yml 
b/docs/data/additional_components.yml
index e406e67a4..e5ae97d20 100644
--- a/docs/data/additional_components.yml
+++ b/docs/data/additional_components.yml
@@ -15,6 +15,12 @@
 # specific language governing permissions and limitations
 # under the License
 
+flink-connector-parent:
+  name: "Apache Flink-connector-parent 1.0.0 Source release"
+  source_release_url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-connector-parent-1.0.0/flink-connector-parent-1.0.0-src.tgz";
+  source_release_asc_url: 
"https://downloads.apache.org/flink/flink-connector-parent-1.0.0/flink-connector-parent-1.0.0-src.tgz.asc";
+  source_release_sha512_url: 
"https://downloads.apache.org/flink/flink-connector-parent-1.0.0/flink-connector-parent-1.0.0-src.tgz.sha512";
+
 flink-shaded:
   name: "Apache Flink-shaded 16.1 Source Release"
   source_release_url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-shaded-16.1/flink-shaded-16.1-src.tgz";



[flink-web] branch asf-site updated: Rebuild website

2023-03-22 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 756d107b4 Rebuild website
756d107b4 is described below

commit 756d107b4d37273f60cac9e2eeb05d73e5438cfa
Author: Chesnay Schepler 
AuthorDate: Wed Mar 22 10:15:51 2023 +0100

Rebuild website
---
 content/downloads/index.html| 7 +++
 content/zh/downloads/index.html | 7 +++
 2 files changed, 14 insertions(+)

diff --git a/content/downloads/index.html b/content/downloads/index.html
index ed28e9a5d..12116a96f 100644
--- a/content/downloads/index.html
+++ b/content/downloads/index.html
@@ -945,6 +945,7 @@ https://github.com/alex-shpak/hugo-book
 Pre-bundled Hadoop 2.6.5
 Pre-bundled Hadoop 2.4.1
 Apache 
Flink-shaded 16.1 Source Release
+Apache 
Flink-connector-parent 1.0.0 Source release
   
 
 Verifying Hashes and 
Signatures
@@ -1264,6 +1265,11 @@ under the License.
   #
 
 https://www.apache.org/dyn/closer.lua/flink/flink-shaded-16.1/flink-shaded-16.1-src.tgz";>Apache
 Flink-shaded 16.1 Source Release Source Release https://downloads.apache.org/flink/flink-shaded-16.1/flink-shaded-16.1-src.tgz.asc";>(asc,
 https://downloads.apache.org/flink/flink-shaded-16.1/flink-shaded-16.1-src.tgz.sha512";>sha512)
+
+  Apache Flink-connector-parent 1.0.0 Source release
+  #
+
+https://www.apache.org/dyn/closer.lua/flink/flink-connector-parent-1.0.0/flink-connector-parent-1.0.0-src.tgz";>Apache
 Flink-connector-parent 1.0.0 Source release Source Release https://downloads.apache.org/flink/flink-connector-parent-1.0.0/flink-connector-parent-1.0.0-src.tgz.asc";>(asc,
 https://downloads.apache.org/flink/flink-connector-parent-1.0.0/flink-connector-parent-1.0.0-src.tgz.sha512";>sha512)
 
   Verifying Hashes and Signatures
   #
@@ -1625,6 +1631,7 @@ The statefun-flink-harness dependency 
includes a local execution en
 Pre-bundled Hadoop 2.6.5
 Pre-bundled Hadoop 2.4.1
 Apache 
Flink-shaded 16.1 Source Release
+Apache 
Flink-connector-parent 1.0.0 Source release
   
 
 Verifying Hashes and 
Signatures
diff --git a/content/zh/downloads/index.html b/content/zh/downloads/index.html
index 4fbc133d2..1e4f553b1 100644
--- a/content/zh/downloads/index.html
+++ b/content/zh/downloads/index.html
@@ -945,6 +945,7 @@ https://github.com/alex-shpak/hugo-book
 Pre-bundled Hadoop 2.6.5
 Pre-bundled Hadoop 2.4.1
 Apache 
Flink-shaded 16.1 Source Release
+Apache 
Flink-connector-parent 1.0.0 Source release
   
 
 验证哈希和签名
@@ -1264,6 +1265,11 @@ under the License.
   #
 
 https://www.apache.org/dyn/closer.lua/flink/flink-shaded-16.1/flink-shaded-16.1-src.tgz";>Apache
 Flink-shaded 16.1 Source Release Source Release https://downloads.apache.org/flink/flink-shaded-16.1/flink-shaded-16.1-src.tgz.asc";>(asc,
 https://downloads.apache.org/flink/flink-shaded-16.1/flink-shaded-16.1-src.tgz.sha512";>sha512)
+
+  Apache Flink-connector-parent 1.0.0 Source release
+  #
+
+https://www.apache.org/dyn/closer.lua/flink/flink-connector-parent-1.0.0/flink-connector-parent-1.0.0-src.tgz";>Apache
 Flink-connector-parent 1.0.0 Source release Source Release https://downloads.apache.org/flink/flink-connector-parent-1.0.0/flink-connector-parent-1.0.0-src.tgz.asc";>(asc,
 https://downloads.apache.org/flink/flink-connector-parent-1.0.0/flink-connector-parent-1.0.0-src.tgz.sha512";>sha512)
 
   验证哈希和签名
   #
@@ -1626,6 +1632,7 @@ under the License.
 Pre-bundled Hadoop 2.6.5
 Pre-bundled Hadoop 2.4.1
 Apache 
Flink-shaded 16.1 Source Release
+Apache 
Flink-connector-parent 1.0.0 Source release
   
 
 验证哈希和签名



[flink-connector-cassandra] branch main updated: [hotfix] Update to Apache flink-connector-parent pom.

2023-03-22 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git


The following commit(s) were added to refs/heads/main by this push:
 new a250223  [hotfix] Update to Apache flink-connector-parent pom.
a250223 is described below

commit a2502236e87b3ec65b7b69c1e34a213927f4036f
Author: Chesnay Schepler 
AuthorDate: Wed Mar 22 10:22:35 2023 +0100

[hotfix] Update to Apache flink-connector-parent pom.
---
 pom.xml | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/pom.xml b/pom.xml
index 5b721b2..85ce634 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,14 +20,13 @@ under the License.
xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
 

-   io.github.zentol.flink
+   org.apache.flink
flink-connector-parent
-   1.0
+   1.0.0

 
4.0.0
 
-   org.apache.flink
flink-connector-cassandra-parent
4.0-SNAPSHOT
Flink : Connectors : Cassandra : Parent



[flink] branch master updated: [FLINK-31513][doc] Fix incorrect path in tpcds-tool README (#22212)

2023-03-22 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 18869f6dfd0 [FLINK-31513][doc] Fix incorrect path in tpcds-tool README 
(#22212)
18869f6dfd0 is described below

commit 18869f6dfd04abd017cf110932c48a33e91c6bb5
Author: leixin <1403342...@qq.com>
AuthorDate: Wed Mar 22 18:22:26 2023 +0800

[FLINK-31513][doc] Fix incorrect path in tpcds-tool README (#22212)

This closes #22212
---
 flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/README.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/README.md 
b/flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/README.md
index 999ab8524da..b3039f3144d 100644
--- a/flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/README.md
+++ b/flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/README.md
@@ -21,7 +21,7 @@ $ export 
FLINK_DIR=/flink/flink-dist/target/flink-${flink-ve
 $ export END_TO_END_DIR=/flink/flink-end-to-end-tests  
# set end to end tests directory
 $ mkdir -p /dir_to_save_genarator
 $ mkdir -p /dir_to_save_data
-$ cd /flink/flink-end-to-end-tests/tpcds-tool
+$ cd 
/flink/flink-end-to-end-tests/flink-tpcds-test/tpcds-tool
 $ sh data_generator.sh /dir_to_save_genarator 1 
/dir_to_save_data 
/flink/flink-end-to-end-tests/test-scripts
 ```
 The downloaded generator will be saved to 
`/dir_to_save_genarator`, the generated data will be saved 
to `/dir_to_save_data`.



[flink] branch master updated: [FLINK-31527][tests] Stabilize ChangelogRescalingITCase

2023-03-22 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9ba3b1a5863 [FLINK-31527][tests] Stabilize ChangelogRescalingITCase
9ba3b1a5863 is described below

commit 9ba3b1a5863c1aeeca0be25b4bb375abfe02b940
Author: Roman Khachatryan 
AuthorDate: Tue Mar 21 21:08:27 2023 +

[FLINK-31527][tests] Stabilize ChangelogRescalingITCase
---
 .../org/apache/flink/test/state/ChangelogRescalingITCase.java  | 10 --
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
index 6891eafadae..10ba869f6a8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
@@ -329,7 +329,7 @@ public class ChangelogRescalingITCase extends TestLogger {
 private String checkpointAndCancel(JobID jobID) throws Exception {
 waitForCheckpoint(jobID, cluster.getMiniCluster(), 1);
 cluster.getClusterClient().cancel(jobID).get();
-checkStatus(jobID);
+waitForSuccessfulTermination(jobID);
 return CommonTestUtils.getLatestCompletedCheckpointPath(jobID, 
cluster.getMiniCluster())
 .orElseThrow(
 () -> {
@@ -337,7 +337,13 @@ public class ChangelogRescalingITCase extends TestLogger {
 });
 }
 
-private void checkStatus(JobID jobID) throws InterruptedException, 
ExecutionException {
+private void waitForSuccessfulTermination(JobID jobID) throws Exception {
+CommonTestUtils.waitUntilCondition(
+() ->
+cluster.getClusterClient()
+.getJobStatus(jobID)
+.get()
+.isGloballyTerminalState());
 if 
(cluster.getClusterClient().getJobStatus(jobID).get().isGloballyTerminalState())
 {
 cluster.getClusterClient()
 .requestJobResult(jobID)



[flink] branch release-1.16 updated (e891bfc156d -> 80ee512f00a)

2023-03-22 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


from e891bfc156d [python][examples] Add Python streaming word count examples
 add 80ee512f00a [FLINK-31527][tests] Stabilize ChangelogRescalingITCase

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/test/state/ChangelogRescalingITCase.java  | 10 --
 1 file changed, 8 insertions(+), 2 deletions(-)



[flink-kubernetes-operator] branch main updated: [FLINK-31407] Bump fabric8 version to 6.5.0

2023-03-22 Thread mbalassi
This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 91cf2338 [FLINK-31407] Bump fabric8 version to 6.5.0
91cf2338 is described below

commit 91cf2338c9bd54b1bad7680c231a59facc12c8b9
Author: Márton Balassi 
AuthorDate: Wed Mar 22 22:25:26 2023 +0100

[FLINK-31407] Bump fabric8 version to 6.5.0
---
 .../operator/autoscaler/AutoScalerInfo.java|   2 +-
 .../operator/service/AbstractFlinkService.java |   1 -
 .../operator/utils/KubernetesClientUtils.java  |   4 +-
 .../kubernetes/operator/utils/StatusRecorder.java  |   4 +-
 .../flink/kubernetes/operator/TestUtils.java   |   2 +-
 .../metrics/KubernetesClientMetricsTest.java   |   2 +-
 .../operator/utils/KubernetesClientUtilsTest.java  |  20 +--
 .../Fabric8FlinkStandaloneKubeClient.java  |   5 +-
 .../crds/flinkdeployments.flink.apache.org-v1.yml  | 153 +
 pom.xml|   2 +-
 10 files changed, 169 insertions(+), 26 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
index 8850b95a..a5462219 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -181,7 +181,7 @@ public class AutoScalerInfo {
 
 public void replaceInKubernetes(KubernetesClient client) throws Exception {
 trimHistoryToMaxCmSize();
-client.resource(configMap).replace();
+client.resource(configMap).update();
 }
 
 @VisibleForTesting
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 4c5d9fcc..97262daf 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -808,7 +808,6 @@ public abstract class AbstractFlinkService implements 
FlinkService {
 .inNamespace(namespace)
 .withName(
 
ExternalServiceDecorator.getExternalServiceName(clusterId))
-.fromServer()
 .get();
 if (service == null) {
 serviceRunning = false;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
index d52bfc67..d6724adf 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
@@ -73,12 +73,12 @@ public class KubernetesClientUtils {
 
 public static > void applyToStoredCr(
 KubernetesClient kubernetesClient, T cr, Consumer function) {
-var inKube = kubernetesClient.resource(cr).fromServer().get();
+var inKube = kubernetesClient.resource(cr).get();
 Long localGeneration = cr.getMetadata().getGeneration();
 Long serverGeneration = inKube.getMetadata().getGeneration();
 if (serverGeneration.equals(localGeneration)) {
 function.accept(inKube);
-kubernetesClient.resource(inKube).lockResourceVersion().replace();
+kubernetesClient.resource(inKube).lockResourceVersion().update();
 } else {
 LOG.info(
 "Spec already upgrading in kube (generation - local: {} 
server: {}), skipping scale operation.",
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index b1bec2f7..e009a7ce 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -120,7 +120,7 @@ public class StatusRecorder<
 int retries = 0;
 while (true) {
 try {
-var updated = 

[flink] branch master updated (9ba3b1a5863 -> e91eb5ec2fe)

2023-03-22 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 9ba3b1a5863 [FLINK-31527][tests] Stabilize ChangelogRescalingITCase
 add e91eb5ec2fe [FLINK-31466][table] Apply Calcite's optimization of 
pushing filter to semijoin for FilterJoinRule

No new revisions were added by this update.

Summary of changes:
 .../plan/rules/logical/FlinkFilterJoinRule.java| 27 +--
 .../sql/join/BroadcastHashSemiAntiJoinTest.xml | 38 +++
 .../batch/sql/join/NestedLoopSemiAntiJoinTest.xml  | 50 +--
 .../plan/batch/sql/join/SemiAntiJoinTest.xml   | 52 ++--
 .../sql/join/ShuffledHashSemiAntiJoinTest.xml  | 42 
 .../batch/sql/join/SortMergeSemiAntiJoinTest.xml   | 42 
 .../logical/CalcPruneAggregateCallRuleTest.xml | 16 +++
 .../plan/rules/logical/FlinkFilterJoinRuleTest.xml |  9 ++--
 .../logical/ProjectPruneAggregateCallRuleTest.xml  | 56 --
 .../plan/stream/sql/join/SemiAntiJoinTest.xml  | 54 ++---
 .../stream/sql/SemiAntiJoinStreamITCase.scala  | 27 ++-
 11 files changed, 219 insertions(+), 194 deletions(-)



[flink] branch master updated: [FLINK-31230][yarn] Improve YarnClusterDescriptor memory unit display.

2023-03-22 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 753da61bb36 [FLINK-31230][yarn] Improve YarnClusterDescriptor memory 
unit display.
753da61bb36 is described below

commit 753da61bb366df35df4bc2c2ebbbe7f75e8237d3
Author: slfan1989 
AuthorDate: Wed Mar 22 19:33:34 2023 +0800

[FLINK-31230][yarn] Improve YarnClusterDescriptor memory unit display.
---
 .../apache/flink/yarn/YARNSessionFIFOITCase.java   |  2 +-
 .../apache/flink/yarn/YarnClusterDescriptor.java   | 28 ++
 2 files changed, 19 insertions(+), 11 deletions(-)

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index a57b9b75bc3..8c19fd17e35 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -244,7 +244,7 @@ class YARNSessionFIFOITCase extends YarnTestBase {
 log.info("Starting testQueryCluster()");
 runWithArgs(
 new String[] {"-q"},
-"Summary: totalMemory 8192 totalCores 1332",
+"Summary: totalMemory 8.000gb (8589934592 bytes) 
totalCores 1332",
 null,
 RunTypes.YARN_SESSION,
 0); // we have 666*2 cores.
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 95fe09f9c06..8231fd6fa59 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -38,6 +38,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.RestOptions;
@@ -97,6 +98,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -844,7 +846,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor {
 
 final ApplicationId appId = appContext.getApplicationId();
 
-// -- Add Zookeeper namespace to local 
flinkConfiguration --
+// -- Add Zookeeper namespace to local 
flinkConfiguraton --
 setHAClusterIdIfNotSet(configuration, appId);
 
 if 
(HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
@@ -1229,7 +1231,6 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor {
 
 LOG.info("Waiting for the cluster to be allocated");
 final long startTime = System.currentTimeMillis();
-long lastLogTime = System.currentTimeMillis();
 ApplicationReport report;
 YarnApplicationState lastAppState = YarnApplicationState.NEW;
 loop:
@@ -1265,11 +1266,9 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor {
 if (appState != lastAppState) {
 LOG.info("Deploying cluster, current state " + 
appState);
 }
-if (System.currentTimeMillis() - lastLogTime > 6) {
-lastLogTime = System.currentTimeMillis();
+if (System.currentTimeMillis() - startTime > 6) {
 LOG.info(
-"Deployment took more than {} seconds. Please 
check if the requested resources are available in the YARN cluster",
-(lastLogTime - startTime) / 1000);
+"Deployment took more than 60 seconds. Please 
check if the requested resources are available in the YARN cluster");
 }
 }
 lastAppState = appState;
@@ -1443,13 +1442,17 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor {
 totalMemory += res.getMemory();
 totalCores += res.getVirtualCores();
 ps.format(format, "NodeID", rep.getNodeId());
- 

[flink] branch release-1.17 updated: [FLINK-31424][table-planner] Fix NPE produced by multiple sink with local-global window aggregate

2023-03-22 Thread lincoln
This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new 71f23066562 [FLINK-31424][table-planner] Fix NPE produced by multiple 
sink with local-global window aggregate
71f23066562 is described below

commit 71f23066562018910e5599719d2a1bc0eb17f430
Author: Jane Chan <55568005+ladyfor...@users.noreply.github.com>
AuthorDate: Thu Mar 23 12:04:42 2023 +0800

[FLINK-31424][table-planner] Fix NPE produced by multiple sink with 
local-global window aggregate

This closes #22238
---
 .../plan/metadata/FlinkRelMdWindowProperties.scala |  40 ++-
 .../table/planner/plan/stats/FlinkStatistic.scala  |   6 +-
 .../plan/stream/sql/join/WindowJoinTest.xml|  72 +
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  | 318 -
 .../metadata/FlinkRelMdWindowPropertiesTest.scala  | 184 
 .../planner/plan/metadata/MetadataTestUtil.scala   |  36 +++
 .../plan/stream/sql/join/WindowJoinTest.scala  |  60 
 7 files changed, 708 insertions(+), 8 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
index 30087585974..a688302e914 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
@@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.metadata
 
 import org.apache.flink.table.planner.{JArrayList, JHashMap, JList}
 import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
+import org.apache.flink.table.planner.plan.logical.WindowSpec
 import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, 
WatermarkAssigner}
 import 
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate, 
FlinkLogicalCorrelate, FlinkLogicalJoin, FlinkLogicalRank}
 import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
@@ -27,6 +28,7 @@ import 
org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase
 import 
org.apache.flink.table.planner.plan.utils.WindowJoinUtil.satisfyWindowJoin
 import 
org.apache.flink.table.planner.plan.utils.WindowUtil.{convertToWindowingStrategy,
 groupingContainsWindowStartEnd, isWindowTableFunctionCall}
 import org.apache.flink.table.runtime.groupwindow._
+import org.apache.flink.table.types.logical.LogicalType
 
 import org.apache.calcite.plan.hep.HepRelVertex
 import org.apache.calcite.plan.volcano.RelSubset
@@ -254,11 +256,34 @@ class FlinkRelMdWindowProperties private extends 
MetadataHandler[FlinkMetadata.W
   def getWindowProperties(
   rel: StreamPhysicalWindowAggregate,
   mq: RelMetadataQuery): RelWindowProperties = {
+getWindowAggregateWindowProperties(
+  rel.grouping.length + rel.aggCalls.size(),
+  rel.namedWindowProperties,
+  rel.windowing.getWindow,
+  rel.windowing.getTimeAttributeType
+)
+  }
+
+  def getWindowProperties(
+  rel: StreamPhysicalGlobalWindowAggregate,
+  mq: RelMetadataQuery): RelWindowProperties = {
+getWindowAggregateWindowProperties(
+  rel.grouping.length + rel.aggCalls.size(),
+  rel.namedWindowProperties,
+  rel.windowing.getWindow,
+  rel.windowing.getTimeAttributeType
+)
+  }
+
+  private def getWindowAggregateWindowProperties(
+  propertyOffset: Int,
+  windowProperties: Seq[NamedWindowProperty],
+  windowSpec: WindowSpec,
+  timeAttributeType: LogicalType): RelWindowProperties = {
 val starts = ArrayBuffer[Int]()
 val ends = ArrayBuffer[Int]()
 val times = ArrayBuffer[Int]()
-val propertyOffset = rel.grouping.length + rel.aggCalls.size()
-rel.namedWindowProperties.map(_.getProperty).zipWithIndex.foreach {
+windowProperties.map(_.getProperty).zipWithIndex.foreach {
   case (p, index) =>
 p match {
   case _: WindowStart =>
@@ -275,11 +300,18 @@ class FlinkRelMdWindowProperties private extends 
MetadataHandler[FlinkMetadata.W
   ImmutableBitSet.of(starts: _*),
   ImmutableBitSet.of(ends: _*),
   ImmutableBitSet.of(times: _*),
-  rel.windowing.getWindow,
-  rel.windowing.getTimeAttributeType
+  windowSpec,
+  timeAttributeType
 )
   }
 
+  def getWindowProperties(
+  rel: StreamPhysicalLocalWindowAggregate,
+  mq: RelMetadataQuery): RelWindowProperties = {
+val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
+fmq.getRelWindowProperties(rel.getInput)
+  }
+
   def getWindowProperties(
   rel: StreamPhysicalWindowRank,
   mq: RelMetada

[flink] branch master updated (753da61bb36 -> 103e5f9fd0f)

2023-03-22 Thread renqs
This is an automated email from the ASF dual-hosted git repository.

renqs pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 753da61bb36 [FLINK-31230][yarn] Improve YarnClusterDescriptor memory 
unit display.
 add 103e5f9fd0f [FLINK-31157][release] Add release note for version 1.17 
(#22146)

No new revisions were added by this update.

Summary of changes:
 docs/content.zh/_index.md   |  12 +-
 docs/content.zh/release-notes/flink-1.17.md | 213 
 docs/content/_index.md  |  13 +-
 docs/content/release-notes/flink-1.17.md| 213 
 4 files changed, 449 insertions(+), 2 deletions(-)
 create mode 100644 docs/content.zh/release-notes/flink-1.17.md
 create mode 100644 docs/content/release-notes/flink-1.17.md



[flink] branch master updated: [FLINK-31149][docs] Update compatibility matrix for release 1.17 (#22244)

2023-03-22 Thread renqs
This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1009196d1ff [FLINK-31149][docs] Update compatibility matrix for 
release 1.17 (#22244)
1009196d1ff is described below

commit 1009196d1ffe2d83ccab46608c3581e25d12fcc3
Author: Qingsheng Ren 
AuthorDate: Thu Mar 23 14:58:06 2023 +0800

[FLINK-31149][docs] Update compatibility matrix for release 1.17 (#22244)
---
 docs/content.zh/docs/ops/upgrading.md | 38 +++
 docs/content/docs/ops/upgrading.md| 38 +++
 2 files changed, 76 insertions(+)

diff --git a/docs/content.zh/docs/ops/upgrading.md 
b/docs/content.zh/docs/ops/upgrading.md
index 3bb53660250..6e6540bb649 100644
--- a/docs/content.zh/docs/ops/upgrading.md
+++ b/docs/content.zh/docs/ops/upgrading.md
@@ -262,6 +262,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   1.14.x
   1.15.x
   1.16.x
+  1.17.x
   限制
 
   
@@ -284,6 +285,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   
   
   
+  
   从 Flink 1.1.x 迁移到 1.2.x+ 的作业的最大并行度目前固定为作业的并行度。
 这意味着迁移后无法增加并行度。
 在未来的错误修复版本中可能会删除此限制。
@@ -306,6 +308,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   O
   O
   O
+  
   
  从 Flink 1.2.x 迁移到 Flink 1.3.x+ 时,不支持同时更改并行度。
 迁移到 Flink 1.3.x+ 后,
@@ -334,6 +337,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   O
   O
   O
+  
   M如果 Savepoint 包含 Scala 案例类,则从 Flink 1.3.0 迁移到 
Flink 1.4.[0,1] 将失败。用户必须直接迁移到 1.4.2+。
 
 
@@ -354,6 +358,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   O
   O
   O
+  
   
 
 
@@ -374,6 +379,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   O
   O
   O
+  
   在 1.6.x 到 1.6.2 和 1.7.0 版本中恢复使用 1.5.x 
创建的广播状态存在一个已知问题:https://issues.apache.org/jira/browse/FLINK-11087";>FLINK-11087. 
 升级到 1.6.x 或 1.7.x 系列的用户需要
 直接迁移到次要版本分别高于 1.6.2 和 1.7.0。
@@ -396,6 +402,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   O
   O
   O
+  
   
 
 
@@ -416,6 +423,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   O
   O
   O
+  
   
 
 
@@ -436,6 +444,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   O
   O
   O
+  O
   
 
 
@@ -456,6 +465,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   O
   O
   O
+  O
   
 
 
@@ -476,6 +486,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   O
   O
   O
+  O
   
 
 
@@ -496,6 +507,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   O
   O
   O
+  O
   
 
 
@@ -516,6 +528,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   O
   O
   O
+  O
   
 
 
@@ -536,6 +549,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   O
   O
   O
+  O
   Don't upgrade from 1.12.x to 1.13.x with an 
unaligned checkpoint. Please use a savepoint for migrating.
 
 
@@ -556,6 +570,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   O
   O
   O
+  O
   
 
 
@@ -576,6 +591,7 @@ $ bin/flink run -s :savepointPath [:runArgs]
   
   O
   O
+  O
   
 For Table API: 1.15.0 and 1.15.1 generated non-deterministic UIDs 
for operators that 
 make it difficult/impossible to restore state or upgrade to next 
patch version. A new 
@@ -604,6 +620,28 @@ $ bin/flink run -s :savepointPath [:runArgs]
   
   
   O
+  O
+  
+
+
+  1.17.x
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  O
   
 
   
diff --git a/docs/content/docs/ops/upgrading.md 
b/docs/content/docs/ops/upgrading.md
index 827bab87eac..c79bf808125 100644
--- a/docs/content/docs/ops/upgrading.md
+++ b/docs/content/docs/ops/upgrading.md
@@ -264,6 +264,7 @@ Savepoints are compatible across Flink versions as 
indicated by the table below:
   1.14.x
   1.15.x
   1.16.x
+  1.17.x
   Limitations
 
   
@@ -286,6 +287,7 @@ Savepoints are compatible across Flink versions as 
indicated by the table below:
   
   
   
+  
   The maximum parallelism of a job that was 
migrated from Flink 1.1.x to 1.2.x+ is
   currentl