This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git


The following commit(s) were added to refs/heads/master by this push:
     new 09a5cba  [FLINK-25708] Bump Flink dependency to 1.14.3
09a5cba is described below

commit 09a5cba521e9f994896c746ec9f8cc6479403612
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Wed Jan 19 14:46:48 2022 +0100

    [FLINK-25708] Bump Flink dependency to 1.14.3
    
    This closes #283.
---
 pom.xml                                            |  5 ++++-
 statefun-e2e-tests/pom.xml                         | 22 ++++++++++++++++++++
 .../statefun-smoke-e2e-common/pom.xml              |  2 +-
 statefun-flink/pom.xml                             | 24 ++++++++++++++++++++++
 statefun-flink/statefun-flink-common/pom.xml       |  1 -
 statefun-flink/statefun-flink-core/pom.xml         |  2 +-
 .../flink/core/common/MailboxExecutorFacade.java   |  2 +-
 .../flink/core/feedback/FeedbackSinkOperator.java  |  6 ------
 .../flink/core/feedback/FeedbackUnionOperator.java |  8 +-------
 .../feedback/FeedbackUnionOperatorFactory.java     |  1 +
 .../functions/FunctionGroupDispatchFactory.java    |  1 +
 .../core/functions/FunctionGroupOperator.java      | 11 +---------
 .../flink/core/functions/ReductionsTest.java       |  9 ++++++--
 statefun-flink/statefun-flink-distribution/pom.xml |  4 ++--
 .../src/main/resources/META-INF/NOTICE             |  2 +-
 statefun-flink/statefun-flink-extensions/pom.xml   |  1 -
 statefun-flink/statefun-flink-launcher/pom.xml     |  2 +-
 .../operator/FunctionsStateBootstrapOperator.java  |  2 +-
 tools/docker/Dockerfile                            |  2 +-
 19 files changed, 70 insertions(+), 37 deletions(-)

diff --git a/pom.xml b/pom.xml
index c221c1c..8434e95 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,8 +79,11 @@ under the License.
         <protobuf.version>3.7.1</protobuf.version>
         <unixsocket.version>2.3.2</unixsocket.version>
         
<protoc-jar-maven-plugin.version>3.11.1</protoc-jar-maven-plugin.version>
-        <flink.version>1.13.2</flink.version>
+        <flink.version>1.14.3</flink.version>
         <scala.binary.version>2.12</scala.binary.version>
+        <scala.version>2.12.7</scala.version>
+        <lz4-java.version>1.8.0</lz4-java.version>
+        
<flink-shaded-jackson.version>2.12.4-14.0</flink-shaded-jackson.version>
         <test.unit.pattern>**/*Test.*</test.unit.pattern>
     </properties>
 
diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index 353e8c9..f2e850c 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -44,6 +44,28 @@ under the License.
         <module>statefun-smoke-e2e-js</module>
     </modules>
 
+    <dependencyManagement>
+        <dependencies>
+            <!--
+            Pin the scala library version in order to resolve the dependency 
conversion problem between two
+            transitive scala-library versions in flink-scala
+            -->
+            <dependency>
+                <groupId>org.scala-lang</groupId>
+                <artifactId>scala-library</artifactId>
+                <version>${scala.version}</version>
+            </dependency>
+            <!--
+            Pin version to avoid conflicts between flink-runtime and 
kafka-clients
+            -->
+            <dependency>
+                <groupId>org.lz4</groupId>
+                <artifactId>lz4-java</artifactId>
+                <version>${lz4-java.version}</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
     <build>
         <plugins>
             <plugin>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml 
b/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml
index 6e7c54d..c09e5b3 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml
@@ -63,7 +63,7 @@ under the License.
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-jackson</artifactId>
-            <version>2.12.1-13.0</version>
+            <version>${flink-shaded-jackson.version}</version>
         </dependency>
 
         <!-- logging -->
diff --git a/statefun-flink/pom.xml b/statefun-flink/pom.xml
index 0b1e334..4a9337f 100644
--- a/statefun-flink/pom.xml
+++ b/statefun-flink/pom.xml
@@ -175,6 +175,30 @@ under the License.
                 <artifactId>snappy-java</artifactId>
                 <version>1.1.4</version>
             </dependency>
+            <!--
+            Pin the scala library version in order to resolve the dependency 
conversion problem between two
+            transitive scala-library versions in flink-scala
+            -->
+            <dependency>
+                <groupId>org.scala-lang</groupId>
+                <artifactId>scala-library</artifactId>
+                <version>${scala.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-shaded-jackson</artifactId>
+                <version>${flink-shaded-jackson.version}</version>
+            </dependency>
+
+            <!--
+            Pin version to avoid conflicts between flink-runtime and 
kafka-clients
+            -->
+            <dependency>
+                <groupId>org.lz4</groupId>
+                <artifactId>lz4-java</artifactId>
+                <version>${lz4-java.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/statefun-flink/statefun-flink-common/pom.xml 
b/statefun-flink/statefun-flink-common/pom.xml
index 28655d6..6728f41 100644
--- a/statefun-flink/statefun-flink-common/pom.xml
+++ b/statefun-flink/statefun-flink-common/pom.xml
@@ -55,7 +55,6 @@ under the License.
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-jackson</artifactId>
-            <version>2.12.1-13.0</version>
         </dependency>
 
         <!-- tests -->
diff --git a/statefun-flink/statefun-flink-core/pom.xml 
b/statefun-flink/statefun-flink-core/pom.xml
index 5c79728..9f88206 100644
--- a/statefun-flink/statefun-flink-core/pom.xml
+++ b/statefun-flink/statefun-flink-core/pom.xml
@@ -32,7 +32,7 @@ under the License.
     <properties>
         <okhttp.version>3.14.6</okhttp.version>
         
<additional-sources.dir>target/additional-sources</additional-sources.dir>
-        
<flink-shaded-netty.version>4.1.49.Final-13.0</flink-shaded-netty.version>
+        
<flink-shaded-netty.version>4.1.65.Final-14.0</flink-shaded-netty.version>
     </properties>
 
     <dependencies>
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/MailboxExecutorFacade.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/MailboxExecutorFacade.java
index 26c871d..a93d1dd 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/MailboxExecutorFacade.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/MailboxExecutorFacade.java
@@ -19,7 +19,7 @@ package org.apache.flink.statefun.flink.core.common;
 
 import java.util.Objects;
 import java.util.concurrent.Executor;
-import org.apache.flink.streaming.api.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.MailboxExecutor;
 
 public final class MailboxExecutorFacade implements Executor {
   private final MailboxExecutor executor;
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java
index 4970a51..374343c 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java
@@ -92,10 +92,4 @@ public final class FeedbackSinkOperator<V> extends 
AbstractStreamOperator<Void>
     IOUtils.closeQuietly(channel);
     super.close();
   }
-
-  @Override
-  public void dispose() throws Exception {
-    IOUtils.closeQuietly(channel);
-    super.dispose();
-  }
 }
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
index fb68cc4..f18d617 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.statefun.flink.core.feedback;
 import java.util.Objects;
 import java.util.OptionalLong;
 import java.util.concurrent.Executor;
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
@@ -32,7 +33,6 @@ import 
org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLogger;
 import 
org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLoggerFactory;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.MailboxExecutor;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -156,12 +156,6 @@ public final class FeedbackUnionOperator<T> extends 
AbstractStreamOperator<T>
     super.close();
   }
 
-  @Override
-  public void dispose() throws Exception {
-    closeInternally();
-    super.dispose();
-  }
-
   // 
------------------------------------------------------------------------------------------------------------------
   // Helpers
   // 
------------------------------------------------------------------------------------------------------------------
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
index a2c563e..f747f3d 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.feedback;
 
 import java.util.Objects;
 import java.util.OptionalLong;
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.common.SerializableFunction;
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupDispatchFactory.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupDispatchFactory.java
index 4a994a4..7cdb41b 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupDispatchFactory.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupDispatchFactory.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.functions;
 
 import java.util.Map;
 import java.util.Objects;
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index 8dcd01b..7e9f75e 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
@@ -43,7 +44,6 @@ import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.MailboxExecutor;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -165,15 +165,6 @@ public class FunctionGroupOperator extends 
AbstractStreamOperator<Message>
     }
   }
 
-  @Override
-  public void dispose() throws Exception {
-    try {
-      closeOrDispose();
-    } finally {
-      super.dispose();
-    }
-  }
-
   private void closeOrDispose() {
     final List<ManagingResources> managingResources = this.managingResources;
     if (managingResources == null) {
diff --git 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
index cf3b19a..ad65827 100644
--- 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
+++ 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
@@ -61,6 +61,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedStateBackend;
@@ -70,7 +71,7 @@ import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTran
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.internal.InternalListState;
-import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors;
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.MoreExecutors;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
 import org.apache.flink.statefun.flink.core.TestUtils;
 import 
org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve;
@@ -83,6 +84,7 @@ import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.junit.Test;
@@ -195,7 +197,7 @@ public class ReductionsTest {
     }
 
     @Override
-    public MetricGroup getMetricGroup() {
+    public OperatorMetricGroup getMetricGroup() {
       throw new UnsupportedOperationException();
     }
 
@@ -574,6 +576,9 @@ public class ReductionsTest {
     public void emitWatermark(Watermark mark) {}
 
     @Override
+    public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}
+
+    @Override
     public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {}
 
     @Override
diff --git a/statefun-flink/statefun-flink-distribution/pom.xml 
b/statefun-flink/statefun-flink-distribution/pom.xml
index b62436f..7608897 100644
--- a/statefun-flink/statefun-flink-distribution/pom.xml
+++ b/statefun-flink/statefun-flink-distribution/pom.xml
@@ -103,13 +103,13 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+            <artifactId>flink-runtime</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-optimizer_${scala.binary.version}</artifactId>
+            <artifactId>flink-optimizer</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
diff --git 
a/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE 
b/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
index 750b1f1..430cac9 100644
--- 
a/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
+++ 
b/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
@@ -22,7 +22,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - org.checkerframework:checker-qual:2.11.1
 - org.apache.commons:commons-lang3:3.3.2
 - org.apache.kafka:kafka-clients:2.4.1
-- org.lz4:lz4-java:1.6.0
+- org.lz4:lz4-java:1.8.0
 - com.squareup.okhttp3:okhttp:3.14.6
 - com.squareup.okio:okio:1.17.2
 - com.fasterxml.jackson.core:jackson-databind:2.12.1
diff --git a/statefun-flink/statefun-flink-extensions/pom.xml 
b/statefun-flink/statefun-flink-extensions/pom.xml
index 96b77c7..9500b1d 100644
--- a/statefun-flink/statefun-flink-extensions/pom.xml
+++ b/statefun-flink/statefun-flink-extensions/pom.xml
@@ -39,7 +39,6 @@ under the License.
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-jackson</artifactId>
-            <version>2.12.1-13.0</version>
         </dependency>
 
         <dependency>
diff --git a/statefun-flink/statefun-flink-launcher/pom.xml 
b/statefun-flink/statefun-flink-launcher/pom.xml
index 4f07bef..85ff83e 100644
--- a/statefun-flink/statefun-flink-launcher/pom.xml
+++ b/statefun-flink/statefun-flink-launcher/pom.xml
@@ -33,7 +33,7 @@ under the License.
         <!-- Flink -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+            <artifactId>flink-runtime</artifactId>
             <version>${flink.version}</version>
         </dependency>
         <dependency>
diff --git 
a/statefun-flink/statefun-flink-state-processor/src/main/java/org/apache/flink/statefun/flink/state/processor/operator/FunctionsStateBootstrapOperator.java
 
b/statefun-flink/statefun-flink-state-processor/src/main/java/org/apache/flink/statefun/flink/state/processor/operator/FunctionsStateBootstrapOperator.java
index 7df4194..fbc9d5e 100644
--- 
a/statefun-flink/statefun-flink-state-processor/src/main/java/org/apache/flink/statefun/flink/state/processor/operator/FunctionsStateBootstrapOperator.java
+++ 
b/statefun-flink/statefun-flink-state-processor/src/main/java/org/apache/flink/statefun/flink/state/processor/operator/FunctionsStateBootstrapOperator.java
@@ -84,7 +84,7 @@ public final class FunctionsStateBootstrapOperator
             snapshotTimestamp,
             true,
             false,
-            getContainingTask().getCheckpointStorage(),
+            
getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(),
             snapshotPath);
 
     output.collect(new StreamRecord<>(state));
diff --git a/tools/docker/Dockerfile b/tools/docker/Dockerfile
index 70912d6..7f1a021 100644
--- a/tools/docker/Dockerfile
+++ b/tools/docker/Dockerfile
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-FROM apache/flink:1.13.2-scala_2.12-java8
+FROM apache/flink:1.14.3-scala_2.12-java8
 
 ENV ROLE worker
 ENV MASTER_HOST localhost

Reply via email to