Hi Xingbo,

thanks for sharing. This is very interesting.

Regards,
Niklas

> On 27. Nov 2020, at 03:05, Xingbo Huang <hxbks...@gmail.com> wrote:
> 
> Hi Niklas,
> 
> Thanks a lot for supporting PyFlink. In fact, your requirement for multiple 
> input and multiple output is essentially Table Aggregation Functions[1]. 
> Although PyFlink does not support it yet, we have listed it in the release 
> 1.13 plan. In addition, row-based operations[2] that are very user-friendly 
> to machine learning users are also included in the 1.13 plan.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#table-aggregation-functions
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#table-aggregation-functions>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations>
> 
> Best,
> Xingbo
> 
> Niklas Wilcke <niklas.wil...@uniberg.com <mailto:niklas.wil...@uniberg.com>> 
> 于2020年11月26日周四 下午5:11写道:
> Hi Xingbo,
> 
> thanks for taking care and letting me know. I was about to share an example, 
> how to reproduce this.
> Now I will wait for the next release candidate and give it a try.
> 
> Regards,
> Niklas
> 
> 
> --
> niklas.wil...@uniberg.com <mailto:niklas.wil...@uniberg.com>
> Mobile: +49 160 9793 2593
> Office: +49 40 2380 6523
> 
> Simon-von-Utrecht-Straße 85a
> 20359 Hamburg
> 
> UNIBERG GmbH
> Registergericht: Amtsgericht Kiel HRB SE-1507
> Geschäftsführer: Andreas Möller, Martin Ulbricht
> 
> Information Art. 13 DSGVO B2B:
> Für die Kommunikation mit Ihnen verarbeiten wir ggf. Ihre personenbezogenen 
> Daten.
> Alle Informationen zum Umgang mit Ihren Daten finden Sie unter 
> https://www.uniberg.com/impressum.html 
> <https://www.uniberg.com/impressum.html>. 
> 
>> On 26. Nov 2020, at 02:59, Xingbo Huang <hxbks...@gmail.com 
>> <mailto:hxbks...@gmail.com>> wrote:
>> 
>> Hi Niklas,
>> 
>> Regarding `Exception in thread "grpc-nio-worker-ELG-3-2" 
>> java.lang.NoClassDefFoundError: 
>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1`, 
>> it does not affect the correctness of the result. The reason is that some 
>> resources are released asynchronously when Grpc Server is shut down[1] . 
>> After the UserClassLoader unloads the class, the asynchronous thread tries 
>> to release the resources and throw NotClassFoundException, but the content 
>> of the data result has been sent downstream, so the correctness of the 
>> result will not be affected.
>> 
>> Regarding the details of the specific causes, I have explained in the flink 
>> community[2] and the beam community[3], and fixed them in the flink 
>> community. There will be no such problem in the next version of release 
>> 1.11.3 and 1.12.0.
>> 
>> [1] 
>> https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/SharedResourceHolder.java#L150
>>  
>> <https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/SharedResourceHolder.java#L150>
>> [2] https://issues.apache.org/jira/browse/FLINK-20284 
>> <https://issues.apache.org/jira/browse/FLINK-20284>
>> [3] https://issues.apache.org/jira/browse/BEAM-5397 
>> <https://issues.apache.org/jira/browse/BEAM-5397>
>> 
>> Best,
>> Xingbo
>> 
>> 
>> Dian Fu <dian0511...@gmail.com <mailto:dian0511...@gmail.com>> 
>> 于2020年11月16日周一 下午9:10写道:
>> Hi Niklas,
>> 
>>> How can I ingest data in a batch table from Kafka or even better 
>>> Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch 
>>> isn't offering a source at all.
>>> The only workaround which comes to my mind is to use the Kafka streaming 
>>> source and to apply a single very large window to create a bounded table. 
>>> Do you think that would work?
>>> Are there other options available? Maybe converting a Stream to a bounded 
>>> table is somehow possible? Thank you!
>> 
>> 
>> I think you are right that Kafka still doesn't support batch and there is no 
>> ES source for now. Another option is you could load the data into a 
>> connector which supports batch. Not sure if anybody else has a better idea 
>> about this.
>> 
>>> I found one cause of this problem and it was mixing a Scala 2.12 Flink 
>>> installation with PyFlink, which has some 2.11 jars in its opt folder. I 
>>> think the JVM just skipped the class definitions, because they weren't 
>>> compatible. I actually wasn't aware of the fact that PyFlink comes with 
>>> prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it 
>>> would make sense to point that out in the documentation. I think I never 
>>> read that and it might be missing. Unfortunately there is still one 
>>> Exception showing up at the very end of the job in the taskmanager log. I 
>>> did the verification you asked for and the class is present in both jar 
>>> files. The one which comes with Flink in the opt folder and the one of 
>>> PyFlink. You can find the log attached.
>>> I think the main question is which jar file has be loaded in in the three 
>>> envronments (executor, jobmanager, taskmanager). Is it fine to not put the 
>>> flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and 
>>> taskmanager? Will it somehow be transferred by PyFlink to the jobmanager 
>>> and taskmanager?
>> 
>> PyFlink comes with the built-in jars such as flink-python_2.11-1.12.0.jar, 
>> flink-dist_2.11-1.12.0.jar, etc and so you don't need to manually add 
>> them(also shouldn't do that). Could you remove the duplicate jars and try it 
>> again?
>> 
>>> No I don't think that there are additional exceptions besides 
>>> "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but 
>>> maybe take a look in the attached log files. This problem could be related 
>>> to 2., maybe the root cause is a class loading issue as well. What do you 
>>> think? You can find attached three log files. One for the executor, the 
>>> jobmanager and the taskmanager. Maybe you can find something useful.
>> 
>> 
>> I found one similar issue at Beam side: 
>> https://issues.apache.org/jira/browse/BEAM-6258 
>> <https://issues.apache.org/jira/browse/BEAM-6258> which has been resolved 
>> long time ago. I'm still trying to reproduce this issue and will let you 
>> know if there is any progress. (Would be great if you could help to provide 
>> an example which could easily reproduce this issue)
>> 
>>> This was very helpful. I was able to implement it. There is only one detail 
>>> missing. Is it possible to UNNEST an array of Rows or tuples? It would be 
>>> really great if I would be able to return a list with multiple fields. 
>>> Currently I'm just putting multiple value into a single VARCHAR, but that 
>>> means the information needs to be extracted later on. Maybe you have an 
>>> idea how to avoid that.
>> 
>> Currently, Pandas UDAF still doesn't support complex type and so I'm afraid 
>> that you have to put multiple values into a single VARCHAR for now.
>> 
>> Regards,
>> Dian
>> 
>> 
>>> 在 2020年11月16日,上午2:46,Niklas Wilcke <niklas.wil...@uniberg.com 
>>> <mailto:niklas.wil...@uniberg.com>> 写道:
>>> 
>>> Hi Dian,
>>> 
>>> this was very helpful again. To the old questions I will answer inline as 
>>> well. Unfortunately also one new question popped up.
>>> 
>>> How can I ingest data in a batch table from Kafka or even better 
>>> Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch 
>>> isn't offering a source at all.
>>> The only workaround which comes to my mind is to use the Kafka streaming 
>>> source and to apply a single very large window to create a bounded table. 
>>> Do you think that would work?
>>> Are there other options available? Maybe converting a Stream to a bounded 
>>> table is somehow possible? Thank you!
>>> 
>>> Kind Regards,
>>> Niklas
>>> 
>>> 
>>> 
>>>> On 13. Nov 2020, at 16:07, Dian Fu <dian0511...@gmail.com 
>>>> <mailto:dian0511...@gmail.com>> wrote:
>>>> 
>>>> Hi Niklas,
>>>> 
>>>> Good to know that this solution may work for you. Regarding to the 
>>>> questions you raised, please find my reply inline.
>>>> 
>>>> Regards,
>>>> Dian
>>>> 
>>>>> 在 2020年11月13日,下午8:48,Niklas Wilcke <niklas.wil...@uniberg.com 
>>>>> <mailto:niklas.wil...@uniberg.com>> 写道:
>>>>> 
>>>>> Hi Dian,
>>>>> 
>>>>> thanks again for your response. In the meantime I tried out your proposal 
>>>>> using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, 
>>>>> but I am facing some issues, which I would like to address. If this goes 
>>>>> too far, please let me know and I will open a new thread for each of the 
>>>>> questions. Let me share some more information about my current 
>>>>> environment, which will maybe help to answer the questions. I'm currently 
>>>>> using my dev machine with Docker and one jobmanager container and one 
>>>>> taskmanager container. If needed I can share the whole docker 
>>>>> environment, but this would involve some more effort on my side. Here are 
>>>>> my five questions.
>>>>> 
>>>>> 1. Where can I find connector libraries for 1.12.0-rc1 or some kind of 
>>>>> instruction how to build them? I can't find them in the 1.12.0-rc1 
>>>>> release and when I build flink from source, I can't find the connector 
>>>>> libraries in the build target. I need flink-sql-connector-elasticsearch7 
>>>>> and flink-sql-connector-kafka.
>>>> 
>>>> You could download the connector jars of 1.12.0-rc1 from here: 
>>>> https://repository.apache.org/content/repositories/orgapacheflink-1402/org/apache/flink/
>>>>  
>>>> <https://repository.apache.org/content/repositories/orgapacheflink-1402/org/apache/flink/>
>>> Thanks that worked like a charm!
>>> 
>>>> 
>>>>> 2. Which steps are needed to properly Setup PyFlink? I followed the 
>>>>> instructions, but I always get some ClassNotFoundExceptions for some Beam 
>>>>> related classes in the taskmanager. The job still works fine, but this 
>>>>> doesn't look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried 
>>>>> to resolve this by adding certain jars, but I wasn't able to fix it. 
>>>>> Maybe you have an idea. You can find the Dockerfile attached, which lines 
>>>>> out the steps I'm currently using. The Exceptions signature looks like 
>>>>> this.
>>>>>    Exception in thread "grpc-nio-worker-ELG-3-2" 
>>>>> java.lang.NoClassDefFoundError: 
>>>>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1
>>>> 
>>>> Usually there is nothing specially need to do to set up PyFlink. I have 
>>>> manually checked that this class should be there(inside 
>>>> flink-python_2.11-1.12.0.jar) and so guess if it's because you environment 
>>>> isn't clean enough? 
>>>> 
>>>> I guess you could check the following things:
>>>> 1) Is it because you have installed 1.11.2 before and so the environment 
>>>> is not clean enough? Could you uninstall PyFlink 1.11.2 manually and 
>>>> reinstall PyFlink 1.12.0-rc1 again? You could also manually check that 
>>>> there should be only one flink-python*.jar under directory 
>>>> xxx/site-packages/pyflink/opt/
>>>> 2) Verify that the class is actually there by the following command: 
>>>> (flink-python_2.11-1.12.0.jar is under directory 
>>>> xxx/site-packages/pyflink/opt/)
>>>>    jar tf flink-python_2.11-1.12.0.jar | grep 
>>>> "org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena"
>>>> 3) If this exception still happens, could you share the exception stack?
>>> 
>>> I found one cause of this problem and it was mixing a Scala 2.12 Flink 
>>> installation with PyFlink, which has some 2.11 jars in its opt folder. I 
>>> think the JVM just skipped the class definitions, because they weren't 
>>> compatible. I actually wasn't aware of the fact that PyFlink comes with 
>>> prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it 
>>> would make sense to point that out in the documentation. I think I never 
>>> read that and it might be missing. Unfortunately there is still one 
>>> Exception showing up at the very end of the job in the taskmanager log. I 
>>> did the verification you asked for and the class is present in both jar 
>>> files. The one which comes with Flink in the opt folder and the one of 
>>> PyFlink. You can find the log attached.
>>> I think the main question is which jar file has be loaded in in the three 
>>> envronments (executor, jobmanager, taskmanager). Is it fine to not put the 
>>> flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and 
>>> taskmanager? Will it somehow be transferred by PyFlink to the jobmanager 
>>> and taskmanager?
>>> 
>>>> 
>>>>> 3. When increasing the size of the input data set I get the following 
>>>>> Exception and the job is canceled. I tried to increase the resources 
>>>>> assigned to flink, but it didn't help. Do you have an idea why this is 
>>>>> happening? You can find a more detailed stack trace in apendix.
>>>> 
>>>> Could you check if there are any other exceptions in the log when this 
>>>> exception happens?
>>> 
>>> No I don't think that there are additional exceptions besides 
>>> "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but 
>>> maybe take a look in the attached log files. This problem could be related 
>>> to 2., maybe the root cause is a class loading issue as well. What do you 
>>> think? You can find attached three log files. One for the executor, the 
>>> jobmanager and the taskmanager. Maybe you can find something useful.
>>> 
>>>> 
>>>>> 4. I can't manage to get the SQL UNNEST operation to work. It is quite 
>>>>> hard for me to debug it and I can't really find any valuable examples or 
>>>>> documentation on the internet. Currently instead of creating an ARRAY I'm 
>>>>> just returning a VARCHAR containing a string representation of the array. 
>>>>> The relevant code you can find in the apendix.
>>>> 
>>>> There are some examples here: 
>>>> https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala
>>>>  
>>>> <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala>
>>> 
>>> This was very helpful. I was able to implement it. There is only one detail 
>>> missing. Is it possible to UNNEST an array of Rows or tuples? It would be 
>>> really great if I would be able to return a list with multiple fields. 
>>> Currently I'm just putting multiple value into a single VARCHAR, but that 
>>> means the information needs to be extracted later on. Maybe you have an 
>>> idea how to avoid that.
>>> 
>>>> 
>>>>> 5. How can I obtain the output of the Python interpreter executing the 
>>>>> UDF. If I put a print statement in the UDF I can't see the output in the 
>>>>> log of the taskmanager. Is there a way to access it?
>>>> 
>>>> You can use the standard logging in Python UDF instead of print. The log 
>>>> output could then be found in the log of the task manager.
>>> 
>>> Thank you! That worked well. I should have checked that without asking.
>>> 
>>>> 
>>>>> I hope these aren't too many questions for this thread. If this is the 
>>>>> case I can still split some of them out. Please let me know, if this is 
>>>>> the case.
>>>>> Thank you very much. I really appreciate your help.
>>>> 
>>>> It's fine to reuse this thread. :)
>>>> 
>>>>> Kind Regards,
>>>>> Niklas
>>>>> 
>>>>> 
>>> 
>>> End of the Taskmanager Log for 2.
>>> ###################################################################
>>> taskmanager_1    | 2020-11-15 17:46:53,438 INFO  
>>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
>>> TaskSlot(index:5, state:ACTIVE, resource profile: 
>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb 
>>> (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), 
>>> managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 
>>> bytes)}, allocationId: e5137050c0f1ef5e660311ddf1f3429f, jobId: 
>>> ba4e3974860af7dc00a28fdfbb44fe06).
>>> taskmanager_1    | 2020-11-15 17:46:53,440 INFO  
>>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
>>> TaskSlot(index:1, state:ACTIVE, resource profile: 
>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb 
>>> (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), 
>>> managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 
>>> bytes)}, allocationId: 541ad3e383fb9c024141f2bab5e8b7fd, jobId: 
>>> ba4e3974860af7dc00a28fdfbb44fe06).
>>> taskmanager_1    | 2020-11-15 17:46:53,442 INFO  
>>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
>>> TaskSlot(index:2, state:ACTIVE, resource profile: 
>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb 
>>> (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), 
>>> managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 
>>> bytes)}, allocationId: ef8adb7d879f4072123fe4bc12054c0c, jobId: 
>>> ba4e3974860af7dc00a28fdfbb44fe06).
>>> taskmanager_1    | 2020-11-15 17:46:53,444 INFO  
>>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
>>> TaskSlot(index:4, state:ACTIVE, resource profile: 
>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb 
>>> (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), 
>>> managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 
>>> bytes)}, allocationId: db5d62b8c9fe8172fc1883c148b150e8, jobId: 
>>> ba4e3974860af7dc00a28fdfbb44fe06).
>>> taskmanager_1    | 2020-11-15 17:46:53,846 INFO  
>>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
>>> TaskSlot(index:0, state:ACTIVE, resource profile: 
>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb 
>>> (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), 
>>> managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 
>>> bytes)}, allocationId: 637d053a0726548c2bc9261fc0e55414, jobId: 
>>> ba4e3974860af7dc00a28fdfbb44fe06).
>>> taskmanager_1    | 2020-11-15 17:46:53,849 INFO  
>>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
>>> TaskSlot(index:3, state:ACTIVE, resource profile: 
>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb 
>>> (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), 
>>> managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 
>>> bytes)}, allocationId: cfaa8633b9102e3a509cfc94dd97d38f, jobId: 
>>> ba4e3974860af7dc00a28fdfbb44fe06).
>>> taskmanager_1    | 2020-11-15 17:46:53,851 INFO  
>>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove 
>>> job ba4e3974860af7dc00a28fdfbb44fe06 from job leader monitoring.
>>> taskmanager_1    | 2020-11-15 17:46:53,851 INFO  
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close 
>>> JobManager connection for job ba4e3974860af7dc00a28fdfbb44fe06.
>>> taskmanager_1    | 2020-11-15 17:46:54,371 ERROR 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>  [] - Failed to submit a listener notification task. Event loop shut down?
>>> taskmanager_1    | java.lang.NoClassDefFoundError: 
>>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/util/concurrent/GlobalEventExecutor$2
>>> taskmanager_1    |      at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.startThread(GlobalEventExecutor.java:227)
>>>  
>>> ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
>>> taskmanager_1    |      at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.execute(GlobalEventExecutor.java:215)
>>>  
>>> ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
>>> taskmanager_1    |      at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841)
>>>  
>>> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
>>> taskmanager_1    |      at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:498)
>>>  
>>> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
>>> taskmanager_1    |      at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>>>  
>>> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
>>> taskmanager_1    |      at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604)
>>>  
>>> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
>>> taskmanager_1    |      at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96)
>>>  
>>> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
>>> taskmanager_1    |      at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1089)
>>>  
>>> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
>>> taskmanager_1    |      at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>>  
>>> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
>>> taskmanager_1    |      at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>>>  
>>> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
>>> taskmanager_1    |      at java.lang.Thread.run(Thread.java:748) 
>>> [?:1.8.0_275]
>>> taskmanager_1    | Caused by: java.lang.ClassNotFoundException: 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor$2
>>> taskmanager_1    |      at 
>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_275]
>>> taskmanager_1    |      at 
>>> java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_275]
>>> taskmanager_1    |      at 
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>>>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>> taskmanager_1    |      at 
>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72)
>>>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>> taskmanager_1    |      at 
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>>>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>> taskmanager_1    |      at 
>>> java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_275]
>>> taskmanager_1    |      ... 11 more
>>> ###################################################################
>>> Logfiles for 3.
>>> 
>>> <large-data-set-taskmanager.log>
>>> <large-data-set-jobmanager.log>
>>> <large-data-set-executor.log>
>>> 
>>>>> Dockerfile for question 2.
>>>>> ####################################################################
>>>>> # This image has been build based on the Dockerfile used for the flink 
>>>>> image on docker hub.
>>>>> # The only change I applied is that I switched to flink 1.12.0-rc1.
>>>>> FROM flink:1.12.0-rc1-scala_2.12
>>>>> 
>>>>> # Install python
>>>>> # TODO: Minimize dependencies
>>>>> RUN apt-get update && apt-get install -y \
>>>>>     python3 \
>>>>>     python3-pip \
>>>>>     python3-dev \
>>>>>     zip \
>>>>>   && rm -rf /var/lib/apt/lists/* \
>>>>>   && ln -s /usr/bin/python3 /usr/bin/python \
>>>>>   && ln -s /usr/bin/pip3 /usr/bin/pip
>>>>> 
>>>>> # Install pyflink
>>>>> RUN wget --no-verbose 
>>>>> https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
>>>>>  
>>>>> <https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl>
>>>>>  \
>>>>>   && pip install apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \
>>>>>   && rm apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
>>>>> ####################################################################
>>>>> Stack Trace for question 3.
>>>>> ####################################################################
>>>>> Caused by: java.lang.RuntimeException: Failed to close remote bundle
>>>>>         at 
>>>>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368)
>>>>>         at 
>>>>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322)
>>>>>         at 
>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283)
>>>>>         at 
>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:267)
>>>>>         at 
>>>>> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.invokeCurrentBatch(BatchArrowPythonGroupAggregateFunctionOperator.java:64)
>>>>>         at 
>>>>> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:94)
>>>>>         at 
>>>>> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.util.concurrent.ExecutionException: 
>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
>>>>> CANCELLED: cancelled before receiving half close
>>>>>         at 
>>>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>>>>         at 
>>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>>>>         at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
>>>>>         at 
>>>>> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
>>>>>         at 
>>>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
>>>>>         at 
>>>>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366)
>>>>>         ... 17 more
>>>>> Caused by: 
>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
>>>>> CANCELLED: cancelled before receiving half close
>>>>>         at 
>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
>>>>>         at 
>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
>>>>>         at 
>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
>>>>>         at 
>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
>>>>>         at 
>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
>>>>>         at 
>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
>>>>>         at 
>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
>>>>>         at 
>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
>>>>>         at 
>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
>>>>>         at 
>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>>>>         at 
>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>>>>         at 
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>         at 
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>         ... 1 more
>>>>> ################################################################
>>>>> Code for question 4.
>>>>> ################################################################
>>>>> # UDAF signature
>>>>> @udaf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
>>>>>      result_type=DataTypes.VARCHAR(10000), func_type='pandas')
>>>>> def forcast(ds_float_series, y):
>>>>> 
>>>>> # SQL DDL
>>>>> "create table mySource (ds FLOAT, riid VARCHAR(100), y FLOAT ) with ( ... 
>>>>> )"
>>>>> "create table mySink (riid VARCHAR(100), yhatd VARCHAR(10000)) with ( ... 
>>>>> )"
>>>>> 
>>>>> # SQL INSERT
>>>>> "INSERT INTO mySink SELECT riid, forcast(ds, y) AS yhat FROM mySource 
>>>>> GROUP BY riid"
>>>>> ################################################################
>>>>> 
>>>>>> On 12. Nov 2020, at 12:53, Dian Fu <dian0511...@gmail.com 
>>>>>> <mailto:dian0511...@gmail.com>> wrote:
>>>>>> 
>>>>>> Hi Niklas,
>>>>>> 
>>>>>> Python DataStream API will also be supported in coming release of 1.12.0 
>>>>>> [1]. However, the functionalities are still limited for the time being 
>>>>>> compared to the Java DataStream API, e.g. it will only support the 
>>>>>> stateless operations, such as map, flat_map, etc.
>>>>>> 
>>>>>> [1] 
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/python/datastream_tutorial.html
>>>>>>  
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/python/datastream_tutorial.html>
>>>>>>> 在 2020年11月12日,下午7:46,Niklas Wilcke <niklas.wil...@uniberg.com 
>>>>>>> <mailto:niklas.wil...@uniberg.com>> 写道:
>>>>>>> 
>>>>>>> Hi Dian,
>>>>>>> 
>>>>>>> thank you very much for this valuable response. I already read about 
>>>>>>> the UDAF, but I wasn't aware of the fact that it is possible to return 
>>>>>>> and UNNEST an array. I will definitely have a try and hopefully this 
>>>>>>> will solve my issue.
>>>>>>> 
>>>>>>> Another question that came up to my mind is whether PyFlink supports 
>>>>>>> any other API except Table and SQL, like the Streaming and Batch API. 
>>>>>>> The documentation is only covering the Table API, but I'm not sure 
>>>>>>> about that. Can you maybe tell me whether the Table and SQL API is the 
>>>>>>> only one supported by PyFlink?
>>>>>>> 
>>>>>>> Kind Regards,
>>>>>>> Niklas
>>>>>>> 
>>>>>>>  
>>>>>>> 
>>>>>>>> On 11. Nov 2020, at 15:32, Dian Fu <dian0511...@gmail.com 
>>>>>>>> <mailto:dian0511...@gmail.com>> wrote:
>>>>>>>> 
>>>>>>>> Hi Niklas,
>>>>>>>> 
>>>>>>>> You are correct that the input/output length of Pandas UDF must be of 
>>>>>>>> the same size and that Flink will split the input data into multiple 
>>>>>>>> bundles for Pandas UDF and the bundle size is non-determinstic. Both 
>>>>>>>> of the above two limitations are by design and so I guess Pandas UDF 
>>>>>>>> could not meet your requirements.
>>>>>>>> 
>>>>>>>> However, you could take a look at if the Pandas UDAF[1] which was 
>>>>>>>> supported in 1.12 could meet your requirements:
>>>>>>>> - As group_by only generate one record per group key just as you said, 
>>>>>>>> you could declare the output type of Pandas UDAF as an array type
>>>>>>>> - You need then flatten the aggregation results, e.g. using UNNEST
>>>>>>>> 
>>>>>>>> NOTE: Flink 1.12 is still not released. You could try the PyFlink 
>>>>>>>> package of RC1[2] for 1.12.0 or build it yourself according to [3].
>>>>>>>> 
>>>>>>>> [1] 
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
>>>>>>>>  
>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions>
>>>>>>>> [2] 
>>>>>>>> https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/ 
>>>>>>>> <https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/>
>>>>>>>> [3] 
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink
>>>>>>>>  
>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink>
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Dian
>>>>>>>> 
>>>>>>>>> 在 2020年11月11日,下午9:03,Niklas Wilcke <niklas.wil...@uniberg.com 
>>>>>>>>> <mailto:niklas.wil...@uniberg.com>> 写道:
>>>>>>>>> 
>>>>>>>>> Hi Flink Community,
>>>>>>>>> 
>>>>>>>>> I'm currently trying to implement a parallel machine learning job 
>>>>>>>>> with Flink. The goal is to train models in parallel for independent 
>>>>>>>>> time series in the same data stream. For that purpose I'm using a 
>>>>>>>>> Python library, which lead me to PyFlink. Let me explain the use case 
>>>>>>>>> a bit more.
>>>>>>>>> I want to implement a batch job, which partitions/groups the data by 
>>>>>>>>> a device identifier. After that I need to process the data for each 
>>>>>>>>> device all at once. There is no way to iteratively train the model 
>>>>>>>>> unfortunately. The challenge I'm facing is to guarantee that all data 
>>>>>>>>> belonging to a certain device is processed in one single step. I'm 
>>>>>>>>> aware of the fact that this does not scale well, but for a reasonable 
>>>>>>>>> amount of input data per device it should be fine from my perspective.
>>>>>>>>> I investigated a lot and I ended up using the Table API and Pandas 
>>>>>>>>> UDF, which roughly fulfil my requirements, but there are the 
>>>>>>>>> following limitations left, which I wanted to talk about.
>>>>>>>>> 
>>>>>>>>> 1. Pandas UDF takes multiple Series as input parameters, which is 
>>>>>>>>> fine for my purpose, but as far as I can see there is no way to 
>>>>>>>>> guarantee that the chunk of data in the Series is "complete". Flink 
>>>>>>>>> will slice the Series and maybe call the UDF multiple times for each 
>>>>>>>>> device. As far as I can see there are some config options like 
>>>>>>>>> "python.fn-execution.arrow.batch.size" and 
>>>>>>>>> "python.fn-execution.bundle.time", which might help, but I'm not 
>>>>>>>>> sure, whether this is the right path to take.
>>>>>>>>> 2. The length of the input Series needs to be of the same size as the 
>>>>>>>>> output Series, which isn't nice for my use case. What I would like to 
>>>>>>>>> do is to process n rows and emit m rows. There shouldn't be any 
>>>>>>>>> dependency between the number of input rows and the number of output 
>>>>>>>>> rows.
>>>>>>>>> 
>>>>>>>>> 3. How do I partition the data stream. The Table API offers a 
>>>>>>>>> groupby, but this doesn't serve my purpose, because I don't want to 
>>>>>>>>> aggregate all the grouped lines. Instead as stated above I want to 
>>>>>>>>> emit m result lines per group. Are there other options using the 
>>>>>>>>> Table API or any other API to do this kind of grouping. I would need 
>>>>>>>>> something like a "keyBy()" from the streaming API. Maybe this can be 
>>>>>>>>> combined? Can I create a separate table for each key?
>>>>>>>>> 
>>>>>>>>> I'm also open to ideas for a completely different approach not using 
>>>>>>>>> the Table API or Pandas UDF. Any idea is welcome.
>>>>>>>>> 
>>>>>>>>> You can find a condensed version of the source code attached.
>>>>>>>>> 
>>>>>>>>> Kind Regards,
>>>>>>>>> Niklas
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> #############################################################
>>>>>>>>> 
>>>>>>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>>>>>>> from pyflink.table import StreamTableEnvironment, DataTypes
>>>>>>>>> from pyflink.table.udf import udf
>>>>>>>>> 
>>>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>>> env.set_parallelism(1)
>>>>>>>>> t_env = StreamTableEnvironment.create(env)
>>>>>>>>> t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
>>>>>>>>>  True)
>>>>>>>>> 
>>>>>>>>> @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
>>>>>>>>>     result_type=DataTypes.FLOAT(), udf_type='pandas')
>>>>>>>>> def forcast(ds_float_series, y):
>>>>>>>>> 
>>>>>>>>>    # Train the model and create the forcast
>>>>>>>>> 
>>>>>>>>>    yhat_ts = forcast['yhat'].tail(input_size)
>>>>>>>>>    return yhat_ts
>>>>>>>>> 
>>>>>>>>> t_env.register_function("forcast", forcast)
>>>>>>>>> 
>>>>>>>>> # Define sink and source here
>>>>>>>>> 
>>>>>>>>> t_env.execute_sql(my_source_ddl)
>>>>>>>>> t_env.execute_sql(my_sink_ddl)
>>>>>>>>> 
>>>>>>>>> # TODO: key_by instead of filter
>>>>>>>>> t_env.from_path('mySource') \
>>>>>>>>>    .where("riid === 'r1i1'") \
>>>>>>>>>    .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
>>>>>>>>>    .insert_into('mySink')
>>>>>>>>> 
>>>>>>>>> t_env.execute("pandas_udf_demo")
>>>>>>>>> 
>>>>>>>>> #############################################################
>> 
> 

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to