Hi team,
I am trying to read the records from the Kafka topic and below is my very
basic code as of now
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.stream_execution_environment import
StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.common import SimpleStringSchema
class SourceData(object):
def __init__(self, env):
self.env = env
self.env.add_jars("file:///flink-sql-connector-kafka-1.17.1.jar")
self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
self.env.set_parallelism(1)
def get_data(self):
source = FlinkKafkaConsumer(
topics="test-topic1",
deserialization_schema=SimpleStringSchema(),
properties={
"bootstrap.servers": "localhost:9092",
"auto.offset.reset": "earliest",
}
)
self.env \
.add_source(source) \
.print()
self.env.execute("source")
print(SourceData(StreamExecutionEnvironment.get_execution_environment()).get_data())
Now, when I try to add the jars via code or if I put the jars inside
pyflink > lib directory, nothing works.
Can someone help me to resolve my problem? I am stuck since hours.
KafkaSource.builder() also doesn't work.
I tried out many solutions available on stackoverflow but no luck.
My python version is 3.10
Java: 11
Flink: 1.17.1
J.