Hi Niklas, Python DataStream API will also be supported in coming release of 1.12.0 [1]. However, the functionalities are still limited for the time being compared to the Java DataStream API, e.g. it will only support the stateless operations, such as map, flat_map, etc.
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/python/datastream_tutorial.html > 在 2020年11月12日,下午7:46,Niklas Wilcke <niklas.wil...@uniberg.com> 写道: > > Hi Dian, > > thank you very much for this valuable response. I already read about the > UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST > an array. I will definitely have a try and hopefully this will solve my issue. > > Another question that came up to my mind is whether PyFlink supports any > other API except Table and SQL, like the Streaming and Batch API. The > documentation is only covering the Table API, but I'm not sure about that. > Can you maybe tell me whether the Table and SQL API is the only one supported > by PyFlink? > > Kind Regards, > Niklas > > > >> On 11. Nov 2020, at 15:32, Dian Fu <dian0511...@gmail.com >> <mailto:dian0511...@gmail.com>> wrote: >> >> Hi Niklas, >> >> You are correct that the input/output length of Pandas UDF must be of the >> same size and that Flink will split the input data into multiple bundles for >> Pandas UDF and the bundle size is non-determinstic. Both of the above two >> limitations are by design and so I guess Pandas UDF could not meet your >> requirements. >> >> However, you could take a look at if the Pandas UDAF[1] which was supported >> in 1.12 could meet your requirements: >> - As group_by only generate one record per group key just as you said, you >> could declare the output type of Pandas UDAF as an array type >> - You need then flatten the aggregation results, e.g. using UNNEST >> >> NOTE: Flink 1.12 is still not released. You could try the PyFlink package of >> RC1[2] for 1.12.0 or build it yourself according to [3]. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions >> >> <https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions> >> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/ >> <https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/> >> [3] >> https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink >> >> <https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink> >> >> Regards, >> Dian >> >>> 在 2020年11月11日,下午9:03,Niklas Wilcke <niklas.wil...@uniberg.com >>> <mailto:niklas.wil...@uniberg.com>> 写道: >>> >>> Hi Flink Community, >>> >>> I'm currently trying to implement a parallel machine learning job with >>> Flink. The goal is to train models in parallel for independent time series >>> in the same data stream. For that purpose I'm using a Python library, which >>> lead me to PyFlink. Let me explain the use case a bit more. >>> I want to implement a batch job, which partitions/groups the data by a >>> device identifier. After that I need to process the data for each device >>> all at once. There is no way to iteratively train the model unfortunately. >>> The challenge I'm facing is to guarantee that all data belonging to a >>> certain device is processed in one single step. I'm aware of the fact that >>> this does not scale well, but for a reasonable amount of input data per >>> device it should be fine from my perspective. >>> I investigated a lot and I ended up using the Table API and Pandas UDF, >>> which roughly fulfil my requirements, but there are the following >>> limitations left, which I wanted to talk about. >>> >>> 1. Pandas UDF takes multiple Series as input parameters, which is fine for >>> my purpose, but as far as I can see there is no way to guarantee that the >>> chunk of data in the Series is "complete". Flink will slice the Series and >>> maybe call the UDF multiple times for each device. As far as I can see >>> there are some config options like "python.fn-execution.arrow.batch.size" >>> and "python.fn-execution.bundle.time", which might help, but I'm not sure, >>> whether this is the right path to take. >>> 2. The length of the input Series needs to be of the same size as the >>> output Series, which isn't nice for my use case. What I would like to do is >>> to process n rows and emit m rows. There shouldn't be any dependency >>> between the number of input rows and the number of output rows. >>> >>> 3. How do I partition the data stream. The Table API offers a groupby, but >>> this doesn't serve my purpose, because I don't want to aggregate all the >>> grouped lines. Instead as stated above I want to emit m result lines per >>> group. Are there other options using the Table API or any other API to do >>> this kind of grouping. I would need something like a "keyBy()" from the >>> streaming API. Maybe this can be combined? Can I create a separate table >>> for each key? >>> >>> I'm also open to ideas for a completely different approach not using the >>> Table API or Pandas UDF. Any idea is welcome. >>> >>> You can find a condensed version of the source code attached. >>> >>> Kind Regards, >>> Niklas >>> >>> >>> >>> ############################################################# >>> >>> from pyflink.datastream import StreamExecutionEnvironment >>> from pyflink.table import StreamTableEnvironment, DataTypes >>> from pyflink.table.udf import udf >>> >>> env = StreamExecutionEnvironment.get_execution_environment() >>> env.set_parallelism(1) >>> t_env = StreamTableEnvironment.create(env) >>> t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", >>> True) >>> >>> @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()], >>> result_type=DataTypes.FLOAT(), udf_type='pandas') >>> def forcast(ds_float_series, y): >>> >>> # Train the model and create the forcast >>> >>> yhat_ts = forcast['yhat'].tail(input_size) >>> return yhat_ts >>> >>> t_env.register_function("forcast", forcast) >>> >>> # Define sink and source here >>> >>> t_env.execute_sql(my_source_ddl) >>> t_env.execute_sql(my_sink_ddl) >>> >>> # TODO: key_by instead of filter >>> t_env.from_path('mySource') \ >>> .where("riid === 'r1i1'") \ >>> .select("ds, riid, y, forcast(ds, y) as yhat_90d") \ >>> .insert_into('mySink') >>> >>> t_env.execute("pandas_udf_demo") >>> >>> ############################################################# >>> >>> >> >