Hi Niklas,

You are correct that the input/output length of Pandas UDF must be of the same 
size and that Flink will split the input data into multiple bundles for Pandas 
UDF and the bundle size is non-determinstic. Both of the above two limitations 
are by design and so I guess Pandas UDF could not meet your requirements.

However, you could take a look at if the Pandas UDAF[1] which was supported in 
1.12 could meet your requirements:
- As group_by only generate one record per group key just as you said, you 
could declare the output type of Pandas UDAF as an array type
- You need then flatten the aggregation results, e.g. using UNNEST

NOTE: Flink 1.12 is still not released. You could try the PyFlink package of 
RC1[2] for 1.12.0 or build it yourself according to [3].

[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/ 


> 在 2020年11月11日,下午9:03,Niklas Wilcke <niklas.wil...@uniberg.com> 写道:
> Hi Flink Community,
> I'm currently trying to implement a parallel machine learning job with Flink. 
> The goal is to train models in parallel for independent time series in the 
> same data stream. For that purpose I'm using a Python library, which lead me 
> to PyFlink. Let me explain the use case a bit more.
> I want to implement a batch job, which partitions/groups the data by a device 
> identifier. After that I need to process the data for each device all at 
> once. There is no way to iteratively train the model unfortunately. The 
> challenge I'm facing is to guarantee that all data belonging to a certain 
> device is processed in one single step. I'm aware of the fact that this does 
> not scale well, but for a reasonable amount of input data per device it 
> should be fine from my perspective.
> I investigated a lot and I ended up using the Table API and Pandas UDF, which 
> roughly fulfil my requirements, but there are the following limitations left, 
> which I wanted to talk about.
> 1. Pandas UDF takes multiple Series as input parameters, which is fine for my 
> purpose, but as far as I can see there is no way to guarantee that the chunk 
> of data in the Series is "complete". Flink will slice the Series and maybe 
> call the UDF multiple times for each device. As far as I can see there are 
> some config options like "python.fn-execution.arrow.batch.size" and 
> "python.fn-execution.bundle.time", which might help, but I'm not sure, 
> whether this is the right path to take.
> 2. The length of the input Series needs to be of the same size as the output 
> Series, which isn't nice for my use case. What I would like to do is to 
> process n rows and emit m rows. There shouldn't be any dependency between the 
> number of input rows and the number of output rows.
> 3. How do I partition the data stream. The Table API offers a groupby, but 
> this doesn't serve my purpose, because I don't want to aggregate all the 
> grouped lines. Instead as stated above I want to emit m result lines per 
> group. Are there other options using the Table API or any other API to do 
> this kind of grouping. I would need something like a "keyBy()" from the 
> streaming API. Maybe this can be combined? Can I create a separate table for 
> each key?
> I'm also open to ideas for a completely different approach not using the 
> Table API or Pandas UDF. Any idea is welcome.
> You can find a condensed version of the source code attached.
> Kind Regards,
> Niklas
> #############################################################
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env)
> t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
>  True)
> @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
>     result_type=DataTypes.FLOAT(), udf_type='pandas')
> def forcast(ds_float_series, y):
>    # Train the model and create the forcast
>    yhat_ts = forcast['yhat'].tail(input_size)
>    return yhat_ts
> t_env.register_function("forcast", forcast)
> # Define sink and source here
> t_env.execute_sql(my_source_ddl)
> t_env.execute_sql(my_sink_ddl)
> # TODO: key_by instead of filter
> t_env.from_path('mySource') \
>    .where("riid === 'r1i1'") \
>    .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
>    .insert_into('mySink')
> t_env.execute("pandas_udf_demo")
> #############################################################

Reply via email to