This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 897001d5 [FLINK-35007] Add support for Flink 1.19 (#90)
897001d5 is described below

commit 897001d5682a0708042d59be81a10485ffd0dde7
Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com>
AuthorDate: Thu Apr 11 10:15:31 2024 +0200

    [FLINK-35007] Add support for Flink 1.19 (#90)
    
    * [FLINK-35007] Add support for Flink 1.19
    
    This also includes dropping the weekly tests for the `v3.0` branch, since 
`v3.1` has been released and that's our main version going forward.
    
    * [FLINK-35007] Remove unused test class that relied on removed Internal 
class
    
    * [FLINK-35007][ci] Copy old `flink-conf.yaml` to make sure that all Python 
tests work for Flink 1.x releases
---
 .github/workflows/push_pr.yml                      |   4 +-
 .github/workflows/weekly.yml                       |  12 +-
 .../connectors/kafka/testutils/DataGenerators.java |  29 --
 flink-python/dev/integration_test.sh               |  18 ++
 flink-python/pyflink/flink-conf.yaml               | 311 +++++++++++++++++++++
 5 files changed, 338 insertions(+), 36 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index 20a66660..7f30c691 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -30,7 +30,7 @@ jobs:
         include:
           - flink: 1.18.1
             jdk: '8, 11, 17'
-          - flink: 1.19-SNAPSHOT
+          - flink: 1.19.0
             jdk: '8, 11, 17, 21'
     uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
     with:
@@ -39,7 +39,7 @@ jobs:
   python_test:
     strategy:
       matrix:
-        flink: [ 1.17.2, 1.18.1, 1.19-SNAPSHOT ]
+        flink: [ 1.17.2, 1.18.1, 1.19.0 ]
     uses: 
apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
     with:
       flink_version: ${{ matrix.flink }}
\ No newline at end of file
diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml
index aaa729fd..f24769ae 100644
--- a/.github/workflows/weekly.yml
+++ b/.github/workflows/weekly.yml
@@ -37,6 +37,10 @@ jobs:
           flink: 1.19-SNAPSHOT,
           jdk: '8, 11, 17, 21',
           branch: main
+        }, {
+          flink: 1.20-SNAPSHOT,
+          jdk: '8, 11, 17, 21',
+          branch: main
         }, {
           flink: 1.17.2,
           branch: v3.1
@@ -45,11 +49,9 @@ jobs:
           jdk: '8, 11, 17',
           branch: v3.1
         }, {
-          flink: 1.17.2,
-          branch: v3.0
-        }, {
-          flink: 1.18.1,
-          branch: v3.0
+          flink: 1.19.0,
+          branch: v3.1,
+          jdk: '8, 11, 17',
         }]
     uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
     with:
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 83ee3fb1..d660bd2f 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -18,16 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
@@ -36,8 +33,6 @@ import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
-import java.util.Collections;
-import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
@@ -210,29 +205,5 @@ public class DataGenerators {
         public Throwable getError() {
             return this.error;
         }
-
-        private static class MockTransformation extends Transformation<String> 
{
-            public MockTransformation() {
-                super("MockTransform", BasicTypeInfo.STRING_TYPE_INFO, 1);
-            }
-
-            @Override
-            public List<Transformation<?>> getTransitivePredecessors() {
-                return null;
-            }
-
-            @Override
-            public List<Transformation<?>> getInputs() {
-                return Collections.emptyList();
-            }
-        }
-
-        private static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
-
-            @Override
-            public JobExecutionResult execute(StreamGraph streamGraph) throws 
Exception {
-                return null;
-            }
-        }
     }
 }
diff --git a/flink-python/dev/integration_test.sh 
b/flink-python/dev/integration_test.sh
index 19816725..45f66482 100755
--- a/flink-python/dev/integration_test.sh
+++ b/flink-python/dev/integration_test.sh
@@ -50,5 +50,23 @@ echo "Checking ${FLINK_SOURCE_DIR} for 
'pyflink_gateway_server.py'"
 find "${FLINK_SOURCE_DIR}/flink-python" -name pyflink_gateway_server.py
 find "${FLINK_SOURCE_DIR}/flink-python/.tox" -name pyflink_gateway_server.py 
-exec cp "${FLINK_SOURCE_DIR}/flink-python/pyflink/pyflink_gateway_server.py" 
{} \;
 
+# Copy an empty flink-conf.yaml to conf/ folder, so that all Python tests on 
Flink 1.x can succeed.
+# This needs to be changed when adding support for Flink 2.0
+echo "Checking ${FLINK_SOURCE_DIR} for 'config.yaml'"
+find "${FLINK_SOURCE_DIR}/flink-python" -name config.yaml
+
+# For every occurrence of config.yaml (new YAML file since Flink 1.19), copy 
in the old flink-conf.yaml so that
+# is used over the new config.yaml file.
+#
+# Because our intention is to copy `flink-conf.yaml` into the same directory 
as `config.yaml` and not replace it,
+# we need to extract the directory from `{}` and then specify the target 
filename (`flink-conf.yaml`) explicitly.
+# Unfortunately, `find`'s `-exec` doesn't directly support manipulating `{}`. 
So we use a slightly modified shell command
+#
+# `"${1}"` and `"${2}"` correspond to the first and second arguments after the 
shell command.
+# In this case, `"${1}"` is the path to `flink-conf.yaml` and `"${2}"` is the 
path to each `config.yaml` found by `find`.
+# `$(dirname "${2}")` extracts the directory part of the path to 
`config.yaml`, and then `/flink-conf.yaml`
+# specifies the target filename within that directory.
+find "${FLINK_SOURCE_DIR}/flink-python/.tox" -name config.yaml -exec sh -c 'cp 
"${1}" "$(dirname "${2}")/flink-conf.yaml"' _ 
"${FLINK_SOURCE_DIR}/flink-python/pyflink/flink-conf.yaml" {} \;
+
 # python test
 test_all_modules
diff --git a/flink-python/pyflink/flink-conf.yaml 
b/flink-python/pyflink/flink-conf.yaml
new file mode 100644
index 00000000..b5aa2794
--- /dev/null
+++ b/flink-python/pyflink/flink-conf.yaml
@@ -0,0 +1,311 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+# These parameters are required for Java 17 support.
+# They can be safely removed when using Java 8/11.
+env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED 
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED 
--add-exports=java.security.jgss/sun.security.krb5= [...]
+
+#==============================================================================
+# Common
+#==============================================================================
+
+# The external address of the host on which the JobManager runs and can be
+# reached by the TaskManagers and any clients which want to connect. This 
setting
+# is only used in Standalone mode and may be overwritten on the JobManager side
+# by specifying the --host <hostname> parameter of the bin/jobmanager.sh 
executable.
+# In high availability mode, if you use the bin/start-cluster.sh script and 
setup
+# the conf/masters file, this will be taken care of automatically. Yarn
+# automatically configure the host name based on the hostname of the node 
where the
+# JobManager runs.
+
+jobmanager.rpc.address: localhost
+
+# The RPC port where the JobManager is reachable.
+
+jobmanager.rpc.port: 6123
+
+# The host interface the JobManager will bind to. By default, this is 
localhost, and will prevent
+# the JobManager from communicating outside the machine/container it is 
running on.
+# On YARN this setting will be ignored if it is set to 'localhost', defaulting 
to 0.0.0.0.
+# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
+#
+# To enable this, set the bind-host address to one that has access to an 
outside facing network
+# interface, such as 0.0.0.0.
+
+jobmanager.bind-host: localhost
+
+
+# The total process memory size for the JobManager.
+#
+# Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.
+
+jobmanager.memory.process.size: 1600m
+
+# The host interface the TaskManager will bind to. By default, this is 
localhost, and will prevent
+# the TaskManager from communicating outside the machine/container it is 
running on.
+# On YARN this setting will be ignored if it is set to 'localhost', defaulting 
to 0.0.0.0.
+# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
+#
+# To enable this, set the bind-host address to one that has access to an 
outside facing network
+# interface, such as 0.0.0.0.
+
+taskmanager.bind-host: localhost
+
+# The address of the host on which the TaskManager runs and can be reached by 
the JobManager and
+# other TaskManagers. If not specified, the TaskManager will try different 
strategies to identify
+# the address.
+#
+# Note this address needs to be reachable by the JobManager and forward 
traffic to one of
+# the interfaces the TaskManager is bound to (see 'taskmanager.bind-host').
+#
+# Note also that unless all TaskManagers are running on the same machine, this 
address needs to be
+# configured separately for each TaskManager.
+
+taskmanager.host: localhost
+
+# The total process memory size for the TaskManager.
+#
+# Note this accounts for all memory usage within the TaskManager process, 
including JVM metaspace and other overhead.
+
+taskmanager.memory.process.size: 1728m
+
+# To exclude JVM metaspace and overhead, please, use total Flink memory size 
instead of 'taskmanager.memory.process.size'.
+# It is not recommended to set both 'taskmanager.memory.process.size' and 
Flink memory.
+#
+# taskmanager.memory.flink.size: 1280m
+
+# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
+
+taskmanager.numberOfTaskSlots: 1
+
+# The parallelism used for programs that did not specify and other parallelism.
+
+parallelism.default: 1
+
+# The default file system scheme and authority.
+# 
+# By default file paths without scheme are interpreted relative to the local
+# root file system 'file:///'. Use this to override the default and interpret
+# relative paths relative to a different file system,
+# for example 'hdfs://mynamenode:12345'
+#
+# fs.default-scheme
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
+#
+# high-availability.type: zookeeper
+
+# The path where metadata for master recovery is persisted. While ZooKeeper 
stores
+# the small ground truth for checkpoint and leader election, this location 
stores
+# the larger objects, like persisted dataflow graphs.
+# 
+# Must be a durable file system that is accessible from all nodes
+# (like HDFS, S3, Ceph, nfs, ...) 
+#
+# high-availability.storageDir: hdfs:///flink/ha/
+
+# The list of ZooKeeper quorum peers that coordinate the high-availability
+# setup. This must be a list of the form:
+# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+#
+# high-availability.zookeeper.quorum: localhost:2181
+
+
+# ACL options are based on 
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
+# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" 
(ZOO_OPEN_ACL_UNSAFE)
+# The default value is "open" and it can be changed to "creator" if ZK 
security is enabled
+#
+# high-availability.zookeeper.client.acl: open
+
+#==============================================================================
+# Fault tolerance and checkpointing
+#==============================================================================
+
+# The backend that will be used to store operator state checkpoints if
+# checkpointing is enabled. Checkpointing is enabled when 
execution.checkpointing.interval > 0.
+#
+# Execution checkpointing related parameters. Please refer to CheckpointConfig 
and ExecutionCheckpointingOptions for more details.
+#
+# execution.checkpointing.interval: 3min
+# execution.checkpointing.externalized-checkpoint-retention: 
[DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
+# execution.checkpointing.max-concurrent-checkpoints: 1
+# execution.checkpointing.min-pause: 0
+# execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
+# execution.checkpointing.timeout: 10min
+# execution.checkpointing.tolerable-failed-checkpoints: 0
+# execution.checkpointing.unaligned: false
+#
+# Supported backends are 'hashmap', 'rocksdb', or the
+# <class-name-of-factory>.
+#
+# state.backend.type: hashmap
+
+# Directory for checkpoints filesystem, when using any of the default bundled
+# state backends.
+#
+# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Default target directory for savepoints, optional.
+#
+# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
+
+# Flag to enable/disable incremental checkpoints for backends that
+# support incremental checkpoints (like the RocksDB state backend). 
+#
+# state.backend.incremental: false
+
+# The failover strategy, i.e., how the job computation recovers from task 
failures.
+# Only restart tasks that may have been affected by the task failure, which 
typically includes
+# downstream tasks and potentially upstream tasks if their produced data is no 
longer available for consumption.
+
+jobmanager.execution.failover-strategy: region
+
+#==============================================================================
+# Rest & web frontend
+#==============================================================================
+
+# The port to which the REST client connects to. If rest.bind-port has
+# not been specified, then the server will bind to this port as well.
+#
+#rest.port: 8081
+
+# The address to which the REST client will connect to
+#
+rest.address: localhost
+
+# Port range for the REST and web server to bind to.
+#
+#rest.bind-port: 8080-8090
+
+# The address that the REST & web server binds to
+# By default, this is localhost, which prevents the REST & web server from
+# being able to communicate outside of the machine/container it is running on.
+#
+# To enable this, set the bind address to one that has access to outside-facing
+# network interface, such as 0.0.0.0.
+#
+rest.bind-address: localhost
+
+# Flag to specify whether job submission is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#web.submit.enable: false
+
+# Flag to specify whether job cancellation is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#web.cancel.enable: false
+
+#==============================================================================
+# Advanced
+#==============================================================================
+
+# Override the directories for temporary files. If not specified, the
+# system-specific Java temporary directory (java.io.tmpdir property) is taken.
+#
+# For framework setups on Yarn, Flink will automatically pick up the
+# containers' temp directories without any need for configuration.
+#
+# Add a delimited list for multiple directories, using the system directory
+# delimiter (colon ':' on unix) or a comma, e.g.:
+#     /data1/tmp:/data2/tmp:/data3/tmp
+#
+# Note: Each directory entry is read from and written to by a different I/O
+# thread. You can include the same directory multiple times in order to create
+# multiple I/O threads against that directory. This is for example relevant for
+# high-throughput RAIDs.
+#
+# io.tmp.dirs: /tmp
+
+# The classloading resolve order. Possible values are 'child-first' (Flink's 
default)
+# and 'parent-first' (Java's default).
+#
+# Child first classloading allows users to use different dependency/library
+# versions in their application than those in the classpath. Switching back
+# to 'parent-first' may help with debugging dependency issues.
+#
+# classloader.resolve-order: child-first
+
+# The amount of memory going to the network stack. These numbers usually need 
+# no tuning. Adjusting them may be necessary in case of an "Insufficient number
+# of network buffers" error. The default min is 64MB, the default max is 1GB.
+# 
+# taskmanager.memory.network.fraction: 0.1
+# taskmanager.memory.network.min: 64mb
+# taskmanager.memory.network.max: 1gb
+
+#==============================================================================
+# Flink Cluster Security Configuration
+#==============================================================================
+
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and 
connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
+
+# The below configure how Kerberos credentials are provided. A keytab will be 
used instead of
+# a ticket cache if the keytab path and principal are set.
+
+# security.kerberos.login.use-ticket-cache: true
+# security.kerberos.login.keytab: /path/to/kerberos/keytab
+# security.kerberos.login.principal: flink-user
+
+# The configuration below defines which JAAS login contexts
+
+# security.kerberos.login.contexts: Client,KafkaClient
+
+#==============================================================================
+# ZK Security Configuration
+#==============================================================================
+
+# Below configurations are applicable if ZK ensemble is configured for security
+
+# Override below configuration to provide custom ZK service name if configured
+# zookeeper.sasl.service-name: zookeeper
+
+# The configuration below must match one of the values set in 
"security.kerberos.login.contexts"
+# zookeeper.sasl.login-context-name: Client
+
+#==============================================================================
+# HistoryServer
+#==============================================================================
+
+# The HistoryServer is started and stopped via bin/historyserver.sh 
(start|stop)
+
+# Directory to upload completed jobs to. Add this directory to the list of
+# monitored directories of the HistoryServer as well (see below).
+#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
+
+# The address under which the web-based HistoryServer listens.
+#historyserver.web.address: 0.0.0.0
+
+# The port under which the web-based HistoryServer listens.
+#historyserver.web.port: 8082
+
+# Comma separated list of directories to monitor for completed jobs.
+#historyserver.archive.fs.dir: hdfs:///completed-jobs/
+
+# Interval in milliseconds for refreshing the monitored directories.
+#historyserver.archive.fs.refresh-interval: 10000
+

Reply via email to