Hi John,

Kinesis and most of the other connectors will be supported in 1.16, see [1]
for more details about kinesis.

For versions prior to 1.16, you could try just as Andrew suggested or refer
to the implementations which are already available in the master as
examples.

Regards,
Dian

[1]
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors/kinesis.py

On Fri, Jun 24, 2022 at 9:20 PM Andrew Otto <o...@wikimedia.org> wrote:

> I've had success using the Java in pyflink via pyflink.java_gateway.
> Something like:
>
> from pyflink.java_gateway import get_gateway
> jvm = get_gateway()
>
> # then perhaps something like:
> FlinkKinesisConsumer = jvm.
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
>
> There also seems to be a nice java_utils.py
> <https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/util/java_utils.py>
>  with helpers that may uh, help.
>
> Not sure if this will work, you might need to use the python env's a java
> StreamTableEnvironment to do it?  Here's an example
> <https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/datastream/stream_execution_environment.py#L922-L937>
> of how the python StreamTableEnvironment calls out to the Java one.
>
> BTW: I'm not an authority nor I have I really tried this, so take this
> advice with a grain of salt!  :)
>
> Good luck!
>
>
>
>
>
>
> On Fri, Jun 24, 2022 at 9:06 AM John Tipper <john_tip...@hotmail.com>
> wrote:
>
>> Hi all,
>>
>> There are a number of connectors which do not appear to be in the Python
>> API v1.15.0, e.g. Kinesis. I can see that it's possible to use these
>> connectors by using the Table API:
>>
>> CREATE TABLE my_table (...)
>> WITH ('connector' = 'kinesis' ...)
>>
>>
>> I guess if you wanted the stream as a DataStream you'd I guess you'd
>> create the Table and then convert into a DataStream?
>>
>> Is there a way of directly instantiating these connectors in PyFlink
>> without needed to use SQL like this (and without having to wait until
>> v1.16)? e.g. the Java API looks like this:
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>>     "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
>>
>>
>> Many thanks,
>>
>> John
>>
>

Reply via email to