Thank you for your help! On Thu, May 23, 2024 at 1:40 PM Aleksandr Pilipenko <z3d...@gmail.com> wrote:
> Hi Nick, > > You need to use another method to add sink to your job - sinkTo. > KinesisStreamsSink implements newer Sink interface, while addSink expect > old SinkFunction. You can see this by looking at method signatures[1] and > in usage examples in documentation[2] > > [1] > https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/data_stream.py#L811-L837 > [2] > https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/#kinesis-streams-sink > > Best, > Aleksandr > > > On Thu, 23 May 2024 at 17:19, Nick Hecht <nick.he...@zainartech.com> > wrote: > >> Hello, >> >> I am currently having issues trying to use the python flink 1.18 >> Datastream api with the Amazon Kinesis Data Streams Connector. >> >> From the documentation >> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/ >> I have downloaded the "flink-connector-kinesis" jar >> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis/4.2.0-1.18/flink-sql-connector-kinesis-4.2.0-1.18.jar >> >> and i have added it in my code: >> >> env = StreamExecutionEnvironment.get_execution_environment() >> env.set_runtime_mode(RuntimeExecutionMode.STREAMING) >> env.enable_checkpointing(5000) >> >> env.add_jars( >> "file:///workspaces/flink/flink-sql-connector-kinesis-4.2.0-1.18.jar" >> , >> ) >> >> and it has worked perfectly so far when setting up my kinesis source, i >> recently added a kinesis sink to complete my pipeline (was testing with >> print before) >> >> # ds = ds.print() sink = KinesisStreamsSink.builder() \ >> .set_kinesis_client_properties(config) \ >> .set_serialization_schema(SimpleStringSchema()) \ >> .set_partition_key_generator(PartitionKeyGenerator.fixed()) \ >> .set_stream_name(stream_name) \ >> .build() >> >> ds = ds.add_sink(sink) >> >> s_env.execute('pipeline') >> >> now when i run my python flink application it throws an error at my >> add_sink call with the following exception: >> >> > python locations_flink_app.py >> >> 2024-05-23 14:53:10,219 - INFO - >> apache_beam.typehints.native_type_compatibility - 315 - Using Any for >> unsupported type: typing.Sequence[~T] >> 2024-05-23 14:53:10,287 - INFO - apache_beam.io.gcp.bigquery - 436 - No >> module named google.cloud.bigquery_storage_v1. As a result, the >> ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`. >> Traceback (most recent call last): >> File "locations_flink_app.py", line 90, in <module> >> setup_flink_app(s_env, props) >> File "locations_flink_app.py", line 71, in setup_flink_app >> ds = ds.add_sink(create_sink( >> File >> "/usr/local/lib/python3.8/site-packages/pyflink/datastream/data_stream.py", >> line 819, in add_sink >> return >> DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function())) >> File "/usr/local/lib/python3.8/site-packages/py4j/java_gateway.py", >> line 1322, in __call__ >> return_value = get_return_value( >> File >> "/usr/local/lib/python3.8/site-packages/pyflink/util/exceptions.py", line >> 146, in deco >> return f(*a, **kw) >> File "/usr/local/lib/python3.8/site-packages/py4j/protocol.py", line >> 330, in get_return_value >> raise Py4JError( >> py4j.protocol.Py4JError: An error occurred while calling o245.addSink. >> Trace: >> org.apache.flink.api.python.shaded.py4j.Py4JException: Method >> addSink([class org.apache.flink.connector.kinesis.sink.KinesisStreamsSink]) >> does not exist >> at >> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329) >> at >> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274) >> at >> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >> at >> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >> at >> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >> at java.base/java.lang.Thread.run(Thread.java:829) >> >> when i open the jar i downloaded >> (flink-sql-connector-kinesis-4.2.0-1.18.jar) i can see it actually has >> the classes i need >> Downloads\flink-sql-connector-kinesis-4.2.0-1.18.zip\org\apache\flink\connector\kinesis\sink >> has KinesisStreamsSink.class [class >> org.apache.flink.connector.kinesis.sink.KinesisStreamsSink] >> >> If I remove the sink the source still works perfectly fine >> (FlinkKinesisConsumer), but I don't understand what I'm missing. The jar >> I'm using should have everything. >> >> anyone else have similar issues? or know what I might need to do? >> >> >> Thank you, >> >> Nick Hecht >> >