Re: PyFlink Table API and UDF Limitations

2020-11-27 Thread Niklas Wilcke
Hi Xingbo,

thanks for sharing. This is very interesting.

Regards,
Niklas

> On 27. Nov 2020, at 03:05, Xingbo Huang  wrote:
> 
> Hi Niklas,
> 
> Thanks a lot for supporting PyFlink. In fact, your requirement for multiple 
> input and multiple output is essentially Table Aggregation Functions[1]. 
> Although PyFlink does not support it yet, we have listed it in the release 
> 1.13 plan. In addition, row-based operations[2] that are very user-friendly 
> to machine learning users are also included in the 1.13 plan.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#table-aggregation-functions
>  
> 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations
>  
> 
> 
> Best,
> Xingbo
> 
> Niklas Wilcke mailto:niklas.wil...@uniberg.com>> 
> 于2020年11月26日周四 下午5:11写道:
> Hi Xingbo,
> 
> thanks for taking care and letting me know. I was about to share an example, 
> how to reproduce this.
> Now I will wait for the next release candidate and give it a try.
> 
> Regards,
> Niklas
> 
> 
> --
> niklas.wil...@uniberg.com 
> Mobile: +49 160 9793 2593
> Office: +49 40 2380 6523
> 
> Simon-von-Utrecht-Straße 85a
> 20359 Hamburg
> 
> UNIBERG GmbH
> Registergericht: Amtsgericht Kiel HRB SE-1507
> Geschäftsführer: Andreas Möller, Martin Ulbricht
> 
> Information Art. 13 DSGVO B2B:
> Für die Kommunikation mit Ihnen verarbeiten wir ggf. Ihre personenbezogenen 
> Daten.
> Alle Informationen zum Umgang mit Ihren Daten finden Sie unter 
> https://www.uniberg.com/impressum.html 
> . 
> 
>> On 26. Nov 2020, at 02:59, Xingbo Huang > > wrote:
>> 
>> Hi Niklas,
>> 
>> Regarding `Exception in thread "grpc-nio-worker-ELG-3-2" 
>> java.lang.NoClassDefFoundError: 
>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1`, 
>> it does not affect the correctness of the result. The reason is that some 
>> resources are released asynchronously when Grpc Server is shut down[1] . 
>> After the UserClassLoader unloads the class, the asynchronous thread tries 
>> to release the resources and throw NotClassFoundException, but the content 
>> of the data result has been sent downstream, so the correctness of the 
>> result will not be affected.
>> 
>> Regarding the details of the specific causes, I have explained in the flink 
>> community[2] and the beam community[3], and fixed them in the flink 
>> community. There will be no such problem in the next version of release 
>> 1.11.3 and 1.12.0.
>> 
>> [1] 
>> https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/SharedResourceHolder.java#L150
>>  
>> 
>> [2] https://issues.apache.org/jira/browse/FLINK-20284 
>> 
>> [3] https://issues.apache.org/jira/browse/BEAM-5397 
>> 
>> 
>> Best,
>> Xingbo
>> 
>> 
>> Dian Fu mailto:dian0511...@gmail.com>> 
>> 于2020年11月16日周一 下午9:10写道:
>> Hi Niklas,
>> 
>>> How can I ingest data in a batch table from Kafka or even better 
>>> Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch 
>>> isn't offering a source at all.
>>> The only workaround which comes to my mind is to use the Kafka streaming 
>>> source and to apply a single very large window to create a bounded table. 
>>> Do you think that would work?
>>> Are there other options available? Maybe converting a Stream to a bounded 
>>> table is somehow possible? Thank you!
>> 
>> 
>> I think you are right that Kafka still doesn't support batch and there is no 
>> ES source for now. Another option is you could load the data into a 
>> connector which supports batch. Not sure if anybody else has a better idea 
>> about this.
>> 
>>> I found one cause of this problem and it was mixing a Scala 2.12 Flink 
>>> installation with PyFlink, which has some 2.11 jars in its opt folder. I 
>>> think the JVM just skipped the class definitions, because they weren't 
>>> compatible. I actually wasn't aware of the fact that PyFlink comes with 
>>> prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it 
>>> would make sense to point that out in the documentation. I think I never 
>>> read that and it might be missing. Unfortunately there is still one 
>>> Exception showing up at the very end of the job in the taskmanager log. I 
>>> did the verification you asked for and the class is present in both jar 
>>> files. The one which comes with Flink in the opt folder and the one of 
>>> PyFlink. You can find the log 

Re: PyFlink Table API and UDF Limitations

2020-11-26 Thread Xingbo Huang
Hi Niklas,

Thanks a lot for supporting PyFlink. In fact, your requirement for multiple
input and multiple output is essentially Table Aggregation Functions[1].
Although PyFlink does not support it yet, we have listed it in the release
1.13 plan. In addition, row-based operations[2] that are very user-friendly
to machine learning users are also included in the 1.13 plan.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#table-aggregation-functions
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations

Best,
Xingbo

Niklas Wilcke  于2020年11月26日周四 下午5:11写道:

> Hi Xingbo,
>
> thanks for taking care and letting me know. I was about to share an
> example, how to reproduce this.
> Now I will wait for the next release candidate and give it a try.
>
> Regards,
> Niklas
>
>
> --
> niklas.wil...@uniberg.com
> Mobile: +49 160 9793 2593
> Office: +49 40 2380 6523
>
> Simon-von-Utrecht-Straße 85a
> 20359 Hamburg
>
> UNIBERG GmbH
> Registergericht: Amtsgericht Kiel HRB SE-1507
> Geschäftsführer: Andreas Möller, Martin Ulbricht
>
> Information Art. 13 DSGVO B2B:
> Für die Kommunikation mit Ihnen verarbeiten wir ggf.
> Ihre personenbezogenen Daten.
> Alle Informationen zum Umgang mit Ihren Daten finden Sie unter
> https://www.uniberg.com/impressum.html.
>
> On 26. Nov 2020, at 02:59, Xingbo Huang  wrote:
>
> Hi Niklas,
>
> Regarding `Exception in thread "grpc-nio-worker-ELG-3-2"
> java.lang.NoClassDefFoundError:
> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1`,
> it does not affect the correctness of the result. The reason is that some
> resources are released asynchronously when Grpc Server is shut down[1] .
> After the UserClassLoader unloads the class, the asynchronous thread tries
> to release the resources and throw NotClassFoundException, but the content
> of the data result has been sent downstream, so the correctness of the
> result will not be affected.
>
> Regarding the details of the specific causes, I have explained in the
> flink community[2] and the beam community[3], and fixed them in the flink
> community. There will be no such problem in the next version of release
> 1.11.3 and 1.12.0.
>
> [1]
> https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/SharedResourceHolder.java#L150
> [2] https://issues.apache.org/jira/browse/FLINK-20284
> [3] https://issues.apache.org/jira/browse/BEAM-5397
>
> Best,
> Xingbo
>
>
> Dian Fu  于2020年11月16日周一 下午9:10写道:
>
>> Hi Niklas,
>>
>> How can I ingest data in a batch table from Kafka or even better
>> Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch
>> isn't offering a source at all.
>>
>> The only workaround which comes to my mind is to use the Kafka streaming
>> source and to apply a single very large window to create a bounded table.
>> Do you think that would work?
>> Are there other options available? Maybe converting a Stream to a
>> bounded table is somehow possible? Thank you!
>>
>>
>> I think you are right that Kafka still doesn't support batch and there is
>> no ES source for now. Another option is you could load the data into a
>> connector which supports batch. Not sure if anybody else has a better idea
>> about this.
>>
>> I found one cause of this problem and it was mixing a Scala 2.12 Flink
>> installation with PyFlink, which has some 2.11 jars in its opt folder. I
>> think the JVM just skipped the class definitions, because they weren't
>> compatible. I actually wasn't aware of the fact that PyFlink comes with
>> prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it
>> would make sense to point that out in the documentation. I think I never
>> read that and it might be missing. Unfortunately there is still one
>> Exception showing up at the very end of the job in the taskmanager log. I
>> did the verification you asked for and the class is present in both jar
>> files. The one which comes with Flink in the opt folder and the one of
>> PyFlink. You can find the log attached.
>> I think the main question is which jar file has be loaded in in the three
>> envronments (executor, jobmanager, taskmanager). Is it fine to not put the
>> flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and
>> taskmanager? Will it somehow be transferred by PyFlink to the jobmanager
>> and taskmanager?
>>
>>
>> PyFlink comes with the built-in jars such as
>> flink-python_2.11-1.12.0.jar, flink-dist_2.11-1.12.0.jar, etc and so you
>> don't need to manually add them(also shouldn't do that). Could you remove
>> the duplicate jars and try it again?
>>
>> No I don't think that there are additional exceptions besides
>> "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but
>> maybe take a look in the attached log files. This problem could be related
>> to 2., maybe the root cause is a class loading issue as well. What do you
>> think? You can find attached three 

Re: PyFlink Table API and UDF Limitations

2020-11-26 Thread Niklas Wilcke
Hi Xingbo,

thanks for taking care and letting me know. I was about to share an example, 
how to reproduce this.
Now I will wait for the next release candidate and give it a try.

Regards,
Niklas


--
niklas.wil...@uniberg.com
Mobile: +49 160 9793 2593
Office: +49 40 2380 6523

Simon-von-Utrecht-Straße 85a
20359 Hamburg

UNIBERG GmbH
Registergericht: Amtsgericht Kiel HRB SE-1507
Geschäftsführer: Andreas Möller, Martin Ulbricht

Information Art. 13 DSGVO B2B:
Für die Kommunikation mit Ihnen verarbeiten wir ggf. Ihre personenbezogenen 
Daten.
Alle Informationen zum Umgang mit Ihren Daten finden Sie unter 
https://www.uniberg.com/impressum.html. 

> On 26. Nov 2020, at 02:59, Xingbo Huang  wrote:
> 
> Hi Niklas,
> 
> Regarding `Exception in thread "grpc-nio-worker-ELG-3-2" 
> java.lang.NoClassDefFoundError: 
> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1`, 
> it does not affect the correctness of the result. The reason is that some 
> resources are released asynchronously when Grpc Server is shut down[1] . 
> After the UserClassLoader unloads the class, the asynchronous thread tries to 
> release the resources and throw NotClassFoundException, but the content of 
> the data result has been sent downstream, so the correctness of the result 
> will not be affected.
> 
> Regarding the details of the specific causes, I have explained in the flink 
> community[2] and the beam community[3], and fixed them in the flink 
> community. There will be no such problem in the next version of release 
> 1.11.3 and 1.12.0.
> 
> [1] 
> https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/SharedResourceHolder.java#L150
>  
> 
> [2] https://issues.apache.org/jira/browse/FLINK-20284 
> 
> [3] https://issues.apache.org/jira/browse/BEAM-5397 
> 
> 
> Best,
> Xingbo
> 
> 
> Dian Fu mailto:dian0511...@gmail.com>> 于2020年11月16日周一 
> 下午9:10写道:
> Hi Niklas,
> 
>> How can I ingest data in a batch table from Kafka or even better 
>> Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch 
>> isn't offering a source at all.
>> The only workaround which comes to my mind is to use the Kafka streaming 
>> source and to apply a single very large window to create a bounded table. Do 
>> you think that would work?
>> Are there other options available? Maybe converting a Stream to a bounded 
>> table is somehow possible? Thank you!
> 
> 
> I think you are right that Kafka still doesn't support batch and there is no 
> ES source for now. Another option is you could load the data into a connector 
> which supports batch. Not sure if anybody else has a better idea about this.
> 
>> I found one cause of this problem and it was mixing a Scala 2.12 Flink 
>> installation with PyFlink, which has some 2.11 jars in its opt folder. I 
>> think the JVM just skipped the class definitions, because they weren't 
>> compatible. I actually wasn't aware of the fact that PyFlink comes with 
>> prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it 
>> would make sense to point that out in the documentation. I think I never 
>> read that and it might be missing. Unfortunately there is still one 
>> Exception showing up at the very end of the job in the taskmanager log. I 
>> did the verification you asked for and the class is present in both jar 
>> files. The one which comes with Flink in the opt folder and the one of 
>> PyFlink. You can find the log attached.
>> I think the main question is which jar file has be loaded in in the three 
>> envronments (executor, jobmanager, taskmanager). Is it fine to not put the 
>> flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and 
>> taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and 
>> taskmanager?
> 
> PyFlink comes with the built-in jars such as flink-python_2.11-1.12.0.jar, 
> flink-dist_2.11-1.12.0.jar, etc and so you don't need to manually add 
> them(also shouldn't do that). Could you remove the duplicate jars and try it 
> again?
> 
>> No I don't think that there are additional exceptions besides 
>> "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but 
>> maybe take a look in the attached log files. This problem could be related 
>> to 2., maybe the root cause is a class loading issue as well. What do you 
>> think? You can find attached three log files. One for the executor, the 
>> jobmanager and the taskmanager. Maybe you can find something useful.
> 
> 
> I found one similar issue at Beam side: 
> https://issues.apache.org/jira/browse/BEAM-6258 
>  which has been resolved 
> long time ago. I'm still trying to reproduce this issue and will let you know 
> if there is any progress. (Would be great if 

Re: PyFlink Table API and UDF Limitations

2020-11-25 Thread Xingbo Huang
Hi Niklas,

Regarding `Exception in thread "grpc-nio-worker-ELG-3-2"
java.lang.NoClassDefFoundError:
org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1`,
it does not affect the correctness of the result. The reason is that some
resources are released asynchronously when Grpc Server is shut down[1] .
After the UserClassLoader unloads the class, the asynchronous thread tries
to release the resources and throw NotClassFoundException, but the content
of the data result has been sent downstream, so the correctness of the
result will not be affected.

Regarding the details of the specific causes, I have explained in the flink
community[2] and the beam community[3], and fixed them in the flink
community. There will be no such problem in the next version of release
1.11.3 and 1.12.0.

[1]
https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/SharedResourceHolder.java#L150
[2] https://issues.apache.org/jira/browse/FLINK-20284
[3] https://issues.apache.org/jira/browse/BEAM-5397

Best,
Xingbo


Dian Fu  于2020年11月16日周一 下午9:10写道:

> Hi Niklas,
>
> How can I ingest data in a batch table from Kafka or even better
> Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch
> isn't offering a source at all.
>
> The only workaround which comes to my mind is to use the Kafka streaming
> source and to apply a single very large window to create a bounded table.
> Do you think that would work?
> Are there other options available? Maybe converting a Stream to a bounded
> table is somehow possible? Thank you!
>
>
> I think you are right that Kafka still doesn't support batch and there is
> no ES source for now. Another option is you could load the data into a
> connector which supports batch. Not sure if anybody else has a better idea
> about this.
>
> I found one cause of this problem and it was mixing a Scala 2.12 Flink
> installation with PyFlink, which has some 2.11 jars in its opt folder. I
> think the JVM just skipped the class definitions, because they weren't
> compatible. I actually wasn't aware of the fact that PyFlink comes with
> prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it
> would make sense to point that out in the documentation. I think I never
> read that and it might be missing. Unfortunately there is still one
> Exception showing up at the very end of the job in the taskmanager log. I
> did the verification you asked for and the class is present in both jar
> files. The one which comes with Flink in the opt folder and the one of
> PyFlink. You can find the log attached.
> I think the main question is which jar file has be loaded in in the three
> envronments (executor, jobmanager, taskmanager). Is it fine to not put the
> flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and
> taskmanager? Will it somehow be transferred by PyFlink to the jobmanager
> and taskmanager?
>
>
> PyFlink comes with the built-in jars such as flink-python_2.11-1.12.0.jar,
> flink-dist_2.11-1.12.0.jar, etc and so you don't need to manually add
> them(also shouldn't do that). Could you remove the duplicate jars and try
> it again?
>
> No I don't think that there are additional exceptions besides
> "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but
> maybe take a look in the attached log files. This problem could be related
> to 2., maybe the root cause is a class loading issue as well. What do you
> think? You can find attached three log files. One for the executor, the
> jobmanager and the taskmanager. Maybe you can find something useful.
>
>
> I found one similar issue at Beam side:
> https://issues.apache.org/jira/browse/BEAM-6258 which has been resolved
> long time ago. I'm still trying to reproduce this issue and will let you
> know if there is any progress. (Would be great if you could help to provide
> an example which could easily reproduce this issue)
>
> This was very helpful. I was able to implement it. There is only one
> detail missing. Is it possible to UNNEST an array of Rows or tuples? It
> would be really great if I would be able to return a list with multiple
> fields. Currently I'm just putting multiple value into a single VARCHAR,
> but that means the information needs to be extracted later on. Maybe you
> have an idea how to avoid that.
>
>
> Currently, Pandas UDAF still doesn't support complex type and so I'm
> afraid that you have to put multiple values into a single VARCHAR for now.
>
> Regards,
> Dian
>
>
> 在 2020年11月16日,上午2:46,Niklas Wilcke  写道:
>
> Hi Dian,
>
> this was very helpful again. To the old questions I will answer inline as
> well. Unfortunately also one new question popped up.
>
> How can I ingest data in a batch table from Kafka or even better
> Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch
> isn't offering a source at all.
> The only workaround which comes to my mind is to use the Kafka streaming
> source and to apply a single very large window 

Re: PyFlink Table API and UDF Limitations

2020-11-16 Thread Dian Fu
Hi Niklas,

> How can I ingest data in a batch table from Kafka or even better 
> Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch 
> isn't offering a source at all.
> The only workaround which comes to my mind is to use the Kafka streaming 
> source and to apply a single very large window to create a bounded table. Do 
> you think that would work?
> Are there other options available? Maybe converting a Stream to a bounded 
> table is somehow possible? Thank you!


I think you are right that Kafka still doesn't support batch and there is no ES 
source for now. Another option is you could load the data into a connector 
which supports batch. Not sure if anybody else has a better idea about this.

> I found one cause of this problem and it was mixing a Scala 2.12 Flink 
> installation with PyFlink, which has some 2.11 jars in its opt folder. I 
> think the JVM just skipped the class definitions, because they weren't 
> compatible. I actually wasn't aware of the fact that PyFlink comes with 
> prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it 
> would make sense to point that out in the documentation. I think I never read 
> that and it might be missing. Unfortunately there is still one Exception 
> showing up at the very end of the job in the taskmanager log. I did the 
> verification you asked for and the class is present in both jar files. The 
> one which comes with Flink in the opt folder and the one of PyFlink. You can 
> find the log attached.
> I think the main question is which jar file has be loaded in in the three 
> envronments (executor, jobmanager, taskmanager). Is it fine to not put the 
> flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and 
> taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and 
> taskmanager?

PyFlink comes with the built-in jars such as flink-python_2.11-1.12.0.jar, 
flink-dist_2.11-1.12.0.jar, etc and so you don't need to manually add them(also 
shouldn't do that). Could you remove the duplicate jars and try it again?

> No I don't think that there are additional exceptions besides 
> "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but 
> maybe take a look in the attached log files. This problem could be related to 
> 2., maybe the root cause is a class loading issue as well. What do you think? 
> You can find attached three log files. One for the executor, the jobmanager 
> and the taskmanager. Maybe you can find something useful.


I found one similar issue at Beam side: 
https://issues.apache.org/jira/browse/BEAM-6258 
 which has been resolved long 
time ago. I'm still trying to reproduce this issue and will let you know if 
there is any progress. (Would be great if you could help to provide an example 
which could easily reproduce this issue)

> This was very helpful. I was able to implement it. There is only one detail 
> missing. Is it possible to UNNEST an array of Rows or tuples? It would be 
> really great if I would be able to return a list with multiple fields. 
> Currently I'm just putting multiple value into a single VARCHAR, but that 
> means the information needs to be extracted later on. Maybe you have an idea 
> how to avoid that.

Currently, Pandas UDAF still doesn't support complex type and so I'm afraid 
that you have to put multiple values into a single VARCHAR for now.

Regards,
Dian


> 在 2020年11月16日,上午2:46,Niklas Wilcke  写道:
> 
> Hi Dian,
> 
> this was very helpful again. To the old questions I will answer inline as 
> well. Unfortunately also one new question popped up.
> 
> How can I ingest data in a batch table from Kafka or even better 
> Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch 
> isn't offering a source at all.
> The only workaround which comes to my mind is to use the Kafka streaming 
> source and to apply a single very large window to create a bounded table. Do 
> you think that would work?
> Are there other options available? Maybe converting a Stream to a bounded 
> table is somehow possible? Thank you!
> 
> Kind Regards,
> Niklas
> 
> 
> 
>> On 13. Nov 2020, at 16:07, Dian Fu > > wrote:
>> 
>> Hi Niklas,
>> 
>> Good to know that this solution may work for you. Regarding to the questions 
>> you raised, please find my reply inline.
>> 
>> Regards,
>> Dian
>> 
>>> 在 2020年11月13日,下午8:48,Niklas Wilcke >> > 写道:
>>> 
>>> Hi Dian,
>>> 
>>> thanks again for your response. In the meantime I tried out your proposal 
>>> using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but 
>>> I am facing some issues, which I would like to address. If this goes too 
>>> far, please let me know and I will open a new thread for each of the 
>>> questions. Let me share some more information about my current environment, 
>>> which will maybe help to answer the questions. I'm currently 

Re: PyFlink Table API and UDF Limitations

2020-11-13 Thread Dian Fu
Hi Niklas,

Good to know that this solution may work for you. Regarding to the questions 
you raised, please find my reply inline.

Regards,
Dian

> 在 2020年11月13日,下午8:48,Niklas Wilcke  写道:
> 
> Hi Dian,
> 
> thanks again for your response. In the meantime I tried out your proposal 
> using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but I 
> am facing some issues, which I would like to address. If this goes too far, 
> please let me know and I will open a new thread for each of the questions. 
> Let me share some more information about my current environment, which will 
> maybe help to answer the questions. I'm currently using my dev machine with 
> Docker and one jobmanager container and one taskmanager container. If needed 
> I can share the whole docker environment, but this would involve some more 
> effort on my side. Here are my five questions.
> 
> 1. Where can I find connector libraries for 1.12.0-rc1 or some kind of 
> instruction how to build them? I can't find them in the 1.12.0-rc1 release 
> and when I build flink from source, I can't find the connector libraries in 
> the build target. I need flink-sql-connector-elasticsearch7 and 
> flink-sql-connector-kafka.

You could download the connector jars of 1.12.0-rc1 from here: 
https://repository.apache.org/content/repositories/orgapacheflink-1402/org/apache/flink/

> 2. Which steps are needed to properly Setup PyFlink? I followed the 
> instructions, but I always get some ClassNotFoundExceptions for some Beam 
> related classes in the taskmanager. The job still works fine, but this 
> doesn't look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried to 
> resolve this by adding certain jars, but I wasn't able to fix it. Maybe you 
> have an idea. You can find the Dockerfile attached, which lines out the steps 
> I'm currently using. The Exceptions signature looks like this.
>Exception in thread "grpc-nio-worker-ELG-3-2" 
> java.lang.NoClassDefFoundError: 
> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1

Usually there is nothing specially need to do to set up PyFlink. I have 
manually checked that this class should be there(inside 
flink-python_2.11-1.12.0.jar) and so guess if it's because you environment 
isn't clean enough? 

I guess you could check the following things:
1) Is it because you have installed 1.11.2 before and so the environment is not 
clean enough? Could you uninstall PyFlink 1.11.2 manually and reinstall PyFlink 
1.12.0-rc1 again? You could also manually check that there should be only one 
flink-python*.jar under directory xxx/site-packages/pyflink/opt/
2) Verify that the class is actually there by the following command: 
(flink-python_2.11-1.12.0.jar is under directory xxx/site-packages/pyflink/opt/)
jar tf flink-python_2.11-1.12.0.jar | grep 
"org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena"
3) If this exception still happens, could you share the exception stack?

> 3. When increasing the size of the input data set I get the following 
> Exception and the job is canceled. I tried to increase the resources assigned 
> to flink, but it didn't help. Do you have an idea why this is happening? You 
> can find a more detailed stack trace in apendix.

Could you check if there are any other exceptions in the log when this 
exception happens?

> 4. I can't manage to get the SQL UNNEST operation to work. It is quite hard 
> for me to debug it and I can't really find any valuable examples or 
> documentation on the internet. Currently instead of creating an ARRAY I'm 
> just returning a VARCHAR containing a string representation of the array. The 
> relevant code you can find in the apendix.

There are some examples here: 
https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala

> 5. How can I obtain the output of the Python interpreter executing the UDF. 
> If I put a print statement in the UDF I can't see the output in the log of 
> the taskmanager. Is there a way to access it?

You can use the standard logging in Python UDF instead of print. The log output 
could then be found in the log of the task manager.

> I hope these aren't too many questions for this thread. If this is the case I 
> can still split some of them out. Please let me know, if this is the case.
> Thank you very much. I really appreciate your help.

It's fine to reuse this thread. :)

> Kind Regards,
> Niklas
> 
> 
> Dockerfile for question 2.
> 
> # This image has been build based on the Dockerfile used for the flink image 
> on docker hub.
> # The only change I applied is that I switched to flink 1.12.0-rc1.
> FROM flink:1.12.0-rc1-scala_2.12
> 
> # Install python
> # TODO: Minimize dependencies
> RUN apt-get update && apt-get install -y \
> python3 \
> python3-pip \
> 

Re: PyFlink Table API and UDF Limitations

2020-11-13 Thread Niklas Wilcke
Hi Dian,

thanks again for your response. In the meantime I tried out your proposal using 
the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but I am 
facing some issues, which I would like to address. If this goes too far, please 
let me know and I will open a new thread for each of the questions. Let me 
share some more information about my current environment, which will maybe help 
to answer the questions. I'm currently using my dev machine with Docker and one 
jobmanager container and one taskmanager container. If needed I can share the 
whole docker environment, but this would involve some more effort on my side. 
Here are my five questions.

1. Where can I find connector libraries for 1.12.0-rc1 or some kind of 
instruction how to build them? I can't find them in the 1.12.0-rc1 release and 
when I build flink from source, I can't find the connector libraries in the 
build target. I need flink-sql-connector-elasticsearch7 and 
flink-sql-connector-kafka.

2. Which steps are needed to properly Setup PyFlink? I followed the 
instructions, but I always get some ClassNotFoundExceptions for some Beam 
related classes in the taskmanager. The job still works fine, but this doesn't 
look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried to resolve 
this by adding certain jars, but I wasn't able to fix it. Maybe you have an 
idea. You can find the Dockerfile attached, which lines out the steps I'm 
currently using. The Exceptions signature looks like this.
   Exception in thread "grpc-nio-worker-ELG-3-2" 
java.lang.NoClassDefFoundError: 
org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1

3. When increasing the size of the input data set I get the following Exception 
and the job is canceled. I tried to increase the resources assigned to flink, 
but it didn't help. Do you have an idea why this is happening? You can find a 
more detailed stack trace in apendix.

4. I can't manage to get the SQL UNNEST operation to work. It is quite hard for 
me to debug it and I can't really find any valuable examples or documentation 
on the internet. Currently instead of creating an ARRAY I'm just returning a 
VARCHAR containing a string representation of the array. The relevant code you 
can find in the apendix.

5. How can I obtain the output of the Python interpreter executing the UDF. If 
I put a print statement in the UDF I can't see the output in the log of the 
taskmanager. Is there a way to access it?

I hope these aren't too many questions for this thread. If this is the case I 
can still split some of them out. Please let me know, if this is the case.
Thank you very much. I really appreciate your help.

Kind Regards,
Niklas


Dockerfile for question 2.

# This image has been build based on the Dockerfile used for the flink image on 
docker hub.
# The only change I applied is that I switched to flink 1.12.0-rc1.
FROM flink:1.12.0-rc1-scala_2.12

# Install python
# TODO: Minimize dependencies
RUN apt-get update && apt-get install -y \
python3 \
python3-pip \
python3-dev \
zip \
  && rm -rf /var/lib/apt/lists/* \
  && ln -s /usr/bin/python3 /usr/bin/python \
  && ln -s /usr/bin/pip3 /usr/bin/pip

# Install pyflink
RUN wget --no-verbose 
https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
 \
  && pip install apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \
  && rm apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl

Stack Trace for question 3.

Caused by: java.lang.RuntimeException: Failed to close remote bundle
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368)
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:267)
at 
org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.invokeCurrentBatch(BatchArrowPythonGroupAggregateFunctionOperator.java:64)
at 
org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:94)
at 
org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33)
at 

Re: PyFlink Table API and UDF Limitations

2020-11-12 Thread Dian Fu
Hi Niklas,

Python DataStream API will also be supported in coming release of 1.12.0 [1]. 
However, the functionalities are still limited for the time being compared to 
the Java DataStream API, e.g. it will only support the stateless operations, 
such as map, flat_map, etc.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/python/datastream_tutorial.html

> 在 2020年11月12日,下午7:46,Niklas Wilcke  写道:
> 
> Hi Dian,
> 
> thank you very much for this valuable response. I already read about the 
> UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST 
> an array. I will definitely have a try and hopefully this will solve my issue.
> 
> Another question that came up to my mind is whether PyFlink supports any 
> other API except Table and SQL, like the Streaming and Batch API. The 
> documentation is only covering the Table API, but I'm not sure about that. 
> Can you maybe tell me whether the Table and SQL API is the only one supported 
> by PyFlink?
> 
> Kind Regards,
> Niklas
> 
>  
> 
>> On 11. Nov 2020, at 15:32, Dian Fu > > wrote:
>> 
>> Hi Niklas,
>> 
>> You are correct that the input/output length of Pandas UDF must be of the 
>> same size and that Flink will split the input data into multiple bundles for 
>> Pandas UDF and the bundle size is non-determinstic. Both of the above two 
>> limitations are by design and so I guess Pandas UDF could not meet your 
>> requirements.
>> 
>> However, you could take a look at if the Pandas UDAF[1] which was supported 
>> in 1.12 could meet your requirements:
>> - As group_by only generate one record per group key just as you said, you 
>> could declare the output type of Pandas UDAF as an array type
>> - You need then flatten the aggregation results, e.g. using UNNEST
>> 
>> NOTE: Flink 1.12 is still not released. You could try the PyFlink package of 
>> RC1[2] for 1.12.0 or build it yourself according to [3].
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
>>  
>> 
>> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/ 
>> 
>> [3] 
>> https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink
>>  
>> 
>> 
>> Regards,
>> Dian
>> 
>>> 在 2020年11月11日,下午9:03,Niklas Wilcke >> > 写道:
>>> 
>>> Hi Flink Community,
>>> 
>>> I'm currently trying to implement a parallel machine learning job with 
>>> Flink. The goal is to train models in parallel for independent time series 
>>> in the same data stream. For that purpose I'm using a Python library, which 
>>> lead me to PyFlink. Let me explain the use case a bit more.
>>> I want to implement a batch job, which partitions/groups the data by a 
>>> device identifier. After that I need to process the data for each device 
>>> all at once. There is no way to iteratively train the model unfortunately. 
>>> The challenge I'm facing is to guarantee that all data belonging to a 
>>> certain device is processed in one single step. I'm aware of the fact that 
>>> this does not scale well, but for a reasonable amount of input data per 
>>> device it should be fine from my perspective.
>>> I investigated a lot and I ended up using the Table API and Pandas UDF, 
>>> which roughly fulfil my requirements, but there are the following 
>>> limitations left, which I wanted to talk about.
>>> 
>>> 1. Pandas UDF takes multiple Series as input parameters, which is fine for 
>>> my purpose, but as far as I can see there is no way to guarantee that the 
>>> chunk of data in the Series is "complete". Flink will slice the Series and 
>>> maybe call the UDF multiple times for each device. As far as I can see 
>>> there are some config options like "python.fn-execution.arrow.batch.size" 
>>> and "python.fn-execution.bundle.time", which might help, but I'm not sure, 
>>> whether this is the right path to take.
>>> 2. The length of the input Series needs to be of the same size as the 
>>> output Series, which isn't nice for my use case. What I would like to do is 
>>> to process n rows and emit m rows. There shouldn't be any dependency 
>>> between the number of input rows and the number of output rows.
>>> 
>>> 3. How do I partition the data stream. The Table API offers a groupby, but 
>>> this doesn't serve my purpose, because I don't want to aggregate all the 
>>> grouped lines. Instead as stated above I want to emit m result lines per 
>>> group. Are there other options using the Table API or any other API to do 
>>> this kind of grouping. I would need something like a 

Re: PyFlink Table API and UDF Limitations

2020-11-12 Thread Niklas Wilcke
Hi Dian,

thank you very much for this valuable response. I already read about the UDAF, 
but I wasn't aware of the fact that it is possible to return and UNNEST an 
array. I will definitely have a try and hopefully this will solve my issue.

Another question that came up to my mind is whether PyFlink supports any other 
API except Table and SQL, like the Streaming and Batch API. The documentation 
is only covering the Table API, but I'm not sure about that. Can you maybe tell 
me whether the Table and SQL API is the only one supported by PyFlink?

Kind Regards,
Niklas

 

> On 11. Nov 2020, at 15:32, Dian Fu  wrote:
> 
> Hi Niklas,
> 
> You are correct that the input/output length of Pandas UDF must be of the 
> same size and that Flink will split the input data into multiple bundles for 
> Pandas UDF and the bundle size is non-determinstic. Both of the above two 
> limitations are by design and so I guess Pandas UDF could not meet your 
> requirements.
> 
> However, you could take a look at if the Pandas UDAF[1] which was supported 
> in 1.12 could meet your requirements:
> - As group_by only generate one record per group key just as you said, you 
> could declare the output type of Pandas UDAF as an array type
> - You need then flatten the aggregation results, e.g. using UNNEST
> 
> NOTE: Flink 1.12 is still not released. You could try the PyFlink package of 
> RC1[2] for 1.12.0 or build it yourself according to [3].
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
>  
> 
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/ 
> 
> [3] 
> https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink
>  
> 
> 
> Regards,
> Dian
> 
>> 在 2020年11月11日,下午9:03,Niklas Wilcke > > 写道:
>> 
>> Hi Flink Community,
>> 
>> I'm currently trying to implement a parallel machine learning job with 
>> Flink. The goal is to train models in parallel for independent time series 
>> in the same data stream. For that purpose I'm using a Python library, which 
>> lead me to PyFlink. Let me explain the use case a bit more.
>> I want to implement a batch job, which partitions/groups the data by a 
>> device identifier. After that I need to process the data for each device all 
>> at once. There is no way to iteratively train the model unfortunately. The 
>> challenge I'm facing is to guarantee that all data belonging to a certain 
>> device is processed in one single step. I'm aware of the fact that this does 
>> not scale well, but for a reasonable amount of input data per device it 
>> should be fine from my perspective.
>> I investigated a lot and I ended up using the Table API and Pandas UDF, 
>> which roughly fulfil my requirements, but there are the following 
>> limitations left, which I wanted to talk about.
>> 
>> 1. Pandas UDF takes multiple Series as input parameters, which is fine for 
>> my purpose, but as far as I can see there is no way to guarantee that the 
>> chunk of data in the Series is "complete". Flink will slice the Series and 
>> maybe call the UDF multiple times for each device. As far as I can see there 
>> are some config options like "python.fn-execution.arrow.batch.size" and 
>> "python.fn-execution.bundle.time", which might help, but I'm not sure, 
>> whether this is the right path to take.
>> 2. The length of the input Series needs to be of the same size as the output 
>> Series, which isn't nice for my use case. What I would like to do is to 
>> process n rows and emit m rows. There shouldn't be any dependency between 
>> the number of input rows and the number of output rows.
>> 
>> 3. How do I partition the data stream. The Table API offers a groupby, but 
>> this doesn't serve my purpose, because I don't want to aggregate all the 
>> grouped lines. Instead as stated above I want to emit m result lines per 
>> group. Are there other options using the Table API or any other API to do 
>> this kind of grouping. I would need something like a "keyBy()" from the 
>> streaming API. Maybe this can be combined? Can I create a separate table for 
>> each key?
>> 
>> I'm also open to ideas for a completely different approach not using the 
>> Table API or Pandas UDF. Any idea is welcome.
>> 
>> You can find a condensed version of the source code attached.
>> 
>> Kind Regards,
>> Niklas
>> 
>> 
>> 
>> #
>> 
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment, DataTypes
>> from 

Re: PyFlink Table API and UDF Limitations

2020-11-11 Thread Dian Fu
Hi Niklas,

You are correct that the input/output length of Pandas UDF must be of the same 
size and that Flink will split the input data into multiple bundles for Pandas 
UDF and the bundle size is non-determinstic. Both of the above two limitations 
are by design and so I guess Pandas UDF could not meet your requirements.

However, you could take a look at if the Pandas UDAF[1] which was supported in 
1.12 could meet your requirements:
- As group_by only generate one record per group key just as you said, you 
could declare the output type of Pandas UDAF as an array type
- You need then flatten the aggregation results, e.g. using UNNEST

NOTE: Flink 1.12 is still not released. You could try the PyFlink package of 
RC1[2] for 1.12.0 or build it yourself according to [3].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
 

[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/ 

[3] 
https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink
 


Regards,
Dian

> 在 2020年11月11日,下午9:03,Niklas Wilcke  写道:
> 
> Hi Flink Community,
> 
> I'm currently trying to implement a parallel machine learning job with Flink. 
> The goal is to train models in parallel for independent time series in the 
> same data stream. For that purpose I'm using a Python library, which lead me 
> to PyFlink. Let me explain the use case a bit more.
> I want to implement a batch job, which partitions/groups the data by a device 
> identifier. After that I need to process the data for each device all at 
> once. There is no way to iteratively train the model unfortunately. The 
> challenge I'm facing is to guarantee that all data belonging to a certain 
> device is processed in one single step. I'm aware of the fact that this does 
> not scale well, but for a reasonable amount of input data per device it 
> should be fine from my perspective.
> I investigated a lot and I ended up using the Table API and Pandas UDF, which 
> roughly fulfil my requirements, but there are the following limitations left, 
> which I wanted to talk about.
> 
> 1. Pandas UDF takes multiple Series as input parameters, which is fine for my 
> purpose, but as far as I can see there is no way to guarantee that the chunk 
> of data in the Series is "complete". Flink will slice the Series and maybe 
> call the UDF multiple times for each device. As far as I can see there are 
> some config options like "python.fn-execution.arrow.batch.size" and 
> "python.fn-execution.bundle.time", which might help, but I'm not sure, 
> whether this is the right path to take.
> 2. The length of the input Series needs to be of the same size as the output 
> Series, which isn't nice for my use case. What I would like to do is to 
> process n rows and emit m rows. There shouldn't be any dependency between the 
> number of input rows and the number of output rows.
> 
> 3. How do I partition the data stream. The Table API offers a groupby, but 
> this doesn't serve my purpose, because I don't want to aggregate all the 
> grouped lines. Instead as stated above I want to emit m result lines per 
> group. Are there other options using the Table API or any other API to do 
> this kind of grouping. I would need something like a "keyBy()" from the 
> streaming API. Maybe this can be combined? Can I create a separate table for 
> each key?
> 
> I'm also open to ideas for a completely different approach not using the 
> Table API or Pandas UDF. Any idea is welcome.
> 
> You can find a condensed version of the source code attached.
> 
> Kind Regards,
> Niklas
> 
> 
> 
> #
> 
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> 
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env)
> t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
>  True)
> 
> @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
> result_type=DataTypes.FLOAT(), udf_type='pandas')
> def forcast(ds_float_series, y):
> 
># Train the model and create the forcast
> 
>yhat_ts = forcast['yhat'].tail(input_size)
>return yhat_ts
> 
> t_env.register_function("forcast", forcast)
> 
> # Define sink and source here
> 
> t_env.execute_sql(my_source_ddl)
> t_env.execute_sql(my_sink_ddl)
> 
> # TODO: key_by instead of filter
> t_env.from_path('mySource') \
>.where("riid === 

PyFlink Table API and UDF Limitations

2020-11-11 Thread Niklas Wilcke
Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. 
The goal is to train models in parallel for independent time series in the same 
data stream. For that purpose I'm using a Python library, which lead me to 
PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device 
identifier. After that I need to process the data for each device all at once. 
There is no way to iteratively train the model unfortunately. The challenge I'm 
facing is to guarantee that all data belonging to a certain device is processed 
in one single step. I'm aware of the fact that this does not scale well, but 
for a reasonable amount of input data per device it should be fine from my 
perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which 
roughly fulfil my requirements, but there are the following limitations left, 
which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my 
purpose, but as far as I can see there is no way to guarantee that the chunk of 
data in the Series is "complete". Flink will slice the Series and maybe call 
the UDF multiple times for each device. As far as I can see there are some 
config options like "python.fn-execution.arrow.batch.size" and 
"python.fn-execution.bundle.time", which might help, but I'm not sure, whether 
this is the right path to take.

2. The length of the input Series needs to be of the same size as the output 
Series, which isn't nice for my use case. What I would like to do is to process 
n rows and emit m rows. There shouldn't be any dependency between the number of 
input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this 
doesn't serve my purpose, because I don't want to aggregate all the grouped 
lines. Instead as stated above I want to emit m result lines per group. Are 
there other options using the Table API or any other API to do this kind of 
grouping. I would need something like a "keyBy()" from the streaming API. Maybe 
this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table 
API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
 True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
 result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):

# Train the model and create the forcast

yhat_ts = forcast['yhat'].tail(input_size)
return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
.where("riid === 'r1i1'") \
.select("ds, riid, y, forcast(ds, y) as yhat_90d") \
.insert_into('mySink')

t_env.execute("pandas_udf_demo")

#




smime.p7s
Description: S/MIME cryptographic signature