Hi  Kadiyala,

I think that there  is a typo from your email:

        Fink  version 1.7.1

May be  1.17.1  ?


    About the error,  The reason why your code can't run successfully

is: the class "*org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer" had been obsolete  since Flink 1.14。*

*
*

You need to use "KafkaSource" and "KafkaSink" to finish  your requirement.


Regards,

*Leo*

*
*

在 2023/6/7 23:27, Kadiyala, Ruthvik via user 写道:
Hi,

Please find below the code I have been using to consume a Kafka Stream that is hosted on confluent. It returns an error regarding the jar files. Please find the error below the code snippet. Let me know what I am doing wrong. I am running this on *Docker *with *Flink Version: 1.7.1.*

*Code:*

frompyflink.common.typeinfoimportTypes
frompyflink.datastreamimportStreamExecutionEnvironment
frompyflink.datastream.connectors.kafkaimportFlinkKafkaConsumer
frompyflink.datastream.formats.jsonimportJsonRowDeserializationSchema
importglob
importos
importsys
importlogging

# Set up the execution environment
env=StreamExecutionEnvironment.get_execution_environment()

logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
env.add_jars("file://flink-sql-connector-kafka-1.17.1.jar")
env.add_classpaths("file://flink-sql-connector-kafka-1.17.1.jar")
env.add_jars("file://flink-connector-kafka_2.11-1.9.2-javadoc.jar")
env.add_classpaths("file://flink-connector-kafka_2.11-1.9.2-javadoc.jar")

# Set up the Confluent Cloud Kafka configuration
kafka_config= {
'bootstrap.servers': 'bootstrap-server',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.jaas.config': 'org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="API_SECRET";'
}

topic='TOPIC_NAME'

deserialization_schema=JsonRowDeserializationSchema.Builder() \
        .type_info(Types.ROW([Types.INT(), Types.STRING()])) \
        .build()

# Set up the Kafka consumer properties
consumer_props= {
'bootstrap.servers': kafka_config['bootstrap.servers'],
'security.protocol': kafka_config['security.protocol'],
'sasl.mechanism': kafka_config['sasl.mechanism'],
'sasl.jaas.config': kafka_config['sasl.jaas.config'],
'group.id': 'python-group-1'
}

# Create a Kafka consumer
kafka_consumer=FlinkKafkaConsumer(
topics=topic, # Kafka topic
deserialization_schema=deserialization_schema,
properties=consumer_props, # Consumer properties
)
kafka_consumer.set_start_from_earliest()
# Add the Kafka consumer as a source to the execution environment
stream=env.add_source(kafka_consumer)

# Define your data processing logic here
# For example, you can print the stream to the console
stream.print()

# Execute the job
env.execute()

*Error:*
*
*
*Traceback (most recent call last):*
File "/home/pyflink/test.py", line 45, in <module>
kafka_consumer = FlinkKafkaConsumer(
File "/usr/local/lib/python3.10/dist-packages/pyflink/datastream/connectors/kafka.py", line 203, in __init__ j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema, File "/usr/local/lib/python3.10/dist-packages/pyflink/datastream/connectors/kafka.py", line 161, in _get_kafka_consumer
j_flink_kafka_consumer = j_consumer_clz(topics,
File "/usr/local/lib/python3.10/dist-packages/pyflink/util/exceptions.py", line 185, in wrapped_call
raise TypeError(
*TypeError: Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'*



Cheers & Regards,
Ruthvik Kadiyala


Reply via email to