Hi Niklas, Thanks a lot for supporting PyFlink. In fact, your requirement for multiple input and multiple output is essentially Table Aggregation Functions[1]. Although PyFlink does not support it yet, we have listed it in the release 1.13 plan. In addition, row-based operations[2] that are very user-friendly to machine learning users are also included in the 1.13 plan.
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#table-aggregation-functions [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations Best, Xingbo Niklas Wilcke <[email protected]> 于2020年11月26日周四 下午5:11写道: > Hi Xingbo, > > thanks for taking care and letting me know. I was about to share an > example, how to reproduce this. > Now I will wait for the next release candidate and give it a try. > > Regards, > Niklas > > > -- > [email protected] > Mobile: +49 160 9793 2593 > Office: +49 40 2380 6523 > > Simon-von-Utrecht-Straße 85a > 20359 Hamburg > > UNIBERG GmbH > Registergericht: Amtsgericht Kiel HRB SE-1507 > Geschäftsführer: Andreas Möller, Martin Ulbricht > > Information Art. 13 DSGVO B2B: > Für die Kommunikation mit Ihnen verarbeiten wir ggf. > Ihre personenbezogenen Daten. > Alle Informationen zum Umgang mit Ihren Daten finden Sie unter > https://www.uniberg.com/impressum.html. > > On 26. Nov 2020, at 02:59, Xingbo Huang <[email protected]> wrote: > > Hi Niklas, > > Regarding `Exception in thread "grpc-nio-worker-ELG-3-2" > java.lang.NoClassDefFoundError: > org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1`, > it does not affect the correctness of the result. The reason is that some > resources are released asynchronously when Grpc Server is shut down[1] . > After the UserClassLoader unloads the class, the asynchronous thread tries > to release the resources and throw NotClassFoundException, but the content > of the data result has been sent downstream, so the correctness of the > result will not be affected. > > Regarding the details of the specific causes, I have explained in the > flink community[2] and the beam community[3], and fixed them in the flink > community. There will be no such problem in the next version of release > 1.11.3 and 1.12.0. > > [1] > https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/SharedResourceHolder.java#L150 > [2] https://issues.apache.org/jira/browse/FLINK-20284 > [3] https://issues.apache.org/jira/browse/BEAM-5397 > > Best, > Xingbo > > > Dian Fu <[email protected]> 于2020年11月16日周一 下午9:10写道: > >> Hi Niklas, >> >> How can I ingest data in a batch table from Kafka or even better >> Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch >> isn't offering a source at all. >> >> The only workaround which comes to my mind is to use the Kafka streaming >> source and to apply a single very large window to create a bounded table. >> Do you think that would work? >> Are there other options available? Maybe converting a Stream to a >> bounded table is somehow possible? Thank you! >> >> >> I think you are right that Kafka still doesn't support batch and there is >> no ES source for now. Another option is you could load the data into a >> connector which supports batch. Not sure if anybody else has a better idea >> about this. >> >> I found one cause of this problem and it was mixing a Scala 2.12 Flink >> installation with PyFlink, which has some 2.11 jars in its opt folder. I >> think the JVM just skipped the class definitions, because they weren't >> compatible. I actually wasn't aware of the fact that PyFlink comes with >> prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it >> would make sense to point that out in the documentation. I think I never >> read that and it might be missing. Unfortunately there is still one >> Exception showing up at the very end of the job in the taskmanager log. I >> did the verification you asked for and the class is present in both jar >> files. The one which comes with Flink in the opt folder and the one of >> PyFlink. You can find the log attached. >> I think the main question is which jar file has be loaded in in the three >> envronments (executor, jobmanager, taskmanager). Is it fine to not put the >> flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and >> taskmanager? Will it somehow be transferred by PyFlink to the jobmanager >> and taskmanager? >> >> >> PyFlink comes with the built-in jars such as >> flink-python_2.11-1.12.0.jar, flink-dist_2.11-1.12.0.jar, etc and so you >> don't need to manually add them(also shouldn't do that). Could you remove >> the duplicate jars and try it again? >> >> No I don't think that there are additional exceptions besides >> "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but >> maybe take a look in the attached log files. This problem could be related >> to 2., maybe the root cause is a class loading issue as well. What do you >> think? You can find attached three log files. One for the executor, the >> jobmanager and the taskmanager. Maybe you can find something useful. >> >> >> I found one similar issue at Beam side: >> https://issues.apache.org/jira/browse/BEAM-6258 which has been resolved >> long time ago. I'm still trying to reproduce this issue and will let you >> know if there is any progress. (Would be great if you could help to provide >> an example which could easily reproduce this issue) >> >> This was very helpful. I was able to implement it. There is only one >> detail missing. Is it possible to UNNEST an array of Rows or tuples? It >> would be really great if I would be able to return a list with multiple >> fields. Currently I'm just putting multiple value into a single VARCHAR, >> but that means the information needs to be extracted later on. Maybe you >> have an idea how to avoid that. >> >> >> Currently, Pandas UDAF still doesn't support complex type and so I'm >> afraid that you have to put multiple values into a single VARCHAR for now. >> >> Regards, >> Dian >> >> >> 在 2020年11月16日,上午2:46,Niklas Wilcke <[email protected]> 写道: >> >> Hi Dian, >> >> this was very helpful again. To the old questions I will answer inline as >> well. Unfortunately also one new question popped up. >> >> How can I ingest data in a batch table from Kafka or even better >> Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch >> isn't offering a source at all. >> The only workaround which comes to my mind is to use the Kafka streaming >> source and to apply a single very large window to create a bounded table. >> Do you think that would work? >> Are there other options available? Maybe converting a Stream to a >> bounded table is somehow possible? Thank you! >> >> Kind Regards, >> Niklas >> >> >> >> On 13. Nov 2020, at 16:07, Dian Fu <[email protected]> wrote: >> >> Hi Niklas, >> >> Good to know that this solution may work for you. Regarding to the >> questions you raised, please find my reply inline. >> >> Regards, >> Dian >> >> 在 2020年11月13日,下午8:48,Niklas Wilcke <[email protected]> 写道: >> >> Hi Dian, >> >> thanks again for your response. In the meantime I tried out your proposal >> using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but >> I am facing some issues, which I would like to address. If this goes too >> far, please let me know and I will open a new thread for each of the >> questions. Let me share some more information about my current environment, >> which will maybe help to answer the questions. I'm currently using my dev >> machine with Docker and one jobmanager container and one taskmanager >> container. If needed I can share the whole docker environment, but this >> would involve some more effort on my side. Here are my five questions. >> >> 1. Where can I find connector libraries for 1.12.0-rc1 or some kind of >> instruction how to build them? I can't find them in the 1.12.0-rc1 release >> and when I build flink from source, I can't find the connector libraries in >> the build target. I need flink-sql-connector-elasticsearch7 >> and flink-sql-connector-kafka. >> >> >> You could download the connector jars of 1.12.0-rc1 from here: >> https://repository.apache.org/content/repositories/orgapacheflink-1402/org/apache/flink/ >> >> >> Thanks that worked like a charm! >> >> >> 2. Which steps are needed to properly Setup PyFlink? I followed the >> instructions, but I always get some ClassNotFoundExceptions for some Beam >> related classes in the taskmanager. The job still works fine, but this >> doesn't look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried to >> resolve this by adding certain jars, but I wasn't able to fix it. Maybe you >> have an idea. You can find the Dockerfile attached, which lines out the >> steps I'm currently using. The Exceptions signature looks like this. >> Exception in thread "grpc-nio-worker-ELG-3-2" >> java.lang.NoClassDefFoundError: >> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1 >> >> >> Usually there is nothing specially need to do to set up PyFlink. I have >> manually checked that this class should be there(inside >> flink-python_2.11-1.12.0.jar) and so guess if it's because you environment >> isn't clean enough? >> >> I guess you could check the following things: >> 1) Is it because you have installed 1.11.2 before and so the environment >> is not clean enough? Could you uninstall PyFlink 1.11.2 manually and >> reinstall PyFlink 1.12.0-rc1 again? You could also manually check that >> there should be only one flink-python*.jar under directory >> xxx/site-packages/pyflink/opt/ >> 2) Verify that the class is actually there by the following command: >> (flink-python_2.11-1.12.0.jar is under directory >> xxx/site-packages/pyflink/opt/) >> jar tf flink-python_2.11-1.12.0.jar | grep >> "org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena" >> 3) If this exception still happens, could you share the exception stack? >> >> >> I found one cause of this problem and it was mixing a Scala 2.12 Flink >> installation with PyFlink, which has some 2.11 jars in its opt folder. I >> think the JVM just skipped the class definitions, because they weren't >> compatible. I actually wasn't aware of the fact that PyFlink comes with >> prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it >> would make sense to point that out in the documentation. I think I never >> read that and it might be missing. Unfortunately there is still one >> Exception showing up at the very end of the job in the taskmanager log. I >> did the verification you asked for and the class is present in both jar >> files. The one which comes with Flink in the opt folder and the one of >> PyFlink. You can find the log attached. >> I think the main question is which jar file has be loaded in in the three >> envronments (executor, jobmanager, taskmanager). Is it fine to not put the >> flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and >> taskmanager? Will it somehow be transferred by PyFlink to the jobmanager >> and taskmanager? >> >> >> 3. When increasing the size of the input data set I get the following >> Exception and the job is canceled. I tried to increase the resources >> assigned to flink, but it didn't help. Do you have an idea why this is >> happening? You can find a more detailed stack trace in apendix. >> >> >> Could you check if there are any other exceptions in the log when this >> exception happens? >> >> >> No I don't think that there are additional exceptions besides >> "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but >> maybe take a look in the attached log files. This problem could be related >> to 2., maybe the root cause is a class loading issue as well. What do you >> think? You can find attached three log files. One for the executor, the >> jobmanager and the taskmanager. Maybe you can find something useful. >> >> >> 4. I can't manage to get the SQL UNNEST operation to work. It is quite >> hard for me to debug it and I can't really find any valuable examples or >> documentation on the internet. Currently instead of creating an ARRAY I'm >> just returning a VARCHAR containing a string representation of the array. >> The relevant code you can find in the apendix. >> >> >> There are some examples here: >> https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala >> >> >> This was very helpful. I was able to implement it. There is only one >> detail missing. Is it possible to UNNEST an array of Rows or tuples? It >> would be really great if I would be able to return a list with multiple >> fields. Currently I'm just putting multiple value into a single VARCHAR, >> but that means the information needs to be extracted later on. Maybe you >> have an idea how to avoid that. >> >> >> 5. How can I obtain the output of the Python interpreter executing the >> UDF. If I put a print statement in the UDF I can't see the output in the >> log of the taskmanager. Is there a way to access it? >> >> >> You can use the standard logging in Python UDF instead of print. The log >> output could then be found in the log of the task manager. >> >> >> Thank you! That worked well. I should have checked that without asking. >> >> >> I hope these aren't too many questions for this thread. If this is the >> case I can still split some of them out. Please let me know, if this is the >> case. >> Thank you very much. I really appreciate your help. >> >> >> It's fine to reuse this thread. :) >> >> Kind Regards, >> Niklas >> >> >> >> End of the Taskmanager Log for 2. >> ################################################################### >> taskmanager_1 | 2020-11-15 17:46:53,438 INFO >> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free >> slot TaskSlot(index:5, state:ACTIVE, resource profile: >> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb >> (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), >> managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 >> bytes)}, allocationId: e5137050c0f1ef5e660311ddf1f3429f, jobId: >> ba4e3974860af7dc00a28fdfbb44fe06). >> taskmanager_1 | 2020-11-15 17:46:53,440 INFO >> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free >> slot TaskSlot(index:1, state:ACTIVE, resource profile: >> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb >> (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), >> managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 >> bytes)}, allocationId: 541ad3e383fb9c024141f2bab5e8b7fd, jobId: >> ba4e3974860af7dc00a28fdfbb44fe06). >> taskmanager_1 | 2020-11-15 17:46:53,442 INFO >> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free >> slot TaskSlot(index:2, state:ACTIVE, resource profile: >> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb >> (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), >> managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 >> bytes)}, allocationId: ef8adb7d879f4072123fe4bc12054c0c, jobId: >> ba4e3974860af7dc00a28fdfbb44fe06). >> taskmanager_1 | 2020-11-15 17:46:53,444 INFO >> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free >> slot TaskSlot(index:4, state:ACTIVE, resource profile: >> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb >> (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), >> managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 >> bytes)}, allocationId: db5d62b8c9fe8172fc1883c148b150e8, jobId: >> ba4e3974860af7dc00a28fdfbb44fe06). >> taskmanager_1 | 2020-11-15 17:46:53,846 INFO >> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free >> slot TaskSlot(index:0, state:ACTIVE, resource profile: >> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb >> (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), >> managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 >> bytes)}, allocationId: 637d053a0726548c2bc9261fc0e55414, jobId: >> ba4e3974860af7dc00a28fdfbb44fe06). >> taskmanager_1 | 2020-11-15 17:46:53,849 INFO >> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free >> slot TaskSlot(index:3, state:ACTIVE, resource profile: >> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb >> (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), >> managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 >> bytes)}, allocationId: cfaa8633b9102e3a509cfc94dd97d38f, jobId: >> ba4e3974860af7dc00a28fdfbb44fe06). >> taskmanager_1 | 2020-11-15 17:46:53,851 INFO >> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove >> job ba4e3974860af7dc00a28fdfbb44fe06 from job leader monitoring. >> taskmanager_1 | 2020-11-15 17:46:53,851 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close >> JobManager connection for job ba4e3974860af7dc00a28fdfbb44fe06. >> taskmanager_1 | 2020-11-15 17:46:54,371 ERROR >> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.rejectedExecution >> [] - Failed to submit a listener notification task. Event loop shut down? >> taskmanager_1 | java.lang.NoClassDefFoundError: >> org/apache/beam/vendor/grpc/v1p26p0/io/netty/util/concurrent/GlobalEventExecutor$2 >> taskmanager_1 | at >> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.startThread(GlobalEventExecutor.java:227) >> ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] >> taskmanager_1 | at >> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.execute(GlobalEventExecutor.java:215) >> ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] >> taskmanager_1 | at >> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841) >> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] >> taskmanager_1 | at >> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:498) >> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] >> taskmanager_1 | at >> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) >> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] >> taskmanager_1 | at >> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604) >> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] >> taskmanager_1 | at >> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96) >> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] >> taskmanager_1 | at >> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1089) >> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] >> taskmanager_1 | at >> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) >> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] >> taskmanager_1 | at >> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) >> [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] >> taskmanager_1 | at java.lang.Thread.run(Thread.java:748) >> [?:1.8.0_275] >> taskmanager_1 | Caused by: java.lang.ClassNotFoundException: >> org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor$2 >> taskmanager_1 | at >> java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_275] >> taskmanager_1 | at >> java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_275] >> taskmanager_1 | at >> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> taskmanager_1 | at >> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> taskmanager_1 | at >> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> taskmanager_1 | at >> java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_275] >> taskmanager_1 | ... 11 more >> ################################################################### >> Logfiles for 3. >> >> <large-data-set-taskmanager.log> >> <large-data-set-jobmanager.log> >> <large-data-set-executor.log> >> >> Dockerfile for question 2. >> #################################################################### >> # This image has been build based on the Dockerfile used for the flink >> image on docker hub. >> # The only change I applied is that I switched to flink 1.12.0-rc1. >> FROM flink:1.12.0-rc1-scala_2.12 >> >> # Install python >> # TODO: Minimize dependencies >> RUN apt-get update && apt-get install -y \ >> python3 \ >> python3-pip \ >> python3-dev \ >> zip \ >> && rm -rf /var/lib/apt/lists/* \ >> && ln -s /usr/bin/python3 /usr/bin/python \ >> && ln -s /usr/bin/pip3 /usr/bin/pip >> >> # Install pyflink >> RUN wget --no-verbose >> https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl >> \ >> && pip install apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \ >> && rm apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl >> #################################################################### >> Stack Trace for question 3. >> #################################################################### >> Caused by: java.lang.RuntimeException: Failed to close remote bundle >> at >> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368) >> at >> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322) >> at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283) >> at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:267) >> at >> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.invokeCurrentBatch(BatchArrowPythonGroupAggregateFunctionOperator.java:64) >> at >> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:94) >> at >> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33) >> at >> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91) >> at >> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) >> at >> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127) >> at >> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.util.concurrent.ExecutionException: >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: >> CANCELLED: cancelled before receiving half close >> at >> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> at >> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) >> at >> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458) >> at >> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547) >> at >> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366) >> ... 17 more >> Caused by: >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: >> CANCELLED: cancelled before receiving half close >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524) >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275) >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353) >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341) >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867) >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> ... 1 more >> ################################################################ >> Code for question 4. >> ################################################################ >> # UDAF signature >> @udaf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()], >> result_type=DataTypes.VARCHAR(10000), func_type='pandas') >> def forcast(ds_float_series, y): >> >> # SQL DDL >> "create table mySource (ds FLOAT, riid VARCHAR(100), y FLOAT ) with ( ... >> )" >> "create table mySink (riid VARCHAR(100), yhatd VARCHAR(10000)) with ( ... >> )" >> >> # SQL INSERT >> "INSERT INTO mySink SELECT riid, forcast(ds, y) AS yhat FROM mySource >> GROUP BY riid" >> ################################################################ >> >> On 12. Nov 2020, at 12:53, Dian Fu <[email protected]> wrote: >> >> Hi Niklas, >> >> Python DataStream API will also be supported in coming release of 1.12.0 >> [1]. However, the functionalities are still limited for the time being >> compared to the Java DataStream API, e.g. it will only support the >> stateless operations, such as map, flat_map, etc. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/python/datastream_tutorial.html >> >> 在 2020年11月12日,下午7:46,Niklas Wilcke <[email protected]> 写道: >> >> Hi Dian, >> >> thank you very much for this valuable response. I already read about the >> UDAF, but I wasn't aware of the fact that it is possible to return and >> UNNEST an array. I will definitely have a try and hopefully this will solve >> my issue. >> >> Another question that came up to my mind is whether PyFlink supports any >> other API except Table and SQL, like the Streaming and Batch API. The >> documentation is only covering the Table API, but I'm not sure about that. >> Can you maybe tell me whether the Table and SQL API is the only one >> supported by PyFlink? >> >> Kind Regards, >> Niklas >> >> >> >> On 11. Nov 2020, at 15:32, Dian Fu <[email protected]> wrote: >> >> Hi Niklas, >> >> You are correct that the input/output length of Pandas UDF must be of the >> same size and that Flink will split the input data into multiple bundles >> for Pandas UDF and the bundle size is non-determinstic. Both of the above >> two limitations are by design and so I guess Pandas UDF could not meet your >> requirements. >> >> However, you could take a look at if the Pandas UDAF[1] which was >> supported in 1.12 could meet your requirements: >> - As group_by only generate one record per group key just as you said, >> you could declare the output type of Pandas UDAF as an array type >> - You need then flatten the aggregation results, e.g. using UNNEST >> >> NOTE: Flink 1.12 is still not released. You could try the PyFlink package >> of RC1[2] for 1.12.0 or build it yourself according to [3]. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions >> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/ >> [3] >> https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink >> >> Regards, >> Dian >> >> 在 2020年11月11日,下午9:03,Niklas Wilcke <[email protected]> 写道: >> >> Hi Flink Community, >> >> I'm currently trying to implement a parallel machine learning job with >> Flink. The goal is to train models in parallel for independent time series >> in the same data stream. For that purpose I'm using a Python library, which >> lead me to PyFlink. Let me explain the use case a bit more. >> I want to implement a batch job, which partitions/groups the data by a >> device identifier. After that I need to process the data for each device >> all at once. There is no way to iteratively train the model unfortunately. >> The challenge I'm facing is to guarantee that all data belonging to a >> certain device is processed in one single step. I'm aware of the fact that >> this does not scale well, but for a reasonable amount of input data per >> device it should be fine from my perspective. >> I investigated a lot and I ended up using the Table API and Pandas UDF, >> which roughly fulfil my requirements, but there are the following >> limitations left, which I wanted to talk about. >> >> 1. Pandas UDF takes multiple Series as input parameters, which is fine >> for my purpose, but as far as I can see there is no way to guarantee that >> the chunk of data in the Series is "complete". Flink will slice the Series >> and maybe call the UDF multiple times for each device. As far as I can see >> there are some config options like "python.fn-execution.arrow.batch.size" >> and "python.fn-execution.bundle.time", which might help, but I'm not sure, >> whether this is the right path to take. >> >> 2. The length of the input Series needs to be of the same size as the >> output Series, which isn't nice for my use case. What I would like to do is >> to process n rows and emit m rows. There shouldn't be any dependency >> between the number of input rows and the number of output rows. >> >> 3. How do I partition the data stream. The Table API offers a groupby, >> but this doesn't serve my purpose, because I don't want to aggregate all >> the grouped lines. Instead as stated above I want to emit m result lines >> per group. Are there other options using the Table API or any other API to >> do this kind of grouping. I would need something like a "keyBy()" from the >> streaming API. Maybe this can be combined? Can I create a separate table >> for each key? >> >> I'm also open to ideas for a completely different approach not using the >> Table API or Pandas UDF. Any idea is welcome. >> >> You can find a condensed version of the source code attached. >> >> Kind Regards, >> Niklas >> >> >> >> ############################################################# >> >> from pyflink.datastream import StreamExecutionEnvironment >> from pyflink.table import StreamTableEnvironment, DataTypes >> from pyflink.table.udf import udf >> >> env = StreamExecutionEnvironment.get_execution_environment() >> env.set_parallelism(1) >> t_env = StreamTableEnvironment.create(env) >> t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", >> True) >> >> @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()], >> result_type=DataTypes.FLOAT(), udf_type='pandas') >> def forcast(ds_float_series, y): >> >> # Train the model and create the forcast >> >> yhat_ts = forcast['yhat'].tail(input_size) >> return yhat_ts >> >> t_env.register_function("forcast", forcast) >> >> # Define sink and source here >> >> t_env.execute_sql(my_source_ddl) >> t_env.execute_sql(my_sink_ddl) >> >> # TODO: key_by instead of filter >> t_env.from_path('mySource') \ >> .where("riid === 'r1i1'") \ >> .select("ds, riid, y, forcast(ds, y) as yhat_90d") \ >> .insert_into('mySink') >> >> t_env.execute("pandas_udf_demo") >> >> ############################################################# >> >> >> >
