This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
The following commit(s) were added to refs/heads/master by this push: new 09a5cba [FLINK-25708] Bump Flink dependency to 1.14.3 09a5cba is described below commit 09a5cba521e9f994896c746ec9f8cc6479403612 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Wed Jan 19 14:46:48 2022 +0100 [FLINK-25708] Bump Flink dependency to 1.14.3 This closes #283. --- pom.xml | 5 ++++- statefun-e2e-tests/pom.xml | 22 ++++++++++++++++++++ .../statefun-smoke-e2e-common/pom.xml | 2 +- statefun-flink/pom.xml | 24 ++++++++++++++++++++++ statefun-flink/statefun-flink-common/pom.xml | 1 - statefun-flink/statefun-flink-core/pom.xml | 2 +- .../flink/core/common/MailboxExecutorFacade.java | 2 +- .../flink/core/feedback/FeedbackSinkOperator.java | 6 ------ .../flink/core/feedback/FeedbackUnionOperator.java | 8 +------- .../feedback/FeedbackUnionOperatorFactory.java | 1 + .../functions/FunctionGroupDispatchFactory.java | 1 + .../core/functions/FunctionGroupOperator.java | 11 +--------- .../flink/core/functions/ReductionsTest.java | 9 ++++++-- statefun-flink/statefun-flink-distribution/pom.xml | 4 ++-- .../src/main/resources/META-INF/NOTICE | 2 +- statefun-flink/statefun-flink-extensions/pom.xml | 1 - statefun-flink/statefun-flink-launcher/pom.xml | 2 +- .../operator/FunctionsStateBootstrapOperator.java | 2 +- tools/docker/Dockerfile | 2 +- 19 files changed, 70 insertions(+), 37 deletions(-) diff --git a/pom.xml b/pom.xml index c221c1c..8434e95 100644 --- a/pom.xml +++ b/pom.xml @@ -79,8 +79,11 @@ under the License. <protobuf.version>3.7.1</protobuf.version> <unixsocket.version>2.3.2</unixsocket.version> <protoc-jar-maven-plugin.version>3.11.1</protoc-jar-maven-plugin.version> - <flink.version>1.13.2</flink.version> + <flink.version>1.14.3</flink.version> <scala.binary.version>2.12</scala.binary.version> + <scala.version>2.12.7</scala.version> + <lz4-java.version>1.8.0</lz4-java.version> + <flink-shaded-jackson.version>2.12.4-14.0</flink-shaded-jackson.version> <test.unit.pattern>**/*Test.*</test.unit.pattern> </properties> diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml index 353e8c9..f2e850c 100644 --- a/statefun-e2e-tests/pom.xml +++ b/statefun-e2e-tests/pom.xml @@ -44,6 +44,28 @@ under the License. <module>statefun-smoke-e2e-js</module> </modules> + <dependencyManagement> + <dependencies> + <!-- + Pin the scala library version in order to resolve the dependency conversion problem between two + transitive scala-library versions in flink-scala + --> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + <!-- + Pin version to avoid conflicts between flink-runtime and kafka-clients + --> + <dependency> + <groupId>org.lz4</groupId> + <artifactId>lz4-java</artifactId> + <version>${lz4-java.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + <build> <plugins> <plugin> diff --git a/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml index 6e7c54d..c09e5b3 100644 --- a/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml +++ b/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml @@ -63,7 +63,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-jackson</artifactId> - <version>2.12.1-13.0</version> + <version>${flink-shaded-jackson.version}</version> </dependency> <!-- logging --> diff --git a/statefun-flink/pom.xml b/statefun-flink/pom.xml index 0b1e334..4a9337f 100644 --- a/statefun-flink/pom.xml +++ b/statefun-flink/pom.xml @@ -175,6 +175,30 @@ under the License. <artifactId>snappy-java</artifactId> <version>1.1.4</version> </dependency> + <!-- + Pin the scala library version in order to resolve the dependency conversion problem between two + transitive scala-library versions in flink-scala + --> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-jackson</artifactId> + <version>${flink-shaded-jackson.version}</version> + </dependency> + + <!-- + Pin version to avoid conflicts between flink-runtime and kafka-clients + --> + <dependency> + <groupId>org.lz4</groupId> + <artifactId>lz4-java</artifactId> + <version>${lz4-java.version}</version> + </dependency> </dependencies> </dependencyManagement> diff --git a/statefun-flink/statefun-flink-common/pom.xml b/statefun-flink/statefun-flink-common/pom.xml index 28655d6..6728f41 100644 --- a/statefun-flink/statefun-flink-common/pom.xml +++ b/statefun-flink/statefun-flink-common/pom.xml @@ -55,7 +55,6 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-jackson</artifactId> - <version>2.12.1-13.0</version> </dependency> <!-- tests --> diff --git a/statefun-flink/statefun-flink-core/pom.xml b/statefun-flink/statefun-flink-core/pom.xml index 5c79728..9f88206 100644 --- a/statefun-flink/statefun-flink-core/pom.xml +++ b/statefun-flink/statefun-flink-core/pom.xml @@ -32,7 +32,7 @@ under the License. <properties> <okhttp.version>3.14.6</okhttp.version> <additional-sources.dir>target/additional-sources</additional-sources.dir> - <flink-shaded-netty.version>4.1.49.Final-13.0</flink-shaded-netty.version> + <flink-shaded-netty.version>4.1.65.Final-14.0</flink-shaded-netty.version> </properties> <dependencies> diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/MailboxExecutorFacade.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/MailboxExecutorFacade.java index 26c871d..a93d1dd 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/MailboxExecutorFacade.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/MailboxExecutorFacade.java @@ -19,7 +19,7 @@ package org.apache.flink.statefun.flink.core.common; import java.util.Objects; import java.util.concurrent.Executor; -import org.apache.flink.streaming.api.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.MailboxExecutor; public final class MailboxExecutorFacade implements Executor { private final MailboxExecutor executor; diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java index 4970a51..374343c 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java @@ -92,10 +92,4 @@ public final class FeedbackSinkOperator<V> extends AbstractStreamOperator<Void> IOUtils.closeQuietly(channel); super.close(); } - - @Override - public void dispose() throws Exception { - IOUtils.closeQuietly(channel); - super.dispose(); - } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java index fb68cc4..f18d617 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java @@ -20,6 +20,7 @@ package org.apache.flink.statefun.flink.core.feedback; import java.util.Objects; import java.util.OptionalLong; import java.util.concurrent.Executor; +import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; @@ -32,7 +33,6 @@ import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLogger; import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLoggerFactory; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.MailboxExecutor; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -156,12 +156,6 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T> super.close(); } - @Override - public void dispose() throws Exception { - closeInternally(); - super.dispose(); - } - // ------------------------------------------------------------------------------------------------------------------ // Helpers // ------------------------------------------------------------------------------------------------------------------ diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java index a2c563e..f747f3d 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.feedback; import java.util.Objects; import java.util.OptionalLong; +import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig; import org.apache.flink.statefun.flink.core.common.SerializableFunction; diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupDispatchFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupDispatchFactory.java index 4a994a4..7cdb41b 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupDispatchFactory.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupDispatchFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.functions; import java.util.Map; import java.util.Objects; +import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig; import org.apache.flink.statefun.flink.core.message.Message; import org.apache.flink.statefun.sdk.io.EgressIdentifier; diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java index 8dcd01b..7e9f75e 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; +import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; @@ -43,7 +44,6 @@ import org.apache.flink.statefun.sdk.StatefulFunctionProvider; import org.apache.flink.statefun.sdk.io.EgressIdentifier; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.MailboxExecutor; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -165,15 +165,6 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message> } } - @Override - public void dispose() throws Exception { - try { - closeOrDispose(); - } finally { - super.dispose(); - } - } - private void closeOrDispose() { final List<ManagingResources> managingResources = this.managingResources; if (managingResources == null) { diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java index cf3b19a..ad65827 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java @@ -61,6 +61,7 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; import org.apache.flink.runtime.state.Keyed; import org.apache.flink.runtime.state.KeyedStateBackend; @@ -70,7 +71,7 @@ import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTran import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.apache.flink.runtime.state.internal.InternalListState; -import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors; +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.MoreExecutors; import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse; import org.apache.flink.statefun.flink.core.TestUtils; import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve; @@ -83,6 +84,7 @@ import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.OutputTag; import org.apache.flink.util.function.BiConsumerWithException; import org.junit.Test; @@ -195,7 +197,7 @@ public class ReductionsTest { } @Override - public MetricGroup getMetricGroup() { + public OperatorMetricGroup getMetricGroup() { throw new UnsupportedOperationException(); } @@ -574,6 +576,9 @@ public class ReductionsTest { public void emitWatermark(Watermark mark) {} @Override + public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {} + + @Override public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {} @Override diff --git a/statefun-flink/statefun-flink-distribution/pom.xml b/statefun-flink/statefun-flink-distribution/pom.xml index b62436f..7608897 100644 --- a/statefun-flink/statefun-flink-distribution/pom.xml +++ b/statefun-flink/statefun-flink-distribution/pom.xml @@ -103,13 +103,13 @@ under the License. </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${scala.binary.version}</artifactId> + <artifactId>flink-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-optimizer_${scala.binary.version}</artifactId> + <artifactId>flink-optimizer</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> diff --git a/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE b/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE index 750b1f1..430cac9 100644 --- a/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE +++ b/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE @@ -22,7 +22,7 @@ This project bundles the following dependencies under the Apache Software Licens - org.checkerframework:checker-qual:2.11.1 - org.apache.commons:commons-lang3:3.3.2 - org.apache.kafka:kafka-clients:2.4.1 -- org.lz4:lz4-java:1.6.0 +- org.lz4:lz4-java:1.8.0 - com.squareup.okhttp3:okhttp:3.14.6 - com.squareup.okio:okio:1.17.2 - com.fasterxml.jackson.core:jackson-databind:2.12.1 diff --git a/statefun-flink/statefun-flink-extensions/pom.xml b/statefun-flink/statefun-flink-extensions/pom.xml index 96b77c7..9500b1d 100644 --- a/statefun-flink/statefun-flink-extensions/pom.xml +++ b/statefun-flink/statefun-flink-extensions/pom.xml @@ -39,7 +39,6 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-jackson</artifactId> - <version>2.12.1-13.0</version> </dependency> <dependency> diff --git a/statefun-flink/statefun-flink-launcher/pom.xml b/statefun-flink/statefun-flink-launcher/pom.xml index 4f07bef..85ff83e 100644 --- a/statefun-flink/statefun-flink-launcher/pom.xml +++ b/statefun-flink/statefun-flink-launcher/pom.xml @@ -33,7 +33,7 @@ under the License. <!-- Flink --> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${scala.binary.version}</artifactId> + <artifactId>flink-runtime</artifactId> <version>${flink.version}</version> </dependency> <dependency> diff --git a/statefun-flink/statefun-flink-state-processor/src/main/java/org/apache/flink/statefun/flink/state/processor/operator/FunctionsStateBootstrapOperator.java b/statefun-flink/statefun-flink-state-processor/src/main/java/org/apache/flink/statefun/flink/state/processor/operator/FunctionsStateBootstrapOperator.java index 7df4194..fbc9d5e 100644 --- a/statefun-flink/statefun-flink-state-processor/src/main/java/org/apache/flink/statefun/flink/state/processor/operator/FunctionsStateBootstrapOperator.java +++ b/statefun-flink/statefun-flink-state-processor/src/main/java/org/apache/flink/statefun/flink/state/processor/operator/FunctionsStateBootstrapOperator.java @@ -84,7 +84,7 @@ public final class FunctionsStateBootstrapOperator snapshotTimestamp, true, false, - getContainingTask().getCheckpointStorage(), + getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), snapshotPath); output.collect(new StreamRecord<>(state)); diff --git a/tools/docker/Dockerfile b/tools/docker/Dockerfile index 70912d6..7f1a021 100644 --- a/tools/docker/Dockerfile +++ b/tools/docker/Dockerfile @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM apache/flink:1.13.2-scala_2.12-java8 +FROM apache/flink:1.14.3-scala_2.12-java8 ENV ROLE worker ENV MASTER_HOST localhost