Hi Niklas,

Python DataStream API will also be supported in coming release of 1.12.0 [1]. 
However, the functionalities are still limited for the time being compared to 
the Java DataStream API, e.g. it will only support the stateless operations, 
such as map, flat_map, etc.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/python/datastream_tutorial.html

> 在 2020年11月12日,下午7:46,Niklas Wilcke <niklas.wil...@uniberg.com> 写道:
> 
> Hi Dian,
> 
> thank you very much for this valuable response. I already read about the 
> UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST 
> an array. I will definitely have a try and hopefully this will solve my issue.
> 
> Another question that came up to my mind is whether PyFlink supports any 
> other API except Table and SQL, like the Streaming and Batch API. The 
> documentation is only covering the Table API, but I'm not sure about that. 
> Can you maybe tell me whether the Table and SQL API is the only one supported 
> by PyFlink?
> 
> Kind Regards,
> Niklas
> 
>  
> 
>> On 11. Nov 2020, at 15:32, Dian Fu <dian0511...@gmail.com 
>> <mailto:dian0511...@gmail.com>> wrote:
>> 
>> Hi Niklas,
>> 
>> You are correct that the input/output length of Pandas UDF must be of the 
>> same size and that Flink will split the input data into multiple bundles for 
>> Pandas UDF and the bundle size is non-determinstic. Both of the above two 
>> limitations are by design and so I guess Pandas UDF could not meet your 
>> requirements.
>> 
>> However, you could take a look at if the Pandas UDAF[1] which was supported 
>> in 1.12 could meet your requirements:
>> - As group_by only generate one record per group key just as you said, you 
>> could declare the output type of Pandas UDAF as an array type
>> - You need then flatten the aggregation results, e.g. using UNNEST
>> 
>> NOTE: Flink 1.12 is still not released. You could try the PyFlink package of 
>> RC1[2] for 1.12.0 or build it yourself according to [3].
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions>
>> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/ 
>> <https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/>
>> [3] 
>> https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink>
>> 
>> Regards,
>> Dian
>> 
>>> 在 2020年11月11日,下午9:03,Niklas Wilcke <niklas.wil...@uniberg.com 
>>> <mailto:niklas.wil...@uniberg.com>> 写道:
>>> 
>>> Hi Flink Community,
>>> 
>>> I'm currently trying to implement a parallel machine learning job with 
>>> Flink. The goal is to train models in parallel for independent time series 
>>> in the same data stream. For that purpose I'm using a Python library, which 
>>> lead me to PyFlink. Let me explain the use case a bit more.
>>> I want to implement a batch job, which partitions/groups the data by a 
>>> device identifier. After that I need to process the data for each device 
>>> all at once. There is no way to iteratively train the model unfortunately. 
>>> The challenge I'm facing is to guarantee that all data belonging to a 
>>> certain device is processed in one single step. I'm aware of the fact that 
>>> this does not scale well, but for a reasonable amount of input data per 
>>> device it should be fine from my perspective.
>>> I investigated a lot and I ended up using the Table API and Pandas UDF, 
>>> which roughly fulfil my requirements, but there are the following 
>>> limitations left, which I wanted to talk about.
>>> 
>>> 1. Pandas UDF takes multiple Series as input parameters, which is fine for 
>>> my purpose, but as far as I can see there is no way to guarantee that the 
>>> chunk of data in the Series is "complete". Flink will slice the Series and 
>>> maybe call the UDF multiple times for each device. As far as I can see 
>>> there are some config options like "python.fn-execution.arrow.batch.size" 
>>> and "python.fn-execution.bundle.time", which might help, but I'm not sure, 
>>> whether this is the right path to take.
>>> 2. The length of the input Series needs to be of the same size as the 
>>> output Series, which isn't nice for my use case. What I would like to do is 
>>> to process n rows and emit m rows. There shouldn't be any dependency 
>>> between the number of input rows and the number of output rows.
>>> 
>>> 3. How do I partition the data stream. The Table API offers a groupby, but 
>>> this doesn't serve my purpose, because I don't want to aggregate all the 
>>> grouped lines. Instead as stated above I want to emit m result lines per 
>>> group. Are there other options using the Table API or any other API to do 
>>> this kind of grouping. I would need something like a "keyBy()" from the 
>>> streaming API. Maybe this can be combined? Can I create a separate table 
>>> for each key?
>>> 
>>> I'm also open to ideas for a completely different approach not using the 
>>> Table API or Pandas UDF. Any idea is welcome.
>>> 
>>> You can find a condensed version of the source code attached.
>>> 
>>> Kind Regards,
>>> Niklas
>>> 
>>> 
>>> 
>>> #############################################################
>>> 
>>> from pyflink.datastream import StreamExecutionEnvironment
>>> from pyflink.table import StreamTableEnvironment, DataTypes
>>> from pyflink.table.udf import udf
>>> 
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>> t_env = StreamTableEnvironment.create(env)
>>> t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
>>>  True)
>>> 
>>> @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
>>>     result_type=DataTypes.FLOAT(), udf_type='pandas')
>>> def forcast(ds_float_series, y):
>>> 
>>>    # Train the model and create the forcast
>>> 
>>>    yhat_ts = forcast['yhat'].tail(input_size)
>>>    return yhat_ts
>>> 
>>> t_env.register_function("forcast", forcast)
>>> 
>>> # Define sink and source here
>>> 
>>> t_env.execute_sql(my_source_ddl)
>>> t_env.execute_sql(my_sink_ddl)
>>> 
>>> # TODO: key_by instead of filter
>>> t_env.from_path('mySource') \
>>>    .where("riid === 'r1i1'") \
>>>    .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
>>>    .insert_into('mySink')
>>> 
>>> t_env.execute("pandas_udf_demo")
>>> 
>>> #############################################################
>>> 
>>> 
>> 
> 

Reply via email to