Thank you for your help!

On Thu, May 23, 2024 at 1:40 PM Aleksandr Pilipenko <z3d...@gmail.com>
wrote:

> Hi Nick,
>
> You need to use another method to add sink to your job - sinkTo.
> KinesisStreamsSink implements newer Sink interface, while addSink expect
> old SinkFunction. You can see this by looking at method signatures[1] and
> in usage examples in documentation[2]
>
> [1]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/data_stream.py#L811-L837
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/#kinesis-streams-sink
>
> Best,
> Aleksandr
>
>
> On Thu, 23 May 2024 at 17:19, Nick Hecht <nick.he...@zainartech.com>
> wrote:
>
>> Hello,
>>
>> I am currently having issues trying to use the python flink 1.18
>> Datastream api with the Amazon Kinesis Data Streams Connector.
>>
>> From the documentation
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/
>>  I have downloaded the "flink-connector-kinesis" jar
>> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis/4.2.0-1.18/flink-sql-connector-kinesis-4.2.0-1.18.jar
>>
>> and i have added it in my code:
>>
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
>> env.enable_checkpointing(5000)
>>
>> env.add_jars(
>>     "file:///workspaces/flink/flink-sql-connector-kinesis-4.2.0-1.18.jar"
>> ,
>> )
>>
>> and it has worked perfectly so far when setting up my kinesis source,  i
>> recently added a kinesis sink to complete my pipeline (was testing with
>> print before)
>>
>> # ds = ds.print() sink = KinesisStreamsSink.builder() \
>>     .set_kinesis_client_properties(config) \
>>     .set_serialization_schema(SimpleStringSchema()) \
>>     .set_partition_key_generator(PartitionKeyGenerator.fixed()) \
>>     .set_stream_name(stream_name) \
>>     .build()
>>
>> ds = ds.add_sink(sink)
>>
>> s_env.execute('pipeline')
>>
>> now when i run my python flink application it throws an error at my
>> add_sink call with the following exception:
>>
>> > python locations_flink_app.py
>>
>> 2024-05-23 14:53:10,219 - INFO -
>> apache_beam.typehints.native_type_compatibility - 315 - Using Any for
>> unsupported type: typing.Sequence[~T]
>> 2024-05-23 14:53:10,287 - INFO - apache_beam.io.gcp.bigquery - 436 - No
>> module named google.cloud.bigquery_storage_v1. As a result, the
>> ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
>> Traceback (most recent call last):
>>   File "locations_flink_app.py", line 90, in <module>
>>     setup_flink_app(s_env, props)
>>   File "locations_flink_app.py", line 71, in setup_flink_app
>>     ds = ds.add_sink(create_sink(
>>   File
>> "/usr/local/lib/python3.8/site-packages/pyflink/datastream/data_stream.py",
>> line 819, in add_sink
>>     return
>> DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function()))
>>   File "/usr/local/lib/python3.8/site-packages/py4j/java_gateway.py",
>> line 1322, in __call__
>>     return_value = get_return_value(
>>   File
>> "/usr/local/lib/python3.8/site-packages/pyflink/util/exceptions.py", line
>> 146, in deco
>>     return f(*a, **kw)
>>   File "/usr/local/lib/python3.8/site-packages/py4j/protocol.py", line
>> 330, in get_return_value
>>     raise Py4JError(
>> py4j.protocol.Py4JError: An error occurred while calling o245.addSink.
>> Trace:
>> org.apache.flink.api.python.shaded.py4j.Py4JException: Method
>> addSink([class org.apache.flink.connector.kinesis.sink.KinesisStreamsSink])
>> does not exist
>>         at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
>>         at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
>>         at
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)
>>         at
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>         at
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>         at
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>         at java.base/java.lang.Thread.run(Thread.java:829)
>>
>> when i open the jar i downloaded
>> (flink-sql-connector-kinesis-4.2.0-1.18.jar)   i can see it actually has
>> the classes i need
>> Downloads\flink-sql-connector-kinesis-4.2.0-1.18.zip\org\apache\flink\connector\kinesis\sink
>> has  KinesisStreamsSink.class    [class
>> org.apache.flink.connector.kinesis.sink.KinesisStreamsSink]
>>
>> If I remove the sink the source still works perfectly fine
>> (FlinkKinesisConsumer),  but I don't understand what I'm missing. The jar
>> I'm using should have everything.
>>
>> anyone else have similar issues?  or know what I might need to do?
>>
>>
>> Thank you,
>>
>> Nick Hecht
>>
>

Reply via email to