Hi Kevin,

Sorry for the late reply.

Actually, you are able to pass arguments to the constructor of the Java
object when instancing in Python. Basic data types
(char/boolean/int/long/float/double/string, etc) can be directory passed
while complex types (array/list/map/POJO, etc) must be converted to java
objects before passing. Please refer to
https://www.py4j.org/py4j_java_collections.html for more information.

Best,
Shuiqiang

Kevin Lam <kevin....@shopify.com> 于2021年3月11日周四 上午4:28写道:

> A follow-up question--In the example you provided Shuiqiang, there were no
> arguments passed to the constructor of the custom sink/source.
>
> What's the best way to pass arguments to the constructor?
>
> On Fri, Mar 5, 2021 at 4:29 PM Kevin Lam <kevin....@shopify.com> wrote:
>
>> Thanks Shuiqiang! That's really helpful, we'll give the connectors a try.
>>
>> On Wed, Mar 3, 2021 at 4:02 AM Shuiqiang Chen <acqua....@gmail.com>
>> wrote:
>>
>>> Hi Kevin,
>>>
>>> Thank you for your questions. Currently, users are not able to defined
>>> custom source/sinks in Python. This is a greate feature that can unify the
>>> end to end PyFlink application development in Python and is a large topic
>>> that we have no plan to support at present.
>>>
>>> As you have noticed that `the Python DataStream API has several
>>> connectors [2] that use Py4J+Java gateways to interoperate with Java
>>> source/sinks`. These connectors are the extensions of the Python abstract
>>> class named `SourceFunction` and `SinkFunction`. Thess two classes can
>>> accept a Java source/sink instance and maintain it to enable the
>>> interoperation between Python and Java.  They can also accept a string of
>>> the full name of a Java/Scala defined Source/SinkFunction class and create
>>> the corresponding java instance. Bellow are the definition of these classes:
>>>
>>> class JavaFunctionWrapper(object):
>>>     """
>>>     A wrapper class that maintains a Function implemented in Java.
>>>     """
>>>
>>>     def __init__(self, j_function: Union[str, JavaObject]):
>>>         # TODO we should move this part to the get_java_function() to 
>>> perform a lazy load.
>>>         if isinstance(j_function, str):
>>>             j_func_class = get_gateway().jvm.__getattr__(j_function)
>>>             j_function = j_func_class()
>>>         self._j_function = j_function
>>>
>>>     def get_java_function(self):
>>>         return self._j_function
>>>
>>>
>>>
>>> class SourceFunction(JavaFunctionWrapper):
>>> """
>>> Base class for all stream data source in Flink.
>>> """
>>>
>>> def __init__(self, source_func: Union[str, JavaObject]):
>>> """
>>> Constructor of SinkFunction.
>>>
>>> :param source_func: The java SourceFunction object.
>>> """
>>> super(SourceFunction, self).__init__(source_func)
>>>
>>>
>>> class SinkFunction(JavaFunctionWrapper):
>>> """
>>> The base class for SinkFunctions.
>>> """
>>>
>>> def __init__(self, sink_func: Union[str, JavaObject]):
>>> """
>>> Constructor of SinkFunction.
>>>
>>> :param sink_func: The java SinkFunction object or the full name of the
>>> SinkFunction class.
>>> """
>>> super(SinkFunction, self).__init__(sink_func)
>>>
>>> Therefore, you are able to defined custom sources/sinks in Scala and
>>> apply them in Python. Here is the recommended approach for implementation:
>>>
>>> class MyBigTableSink(SinkFunction):
>>>     def __init__(self, class_name: str):
>>>         super(MyBigTableSink, self).__init__(class_name)
>>>
>>>
>>> def example():
>>>     env = StreamExecutionEnvironment.get_execution_environment()
>>>     env.add_jars('/the/path/of/your/MyBigTableSink.jar')
>>>     # ...
>>>     ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink"))
>>>     env.execute("Application with Custom Sink")
>>>
>>>
>>> if __name__ == '__main__':
>>>     example()
>>>
>>> Remember that you must add the jar of the Scala defined SinkFunction by
>>> calling `env.add_jars()` before adding the SinkFunction. And your custom
>>> sources/sinks function must be the extension of `SourceFunction` and
>>> `SinkFunction`.
>>>
>>> Any further questions are welcomed!
>>>
>>> Best,
>>> Shuiqiang
>>>
>>>
>>> Kevin Lam <kevin....@shopify.com> 于2021年3月3日周三 上午2:50写道:
>>>
>>>> Hello everyone,
>>>>
>>>> I have some questions about the Python API that hopefully folks in the
>>>> Apache Flink community can help with.
>>>>
>>>> A little background, I’m interested in using the Python Datastream API
>>>> because of stakeholders who don’t have a background in Scala/Java, and
>>>> would prefer Python if possible. Our team is open to maintaining Scala
>>>> constructs on our end, however we are looking to expose Flink for stateful
>>>> streaming via a Python API to end-users.
>>>>
>>>> Questions:
>>>>
>>>> 1/ The docs mention that custom Sources and Sinks cannot be defined in
>>>> Python, but must be written in Java/Scala [1]. What is the recommended
>>>> approach for interoperating between custom sinks/sources written in Scala,
>>>> with the Python API? If nothing is currently supported, is it on the road
>>>> map?
>>>>
>>>> 2/ Also, I’ve noted that the Python DataStream API has several
>>>> connectors [2] that use Py4J+Java gateways to interoperate with Java
>>>> source/sinks. Is there a way for users to build their own connectors? What
>>>> would this process entail?
>>>>
>>>> Ideally, we’d like to be able to define custom sources/sinks in Scala
>>>> and use them in our Python API Flink Applications. For example, defining a
>>>> BigTable sink in Scala for use in the Python API:
>>>>
>>>>
>>>> [3]
>>>>
>>>> Where MyBigTableSink is just somehow importing a Scala defined sink.
>>>>
>>>> More generally, we’re interested in learning more about Scala/Python
>>>> interoperability in Flink, and how we can expose the power of Flink’s Scala
>>>> APIs to Python. Open to any suggestions, strategies, etc.
>>>>
>>>> Looking forward to any thoughts!
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks
>>>>
>>>> [2]
>>>> https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py
>>>>
>>>> [3] Plaintext paste of code in screenshot, in case of attachment issues:
>>>> ```
>>>> from pyflink.common.typeinfo import Types
>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>> from pyflink.datastream.connectors import MyBigTableSink
>>>>
>>>> def example():
>>>>     env = StreamExecutionEnvironment.get_execution_environment()
>>>>     ...
>>>>     ds.add_sink(MyBigTableSink, ...)
>>>>     env.execute("Application with Custom Sink")
>>>>
>>>> if __name__ == '__main__':
>>>>     example()
>>>> ```
>>>>
>>>

Reply via email to