Hello,

I am currently having issues trying to use the python flink 1.18 Datastream
api with the Amazon Kinesis Data Streams Connector.

>From the documentation
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/
 I have downloaded the "flink-connector-kinesis" jar
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis/4.2.0-1.18/flink-sql-connector-kinesis-4.2.0-1.18.jar

and i have added it in my code:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.enable_checkpointing(5000)

env.add_jars(
    "file:///workspaces/flink/flink-sql-connector-kinesis-4.2.0-1.18.jar",
)

and it has worked perfectly so far when setting up my kinesis source,  i
recently added a kinesis sink to complete my pipeline (was testing with
print before)

# ds = ds.print() sink = KinesisStreamsSink.builder() \
    .set_kinesis_client_properties(config) \
    .set_serialization_schema(SimpleStringSchema()) \
    .set_partition_key_generator(PartitionKeyGenerator.fixed()) \
    .set_stream_name(stream_name) \
    .build()

ds = ds.add_sink(sink)

s_env.execute('pipeline')

now when i run my python flink application it throws an error at my
add_sink call with the following exception:

> python locations_flink_app.py

2024-05-23 14:53:10,219 - INFO -
apache_beam.typehints.native_type_compatibility - 315 - Using Any for
unsupported type: typing.Sequence[~T]
2024-05-23 14:53:10,287 - INFO - apache_beam.io.gcp.bigquery - 436 - No
module named google.cloud.bigquery_storage_v1. As a result, the
ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
Traceback (most recent call last):
  File "locations_flink_app.py", line 90, in <module>
    setup_flink_app(s_env, props)
  File "locations_flink_app.py", line 71, in setup_flink_app
    ds = ds.add_sink(create_sink(
  File
"/usr/local/lib/python3.8/site-packages/pyflink/datastream/data_stream.py",
line 819, in add_sink
    return
DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function()))
  File "/usr/local/lib/python3.8/site-packages/py4j/java_gateway.py", line
1322, in __call__
    return_value = get_return_value(
  File "/usr/local/lib/python3.8/site-packages/pyflink/util/exceptions.py",
line 146, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.8/site-packages/py4j/protocol.py", line 330,
in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o245.addSink.
Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method
addSink([class org.apache.flink.connector.kinesis.sink.KinesisStreamsSink])
does not exist
        at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
        at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
        at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)
        at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)

when i open the jar i downloaded
(flink-sql-connector-kinesis-4.2.0-1.18.jar)   i can see it actually has
the classes i need
Downloads\flink-sql-connector-kinesis-4.2.0-1.18.zip\org\apache\flink\connector\kinesis\sink
has  KinesisStreamsSink.class    [class
org.apache.flink.connector.kinesis.sink.KinesisStreamsSink]

If I remove the sink the source still works perfectly fine
(FlinkKinesisConsumer),  but I don't understand what I'm missing. The jar
I'm using should have everything.

anyone else have similar issues?  or know what I might need to do?


Thank you,

Nick Hecht

Reply via email to