This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch release-2.56.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.56.0 by this push: new 50b73de3336 [runners-flink] remove 1.12 and 1.13 runners (#31020) 50b73de3336 is described below commit 50b73de333614888db174140f72371db4cdbdbe5 Author: Jan Lukavský <je...@seznam.cz> AuthorDate: Wed Apr 17 21:56:06 2024 +0200 [runners-flink] remove 1.12 and 1.13 runners (#31020) --- gradle.properties | 2 +- release/build.gradle.kts | 2 +- runners/flink/1.12/build.gradle | 25 ------ .../flink/1.12/job-server-container/build.gradle | 26 ------ runners/flink/1.12/job-server/build.gradle | 31 ------- .../streaming/AbstractStreamOperatorCompat.java | 94 ---------------------- .../io/source/compat/FlinkSourceCompat.java | 31 ------- .../io/source/compat/SplitEnumeratorCompat.java | 27 ------- .../beam/runners/flink/RemoteMiniClusterImpl.java | 68 ---------------- .../runners/flink/metrics/MetricGroupWrapper.java | 31 ------- .../runners/flink/streaming/StreamSources.java | 72 ----------------- .../streaming/io/source/SourceTestCompat.java | 75 ----------------- runners/flink/1.13/build.gradle | 25 ------ .../flink/1.13/job-server-container/build.gradle | 26 ------ runners/flink/1.13/job-server/build.gradle | 31 ------- .../translation/types/CoderTypeSerializer.java | 0 .../streaming/ProcessingTimeCallbackCompat.java | 0 .../io/source/compat/SplitEnumeratorCompat.java | 0 .../streaming/io/source/compat/package-info.java | 0 .../beam/runners/flink/MiniClusterCompat.java | 0 .../translation/types/CoderTypeSerializerTest.java | 0 runners/flink/flink_runner.gradle | 17 ++-- .../streaming/io/source/FlinkSourceReaderBase.java | 2 - sdks/go/examples/stringsplit/stringsplit.go | 2 +- sdks/java/testing/nexmark/build.gradle | 2 +- sdks/java/testing/tpcds/build.gradle | 2 +- settings.gradle.kts | 8 -- .../content/en/documentation/dsls/sql/shell.md | 6 +- .../shortcodes/flink_java_pipeline_options.html | 8 +- .../shortcodes/flink_python_pipeline_options.html | 8 +- 30 files changed, 22 insertions(+), 599 deletions(-) diff --git a/gradle.properties b/gradle.properties index 5b07dd6c2b4..8fd8e9b4742 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,6 +39,6 @@ docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ # supported flink versions -flink_versions=1.12,1.13,1.14,1.15,1.16,1.17 +flink_versions=1.14,1.15,1.16,1.17 # supported python versions python_versions=3.8,3.9,3.10,3.11 diff --git a/release/build.gradle.kts b/release/build.gradle.kts index abb34d8605a..4e4586f666f 100644 --- a/release/build.gradle.kts +++ b/release/build.gradle.kts @@ -39,7 +39,7 @@ task("runJavaExamplesValidationTask") { dependsOn(":runners:direct-java:runQuickstartJavaDirect") dependsOn(":runners:google-cloud-dataflow-java:runQuickstartJavaDataflow") dependsOn(":runners:spark:3:runQuickstartJavaSpark") - dependsOn(":runners:flink:1.13:runQuickstartJavaFlinkLocal") + dependsOn(":runners:flink:1.17:runQuickstartJavaFlinkLocal") dependsOn(":runners:direct-java:runMobileGamingJavaDirect") dependsOn(":runners:google-cloud-dataflow-java:runMobileGamingJavaDataflow") dependsOn(":runners:twister2:runQuickstartJavaTwister2") diff --git a/runners/flink/1.12/build.gradle b/runners/flink/1.12/build.gradle deleted file mode 100644 index 2acee16c6e8..00000000000 --- a/runners/flink/1.12/build.gradle +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -project.ext { - flink_major = '1.12' - flink_version = '1.12.7' -} - -// Load the main build script which contains all build logic. -apply from: "../flink_runner.gradle" diff --git a/runners/flink/1.12/job-server-container/build.gradle b/runners/flink/1.12/job-server-container/build.gradle deleted file mode 100644 index afdb68a0fc9..00000000000 --- a/runners/flink/1.12/job-server-container/build.gradle +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server-container' - -project.ext { - resource_path = basePath -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.12/job-server/build.gradle b/runners/flink/1.12/job-server/build.gradle deleted file mode 100644 index 890082042c0..00000000000 --- a/runners/flink/1.12/job-server/build.gradle +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server' - -project.ext { - // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] - test_source_dirs = ["$basePath/src/test/java"] - main_resources_dirs = ["$basePath/src/main/resources"] - test_resources_dirs = ["$basePath/src/test/resources"] - archives_base_name = 'beam-runners-flink-1.12-job-server' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java deleted file mode 100644 index 5072e6b2459..00000000000 --- a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; -import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; - -/** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */ -public abstract class AbstractStreamOperatorCompat<OutputT> - extends AbstractStreamOperator<OutputT> { - - /** - * Getter for timeServiceManager, which has been made private in Flink 1.11. - * - * @return Time service manager. - */ - protected InternalTimeServiceManager<?> getTimeServiceManagerCompat() { - return getTimeServiceManager() - .orElseThrow(() -> new IllegalStateException("Time service manager is not set.")); - } - - /** - * This call has been removed from {@link AbstractStreamOperator} in Flink 1.12. - * - * <p>{@link InternalTimeServiceManagerImpl#numProcessingTimeTimers()} - */ - protected int numProcessingTimeTimers() { - return getTimeServiceManager() - .map( - manager -> { - InternalTimeServiceManager<?> tsm = getTimeServiceManagerCompat(); - if (tsm instanceof InternalTimeServiceManagerImpl) { - final InternalTimeServiceManagerImpl<?> cast = - (InternalTimeServiceManagerImpl<?>) getTimeServiceManagerCompat(); - return cast.numProcessingTimeTimers(); - } else if (tsm instanceof BatchExecutionInternalTimeServiceManager) { - return 0; - } else { - throw new IllegalStateException( - String.format( - "Unknown implementation of InternalTimerServiceManager. %s", tsm)); - } - }) - .orElse(0); - } - - /** Release all of the operator's resources. */ - abstract void cleanUp() throws Exception; - - /** Flush all remaining buffered data. */ - abstract void flushData() throws Exception; - - // Prior to Flink 1.14, dispose() releases the operator's resources, while close() flushes - // remaining data and then releases the operator's resources. - // https://issues.apache.org/jira/browse/FLINK-22972 - - @Override - public void dispose() throws Exception { - try { - cleanUp(); - } finally { - // This releases all task's resources. We need to call this last - // to ensure that state, timers, or output buffers can still be - // accessed during finishing the bundle. - super.dispose(); - } - } - - @Override - public void close() throws Exception { - try { - flushData(); - } finally { - super.close(); - } - } -} diff --git a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java deleted file mode 100644 index 2f9e69fe4f7..00000000000 --- a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat; - -import org.apache.flink.api.connector.source.SourceReaderContext; -import org.apache.flink.metrics.Counter; -import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; - -public class FlinkSourceCompat { - - public static Counter getNumRecordsInCounter(SourceReaderContext context) { - return ((OperatorMetricGroup) context.metricGroup()) - .getIOMetricGroup() - .getNumRecordsInCounter(); - } -} diff --git a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java deleted file mode 100644 index d6bed940470..00000000000 --- a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat; - -import org.apache.flink.api.connector.source.SourceSplit; -import org.apache.flink.api.connector.source.SplitEnumerator; - -public interface SplitEnumeratorCompat<SplitT extends SourceSplit, CheckpointT> - extends SplitEnumerator<SplitT, CheckpointT> { - - CheckpointT snapshotState(long checkpointId) throws Exception; -} diff --git a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java deleted file mode 100644 index 5d8bebd6dff..00000000000 --- a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; - -/** A {@link MiniCluster} which allows remote connections for the end-to-end test. */ -public class RemoteMiniClusterImpl extends RemoteMiniCluster { - - private String jobManagerBindAddress; - private int port; - - public RemoteMiniClusterImpl(MiniClusterConfiguration miniClusterConfiguration) { - super(miniClusterConfiguration); - jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress(); - } - - @Override - protected RpcService createLocalRpcService(Configuration configuration) throws Exception { - // Enable remote connections to the mini cluster which are disabled by default - final RpcService rpcService = - AkkaRpcServiceUtils.remoteServiceBuilder( - configuration, jobManagerBindAddress, String.valueOf(0)) - .withBindAddress(jobManagerBindAddress) - .withBindPort(0) - .withCustomConfig(AkkaUtils.testDispatcherConfig()) - .createAndStart(); - this.port = rpcService.getPort(); - - return rpcService; - } - - @Override - public int getClusterPort() { - Preconditions.checkState(port > 0, "Port not yet initialized. Start the cluster first."); - return port; - } - - @Override - public int getRestPort() { - try { - return getRestAddress().get().getPort(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} diff --git a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java deleted file mode 100644 index 797c09b0929..00000000000 --- a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.metrics; - -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; - -/** - * Compatibility layer for {@link MetricGroup} breaking changes made in Flink 1.14 - * (https://issues.apache.org/jira/browse/FLINK-23652). - */ -public interface MetricGroupWrapper extends MetricGroup { - static MetricGroup createUnregisteredMetricGroup() { - return new UnregisteredMetricsGroup(); - } -} diff --git a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java deleted file mode 100644 index ba5940653cd..00000000000 --- a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.streaming; - -import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; -import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; -import org.apache.flink.streaming.runtime.tasks.OperatorChain; -import org.apache.flink.streaming.runtime.tasks.StreamTask; - -/** {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. */ -public class StreamSources { - - public static <OutT, SrcT extends SourceFunction<OutT>> void run( - StreamSource<OutT, SrcT> streamSource, - Object lockingObject, - Output<StreamRecord<OutT>> collector) - throws Exception { - streamSource.run( - lockingObject, - new TestStreamStatusMaintainer(), - collector, - createOperatorChain(streamSource)); - } - - private static OperatorChain<?, ?> createOperatorChain(AbstractStreamOperator<?> operator) { - return new OperatorChain<>( - operator.getContainingTask(), - StreamTask.createRecordWriterDelegate( - operator.getOperatorConfig(), new MockEnvironmentBuilder().build())); - } - - /** StreamStatusMaintainer was removed in Flink 1.14. */ - private static final class TestStreamStatusMaintainer implements StreamStatusMaintainer { - StreamStatus currentStreamStatus = StreamStatus.ACTIVE; - - @Override - public void toggleStreamStatus(StreamStatus streamStatus) { - if (!currentStreamStatus.equals(streamStatus)) { - currentStreamStatus = streamStatus; - } - } - - @Override - public StreamStatus getStreamStatus() { - return currentStreamStatus; - } - } - - /** The emitWatermarkStatus method was added in Flink 1.14, so we need to wrap Output. */ - public interface OutputWrapper<T> extends Output<T> {} -} diff --git a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java deleted file mode 100644 index 0b9ca07f99a..00000000000 --- a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source; - -import java.util.HashMap; -import java.util.Map; -import org.apache.flink.api.connector.source.ReaderOutput; -import org.apache.flink.api.connector.source.SourceOutput; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.SimpleCounter; -import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; - -public class SourceTestCompat { - - /** A MetricGroup implementation which records the registered gauge. */ - public static class TestMetricGroup - extends UnregisteredMetricGroups.UnregisteredOperatorMetricGroup { - public final Map<String, Gauge<?>> registeredGauge = new HashMap<>(); - public final Map<String, Counter> registeredCounter = new HashMap<>(); - public final Counter numRecordsInCounter = new SimpleCounter(); - - @Override - public <T, GaugeT extends Gauge<T>> GaugeT gauge(String name, GaugeT gauge) { - registeredGauge.put(name, gauge); - return gauge; - } - - @Override - public Counter counter(String name) { - // The OperatorIOMetricsGroup will register some IO metrics in the constructor. - // At that time, the construction of this class has not finihsed yet, so we - // need to delegate the call to the parent class. - if (registeredCounter != null) { - return registeredCounter.computeIfAbsent(name, ignored -> super.counter(name)); - } else { - return super.counter(name); - } - } - - @Override - public OperatorIOMetricGroup getIOMetricGroup() { - return new OperatorIOMetricGroup(this) { - @Override - public Counter getNumRecordsInCounter() { - return numRecordsInCounter; - } - }; - } - } - - public interface ReaderOutputCompat<T> extends ReaderOutput<T> { - void markActive(); - } - - public interface SourceOutputCompat<T> extends SourceOutput<T> { - void markActive(); - } -} diff --git a/runners/flink/1.13/build.gradle b/runners/flink/1.13/build.gradle deleted file mode 100644 index 1aa19e6f071..00000000000 --- a/runners/flink/1.13/build.gradle +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -project.ext { - flink_major = '1.13' - flink_version = '1.13.5' -} - -// Load the main build script which contains all build logic. -apply from: "../flink_runner.gradle" diff --git a/runners/flink/1.13/job-server-container/build.gradle b/runners/flink/1.13/job-server-container/build.gradle deleted file mode 100644 index afdb68a0fc9..00000000000 --- a/runners/flink/1.13/job-server-container/build.gradle +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server-container' - -project.ext { - resource_path = basePath -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.13/job-server/build.gradle b/runners/flink/1.13/job-server/build.gradle deleted file mode 100644 index a7e6fd6eb59..00000000000 --- a/runners/flink/1.13/job-server/build.gradle +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server' - -project.ext { - // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] - test_source_dirs = ["$basePath/src/test/java"] - main_resources_dirs = ["$basePath/src/main/resources"] - test_resources_dirs = ["$basePath/src/test/resources"] - archives_base_name = 'beam-runners-flink-1.13-job-server' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java similarity index 100% rename from runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java rename to runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java diff --git a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java similarity index 100% rename from runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java rename to runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java diff --git a/runners/flink/1.13/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java similarity index 100% rename from runners/flink/1.13/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java rename to runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java diff --git a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java similarity index 100% rename from runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java rename to runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java diff --git a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java similarity index 100% rename from runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java rename to runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java diff --git a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java similarity index 100% rename from runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java rename to runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index f786bcf8d88..6849334aea0 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -31,7 +31,7 @@ def overrides(versions, type, base_path) { versions.collect { "${base_path}/${it}/src/${type}/java" } + ["./src/${type}/java"] } -def all_versions = ['1.12', '1.13', '1.14', '1.15', '1.16', '1.17'] +def all_versions = flink_versions.split(",") def previous_versions = all_versions.findAll { it < flink_major } @@ -208,15 +208,10 @@ dependencies { implementation "org.apache.flink:flink-metrics-core:$flink_version" implementation "org.apache.flink:flink-java:$flink_version" - if (flink_version.compareTo("1.14") >= 0) { - implementation "org.apache.flink:flink-runtime:$flink_version" - implementation "org.apache.flink:flink-metrics-core:$flink_version" - testImplementation "org.apache.flink:flink-runtime:$flink_version:tests" - testImplementation "org.apache.flink:flink-rpc-akka:$flink_version" - } else { - implementation "org.apache.flink:flink-runtime_2.12:$flink_version" - testImplementation "org.apache.flink:flink-runtime_2.12:$flink_version:tests" - } + implementation "org.apache.flink:flink-runtime:$flink_version" + implementation "org.apache.flink:flink-metrics-core:$flink_version" + testImplementation "org.apache.flink:flink-runtime:$flink_version:tests" + testImplementation "org.apache.flink:flink-rpc-akka:$flink_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") // FlinkStateInternalsTest extends abstract StateInternalsTest testImplementation project(path: ":runners:core-java", configuration: "testRuntimeMigration") @@ -378,7 +373,7 @@ tasks.register("validatesRunnerSickbay", Test) { } } -// Generates :runners:flink:1.13:runQuickstartJavaFlinkLocal +// Generates :runners:flink:1.17:runQuickstartJavaFlinkLocal createJavaExamplesArchetypeValidationTask(type: 'Quickstart', runner: 'FlinkLocal') tasks.register("examplesIntegrationTest", Test) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java index 288f1f9511b..1f4f31f90eb 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java @@ -124,8 +124,6 @@ public abstract class FlinkSourceReaderBase<T, OutputT> pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs(); this.idleTimeoutFuture = new CompletableFuture<>(); this.idleTimeoutCountingDown = false; - // TODO: Remove the casting and use SourceReaderMetricGroup after minimum FLink version is - // upgraded to 1.14 and above. this.numRecordsInCounter = FlinkSourceCompat.getNumRecordsInCounter(context); FlinkMetricContainerWithoutAccumulator metricsContainer = new FlinkMetricContainerWithoutAccumulator(context.metricGroup()); diff --git a/sdks/go/examples/stringsplit/stringsplit.go b/sdks/go/examples/stringsplit/stringsplit.go index 3698bd3cc2b..28d79815607 100644 --- a/sdks/go/examples/stringsplit/stringsplit.go +++ b/sdks/go/examples/stringsplit/stringsplit.go @@ -21,7 +21,7 @@ // 1. From a command line, navigate to the top-level beam/ directory and run // the Flink job server: // -// ./gradlew :runners:flink:1.13:job-server:runShadow -Djob-host=localhost -Dflink-master=local +// ./gradlew :runners:flink:1.17:job-server:runShadow -Djob-host=localhost -Dflink-master=local // // 2. The job server is ready to receive jobs once it outputs a log like the // following: `JobService started on localhost:8099`. Take note of the endpoint diff --git a/sdks/java/testing/nexmark/build.gradle b/sdks/java/testing/nexmark/build.gradle index 942dfb4c1e9..b9e6a0d2266 100644 --- a/sdks/java/testing/nexmark/build.gradle +++ b/sdks/java/testing/nexmark/build.gradle @@ -144,7 +144,7 @@ def getNexmarkArgs = { // // Parameters: // -Pnexmark.runner -// Specify a runner subproject, such as ":runners:spark:3" or ":runners:flink:1.13" +// Specify a runner subproject, such as ":runners:spark:3" or ":runners:flink:1.17" // Defaults to ":runners:direct-java" // // -Pnexmark.args diff --git a/sdks/java/testing/tpcds/build.gradle b/sdks/java/testing/tpcds/build.gradle index 387dda89824..da4a317d320 100644 --- a/sdks/java/testing/tpcds/build.gradle +++ b/sdks/java/testing/tpcds/build.gradle @@ -94,7 +94,7 @@ if (isSpark) { // // Parameters: // -Ptpcds.runner -// Specify a runner subproject, such as ":runners:spark:3" or ":runners:flink:1.13" +// Specify a runner subproject, such as ":runners:spark:3" or ":runners:flink:1.17" // Defaults to ":runners:direct-java" // // -Ptpcds.args diff --git a/settings.gradle.kts b/settings.gradle.kts index c2dd9bac1ba..8c86a8a5e55 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -121,14 +121,6 @@ include(":runners:core-java") include(":runners:direct-java") include(":runners:extensions-java:metrics") /* Begin Flink Runner related settings */ -// Flink 1.12 -include(":runners:flink:1.12") -include(":runners:flink:1.12:job-server") -include(":runners:flink:1.12:job-server-container") -// Flink 1.13 -include(":runners:flink:1.13") -include(":runners:flink:1.13:job-server") -include(":runners:flink:1.13:job-server-container") // Flink 1.14 include(":runners:flink:1.14") include(":runners:flink:1.14:job-server") diff --git a/website/www/site/content/en/documentation/dsls/sql/shell.md b/website/www/site/content/en/documentation/dsls/sql/shell.md index 7c7b31bac49..3676bfa59f7 100644 --- a/website/www/site/content/en/documentation/dsls/sql/shell.md +++ b/website/www/site/content/en/documentation/dsls/sql/shell.md @@ -29,7 +29,7 @@ This page describes how to work with the shell, but does not focus on specific f To use Beam SQL shell, you must first clone the [Beam SDK repository](https://github.com/apache/beam). Then, from the root of the repository clone, execute the following commands to run the shell: ``` -./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.13,:sdks:java:io:kafka' installDist +./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' installDist ./sdks/java/extensions/sql/shell/build/install/shell/bin/shell ``` @@ -117,7 +117,7 @@ By default, Beam uses the `DirectRunner` to run the pipeline on the machine wher 1. Make sure the SQL shell includes the desired runner. Add the corresponding project id to the `-Pbeam.sql.shell.bundled` parameter of the Gradle invocation ([source code](https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/shell/build.gradle), [project ids](https://github.com/apache/beam/blob/master/settings.gradle.kts)). For example, use the following command to include Flink runner and KafkaIO: ``` - ./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.13,:sdks:java:io:kafka' installDist + ./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' installDist ``` _Note: You can bundle multiple runners (using a comma-separated list) or other additional components in the same manner. For example, you can add support for more I/Os._ @@ -143,7 +143,7 @@ To configure the runner, you must specify `PipelineOptions` by using the `SET` c You can also build your own standalone package for SQL shell using `distZip` or `distTar` tasks. For example: ``` -./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.13,:sdks:java:io:kafka' distZip +./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' distZip ls ./sdks/java/extensions/sql/shell/build/distributions/ beam-sdks-java-extensions-sql-shell-2.6.0-SNAPSHOT.tar beam-sdks-java-extensions-sql-shell-2.6.0-SNAPSHOT.zip diff --git a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html index a6b7ed28f40..939c64ed9c4 100644 --- a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html @@ -119,13 +119,13 @@ Should be called before running the tests. </tr> <tr> <td><code>maxBundleSize</code></td> - <td>The maximum number of elements in a bundle.</td> - <td>Default: <code>1000</code></td> + <td>The maximum number of elements in a bundle. Default values are 1000 for a streaming job and 1,000,000 for batch</td> + <td>Default: <code>MaxBundleSizeFactory</code></td> </tr> <tr> <td><code>maxBundleTimeMills</code></td> - <td>The maximum time to wait before finalising a bundle (in milliseconds).</td> - <td>Default: <code>1000</code></td> + <td>The maximum time to wait before finalising a bundle (in milliseconds). Default values are 1000 for streaming and 10,000 for batch.</td> + <td>Default: <code>MaxBundleTimeFactory</code></td> </tr> <tr> <td><code>maxParallelism</code></td> diff --git a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html index 494b01bc1d0..eb5c525d78b 100644 --- a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html @@ -119,13 +119,13 @@ Should be called before running the tests. </tr> <tr> <td><code>max_bundle_size</code></td> - <td>The maximum number of elements in a bundle.</td> - <td>Default: <code>1000</code></td> + <td>The maximum number of elements in a bundle. Default values are 1000 for a streaming job and 1,000,000 for batch</td> + <td>Default: <code>MaxBundleSizeFactory</code></td> </tr> <tr> <td><code>max_bundle_time_mills</code></td> - <td>The maximum time to wait before finalising a bundle (in milliseconds).</td> - <td>Default: <code>1000</code></td> + <td>The maximum time to wait before finalising a bundle (in milliseconds). Default values are 1000 for streaming and 10,000 for batch.</td> + <td>Default: <code>MaxBundleTimeFactory</code></td> </tr> <tr> <td><code>max_parallelism</code></td>