Hello,
I am trying to deploy a Python job with Kafka connector:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment,
StreamTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka,
Json, Csv
exec_env = StreamExecutionEnvironment.get_execution_environment()
t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)
t_env.connect(Kafka()
.version("0.11")
.topic("my-topic")
.property("bootstrap.servers",
'my-cluster-kafka-bootstrap:9092')
) \
.in_append_mode() \
.with_format(Csv()
.line_delimiter("\r\n") \
.derive_schema()) \
.with_schema(Schema()
.field("tbd", DataTypes.INT())) \
.register_table_source('mySource')
t_env.connect(FileSystem().path('../production_data/kafkaoutput')) \
.with_format(OldCsv()
.field('tbd', DataTypes.INT())) \
.with_schema(Schema()
.field("tbd", DataTypes.INT())) \
.register_table_sink('mySink')
t_env.scan('mySource') \
.select('tbd') \
.where("tbd = 1") \
.insert_into('mySink')
t_env.execute("tutorial_job")
When I run a deploying command:
bin/flink run -py
/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py --jarfile
/home/wojtek/flink/flink-connectors/flink-connector-kafka-0.11/target/flink-connector-kafka-0.11_2.11-1.11-SNAPSHOT.jar
I get an error:
Traceback (most recent call last):
File "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py", line
9, in <module>
t_env = StreamTableEnvironment.create(exec_env, t_config)
File
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/table/table_environment.py",
line 1478, in create
File
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
File
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
File
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create.
: java.lang.NoClassDefFoundError:
org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
at
java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1209)
at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1220)
at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1264)
at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1299)
at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1384)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
at
org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
at
org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170)
at
org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125)
at
org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:158)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:135)
at
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:143)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 39 more
org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
What is the correct way to deploy python job on Flink which uses Kafka? It
seems like it cannot get a correct dependency of Kafka.
I wonder if there is some more simply solution and if it matters that i
would like deploy a job on the K8s cluster.
Thanks,
Wojtek
--
UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje
poufne, które mogą być również objęte tajemnicą handlową lub służbową.
Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie
skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu.
Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości
oraz zawartych w niej informacji jest zabronione.
Alphamoon Sp. z o.o.,
ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
wpisana pod numerem KRS
0000621513 do Krajowego Rejestru Sądowego, prowadzonego przez Sąd Rejonowy
dla Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru
Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy: 5.000 PLN w
pełni opłacony.
NOTE - Message and the documents attached thereto contain
confidential information, which may also be a trade secret or confidential.
If you are not the intended recipient of the message, please contact the
sender without delay and delete the message from your system. Disclosure,
copying, dissemination or publication of this message and information
contained therein is prohibited.
Alphamoon Sp. z o.o. (Ltd.), ul. Pawła
Włodkowica 21/3, 50-072 Wrocław, Poland;
Registered under the KRS number
0000621513 to the National Court Register, kept by the District Court for
Wrocław-Fabryczna VI Economic Department of the National Court Register,
VAT-ID: PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully
paid-up.