[jira] [Commented] (FLINK-10469) FileChannel may not write the whole buffer in a single call to FileChannel.write(Buffer buffer)
[ https://issues.apache.org/jira/browse/FLINK-10469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633622#comment-16633622 ] Guowei Ma commented on FLINK-10469: --- +1 > FileChannel may not write the whole buffer in a single call to > FileChannel.write(Buffer buffer) > --- > > Key: FLINK-10469 > URL: https://issues.apache.org/jira/browse/FLINK-10469 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.1, 1.4.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4, 1.6.2 >Reporter: Yun Gao >Priority: Major > > Currently all the calls to _FileChannel.write(ByteBuffer src)_ assumes that > this method will not return before the whole buffer is written, like the one > in _AsynchronousFileIOChannel.write()._ > > However, this assumption may not be right for all the environments. We have > encountered the case that only part of a buffer was written on a cluster with > a high IO load, and the target file got messy. > > To fix this issue, I think we should add a utility method in the > org.apache.flink.util.IOUtils to ensure the whole buffer is written with a > loop,and replace all the calls to _FileChannel.write(ByteBuffer)_ with this > new method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
Guowei Ma created FLINK-13993: - Summary: Using FlinkUserCodeClassLoaders to load the user class in the perjob mode Key: FLINK-13993 URL: https://issues.apache.org/jira/browse/FLINK-13993 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.9.0, 1.10.0 Reporter: Guowei Ma Currently, Flink has the FlinkUserCodeClassLoader, which is using to load user’s class. However, the user class and the system class are all loaded by the system classloader in the perjob mode. This introduces some conflicts. This document[1] gives a proposal that makes the FlinkUserClassLoader load the user class in perjob mode. (disscuss with Till[2]) [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7] [2] [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
[ https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-13993: -- Fix Version/s: 1.10.0 > Using FlinkUserCodeClassLoaders to load the user class in the perjob mode > - > > Key: FLINK-13993 > URL: https://issues.apache.org/jira/browse/FLINK-13993 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Guowei Ma >Priority: Major > Fix For: 1.10.0 > > > Currently, Flink has the FlinkUserCodeClassLoader, which is using to load > user’s class. However, the user class and the system class are all loaded by > the system classloader in the perjob mode. This introduces some conflicts. > This document[1] gives a proposal that makes the FlinkUserClassLoader load > the user class in perjob mode. (disscuss with Till[2]) > > [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7] > [2] > [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Closed] (FLINK-13840) Let StandaloneJobClusterEntrypoint use user code class loader
[ https://issues.apache.org/jira/browse/FLINK-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma closed FLINK-13840. - Resolution: Duplicate > Let StandaloneJobClusterEntrypoint use user code class loader > - > > Key: FLINK-13840 > URL: https://issues.apache.org/jira/browse/FLINK-13840 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.10.0 > > > In order to resolve class loading issues when using the > {{StandaloneJobClusterEntryPoint}}, it would be better to run the user code > in the user code class loader which supports child first class loading. At > the moment, the user code jar is part of the system class path and, hence, > part of the system class loader. > An easy way to solve this problem would be to place the user code in a > different directory than {{lib}} and then specify this path as an additional > classpath when creating the {{PackagedProgram}}. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
[ https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16927484#comment-16927484 ] Guowei Ma commented on FLINK-13993: --- Thanks for reminding me. I'll close it. [~till.rohrmann] > Using FlinkUserCodeClassLoaders to load the user class in the perjob mode > - > > Key: FLINK-13993 > URL: https://issues.apache.org/jira/browse/FLINK-13993 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Guowei Ma >Priority: Major > Fix For: 1.10.0 > > > Currently, Flink has the FlinkUserCodeClassLoader, which is using to load > user’s class. However, the user class and the system class are all loaded by > the system classloader in the perjob mode. This introduces some conflicts. > This document[1] gives a proposal that makes the FlinkUserClassLoader load > the user class in perjob mode. (disscuss with Till[2]) > > [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7] > [2] > [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
[ https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930613#comment-16930613 ] Guowei Ma commented on FLINK-13993: --- I will do it. thanks. [~till.rohrmann] > Using FlinkUserCodeClassLoaders to load the user class in the perjob mode > - > > Key: FLINK-13993 > URL: https://issues.apache.org/jira/browse/FLINK-13993 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Guowei Ma >Assignee: Guowei Ma >Priority: Major > Fix For: 1.10.0 > > > Currently, Flink has the FlinkUserCodeClassLoader, which is using to load > user’s class. However, the user class and the system class are all loaded by > the system classloader in the perjob mode. This introduces some conflicts. > This document[1] gives a proposal that makes the FlinkUserClassLoader load > the user class in perjob mode. (disscuss with Till[2]) > > [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7] > [2] > [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
[ https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-13993: -- Remaining Estimate: 168h Original Estimate: 168h > Using FlinkUserCodeClassLoaders to load the user class in the perjob mode > - > > Key: FLINK-13993 > URL: https://issues.apache.org/jira/browse/FLINK-13993 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Guowei Ma >Assignee: Guowei Ma >Priority: Major > Fix For: 1.10.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Currently, Flink has the FlinkUserCodeClassLoader, which is using to load > user’s class. However, the user class and the system class are all loaded by > the system classloader in the perjob mode. This introduces some conflicts. > This document[1] gives a proposal that makes the FlinkUserClassLoader load > the user class in perjob mode. (disscuss with Till[2]) > > [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7] > [2] > [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
[ https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-13993: -- Remaining Estimate: 144h (was: 168h) Original Estimate: 144h (was: 168h) > Using FlinkUserCodeClassLoaders to load the user class in the perjob mode > - > > Key: FLINK-13993 > URL: https://issues.apache.org/jira/browse/FLINK-13993 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Guowei Ma >Assignee: Guowei Ma >Priority: Major > Fix For: 1.10.0 > > Original Estimate: 144h > Remaining Estimate: 144h > > Currently, Flink has the FlinkUserCodeClassLoader, which is using to load > user’s class. However, the user class and the system class are all loaded by > the system classloader in the perjob mode. This introduces some conflicts. > This document[1] gives a proposal that makes the FlinkUserClassLoader load > the user class in perjob mode. (disscuss with Till[2]) > > [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7] > [2] > [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
[ https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-13993: -- Remaining Estimate: 48h (was: 144h) Original Estimate: 48h (was: 144h) > Using FlinkUserCodeClassLoaders to load the user class in the perjob mode > - > > Key: FLINK-13993 > URL: https://issues.apache.org/jira/browse/FLINK-13993 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Guowei Ma >Assignee: Guowei Ma >Priority: Major > Fix For: 1.10.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Currently, Flink has the FlinkUserCodeClassLoader, which is using to load > user’s class. However, the user class and the system class are all loaded by > the system classloader in the perjob mode. This introduces some conflicts. > This document[1] gives a proposal that makes the FlinkUserClassLoader load > the user class in perjob mode. (disscuss with Till[2]) > > [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7] > [2] > [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
[ https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-13993: -- Labels: pull-request-available (was: ) > Using FlinkUserCodeClassLoaders to load the user class in the perjob mode > - > > Key: FLINK-13993 > URL: https://issues.apache.org/jira/browse/FLINK-13993 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Guowei Ma >Assignee: Guowei Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Currently, Flink has the FlinkUserCodeClassLoader, which is using to load > user’s class. However, the user class and the system class are all loaded by > the system classloader in the perjob mode. This introduces some conflicts. > This document[1] gives a proposal that makes the FlinkUserClassLoader load > the user class in perjob mode. (disscuss with Till[2]) > > [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7] > [2] > [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
[ https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-13993: -- Remaining Estimate: 30h (was: 48h) Original Estimate: 30h (was: 48h) > Using FlinkUserCodeClassLoaders to load the user class in the perjob mode > - > > Key: FLINK-13993 > URL: https://issues.apache.org/jira/browse/FLINK-13993 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Guowei Ma >Assignee: Guowei Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Original Estimate: 30h > Remaining Estimate: 30h > > Currently, Flink has the FlinkUserCodeClassLoader, which is using to load > user’s class. However, the user class and the system class are all loaded by > the system classloader in the perjob mode. This introduces some conflicts. > This document[1] gives a proposal that makes the FlinkUserClassLoader load > the user class in perjob mode. (disscuss with Till[2]) > > [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7] > [2] > [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
[ https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16933201#comment-16933201 ] Guowei Ma commented on FLINK-13993: --- Sorry, I change the time. Actually, what I mean is one man week. > Using FlinkUserCodeClassLoaders to load the user class in the perjob mode > - > > Key: FLINK-13993 > URL: https://issues.apache.org/jira/browse/FLINK-13993 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Guowei Ma >Assignee: Guowei Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Original Estimate: 30h > Remaining Estimate: 30h > > Currently, Flink has the FlinkUserCodeClassLoader, which is using to load > user’s class. However, the user class and the system class are all loaded by > the system classloader in the perjob mode. This introduces some conflicts. > This document[1] gives a proposal that makes the FlinkUserClassLoader load > the user class in perjob mode. (disscuss with Till[2]) > > [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7] > [2] > [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
[ https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16933201#comment-16933201 ] Guowei Ma edited comment on FLINK-13993 at 9/19/19 9:25 AM: Sorry [~trohrmann] , I change the time. Actually, what I mean is one man week. was (Author: maguowei): Sorry, I change the time. Actually, what I mean is one man week. > Using FlinkUserCodeClassLoaders to load the user class in the perjob mode > - > > Key: FLINK-13993 > URL: https://issues.apache.org/jira/browse/FLINK-13993 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Guowei Ma >Assignee: Guowei Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Original Estimate: 30h > Remaining Estimate: 30h > > Currently, Flink has the FlinkUserCodeClassLoader, which is using to load > user’s class. However, the user class and the system class are all loaded by > the system classloader in the perjob mode. This introduces some conflicts. > This document[1] gives a proposal that makes the FlinkUserClassLoader load > the user class in perjob mode. (disscuss with Till[2]) > > [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7] > [2] > [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16877524#comment-16877524 ] Guowei Ma commented on FLINK-12852: --- In long run there may be another way to avoid the dead lock. Task could spill some buffer to the disk and return the buffer to the local buffer pool. I think this could increase the utilization of memory in most time. What do you guys think? > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the > network memory into the ResourceProfile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13132) Allow ClusterEntrypoints use user main method to generate job graph
[ https://issues.apache.org/jira/browse/FLINK-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880053#comment-16880053 ] Guowei Ma commented on FLINK-13132: --- This is a very interesting optimization, but I have two questions. 1)Why do you only change the yarn, according to your description, there should be more than a yarn in your business system. Have you optimized other systems? Could you say which systems are you switching between? Such as yarn, mesos, k8s 2) In addition, even if you change to ClassPathJobGraphRetriever, you need to upload the application and lib jar to the server and regenerate your JobGraph. Compared with the “client mode”, there is no advantage in time because there is no optimization of anything, and the user may not feel any difference. Or your business scenario has a certain speciality, could you make some further explanations? > Allow ClusterEntrypoints use user main method to generate job graph > --- > > Key: FLINK-13132 > URL: https://issues.apache.org/jira/browse/FLINK-13132 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0, 1.8.1 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > > We are building a service that can transparently deploy a job to different > cluster management systems, such as Yarn and another internal system. It is > very cost to download the jar and generate JobGraph in the client side. Thus, > I want to propose an improvement to make Yarn Entrypoints can be configurable > to use either FileJobGraphRetriever or ClassPathJobGraphRetriever. It is > actually a long asking TODO in AbstractionYarnClusterDescriptor in line 834. > https://github.com/apache/flink/blob/21468e0050dc5f97de5cfe39885e0d3fd648e399/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L834 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13132) Allow ClusterEntrypoints use user main method to generate job graph
[ https://issues.apache.org/jira/browse/FLINK-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16883481#comment-16883481 ] Guowei Ma commented on FLINK-13132: --- [~suez1224] and [~ZhenqiuHuang]. Thanks for your detailed explanations. Your scenario makes sense to me. 1. Except for the jars, how do you deal with the dependency of the user's jobs when you move from a cluster to another? For example, the checkpoint data of jobs and the source and sink of user's jobs. Will your deploy service also move those dependencies for the users? 2. In the HA scenario, we could use the ability of the SubmittedGraphStore to resolve the non-deterministic problem. Before generating the new JobGraph, Dispatcher could check the "SubmittedGraphStore" first then to decide whether to generate the JobGraph or not. I think [~till.rohrmann] could give more suggestions. Thanks. > Allow ClusterEntrypoints use user main method to generate job graph > --- > > Key: FLINK-13132 > URL: https://issues.apache.org/jira/browse/FLINK-13132 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0, 1.8.1 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > > We are building a service that can transparently deploy a job to different > cluster management systems, such as Yarn and another internal system. It is > very cost to download the jar and generate JobGraph in the client side. Thus, > I want to propose an improvement to make Yarn Entrypoints can be configurable > to use either FileJobGraphRetriever or ClassPathJobGraphRetriever. It is > actually a long asking TODO in AbstractionYarnClusterDescriptor in line 834. > https://github.com/apache/flink/blob/21468e0050dc5f97de5cfe39885e0d3fd648e399/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L834 -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-14464) Introduce the 'AbstractUserClassPathJobGraphRetriever' which could construct the 'user code class path' from the "job" dir.
Guowei Ma created FLINK-14464: - Summary: Introduce the 'AbstractUserClassPathJobGraphRetriever' which could construct the 'user code class path' from the "job" dir. Key: FLINK-14464 URL: https://issues.apache.org/jira/browse/FLINK-14464 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.9.0, 1.10.0 Reporter: Guowei Ma -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14465) Let StandaloneJobClusterEntrypoint use user code class loader
Guowei Ma created FLINK-14465: - Summary: Let StandaloneJobClusterEntrypoint use user code class loader Key: FLINK-14465 URL: https://issues.apache.org/jira/browse/FLINK-14465 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.9.0, 1.10.0 Reporter: Guowei Ma -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14466) Let YarnJobClusterEntrypoint use user code class loader
Guowei Ma created FLINK-14466: - Summary: Let YarnJobClusterEntrypoint use user code class loader Key: FLINK-14466 URL: https://issues.apache.org/jira/browse/FLINK-14466 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.9.0, 1.10.0 Reporter: Guowei Ma -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14467) Let MesosJobClusterEntrypoint use user code class loader
Guowei Ma created FLINK-14467: - Summary: Let MesosJobClusterEntrypoint use user code class loader Key: FLINK-14467 URL: https://issues.apache.org/jira/browse/FLINK-14467 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.9.0, 1.10.0 Reporter: Guowei Ma -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13840) Let StandaloneJobClusterEntrypoint use user code class loader
[ https://issues.apache.org/jira/browse/FLINK-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920265#comment-16920265 ] Guowei Ma commented on FLINK-13840: --- As Till said, it is an easy way to add a different directory than lib to PackagedProgram. I wrote a draft doc about how to deal with this. Any comments are welcome! [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit#] > Let StandaloneJobClusterEntrypoint use user code class loader > - > > Key: FLINK-13840 > URL: https://issues.apache.org/jira/browse/FLINK-13840 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.10.0 > > > In order to resolve class loading issues when using the > {{StandaloneJobClusterEntryPoint}}, it would be better to run the user code > in the user code class loader which supports child first class loading. At > the moment, the user code jar is part of the system class path and, hence, > part of the system class loader. > An easy way to solve this problem would be to place the user code in a > different directory than {{lib}} and then specify this path as an additional > classpath when creating the {{PackagedProgram}}. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14991) Export `FLINK_HOME` environment variable to all the entrypoint
Guowei Ma created FLINK-14991: - Summary: Export `FLINK_HOME` environment variable to all the entrypoint Key: FLINK-14991 URL: https://issues.apache.org/jira/browse/FLINK-14991 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Guowei Ma Currently, Flink depends on 6 types of files: configuration files, system jars files, script files、library jar files, plugin jar files, and user jars files. These files are in different directories. Flink exports 5 environment variables to locate these different type files: `FLINK_CONF_DIR`,`FLINK_LIB_DIR`,`FLINK_OPT_DIR`,`FLINK_PLUGIN_DIR`,`FLINK_BIN_DIR`. It is not a good style that exports an environment variable for every type of file. So this jira proposes to export the `FLINK_HOME` environment variable to all the entrypoint. Derive the directory of the different type files from the `FLINK_HOME` environment variable and every type file has a fixed directory name. This also has another benefit that the method implies the directory structure is the same in all the situations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15032) Remove the eagerly serialization from `RemoteRpcInvocation`
Guowei Ma created FLINK-15032: - Summary: Remove the eagerly serialization from `RemoteRpcInvocation` Key: FLINK-15032 URL: https://issues.apache.org/jira/browse/FLINK-15032 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Guowei Ma Currently, the constructor of `RemoteRpcInvocation` serializes the `parameterTypes` and `arg` of an RPC call. This could lead to two problems: # Consider a job that has 1k parallelism and has a 1m union list state. When deploying the 1k tasks, the eager serialization would use 1G memory instantly(Some time the serialization amplifies the memory usage). However, the serialized object is only used when the Akka sends the message. So we could reduce the memory pressure if we only serialize the object when the message would be sent by the Akka. # Furthermore, Akka would serialize the message at last and all the XXXGateway related class could be visible by the RPC level. Because of that, I think the serialization in the constructor of `RemoteRpcInvocation` could be avoided. I also do a simple test and find this could reduce the time cost of the RPC call. The 1k number of RPC calls with 1m `String` message: The current version costs around 2700ms; the Nonserialization version cost about 37ms. In summary, this Jira proposes to remove the serialization at the constructor of `RemoteRpcInvocation`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15032) Remove the eagerly serialization from `RemoteRpcInvocation`
[ https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-15032: -- Description: Currently, the constructor of `RemoteRpcInvocation` serializes the `parameterTypes` and `arg` of an RPC call. This could lead to a problem: Consider a job that has 1k parallelism and has a 1m union list state. When deploying the 1k tasks, the eager serialization would use 1G memory instantly(Some time the serialization amplifies the memory usage). However, the serialized object is only used when the Akka sends the message. So we could reduce the memory pressure if we only serialize the object when the message would be sent by the Akka. Akka would serialize the message at last and all the XXXGateway related class could be visible by the RPC level. Because of that, I think the eager serialization in the constructor of `RemoteRpcInvocation` could be avoided. I also do a simple test and find this could reduce the time cost of the RPC call. The 1k number of RPC calls with 1m `String` message: The current version costs around 2700ms; the Nonserialization version cost about 37ms. In summary, this Jira proposes to remove the serialization at the constructor of `RemoteRpcInvocation`. was: Currently, the constructor of `RemoteRpcInvocation` serializes the `parameterTypes` and `arg` of an RPC call. This could lead to two problems: # Consider a job that has 1k parallelism and has a 1m union list state. When deploying the 1k tasks, the eager serialization would use 1G memory instantly(Some time the serialization amplifies the memory usage). However, the serialized object is only used when the Akka sends the message. So we could reduce the memory pressure if we only serialize the object when the message would be sent by the Akka. # Furthermore, Akka would serialize the message at last and all the XXXGateway related class could be visible by the RPC level. Because of that, I think the serialization in the constructor of `RemoteRpcInvocation` could be avoided. I also do a simple test and find this could reduce the time cost of the RPC call. The 1k number of RPC calls with 1m `String` message: The current version costs around 2700ms; the Nonserialization version cost about 37ms. In summary, this Jira proposes to remove the serialization at the constructor of `RemoteRpcInvocation`. > Remove the eagerly serialization from `RemoteRpcInvocation` > > > Key: FLINK-15032 > URL: https://issues.apache.org/jira/browse/FLINK-15032 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Guowei Ma >Priority: Major > > Currently, the constructor of `RemoteRpcInvocation` serializes the > `parameterTypes` and `arg` of an RPC call. This could lead to a problem: > Consider a job that has 1k parallelism and has a 1m union list state. When > deploying the 1k tasks, the eager serialization would use 1G memory > instantly(Some time the serialization amplifies the memory usage). However, > the serialized object is only used when the Akka sends the message. So we > could reduce the memory pressure if we only serialize the object when the > message would be sent by the Akka. > Akka would serialize the message at last and all the XXXGateway related class > could be visible by the RPC level. Because of that, I think the eager > serialization in the constructor of `RemoteRpcInvocation` could be avoided. I > also do a simple test and find this could reduce the time cost of the RPC > call. The 1k number of RPC calls with 1m `String` message: The current > version costs around 2700ms; the Nonserialization version cost about 37ms. > > In summary, this Jira proposes to remove the serialization at the constructor > of `RemoteRpcInvocation`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15032) Remove the eager serialization from `RemoteRpcInvocation`
[ https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-15032: -- Description: Currently, the constructor of `RemoteRpcInvocation` serializes the `parameterTypes` and `arg` of an RPC call. This could lead to a problem: Consider a job that has 1k parallelism and has a 1m union list state. When deploying the 1k tasks, the eager serialization would use 1G memory instantly(Some time the serialization amplifies the memory usage). However, the serialized object is only used when the Akka sends the message. So we could reduce the memory pressure if we only serialize the object when the message would be sent by the Akka. Akka would serialize the message at last and all the XXXGateway related class could be visible by the RPC level. Because of that, I think the eager serialization in the constructor of `RemoteRpcInvocation` could be avoided. I also do a simple test and find this could reduce the time cost of the RPC call. The 1k number of RPC calls with 1m `String` message: The current version costs around 2700ms; the Nonserialization version cost about 37ms. In summary, this Jira proposes to remove the eager serialization at the constructor of `RemoteRpcInvocation`. was: Currently, the constructor of `RemoteRpcInvocation` serializes the `parameterTypes` and `arg` of an RPC call. This could lead to a problem: Consider a job that has 1k parallelism and has a 1m union list state. When deploying the 1k tasks, the eager serialization would use 1G memory instantly(Some time the serialization amplifies the memory usage). However, the serialized object is only used when the Akka sends the message. So we could reduce the memory pressure if we only serialize the object when the message would be sent by the Akka. Akka would serialize the message at last and all the XXXGateway related class could be visible by the RPC level. Because of that, I think the eager serialization in the constructor of `RemoteRpcInvocation` could be avoided. I also do a simple test and find this could reduce the time cost of the RPC call. The 1k number of RPC calls with 1m `String` message: The current version costs around 2700ms; the Nonserialization version cost about 37ms. In summary, this Jira proposes to remove the serialization at the constructor of `RemoteRpcInvocation`. > Remove the eager serialization from `RemoteRpcInvocation` > -- > > Key: FLINK-15032 > URL: https://issues.apache.org/jira/browse/FLINK-15032 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Guowei Ma >Priority: Major > > Currently, the constructor of `RemoteRpcInvocation` serializes the > `parameterTypes` and `arg` of an RPC call. This could lead to a problem: > Consider a job that has 1k parallelism and has a 1m union list state. When > deploying the 1k tasks, the eager serialization would use 1G memory > instantly(Some time the serialization amplifies the memory usage). However, > the serialized object is only used when the Akka sends the message. So we > could reduce the memory pressure if we only serialize the object when the > message would be sent by the Akka. > Akka would serialize the message at last and all the XXXGateway related class > could be visible by the RPC level. Because of that, I think the eager > serialization in the constructor of `RemoteRpcInvocation` could be avoided. I > also do a simple test and find this could reduce the time cost of the RPC > call. The 1k number of RPC calls with 1m `String` message: The current > version costs around 2700ms; the Nonserialization version cost about 37ms. > > In summary, this Jira proposes to remove the eager serialization at the > constructor of `RemoteRpcInvocation`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15032) Remove the eager serialization from `RemoteRpcInvocation`
[ https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-15032: -- Summary: Remove the eager serialization from `RemoteRpcInvocation` (was: Remove the eagerly serialization from `RemoteRpcInvocation` ) > Remove the eager serialization from `RemoteRpcInvocation` > -- > > Key: FLINK-15032 > URL: https://issues.apache.org/jira/browse/FLINK-15032 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Guowei Ma >Priority: Major > > Currently, the constructor of `RemoteRpcInvocation` serializes the > `parameterTypes` and `arg` of an RPC call. This could lead to a problem: > Consider a job that has 1k parallelism and has a 1m union list state. When > deploying the 1k tasks, the eager serialization would use 1G memory > instantly(Some time the serialization amplifies the memory usage). However, > the serialized object is only used when the Akka sends the message. So we > could reduce the memory pressure if we only serialize the object when the > message would be sent by the Akka. > Akka would serialize the message at last and all the XXXGateway related class > could be visible by the RPC level. Because of that, I think the eager > serialization in the constructor of `RemoteRpcInvocation` could be avoided. I > also do a simple test and find this could reduce the time cost of the RPC > call. The 1k number of RPC calls with 1m `String` message: The current > version costs around 2700ms; the Nonserialization version cost about 37ms. > > In summary, this Jira proposes to remove the serialization at the constructor > of `RemoteRpcInvocation`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15032) Remove the eager serialization from `RemoteRpcInvocation`
[ https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16986935#comment-16986935 ] Guowei Ma commented on FLINK-15032: --- Hi [~SleePy] >>>Do you mean we could postpone the serialization till >>>{{RemoteRpcInvocation#writeObject}}? yes. >>>Could you explain it a bit more? Do you mean there is no visible issue of >>>postponing the serialization? Or we don't need serialization anymore? What I mean is that there is no visible issue of postponing the serialization. >>>Do we still need serialization in {{wirteObject}} after removing >>>serialization in constructor? The serialization is still needed. > Remove the eager serialization from `RemoteRpcInvocation` > -- > > Key: FLINK-15032 > URL: https://issues.apache.org/jira/browse/FLINK-15032 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Guowei Ma >Priority: Major > > Currently, the constructor of `RemoteRpcInvocation` serializes the > `parameterTypes` and `arg` of an RPC call. This could lead to a problem: > Consider a job that has 1k parallelism and has a 1m union list state. When > deploying the 1k tasks, the eager serialization would use 1G memory > instantly(Some time the serialization amplifies the memory usage). However, > the serialized object is only used when the Akka sends the message. So we > could reduce the memory pressure if we only serialize the object when the > message would be sent by the Akka. > Akka would serialize the message at last and all the XXXGateway related class > could be visible by the RPC level. Because of that, I think the eager > serialization in the constructor of `RemoteRpcInvocation` could be avoided. I > also do a simple test and find this could reduce the time cost of the RPC > call. The 1k number of RPC calls with 1m `String` message: The current > version costs around 2700ms; the Nonserialization version cost about 37ms. > > In summary, this Jira proposes to remove the eager serialization at the > constructor of `RemoteRpcInvocation`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable
[ https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-10819: -- Attachment: image-2019-07-19-16-59-10-178.png > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is > unstable > - > > Key: FLINK-10819 > URL: https://issues.apache.org/jira/browse/FLINK-10819 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: Guowei Ma >Priority: Critical > Labels: test-stability > Fix For: 1.9.0 > > Attachments: image-2019-07-19-16-59-10-178.png > > > Found the following error in the process of CI: > Results : > Tests in error: > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » > IllegalArgument > Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29 > 18:40:55.828 [INFO] > > 18:40:55.829 [INFO] BUILD FAILURE > 18:40:55.829 [INFO] > > 18:40:55.830 [INFO] Total time: 30:19 min > 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00 > 18:40:56.294 [INFO] Final Memory: 92M/678M > 18:40:56.294 [INFO] > > 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be > activated because it does not exist. > 18:40:56.295 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test > (integration-tests) on project flink-tests_2.11: There are test failures. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] Please refer to > /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports > for the individual test results. > 18:40:56.295 [ERROR] -> [Help 1] > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven > with the -e switch. > 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug > logging. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] For more information about the errors and possible > solutions, please read the following articles: > 18:40:56.295 [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > MVN exited with EXIT CODE: 1. > Trying to KILL watchdog (11329). > ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog > PRODUCED build artifacts. > But after the rerun, the error disappeared. > Currently,no specific reasons are found, and will continue to pay attention. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable
[ https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-10819: -- Attachment: image-2019-07-19-17-00-17-194.png > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is > unstable > - > > Key: FLINK-10819 > URL: https://issues.apache.org/jira/browse/FLINK-10819 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: Guowei Ma >Priority: Critical > Labels: test-stability > Fix For: 1.9.0 > > Attachments: image-2019-07-19-16-59-10-178.png, > image-2019-07-19-17-00-17-194.png > > > Found the following error in the process of CI: > Results : > Tests in error: > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » > IllegalArgument > Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29 > 18:40:55.828 [INFO] > > 18:40:55.829 [INFO] BUILD FAILURE > 18:40:55.829 [INFO] > > 18:40:55.830 [INFO] Total time: 30:19 min > 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00 > 18:40:56.294 [INFO] Final Memory: 92M/678M > 18:40:56.294 [INFO] > > 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be > activated because it does not exist. > 18:40:56.295 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test > (integration-tests) on project flink-tests_2.11: There are test failures. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] Please refer to > /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports > for the individual test results. > 18:40:56.295 [ERROR] -> [Help 1] > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven > with the -e switch. > 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug > logging. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] For more information about the errors and possible > solutions, please read the following articles: > 18:40:56.295 [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > MVN exited with EXIT CODE: 1. > Trying to KILL watchdog (11329). > ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog > PRODUCED build artifacts. > But after the rerun, the error disappeared. > Currently,no specific reasons are found, and will continue to pay attention. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable
[ https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-10819: -- Attachment: image-2019-07-19-17-01-19-758.png > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is > unstable > - > > Key: FLINK-10819 > URL: https://issues.apache.org/jira/browse/FLINK-10819 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: Guowei Ma >Priority: Critical > Labels: test-stability > Fix For: 1.9.0 > > Attachments: image-2019-07-19-16-59-10-178.png, > image-2019-07-19-17-00-17-194.png, image-2019-07-19-17-01-19-758.png > > > Found the following error in the process of CI: > Results : > Tests in error: > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » > IllegalArgument > Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29 > 18:40:55.828 [INFO] > > 18:40:55.829 [INFO] BUILD FAILURE > 18:40:55.829 [INFO] > > 18:40:55.830 [INFO] Total time: 30:19 min > 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00 > 18:40:56.294 [INFO] Final Memory: 92M/678M > 18:40:56.294 [INFO] > > 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be > activated because it does not exist. > 18:40:56.295 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test > (integration-tests) on project flink-tests_2.11: There are test failures. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] Please refer to > /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports > for the individual test results. > 18:40:56.295 [ERROR] -> [Help 1] > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven > with the -e switch. > 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug > logging. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] For more information about the errors and possible > solutions, please read the following articles: > 18:40:56.295 [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > MVN exited with EXIT CODE: 1. > Trying to KILL watchdog (11329). > ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog > PRODUCED build artifacts. > But after the rerun, the error disappeared. > Currently,no specific reasons are found, and will continue to pay attention. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable
[ https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16889486#comment-16889486 ] Guowei Ma commented on FLINK-10819: --- It might be a bug. 1. The test is time out because two "READY_MARKER_FILE_PREFIX" files are missing. 2. Two tasks, which response for creating the two files can't be deployed because the resource is not available. !image-2019-07-19-17-01-19-758.png! 3. The slots from one TM(34dbf0f8264469af49be8e1dbc2ad811) are not recognized by SlotManger. Since this, the two tasks can't be deployed. !image-2019-07-19-17-00-17-194.png! 4. The TM(34dbf0f8264469af49be8e1dbc2ad811) registers to RM twice. !image-2019-07-19-16-59-10-178.png! The RM responses two RegistrationResponses to TM. But TM uses different threads to deal with RegistrationResponse. The registrationId of old RegistrationResponse override the registrationId of new RegistrationResponse. The simple idea is to use the main thread to process on the TM side. I am still thinking about it if there is another method. > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is > unstable > - > > Key: FLINK-10819 > URL: https://issues.apache.org/jira/browse/FLINK-10819 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: Guowei Ma >Priority: Critical > Labels: test-stability > Fix For: 1.9.0 > > Attachments: image-2019-07-19-16-59-10-178.png, > image-2019-07-19-17-00-17-194.png, image-2019-07-19-17-01-19-758.png > > > Found the following error in the process of CI: > Results : > Tests in error: > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » > IllegalArgument > Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29 > 18:40:55.828 [INFO] > > 18:40:55.829 [INFO] BUILD FAILURE > 18:40:55.829 [INFO] > > 18:40:55.830 [INFO] Total time: 30:19 min > 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00 > 18:40:56.294 [INFO] Final Memory: 92M/678M > 18:40:56.294 [INFO] > > 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be > activated because it does not exist. > 18:40:56.295 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test > (integration-tests) on project flink-tests_2.11: There are test failures. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] Please refer to > /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports > for the individual test results. > 18:40:56.295 [ERROR] -> [Help 1] > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven > with the -e switch. > 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug > logging. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] For more information about the errors and possible > solutions, please read the following articles: > 18:40:56.295 [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > MVN exited with EXIT CODE: 1. > Trying to KILL watchdog (11329). > ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog > PRODUCED build artifacts. > But after the rerun, the error disappeared. > Currently,no specific reasons are found, and will continue to pay attention. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable
[ https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16889486#comment-16889486 ] Guowei Ma edited comment on FLINK-10819 at 7/20/19 11:56 AM: - It might be a bug according to https://api.travis-ci.org/v3/job/508500560/log.txt 1. The test is time out because two "READY_MARKER_FILE_PREFIX" files are missing. 2. Two tasks, which response for creating the two files can't be deployed because the resource is not available. !image-2019-07-19-17-01-19-758.png! 3. The slots from one TM(34dbf0f8264469af49be8e1dbc2ad811) are not recognized by SlotManger. Since this, the two tasks can't be deployed. !image-2019-07-19-17-00-17-194.png! 4. The TM(34dbf0f8264469af49be8e1dbc2ad811) registers to RM twice. !image-2019-07-19-16-59-10-178.png! The RM responses two RegistrationResponses to TM. But TM uses different threads to deal with RegistrationResponse. The registrationId of old RegistrationResponse override the registrationId of new RegistrationResponse. The simple idea is to use the main thread to process on the TM side. I am still thinking about it if there is another method. was (Author: maguowei): It might be a bug. 1. The test is time out because two "READY_MARKER_FILE_PREFIX" files are missing. 2. Two tasks, which response for creating the two files can't be deployed because the resource is not available. !image-2019-07-19-17-01-19-758.png! 3. The slots from one TM(34dbf0f8264469af49be8e1dbc2ad811) are not recognized by SlotManger. Since this, the two tasks can't be deployed. !image-2019-07-19-17-00-17-194.png! 4. The TM(34dbf0f8264469af49be8e1dbc2ad811) registers to RM twice. !image-2019-07-19-16-59-10-178.png! The RM responses two RegistrationResponses to TM. But TM uses different threads to deal with RegistrationResponse. The registrationId of old RegistrationResponse override the registrationId of new RegistrationResponse. The simple idea is to use the main thread to process on the TM side. I am still thinking about it if there is another method. > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is > unstable > - > > Key: FLINK-10819 > URL: https://issues.apache.org/jira/browse/FLINK-10819 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: Guowei Ma >Priority: Critical > Labels: test-stability > Fix For: 1.9.0 > > Attachments: image-2019-07-19-16-59-10-178.png, > image-2019-07-19-17-00-17-194.png, image-2019-07-19-17-01-19-758.png > > > Found the following error in the process of CI: > Results : > Tests in error: > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » > IllegalArgument > Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29 > 18:40:55.828 [INFO] > > 18:40:55.829 [INFO] BUILD FAILURE > 18:40:55.829 [INFO] > > 18:40:55.830 [INFO] Total time: 30:19 min > 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00 > 18:40:56.294 [INFO] Final Memory: 92M/678M > 18:40:56.294 [INFO] > > 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be > activated because it does not exist. > 18:40:56.295 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test > (integration-tests) on project flink-tests_2.11: There are test failures. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] Please refer to > /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports > for the individual test results. > 18:40:56.295 [ERROR] -> [Help 1] > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven > with the -e switch. > 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug > logging. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] For more information about the errors and possible > solutions, please read the following articles: > 18:40:56.295 [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > MVN exited with EXIT CODE: 1. > Trying to KILL watchdog (11329). > ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog > PRODUCED build artifacts. > But after the rerun, the error disappeared. > Currently,no specific reasons are found, and will continue to pay attention. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13426) TaskExecutor uses the wrong Registrationid in the heartbeat with RM.
Guowei Ma created FLINK-13426: - Summary: TaskExecutor uses the wrong Registrationid in the heartbeat with RM. Key: FLINK-13426 URL: https://issues.apache.org/jira/browse/FLINK-13426 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.8.1, 1.9.0 Reporter: Guowei Ma 1. First-time TaskExecutor register to rm successfully. If it fails to send SlotReport to SlotMaanger, TaskExecutor will reconnect to RM again. However, TaskExecutor still uses the old registration id in the EstablishedResourceManagerConnection. 2. Second-time TaskExecutor registers to rm successfully and gets a new registration id. 3. First-round and second-round has a race condition. Since that the task executor maybe use the old registration id in heartbeat with rm. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13426) TaskExecutor uses the wrong Registrationid in the heartbeat with RM.
[ https://issues.apache.org/jira/browse/FLINK-13426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-13426: -- Attachment: image-2019-07-25-17-57-03-537.png > TaskExecutor uses the wrong Registrationid in the heartbeat with RM. > > > Key: FLINK-13426 > URL: https://issues.apache.org/jira/browse/FLINK-13426 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.1, 1.9.0 >Reporter: Guowei Ma >Priority: Minor > Attachments: image-2019-07-25-17-57-03-537.png > > > 1. First-time TaskExecutor register to rm successfully. If it fails to send > SlotReport to SlotMaanger, TaskExecutor will reconnect to RM again. However, > TaskExecutor still uses the old registration id in the > EstablishedResourceManagerConnection. > 2. Second-time TaskExecutor registers to rm successfully and gets a new > registration id. > 3. First-round and second-round has a race condition. Since that the task > executor maybe use the old registration id in heartbeat with rm. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13426) TaskExecutor uses the wrong Registrationid in the heartbeat with RM.
[ https://issues.apache.org/jira/browse/FLINK-13426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-13426: -- Description: 1. First-time TaskExecutor register to rm successfully. If it fails to send SlotReport to SlotMaanger, TaskExecutor will reconnect to RM again. However, TaskExecutor still uses the old registration id in the EstablishedResourceManagerConnection. 2. Second-time TaskExecutor registers to rm successfully and gets a new registration id. 3. First-round and second-round has a race condition. Since that the task executor maybe use the old registration id in heartbeat with rm. !image-2019-07-25-17-57-03-537.png! was: 1. First-time TaskExecutor register to rm successfully. If it fails to send SlotReport to SlotMaanger, TaskExecutor will reconnect to RM again. However, TaskExecutor still uses the old registration id in the EstablishedResourceManagerConnection. 2. Second-time TaskExecutor registers to rm successfully and gets a new registration id. 3. First-round and second-round has a race condition. Since that the task executor maybe use the old registration id in heartbeat with rm. > TaskExecutor uses the wrong Registrationid in the heartbeat with RM. > > > Key: FLINK-13426 > URL: https://issues.apache.org/jira/browse/FLINK-13426 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.1, 1.9.0 >Reporter: Guowei Ma >Priority: Minor > Attachments: image-2019-07-25-17-57-03-537.png > > > 1. First-time TaskExecutor register to rm successfully. If it fails to send > SlotReport to SlotMaanger, TaskExecutor will reconnect to RM again. However, > TaskExecutor still uses the old registration id in the > EstablishedResourceManagerConnection. > 2. Second-time TaskExecutor registers to rm successfully and gets a new > registration id. > 3. First-round and second-round has a race condition. Since that the task > executor maybe use the old registration id in heartbeat with rm. > > !image-2019-07-25-17-57-03-537.png! -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable
[ https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16889486#comment-16889486 ] Guowei Ma edited comment on FLINK-10819 at 7/25/19 10:05 AM: - It might be a bug according to [https://api.travis-ci.org/v3/job/508500560/log.txt] 1. The test is time out because two "READY_MARKER_FILE_PREFIX" files are missing. 2. Two tasks, which response for creating the two files can't be deployed because the resource is not available. !image-2019-07-19-17-01-19-758.png! 3. However, the reason why RM has not allocated resources to the two tasks is still uncertain. Because two task executors have been registered at the first time; there must be 4 free slots. !image-2019-07-19-17-00-17-194.png! I have read a few logs, and all have the following three characteristics. 1. Always have a TM registered twice; 2. The taskexecutor, which registers twice uses the old RegistrationID in the heartbeat. I already open a Jira [FLINK-13426] 3. When the job starts, it applies for 4 slots at the beginning. However, only one TM is available. During the execution of the Source, the SlotPool cancels the two Slot requests that have not come. In the next execution, the Scheduler issues two SlotRequests, but the two SlotRequests don't return until timeout. was (Author: maguowei): It might be a bug according to https://api.travis-ci.org/v3/job/508500560/log.txt 1. The test is time out because two "READY_MARKER_FILE_PREFIX" files are missing. 2. Two tasks, which response for creating the two files can't be deployed because the resource is not available. !image-2019-07-19-17-01-19-758.png! 3. The slots from one TM(34dbf0f8264469af49be8e1dbc2ad811) are not recognized by SlotManger. Since this, the two tasks can't be deployed. !image-2019-07-19-17-00-17-194.png! 4. The TM(34dbf0f8264469af49be8e1dbc2ad811) registers to RM twice. !image-2019-07-19-16-59-10-178.png! The RM responses two RegistrationResponses to TM. But TM uses different threads to deal with RegistrationResponse. The registrationId of old RegistrationResponse override the registrationId of new RegistrationResponse. The simple idea is to use the main thread to process on the TM side. I am still thinking about it if there is another method. > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is > unstable > - > > Key: FLINK-10819 > URL: https://issues.apache.org/jira/browse/FLINK-10819 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: Guowei Ma >Priority: Critical > Labels: test-stability > Fix For: 1.9.0 > > Attachments: image-2019-07-19-16-59-10-178.png, > image-2019-07-19-17-00-17-194.png, image-2019-07-19-17-01-19-758.png > > > Found the following error in the process of CI: > Results : > Tests in error: > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » > IllegalArgument > Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29 > 18:40:55.828 [INFO] > > 18:40:55.829 [INFO] BUILD FAILURE > 18:40:55.829 [INFO] > > 18:40:55.830 [INFO] Total time: 30:19 min > 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00 > 18:40:56.294 [INFO] Final Memory: 92M/678M > 18:40:56.294 [INFO] > > 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be > activated because it does not exist. > 18:40:56.295 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test > (integration-tests) on project flink-tests_2.11: There are test failures. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] Please refer to > /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports > for the individual test results. > 18:40:56.295 [ERROR] -> [Help 1] > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven > with the -e switch. > 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug > logging. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] For more information about the errors and possible > solutions, please read the following articles: > 18:40:56.295 [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > MVN exited with EXIT CODE: 1. > Trying to KILL watchdog (11329). > ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog > PRODUCED bui
[jira] [Commented] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable
[ https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16892620#comment-16892620 ] Guowei Ma commented on FLINK-10819: --- I think we could add some status log in SlotManager like slot pool. It will help to find why RM can not fulfill the slot requests and slot status. > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is > unstable > - > > Key: FLINK-10819 > URL: https://issues.apache.org/jira/browse/FLINK-10819 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: Guowei Ma >Priority: Critical > Labels: test-stability > Fix For: 1.9.0 > > Attachments: image-2019-07-19-16-59-10-178.png, > image-2019-07-19-17-00-17-194.png, image-2019-07-19-17-01-19-758.png > > > Found the following error in the process of CI: > Results : > Tests in error: > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » > IllegalArgument > Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29 > 18:40:55.828 [INFO] > > 18:40:55.829 [INFO] BUILD FAILURE > 18:40:55.829 [INFO] > > 18:40:55.830 [INFO] Total time: 30:19 min > 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00 > 18:40:56.294 [INFO] Final Memory: 92M/678M > 18:40:56.294 [INFO] > > 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be > activated because it does not exist. > 18:40:56.295 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test > (integration-tests) on project flink-tests_2.11: There are test failures. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] Please refer to > /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports > for the individual test results. > 18:40:56.295 [ERROR] -> [Help 1] > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven > with the -e switch. > 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug > logging. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] For more information about the errors and possible > solutions, please read the following articles: > 18:40:56.295 [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > MVN exited with EXIT CODE: 1. > Trying to KILL watchdog (11329). > ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog > PRODUCED build artifacts. > But after the rerun, the error disappeared. > Currently,no specific reasons are found, and will continue to pay attention. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13426) TaskExecutor uses the wrong Registrationid in the heartbeat with RM.
[ https://issues.apache.org/jira/browse/FLINK-13426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16895684#comment-16895684 ] Guowei Ma commented on FLINK-13426: --- TaskExecutor should use the taskExecutorRegistrationId after it successfully sendSlotReport the first time. Or it might use the old one in the heartbeat with ResourceManager. Since that TaskExecutor should monitor the ResourceManager at slotReportResponseFuture complete callback. > TaskExecutor uses the wrong Registrationid in the heartbeat with RM. > > > Key: FLINK-13426 > URL: https://issues.apache.org/jira/browse/FLINK-13426 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.1, 1.9.0 >Reporter: Guowei Ma >Priority: Minor > Attachments: image-2019-07-25-17-57-03-537.png > > > 1. First-time TaskExecutor register to rm successfully. If it fails to send > SlotReport to SlotMaanger, TaskExecutor will reconnect to RM again. However, > TaskExecutor still uses the old registration id in the > EstablishedResourceManagerConnection. > 2. Second-time TaskExecutor registers to rm successfully and gets a new > registration id. > 3. First-round and second-round has a race condition. Since that the task > executor maybe use the old registration id in heartbeat with rm. > > !image-2019-07-25-17-57-03-537.png! -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13480) Export SlotManager status at debug mode.
Guowei Ma created FLINK-13480: - Summary: Export SlotManager status at debug mode. Key: FLINK-13480 URL: https://issues.apache.org/jira/browse/FLINK-13480 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.9.0, 1.10 Reporter: Guowei Ma It is very difficult to resolve some resource allocation issues, for example, FLINK-10819 . One reason is that the status of slotmanager is very difficult to know. I think we could save a lot of time when troubleshooting problems if the status of slotmanager can export to log So I propose to export the status of slotmanager when debug is open. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13480) Export SlotManager status at debug mode.
[ https://issues.apache.org/jira/browse/FLINK-13480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-13480: -- Issue Type: Improvement (was: Bug) > Export SlotManager status at debug mode. > > > Key: FLINK-13480 > URL: https://issues.apache.org/jira/browse/FLINK-13480 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10 >Reporter: Guowei Ma >Priority: Minor > > It is very difficult to resolve some resource allocation issues, for example, > FLINK-10819 . > One reason is that the status of slotmanager is very difficult to know. I > think we could save a lot of time when troubleshooting problems if the status > of slotmanager can export to log > So I propose to export the status of slotmanager when debug is open. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13491) AsyncWaitOperator doesn't handle endInput call properly
[ https://issues.apache.org/jira/browse/FLINK-13491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16895937#comment-16895937 ] Guowei Ma commented on FLINK-13491: --- Currently, Operatorchain calls all operator's "endInput" together. An alternative way likes {code:java} operator1.endInput() operator1.close() operator2.endInput operator2.close() {code} this could flush the buffer data in the operator. > AsyncWaitOperator doesn't handle endInput call properly > --- > > Key: FLINK-13491 > URL: https://issues.apache.org/jira/browse/FLINK-13491 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.0 >Reporter: Piotr Nowojski >Priority: Blocker > Fix For: 1.9.0 > > > This is the same issue as for {{ContinousFileReaderOperator}} in > https://issues.apache.org/jira/browse/FLINK-13376. {{AsyncWaitOperator}} will > propagate {{endInput}} notification immediately, even if it has some records > buffered. > Side note, this also shows that the current {{BoundedOneInput#endInput}} API > is easy to mishandle if an operator buffers some records internally. Maybe we > could redesign this API somehow [~aljoscha] [~sunhaibotb]? -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Assigned] (FLINK-11985) Remove ignored command line parameter from yarn_setup.md
[ https://issues.apache.org/jira/browse/FLINK-11985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma reassigned FLINK-11985: - Assignee: Guowei Ma > Remove ignored command line parameter from yarn_setup.md > > > Key: FLINK-11985 > URL: https://issues.apache.org/jira/browse/FLINK-11985 > Project: Flink > Issue Type: Task > Components: Documentation >Reporter: Guowei Ma >Assignee: Guowei Ma >Priority: Trivial > > In "Flink Yarn Session" mode the command line parameter "-n, --container" has > already been ignored. It would be more friendly for user that the > documentation is consistent with the implementation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11985) Remove ignored command line parameter from yarn_setup.md
Guowei Ma created FLINK-11985: - Summary: Remove ignored command line parameter from yarn_setup.md Key: FLINK-11985 URL: https://issues.apache.org/jira/browse/FLINK-11985 Project: Flink Issue Type: Task Components: Documentation Reporter: Guowei Ma In "Flink Yarn Session" mode the command line parameter "-n, --container" has already been ignored. It would be more friendly for user that the documentation is consistent with the implementation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12037) High parallelism E2E test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-12037: -- Attachment: image-2019-04-01-16-33-17-540.png > High parallelism E2E test failed on Travis > -- > > Key: FLINK-12037 > URL: https://issues.apache.org/jira/browse/FLINK-12037 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.8.1 >Reporter: Chesnay Schepler >Priority: Major > Labels: test-stability > Attachments: image-2019-04-01-16-33-17-540.png > > > https://travis-ci.org/apache/flink/jobs/511335563 > {code} > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: f149b50ac8c6291371ce54b30fb0a072) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) > at > org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 22 more > Caused by: org.apache.flink.util.FlinkException: TaskExecutor > akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated > slots for job f149b50ac8c6291371ce54b30fb0a072. > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at
[jira] [Commented] (FLINK-12037) High parallelism E2E test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806517#comment-16806517 ] Guowei Ma commented on FLINK-12037: --- # A slot is allocated but jm don't deploy any task on it. # Since that TaskManager free the slot.(after 10s) # JM deploy the task to the TM which don't have any resource for it. !image-2019-04-01-16-33-17-540.png! I think it just the direct reason not root cause. Because I don't understand why jm requests the slot but don't deploy task on it. > High parallelism E2E test failed on Travis > -- > > Key: FLINK-12037 > URL: https://issues.apache.org/jira/browse/FLINK-12037 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.8.1 >Reporter: Chesnay Schepler >Priority: Major > Labels: test-stability > Attachments: image-2019-04-01-16-33-17-540.png > > > https://travis-ci.org/apache/flink/jobs/511335563 > {code} > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: f149b50ac8c6291371ce54b30fb0a072) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) > at > org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 22 more > Caused by: org.apache.flink.util.FlinkException: TaskExecutor > akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated > slots for job f149b50ac8c6291371ce54b30fb0a072. > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) >
[jira] [Comment Edited] (FLINK-12037) High parallelism E2E test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806517#comment-16806517 ] Guowei Ma edited comment on FLINK-12037 at 4/1/19 8:45 AM: --- # A slot is allocated but jm don't deploy any task on it. # Since that TaskManager free the slot.(after 10s) # JM deploy the task to the TM which don't have any resource for it. !image-2019-04-01-16-33-17-540.png! was (Author: maguowei): # A slot is allocated but jm don't deploy any task on it. # Since that TaskManager free the slot.(after 10s) # JM deploy the task to the TM which don't have any resource for it. !image-2019-04-01-16-33-17-540.png! I think it just the direct reason not root cause. Because I don't understand why jm requests the slot but don't deploy task on it. > High parallelism E2E test failed on Travis > -- > > Key: FLINK-12037 > URL: https://issues.apache.org/jira/browse/FLINK-12037 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.8.1 >Reporter: Chesnay Schepler >Priority: Major > Labels: test-stability > Attachments: image-2019-04-01-16-33-17-540.png > > > https://travis-ci.org/apache/flink/jobs/511335563 > {code} > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: f149b50ac8c6291371ce54b30fb0a072) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) > at > org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 22 more > Caused by: org.apache.flink.util.FlinkException: TaskExecutor > akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated > slots for job f149b50ac8c6291371ce54b30fb0a072. > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(Un
[jira] [Issue Comment Deleted] (FLINK-12037) High parallelism E2E test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-12037: -- Comment: was deleted (was: # A slot is allocated but jm don't deploy any task on it. # Since that TaskManager free the slot.(after 10s) # JM deploy the task to the TM which don't have any resource for it. !image-2019-04-01-16-33-17-540.png! ) > High parallelism E2E test failed on Travis > -- > > Key: FLINK-12037 > URL: https://issues.apache.org/jira/browse/FLINK-12037 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.8.1 >Reporter: Chesnay Schepler >Priority: Major > Labels: test-stability > > https://travis-ci.org/apache/flink/jobs/511335563 > {code} > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: f149b50ac8c6291371ce54b30fb0a072) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) > at > org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 22 more > Caused by: org.apache.flink.util.FlinkException: TaskExecutor > akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated > slots for job f149b50ac8c6291371ce54b30fb0a072. > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.Fork
[jira] [Updated] (FLINK-12037) High parallelism E2E test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-12037: -- Attachment: (was: image-2019-04-01-16-33-17-540.png) > High parallelism E2E test failed on Travis > -- > > Key: FLINK-12037 > URL: https://issues.apache.org/jira/browse/FLINK-12037 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.8.1 >Reporter: Chesnay Schepler >Priority: Major > Labels: test-stability > > https://travis-ci.org/apache/flink/jobs/511335563 > {code} > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: f149b50ac8c6291371ce54b30fb0a072) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) > at > org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 22 more > Caused by: org.apache.flink.util.FlinkException: TaskExecutor > akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated > slots for job f149b50ac8c6291371ce54b30fb0a072. > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWor
[jira] [Commented] (FLINK-12037) High parallelism E2E test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806596#comment-16806596 ] Guowei Ma commented on FLINK-12037: --- >From the log: The TaskManager(b15f5702d9e1cc57e353824b3ec4b792) doesn't get response from jm after it offer its' slot. Since that the taskmanager release the slot and close the connection with jm. > High parallelism E2E test failed on Travis > -- > > Key: FLINK-12037 > URL: https://issues.apache.org/jira/browse/FLINK-12037 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.8.1 >Reporter: Chesnay Schepler >Priority: Major > Labels: test-stability > > https://travis-ci.org/apache/flink/jobs/511335563 > {code} > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: f149b50ac8c6291371ce54b30fb0a072) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) > at > org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 22 more > Caused by: org.apache.flink.util.FlinkException: TaskExecutor > akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated > slots for job f149b50ac8c6291371ce54b30fb0a072. > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.fork
[jira] [Comment Edited] (FLINK-12037) High parallelism E2E test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806596#comment-16806596 ] Guowei Ma edited comment on FLINK-12037 at 4/1/19 10:10 AM: >From the log: The TaskManager(b15f5702d9e1cc57e353824b3ec4b792) doesn't get response from jm after it offer its' slot. Since that the taskmanager release the slot and close the connection with jm after 10s. was (Author: maguowei): >From the log: The TaskManager(b15f5702d9e1cc57e353824b3ec4b792) doesn't get response from jm after it offer its' slot. Since that the taskmanager release the slot and close the connection with jm. > High parallelism E2E test failed on Travis > -- > > Key: FLINK-12037 > URL: https://issues.apache.org/jira/browse/FLINK-12037 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.8.1 >Reporter: Chesnay Schepler >Priority: Major > Labels: test-stability > > https://travis-ci.org/apache/flink/jobs/511335563 > {code} > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: f149b50ac8c6291371ce54b30fb0a072) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) > at > org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 22 more > Caused by: org.apache.flink.util.FlinkException: TaskExecutor > akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated > slots for job f149b50ac8c6291371ce54b30fb0a072. > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(Act
[jira] [Comment Edited] (FLINK-12037) High parallelism E2E test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806596#comment-16806596 ] Guowei Ma edited comment on FLINK-12037 at 4/2/19 10:34 AM: >From the log: The TaskManager(b15f5702d9e1cc57e353824b3ec4b792) doesn't get response from jm after it offer its' slot. Since that the taskmanager release the slot and close the connection with jm after 10s. For TM 11:38:11,436 this taskmanger b15f5702d9e1cc57e353824b3ec4b792 is started 11:40:36,761 Receive slot request 11:40:43,972 Establish JobManager connection for job f149b50ac8c6291371ce54b30fb0a072 11:40:43,989 Offer reserved slots to the leader of job 11:40:46,766 Free slot TaskSlot During 11:40:43~11:40:46 JM is mainly deploying the task. !image-2019-04-02-18-30-20-634.png! Some deployings have 100ms interval was (Author: maguowei): >From the log: The TaskManager(b15f5702d9e1cc57e353824b3ec4b792) doesn't get response from jm after it offer its' slot. Since that the taskmanager release the slot and close the connection with jm after 10s. > High parallelism E2E test failed on Travis > -- > > Key: FLINK-12037 > URL: https://issues.apache.org/jira/browse/FLINK-12037 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.8.1 >Reporter: Chesnay Schepler >Priority: Major > Labels: test-stability > Attachments: image-2019-04-02-18-30-20-634.png > > > https://travis-ci.org/apache/flink/jobs/511335563 > {code} > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: f149b50ac8c6291371ce54b30fb0a072) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) > at > org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 22 more > Caused by: org.apache.flink.util.FlinkException: TaskExecutor > akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated > slots for job f149b50ac8c6291371ce54b30fb0a072. > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAs
[jira] [Comment Edited] (FLINK-12037) High parallelism E2E test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806596#comment-16806596 ] Guowei Ma edited comment on FLINK-12037 at 4/2/19 1:29 PM: --- >From the log: The TaskManager(b15f5702d9e1cc57e353824b3ec4b792) doesn't get response from jm after it offer its' slot. Since that the taskmanager release the slot and close the connection with jm after 10s. For TM 11:38:11,436 this taskmanger b15f5702d9e1cc57e353824b3ec4b792 is started 11:40:36,761 Receive slot request 11:40:43,972 Establish JobManager connection for job f149b50ac8c6291371ce54b30fb0a072 11:40:43,989 Offer reserved slots to the leader of job 11:40:46,766 Free slot TaskSlot During 11:40:43~11:40:46 JM is mainly deploying the task. There is no obvious information about what the JM is doing. There is also no obvious information that "Deploying" is time consuming. !image-2019-04-02-18-30-20-634.png! Some deployings have 100ms interval was (Author: maguowei): >From the log: The TaskManager(b15f5702d9e1cc57e353824b3ec4b792) doesn't get response from jm after it offer its' slot. Since that the taskmanager release the slot and close the connection with jm after 10s. For TM 11:38:11,436 this taskmanger b15f5702d9e1cc57e353824b3ec4b792 is started 11:40:36,761 Receive slot request 11:40:43,972 Establish JobManager connection for job f149b50ac8c6291371ce54b30fb0a072 11:40:43,989 Offer reserved slots to the leader of job 11:40:46,766 Free slot TaskSlot During 11:40:43~11:40:46 JM is mainly deploying the task. !image-2019-04-02-18-30-20-634.png! Some deployings have 100ms interval > High parallelism E2E test failed on Travis > -- > > Key: FLINK-12037 > URL: https://issues.apache.org/jira/browse/FLINK-12037 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.8.1 >Reporter: Chesnay Schepler >Priority: Major > Labels: test-stability > Attachments: image-2019-04-02-18-30-20-634.png > > > https://travis-ci.org/apache/flink/jobs/511335563 > {code} > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: f149b50ac8c6291371ce54b30fb0a072) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) > at > org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 22 more > Caused by: org.apache.flink.util.FlinkException: TaskExecutor > akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more
[jira] [Commented] (FLINK-12086) AsyncFunction - Add access to a user defined Object for cleanup on timeout
[ https://issues.apache.org/jira/browse/FLINK-12086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16808282#comment-16808282 ] Guowei Ma commented on FLINK-12086: --- I think there might be another way to access user object other than change the API. {code:java} package dev.codeflush; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.ResultFuture; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SomeAsyncFunction implements AsyncFunction { private static final long serialVersionUID = 1L; private HashMap, Future> libFutures = new HashMap(); @Override public void asyncInvoke(Integer input, ResultFuture resultFuture) throws Exception { Future future = null; // submit something in a library thread-pool libFutures.put(resultFuture, future); CompletableFuture.runAsync(() -> { try { resultFuture.complete(Collections.singleton(future.get())); libFutures.remove(resultFuture); } catch (ExecutionException e) { // handle this } catch (InterruptedException e) { // handle that } }); return future; } @Override public void timeout(Integer input, ResultFuture resultFuture) throws Exception { Future future = libFutures.remove(resultFuture); if (future != null) { future.cancel(true); } resultFuture.complete(Collections.emptySet()); } } {code} > AsyncFunction - Add access to a user defined Object for cleanup on timeout > -- > > Key: FLINK-12086 > URL: https://issues.apache.org/jira/browse/FLINK-12086 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Felix Wollschläger >Priority: Major > > When executing async-requests it would be nice to have access to a user > defined object to perform cleanup when the process times out. > For example, when executing Cassandra-Queries I'm using the drivers > threadpool to submit Statements, which returns a > com.datastax.driver.core.ResultSetFutre ( > [https://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/core/ResultSetFuture.html] > ). When I run into a timeout I could cancel the Future because waiting for > it to complete is unnecessary in that case. > > The API could be extendend to something like this: > > Adding an Type-Parameter to the AsnyFunction Interface: > {code:java} > AsyncFunction{code} > Updating the asnyInvoke-Method to return the user-defined object: > {code:java} > T asyncInvoke(IN input, ResultFuture future) throws Exception;{code} > Updating the timeout-Method to accept the user-defined object: > {code:java} > void timeout(IN input, T obj, ResultFuture resultFuture) throws > Exception{code} > > An example Implementation could look like this: > {code:java} > package dev.codeflush; > import org.apache.flink.streaming.api.functions.async.AsyncFunction; > import org.apache.flink.streaming.api.functions.async.ResultFuture; > import java.util.Collections; > import java.util.concurrent.CompletableFuture; > import java.util.concurrent.ExecutionException; > import java.util.concurrent.Future; > public class SomeAsyncFunction implements AsyncFunction Future> { > private static final long serialVersionUID = 1L; > > @Override > public Future asyncInvoke(Integer input, ResultFuture > resultFuture) throws Exception { > Future future = null; // submit something in a library > thread-pool > CompletableFuture.runAsync(() -> { > try { > resultFuture.complete(Collections.singleton(future.get())); > } catch (ExecutionException e) { > // handle this > } catch (InterruptedException e) { > // handle that > } > }); > > return future; > } > @Override > public void timeout(Integer input, Future future, > ResultFuture resultFuture) throws Exception { > future.cancel(true); > resultFuture.complete(Collections.emptySet()); > } > } > {code} > As it currently is, submitted tasks in the asnyInvoke-Method will use > resources (Threads, IO) even if the application is no longer in a state where > it could do something meaningful with the result. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12086) AsyncFunction - Add access to a user defined Object for cleanup on timeout
[ https://issues.apache.org/jira/browse/FLINK-12086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16808282#comment-16808282 ] Guowei Ma edited comment on FLINK-12086 at 4/3/19 1:30 AM: --- I think there might be another way to access user object other than change the API. {code:java} package dev.codeflush; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.ResultFuture; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SomeAsyncFunction implements AsyncFunction { private static final long serialVersionUID = 1L; private HashMap, Future> libFutures = new HashMap(); @Override public void asyncInvoke(Integer input, ResultFuture resultFuture) throws Exception { Future future = null; // submit something in a library thread-pool libFutures.put(resultFuture, future); CompletableFuture.runAsync(() -> { try { resultFuture.complete(Collections.singleton(future.get())); libFutures.remove(resultFuture); } catch (ExecutionException e) { // handle this } catch (InterruptedException e) { // handle that } }); return future; } @Override public void timeout(Integer input, ResultFuture resultFuture) throws Exception { Future future = libFutures.remove(resultFuture); if (future != null) { future.cancel(true); } resultFuture.complete(Collections.emptySet()); } } {code} was (Author: maguowei): I think there might be another way to access user object other than change the API. {code:java} package dev.codeflush; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.ResultFuture; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SomeAsyncFunction implements AsyncFunction { private static final long serialVersionUID = 1L; private HashMap, Future> libFutures = new HashMap(); @Override public void asyncInvoke(Integer input, ResultFuture resultFuture) throws Exception { Future future = null; // submit something in a library thread-pool libFutures.put(resultFuture, future); CompletableFuture.runAsync(() -> { try { resultFuture.complete(Collections.singleton(future.get())); libFutures.remove(resultFuture); } catch (ExecutionException e) { // handle this } catch (InterruptedException e) { // handle that } }); return future; } @Override public void timeout(Integer input, ResultFuture resultFuture) throws Exception { Future future = libFutures.remove(resultFuture); if (future != null) { future.cancel(true); } resultFuture.complete(Collections.emptySet()); } } {code} > AsyncFunction - Add access to a user defined Object for cleanup on timeout > -- > > Key: FLINK-12086 > URL: https://issues.apache.org/jira/browse/FLINK-12086 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Felix Wollschläger >Priority: Major > > When executing async-requests it would be nice to have access to a user > defined object to perform cleanup when the process times out. > For example, when executing Cassandra-Queries I'm using the drivers > threadpool to submit Statements, which returns a > com.datastax.driver.core.ResultSetFutre ( > [https://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/core/ResultSetFuture.html] > ). When I run into a timeout I could cancel the Future because waiting for > it to complete is unnecessary in that case. > > The API could be extendend to something like this: > > Adding an Type-Parameter to the AsnyFunction Interface: > {code:java} > AsyncFunction{code} > Updating the asnyInvoke-Method to return the user-defined object: > {code:java} > T asyncInvoke(IN input, ResultFuture future) throws Exception;{code} > Updating the timeout-Method to accept the user-defined object: > {code:java} > void timeout(IN input, T obj, ResultFuture resultFuture) throws > Exception{code} > > An example Implementation could look like this: > {code:java} > package dev.codeflush; > import org.apache.flink.streaming.api.functions.async.A
[jira] [Commented] (FLINK-12106) Jobmanager is killing FINISHED taskmanger containers, causing exception in still running Taskmanagers an
[ https://issues.apache.org/jira/browse/FLINK-12106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16809421#comment-16809421 ] Guowei Ma commented on FLINK-12106: --- AFAIK, the community is working on it. [FLINK-10941|https://issues.apache.org/jira/browse/FLINK-10941] has the same problem. This issue is related to the lifecycle control of Shuffle Resource. There have some related discussions and design[1][2]. [1] [https://docs.google.com/document/d/13vAJJxfRXAwI4MtO8dux8hHnNMw2Biu5XRrb_hvGehA/edit#heading=h.v7vhb7w01d61] [2] [https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Manager] > Jobmanager is killing FINISHED taskmanger containers, causing exception in > still running Taskmanagers an > > > Key: FLINK-12106 > URL: https://issues.apache.org/jira/browse/FLINK-12106 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.7.2 > Environment: Hadoop: hdp/2.5.6.0-40 > Flink: 2.7.2 >Reporter: John >Priority: Major > > When running a single flink job on YARN, some of the taskmanger containers > reach the FINISHED state before others. It appears that, after receiving > final execution state FINISHED from a taskmanager, jobmanager is waiting ~68 > seconds and then freeing the associated slot in the taskmanager. After and > additional 60 seconds, jobmanager is stopping the same taskmanger because > TaskExecutor exceeded the idle timeout. > Meanwhile, other taskmangers are still working to complete the job. Within > 10 seconds after the taskmanger container above is stopped, the remaining > task managers receive an exception due to loss of connection to the stopped > taskmanager. These exceptions result job failure. > > Relevant logs: > 2019-04-03 13:49:00,013 INFO org.apache.flink.yarn.YarnResourceManager > - Registering TaskManager with ResourceID > container_1553017480503_0158_01_38 > (akka.tcp://flink@hadoop4:42745/user/taskmanager_0) at ResourceManager > 2019-04-03 13:49:05,900 INFO org.apache.flink.yarn.YarnResourceManager > - Registering TaskManager with ResourceID > container_1553017480503_0158_01_59 > (akka.tcp://flink@hadoop9:55042/user/taskmanager_0) at ResourceManager > > > 2019-04-03 13:48:51,132 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1553017480503_0158_01_77 - Remaining pending container > requests: 6 > 2019-04-03 13:48:52,862 INFO org.apache.flink.yarn.YarnTaskExecutorRunner > - > -Dlog.file=/hadoop/yarn/log/application_1553017480503_0158/container_1553017480503_0158_01_77/taskmanager.log > 2019-04-03 13:48:57,490 INFO > org.apache.flink.runtime.io.network.netty.NettyServer - Successful > initialization (took 202 ms). Listening on SocketAddress > /192.168.230.69:40140. > 2019-04-03 13:49:12,575 INFO org.apache.flink.yarn.YarnResourceManager > - Registering TaskManager with ResourceID > container_1553017480503_0158_01_77 > (akka.tcp://flink@hadoop9:51525/user/taskmanager_0) at ResourceManager > 2019-04-03 13:49:12,631 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated > slot for AllocationID\{42fed3e5a136240c23cc7b394e3249e9}. > 2019-04-03 14:58:15,188 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - > Un-registering task and sending final execution state FINISHED to JobManager > for task DataSink > (com.anovadata.alexflinklib.sinks.bucketing.BucketingOutputFormat@26874f2c) > a4b5fb32830d4561147b2714828109e2. > 2019-04-03 14:59:23,049 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > idle slot [AllocationID\{42fed3e5a136240c23cc7b394e3249e9}]. > 2019-04-03 14:59:23,058 INFO > org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable - Free slot > TaskSlot(index:0, state:ACTIVE, resource profile: > ResourceProfile\{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647, > directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, > networkMemoryInMB=2147483647}, allocationId: > AllocationID\{42fed3e5a136240c23cc7b394e3249e9}, jobId: > a6c4e367698c15cdf168d19a89faff1d). > 2019-04-03 15:00:02,641 INFO org.apache.flink.yarn.YarnResourceManager > - Stopping container container_1553017480503_0158_01_77. > 2019-04-03 15:00:02,646 INFO org.apache.flink.yarn.YarnResourceManager > - Closing TaskExecutor connection > container_1553017480503_0158_01_77 because: TaskExecutor exceeded the > idle timeout. > > > 2019-04-03 13:48:48,902 INFO org.apache.flink.yarn.YarnTaskExecut
[jira] [Updated] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-12113: -- Attachment: image-2019-04-07-21-52-37-264.png > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16811892#comment-16811892 ] Guowei Ma commented on FLINK-12113: --- I can't reproduce your problem. !image-2019-04-07-21-52-37-264.png! > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-12113: -- Attachment: image-2019-04-08-23-19-27-359.png > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812502#comment-16812502 ] Guowei Ma commented on FLINK-12113: --- hi, [~vision57] I change my code and don't use anonymous class. But it does not produce the problem. I am using JDK8. !image-2019-04-08-23-19-27-359.png! > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813050#comment-16813050 ] Guowei Ma commented on FLINK-12113: --- Thanks for your explanation. [~vision57]. DataStream.flatmap/DataStream.process do the clean but StreamExecutionEnvironment does not do the clean. I think we could learn from DataStream api. [~aljoscha] Could you give some advice? > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813050#comment-16813050 ] Guowei Ma edited comment on FLINK-12113 at 4/9/19 6:58 AM: --- Thanks for your explanation. [~vision57]. I think StreamExecutionEnvironment's other functions might have the same problem. DataStream.flatmap/DataStream.process do the clean but StreamExecutionEnvironment does not do the clean. I think we could learn from DataStream api. [~aljoscha] Could you give some advice? was (Author: maguowei): Thanks for your explanation. [~vision57]. DataStream.flatmap/DataStream.process do the clean but StreamExecutionEnvironment does not do the clean. I think we could learn from DataStream api. [~aljoscha] Could you give some advice? > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815286#comment-16815286 ] Guowei Ma commented on FLINK-12113: --- [~vision57] do you want to fix this ? if I you would not have time I think I could fix it. What do you think? > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma reassigned FLINK-12113: - Assignee: Guowei Ma > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Assignee: Guowei Ma >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-17038) Decouple resolving Type and creating TypeInformation process
Guowei Ma created FLINK-17038: - Summary: Decouple resolving Type and creating TypeInformation process Key: FLINK-17038 URL: https://issues.apache.org/jira/browse/FLINK-17038 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.10.0 Reporter: Guowei Ma -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17038) Decouple resolving Type and creating TypeInformation process
[ https://issues.apache.org/jira/browse/FLINK-17038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-17038: -- Parent: FLINK-15674 Issue Type: Sub-task (was: Improvement) > Decouple resolving Type and creating TypeInformation process > > > Key: FLINK-17038 > URL: https://issues.apache.org/jira/browse/FLINK-17038 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Affects Versions: 1.10.0 >Reporter: Guowei Ma >Priority: Critical > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17039) Introduce TypeInformationExtractor interface
Guowei Ma created FLINK-17039: - Summary: Introduce TypeInformationExtractor interface Key: FLINK-17039 URL: https://issues.apache.org/jira/browse/FLINK-17039 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Guowei Ma -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17038) Decouple resolving Type from creating TypeInformation process
[ https://issues.apache.org/jira/browse/FLINK-17038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-17038: -- Summary: Decouple resolving Type from creating TypeInformation process (was: Decouple resolving Type and creating TypeInformation process) > Decouple resolving Type from creating TypeInformation process > - > > Key: FLINK-17038 > URL: https://issues.apache.org/jira/browse/FLINK-17038 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Affects Versions: 1.10.0 >Reporter: Guowei Ma >Priority: Critical > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17039) Introduce TypeInformationExtractor
[ https://issues.apache.org/jira/browse/FLINK-17039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-17039: -- Summary: Introduce TypeInformationExtractor (was: Introduce TypeInformationExtractor interface) > Introduce TypeInformationExtractor > -- > > Key: FLINK-17039 > URL: https://issues.apache.org/jira/browse/FLINK-17039 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Reporter: Guowei Ma >Priority: Critical > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17041) Migrate current TypeInformation creation to the TypeInformationExtractor framework
Guowei Ma created FLINK-17041: - Summary: Migrate current TypeInformation creation to the TypeInformationExtractor framework Key: FLINK-17041 URL: https://issues.apache.org/jira/browse/FLINK-17041 Project: Flink Issue Type: Sub-task Reporter: Guowei Ma -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17039) Introduce TypeInformationExtractor famework
[ https://issues.apache.org/jira/browse/FLINK-17039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-17039: -- Summary: Introduce TypeInformationExtractor famework (was: Introduce TypeInformationExtractor) > Introduce TypeInformationExtractor famework > --- > > Key: FLINK-17039 > URL: https://issues.apache.org/jira/browse/FLINK-17039 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Reporter: Guowei Ma >Priority: Critical > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15032) Remove the eager serialization from `RemoteRpcInvocation`
[ https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17008617#comment-17008617 ] Guowei Ma commented on FLINK-15032: --- Thanks [~trohrmann] and [~SleePy] for your concerns. As [~SleePy] said there is no good way to check the size of load without serialization. For reducing the memory pressure we could only compute size of load but not save the serialization result. But it is only useful for the specific scenarios for example (large parallelism and network is limited). So I would close this issue now. > Remove the eager serialization from `RemoteRpcInvocation` > -- > > Key: FLINK-15032 > URL: https://issues.apache.org/jira/browse/FLINK-15032 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Guowei Ma >Priority: Major > > Currently, the constructor of `RemoteRpcInvocation` serializes the > `parameterTypes` and `arg` of an RPC call. This could lead to a problem: > Consider a job that has 1k parallelism and has a 1m union list state. When > deploying the 1k tasks, the eager serialization would use 1G memory > instantly(Some time the serialization amplifies the memory usage). However, > the serialized object is only used when the Akka sends the message. So we > could reduce the memory pressure if we only serialize the object when the > message would be sent by the Akka. > Akka would serialize the message at last and all the XXXGateway related class > could be visible by the RPC level. Because of that, I think the eager > serialization in the constructor of `RemoteRpcInvocation` could be avoided. I > also do a simple test and find this could reduce the time cost of the RPC > call. The 1k number of RPC calls with 1m `String` message: The current > version costs around 2700ms; the Nonserialization version cost about 37ms. > > In summary, this Jira proposes to remove the eager serialization at the > constructor of `RemoteRpcInvocation`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-15032) Remove the eager serialization from `RemoteRpcInvocation`
[ https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma closed FLINK-15032. - Resolution: Won't Fix > Remove the eager serialization from `RemoteRpcInvocation` > -- > > Key: FLINK-15032 > URL: https://issues.apache.org/jira/browse/FLINK-15032 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Guowei Ma >Priority: Major > > Currently, the constructor of `RemoteRpcInvocation` serializes the > `parameterTypes` and `arg` of an RPC call. This could lead to a problem: > Consider a job that has 1k parallelism and has a 1m union list state. When > deploying the 1k tasks, the eager serialization would use 1G memory > instantly(Some time the serialization amplifies the memory usage). However, > the serialized object is only used when the Akka sends the message. So we > could reduce the memory pressure if we only serialize the object when the > message would be sent by the Akka. > Akka would serialize the message at last and all the XXXGateway related class > could be visible by the RPC level. Because of that, I think the eager > serialization in the constructor of `RemoteRpcInvocation` could be avoided. I > also do a simple test and find this could reduce the time cost of the RPC > call. The 1k number of RPC calls with 1m `String` message: The current > version costs around 2700ms; the Nonserialization version cost about 37ms. > > In summary, this Jira proposes to remove the eager serialization at the > constructor of `RemoteRpcInvocation`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-2646) User functions should be able to differentiate between successful close and erroneous close
[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17019866#comment-17019866 ] Guowei Ma commented on FLINK-2646: -- I want to understand what specific scenarios the jira wants to resolve. After reading some mail/doc[1][2] and some scenarios I encountered. I summary it as below(correct me if I miss something): Currently, the scenarios that _close_ interface could not be satisfied # The user wants to accelerate the failover process. Currently, some users implement the _close_ interface to flush the memory data to the external system because the job would deal with the bounded stream sometimes. However, it slows down the failover process because when canceling the task the Flink would also call the close method which might do some heavy i/o processing. # The user wants the exactly once semantics for the bounded stream. If the user implements the _close_ interface which commits the results some results would be committed multi-times because when failover occurs some messages would be replayed. If the user implements the _close_ interface which does not commit the result some results would be lost. Because many users implement the _close_ interface to release the resources so we could not break this semantics that whenever a task is terminated the _close_ method should always be called. If Flink provides an interface such as `_closeAtEndofStream_` I think we could resolve the second problem in most situations. However I think this also needs some other efforts such as dedupe the commit at the _close_ or using the _finalizeOnMaster_ callback. [1] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E [2] https://docs.google.com/document/d/1SXfhmeiJfWqi2ITYgCgAoSDUv5PNq1T8Zu01nR5Ebog/edit# > User functions should be able to differentiate between successful close and > erroneous close > --- > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: usability > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15670) Provide a Kafka Source/Sink pair that aligns Kafka's Partitions and Flink's KeyGroups
[ https://issues.apache.org/jira/browse/FLINK-15670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17019887#comment-17019887 ] Guowei Ma commented on FLINK-15670: --- >>>I am wondering how do we expose this feature to the users? Maybe we could do something automatically when generating the StreamGraph or JobGraph. For example if `_blockingConnectionsBetweenChains_ ` is true we could add the Kafka source and Kafka sink automatically between the two shuffle operators automatically. The topic could use the operator's uid. > Provide a Kafka Source/Sink pair that aligns Kafka's Partitions and Flink's > KeyGroups > - > > Key: FLINK-15670 > URL: https://issues.apache.org/jira/browse/FLINK-15670 > Project: Flink > Issue Type: New Feature > Components: API / DataStream, Connectors / Kafka >Reporter: Stephan Ewen >Priority: Major > Labels: usability > Fix For: 1.11.0 > > > This Source/Sink pair would serve two purposes: > 1. You can read topics that are already partitioned by key and process them > without partitioning them again (avoid shuffles) > 2. You can use this to shuffle through Kafka, thereby decomposing the job > into smaller jobs and independent pipelined regions that fail over > independently. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-2646) User functions should be able to differentiate between successful close and erroneous close
[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17022873#comment-17022873 ] Guowei Ma commented on FLINK-2646: -- Thanks for your detailed comments. I think it is very cool unification the sink for the bounded and unbounded job. What I understand about the Sinks and Committing as follow, correct me if I am wrong # EndOfInput(TaskFinish) is very similar to trigger a checkpoint. One difference is this checkpoint is not triggered by the _CheckpointCoordinator_. So maybe Flink could notify the UDF to snapshot state when receiving the _EndOfInput_ # JobFinish is very similar to the checkpoint complete. So maybe Flink could also notify the UDF the _notifyCheckpointComplete_ when a job is finished. So the sink does not need to assume that the input is bounded or unbounded. It only depends on the checkpoint mechanism to achieve exactly-once on its side. I have some little questions and thoughts. I want to be on the same page with you guys through thinking about these problems. # When does the Flink notify the task CheckpointComplete if a job has both bounded and unbounded source? Because the job could not finish the finished tasks of the job could not be notified of the _JobFinished_ An option is that Flink needs to support triggering the checkpoint for a job that has the finished tasks and notifying the completion of the checkpoint. # When a slot could be released for the other task to use? If I understand correctly all the resources(included managed memory) should be released in the _dispose_ stage in the new design. So a task could not release any resource even after the task reports it is finished to JM if it needs to be notified of the _JobFinish_ As far as I know the JM could release slot when all the tasks in it are finished. This might lead to inconsistency. I am not pretty sure there are some specific cases for this. But I think it might be some potential risks in theory. # Flink needs to guarantee that the JobFinish event is received by all the tasks. Flink could not receive the acknowledgment of the JobFinish event from the task. There could be two situations. (The drain might have the same claim in some situations.) ## JobFinshed request/response is lost. Retrying JobFinished notification might resolve this problem. ## The task failed when handling the JobFinished event. So Flink could not receive the acknowledge. Flink could use the normal Failover Strategy and restart the task with the state that is snapshotted at the moment of the _endOfInput_. This could trigger another round _endOfInput_ and _JobFinished_. But I think this only works for the source that supports the checkpoint. (JM failover when notifying the JobFinish Event to the task. The new JM should notify the JobFinshed evet to all the tasks.) > User functions should be able to differentiate between successful close and > erroneous close > --- > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: usability > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable
[ https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17023396#comment-17023396 ] Guowei Ma commented on FLINK-10819: --- Hi, [~trohrmann] I could replace the scala version Deadline with Flink version Deadline. But I think this is not the root cause after I analyze the log https://api.travis-ci.org/v3/job/508500560/log.txt. However, I could not reproduce this bug in 1.9 version. Currently, no user reports this situation now. Maybe there is some change which fixes this bug or decreases the possibility. I want to know what the usual way approach for this not reproducing bug. Close it and reopen it when it happens again. Or keep it open. thanks > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is > unstable > - > > Key: FLINK-10819 > URL: https://issues.apache.org/jira/browse/FLINK-10819 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: Guowei Ma >Priority: Critical > Labels: test-stability > Attachments: image-2019-07-19-16-59-10-178.png, > image-2019-07-19-17-00-17-194.png, image-2019-07-19-17-01-19-758.png > > > Found the following error in the process of CI: > Results : > Tests in error: > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » > IllegalArgument > Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29 > 18:40:55.828 [INFO] > > 18:40:55.829 [INFO] BUILD FAILURE > 18:40:55.829 [INFO] > > 18:40:55.830 [INFO] Total time: 30:19 min > 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00 > 18:40:56.294 [INFO] Final Memory: 92M/678M > 18:40:56.294 [INFO] > > 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be > activated because it does not exist. > 18:40:56.295 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test > (integration-tests) on project flink-tests_2.11: There are test failures. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] Please refer to > /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports > for the individual test results. > 18:40:56.295 [ERROR] -> [Help 1] > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven > with the -e switch. > 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug > logging. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] For more information about the errors and possible > solutions, please read the following articles: > 18:40:56.295 [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > MVN exited with EXIT CODE: 1. > Trying to KILL watchdog (11329). > ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog > PRODUCED build artifacts. > But after the rerun, the error disappeared. > Currently,no specific reasons are found, and will continue to pay attention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-2646) User functions should be able to differentiate between successful close and erroneous close
[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17023777#comment-17023777 ] Guowei Ma commented on FLINK-2646: -- Thanks [~kkl0u] for your detailed comment on the difference between the CP and EOI. That is also what I was thinking. If there is no difference between them at all we might not need to expose the EOI interface to the DataStream user for this scenario. For example, the user could finalize in-progress file in the dispose stage. (Sometime user could cancel the job and use the sink result). Of course, this could introduce some of the cost for only a canceling job scenario and I think the cost might be the same in the failover scenarios. Do you have some concerns about this? Enlighten me please if I missing something. > User functions should be able to differentiate between successful close and > erroneous close > --- > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: usability > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable
[ https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma closed FLINK-10819. - > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is > unstable > - > > Key: FLINK-10819 > URL: https://issues.apache.org/jira/browse/FLINK-10819 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: Guowei Ma >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.10.0 > > Attachments: image-2019-07-19-16-59-10-178.png, > image-2019-07-19-17-00-17-194.png, image-2019-07-19-17-01-19-758.png > > Time Spent: 20m > Remaining Estimate: 0h > > Found the following error in the process of CI: > Results : > Tests in error: > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » > IllegalArgument > Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29 > 18:40:55.828 [INFO] > > 18:40:55.829 [INFO] BUILD FAILURE > 18:40:55.829 [INFO] > > 18:40:55.830 [INFO] Total time: 30:19 min > 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00 > 18:40:56.294 [INFO] Final Memory: 92M/678M > 18:40:56.294 [INFO] > > 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be > activated because it does not exist. > 18:40:56.295 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test > (integration-tests) on project flink-tests_2.11: There are test failures. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] Please refer to > /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports > for the individual test results. > 18:40:56.295 [ERROR] -> [Help 1] > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven > with the -e switch. > 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug > logging. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] For more information about the errors and possible > solutions, please read the following articles: > 18:40:56.295 [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > MVN exited with EXIT CODE: 1. > Trying to KILL watchdog (11329). > ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog > PRODUCED build artifacts. > But after the rerun, the error disappeared. > Currently,no specific reasons are found, and will continue to pay attention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-2646) User functions should be able to differentiate between successful close and erroneous close
[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17024305#comment-17024305 ] Guowei Ma commented on FLINK-2646: -- Hi, [~kkl0u] >From Stephan and your reply, I have an assumption: Flink could always trigger >a checkpoint at the end of input and notified the checkpoint complete to all >the UDF if they need it in whatever situation. So 1. For the stop-with-checkpoint scenario, we could choose another implementation that commits the part of the last in-progress file in the dispose stage. 2. For the non-2PC and normal UDFs, they could implement the CheckpointedFunction interface. Generally, I think the EOI interface is a very clear definition. However, I just want to know Flink introduces the EOI interface to the DataStream is for clarifying the concept or there are some scenarios that could not be satisfied with the current interfaces if the assumption is true. > User functions should be able to differentiate between successful close and > erroneous close > --- > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: usability > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-2646) User functions should be able to differentiate between successful close and erroneous close
[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17024938#comment-17024938 ] Guowei Ma commented on FLINK-2646: -- Share some thoughts about who should trigger the last checkpoint in the bounded stream scenario: The _CheckpointCoordiantor_ could trigger the last checkpoint. However, I think that it has some advantages that the task triggers the last checkpoint in this scenario. Of course, it is the responsibility that the _CheckpointCoordinator_ should notify the completion of the checkpoint in both two options. # There would be fewer RPC calls ## The first option (_CheckpointCoodinator_ triggers the last checkpoint) ### Task->JM: Task is finished ### JM -> _CheckpointCoordinator_ -> Task: Trigger the last checkpoint. ### Task -> _CheckpointCoordinator_: Acknowledge the last checkpoint. ### _CheckpointCoodinator_ -> Task: Confirm the completion of the last checkpoint. ## Second Option (Task triggers the last checkpoint) ### Task -> _CheckpointCoordinator_: Task acknowledges the last checkpoint when it reaches the end of input. ### _CheckpointCoordinator_ -> Task: Confirm the completion of the last checkpoint ### Task -> JM: Task is finished. ## The second option is at least once less RPC call than the first option. Actually I think the step of a.i is not intuitive that _CheckpointCoordinator_ sends RPC call to the Finished tasks. If Flink introduces another state such as End_Of_Input to the ExecutionState, there would another more RPC call. # Release the resource is simpler. The _CheckpointCoodinator_ should notify _Scheduler_ of the completion of the last checkpoint then the _Scheduler_ could release the slot properly in the first option. The second option does not need to do anything special for releasing the resource properly. # Easier to deal with the AM failover scenario. Currently, if a job reaches the FINISHED status its’ JobGraph would be removed from the JobGraphStore. So when a new AM grants the leadership it would not re-summitted the Job. We should only remove the JobGraph when the CheckpointCoordinator confirms the notification of last-checkpoint is done in the first option. In the second option, we could do nothing. In the drain scenario, Flink should trigger the checkpoint from the CheckpointCoordiantor. > User functions should be able to differentiate between successful close and > erroneous close > --- > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: usability > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-2646) User functions should be able to differentiate between successful close and erroneous close
[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17024938#comment-17024938 ] Guowei Ma edited comment on FLINK-2646 at 1/28/20 9:41 AM: --- Share some thoughts about who should trigger the last checkpoint in the bounded stream scenario: The _CheckpointCoordiantor_ could trigger the last checkpoint. However, I think that it has some advantages that the task triggers the last checkpoint in this scenario. Of course, it is the responsibility that the _CheckpointCoordinator_ should notify the completion of the checkpoint in both two options. # There would be fewer RPC calls ## The first option (_CheckpointCoodinator_ triggers the last checkpoint) ### Task->JM: Task is finished ### JM -> _CheckpointCoordinator_ -> Task: Trigger the last checkpoint. ### Task -> _CheckpointCoordinator_: Acknowledge the last checkpoint. ### _CheckpointCoodinator_ -> Task: Confirm the completion of the last checkpoint. ## Second Option (Task triggers the last checkpoint) ### Task -> _CheckpointCoordinator_: Task acknowledges the last checkpoint when it reaches the end of input. ### _CheckpointCoordinator_ -> Task: Confirm the completion of the last checkpoint ### Task -> JM: Task is finished. ## The second option is at least once less RPC call than the first option. Actually I think the step of 1.1.1 is not intuitive that _CheckpointCoordinator_ sends RPC call to the Finished tasks. If Flink introduces another state such as End_Of_Input to the ExecutionState, there would another more RPC call. # Release the resource is simpler. The _CheckpointCoodinator_ should notify _Scheduler_ of the completion of the last checkpoint then the _Scheduler_ could release the slot properly in the first option. The second option does not need to do anything special for releasing the resource properly. # Easier to deal with the AM failover scenario. Currently, if a job reaches the FINISHED status its’ JobGraph would be removed from the JobGraphStore. So when a new AM grants the leadership it would not re-summitted the Job. We should only remove the JobGraph when the CheckpointCoordinator confirms the notification of last-checkpoint is done in the first option. In the second option, we could do nothing. In the drain scenario, Flink should trigger the checkpoint from the CheckpointCoordiantor. was (Author: maguowei): Share some thoughts about who should trigger the last checkpoint in the bounded stream scenario: The _CheckpointCoordiantor_ could trigger the last checkpoint. However, I think that it has some advantages that the task triggers the last checkpoint in this scenario. Of course, it is the responsibility that the _CheckpointCoordinator_ should notify the completion of the checkpoint in both two options. # There would be fewer RPC calls ## The first option (_CheckpointCoodinator_ triggers the last checkpoint) ### Task->JM: Task is finished ### JM -> _CheckpointCoordinator_ -> Task: Trigger the last checkpoint. ### Task -> _CheckpointCoordinator_: Acknowledge the last checkpoint. ### _CheckpointCoodinator_ -> Task: Confirm the completion of the last checkpoint. ## Second Option (Task triggers the last checkpoint) ### Task -> _CheckpointCoordinator_: Task acknowledges the last checkpoint when it reaches the end of input. ### _CheckpointCoordinator_ -> Task: Confirm the completion of the last checkpoint ### Task -> JM: Task is finished. ## The second option is at least once less RPC call than the first option. Actually I think the step of a.i is not intuitive that _CheckpointCoordinator_ sends RPC call to the Finished tasks. If Flink introduces another state such as End_Of_Input to the ExecutionState, there would another more RPC call. # Release the resource is simpler. The _CheckpointCoodinator_ should notify _Scheduler_ of the completion of the last checkpoint then the _Scheduler_ could release the slot properly in the first option. The second option does not need to do anything special for releasing the resource properly. # Easier to deal with the AM failover scenario. Currently, if a job reaches the FINISHED status its’ JobGraph would be removed from the JobGraphStore. So when a new AM grants the leadership it would not re-summitted the Job. We should only remove the JobGraph when the CheckpointCoordinator confirms the notification of last-checkpoint is done in the first option. In the second option, we could do nothing. In the drain scenario, Flink should trigger the checkpoint from the CheckpointCoordiantor. > User functions should be able to differentiate between successful close and > erroneous close > --- > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 >
[jira] [Created] (FLINK-15784) Show the Watermark metrics of source node in the web UI.
Guowei Ma created FLINK-15784: - Summary: Show the Watermark metrics of source node in the web UI. Key: FLINK-15784 URL: https://issues.apache.org/jira/browse/FLINK-15784 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Reporter: Guowei Ma Currently, the web UI only shows the watermark of the non-source node. It might mislead some users that the source could not “receive” the watermark.[1] So it would be more consistent to show all the watermarks metrics included the source node in the web UI. [1][https://lists.apache.org/thread.html/r0f565a425253afea67eb2cca8dc1b233b57f87bb71650681aa9b6731%40%3Cuser.flink.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15784) Show the Watermark metrics of source node in the web UI.
[ https://issues.apache.org/jira/browse/FLINK-15784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-15784: -- Labels: usability (was: ) > Show the Watermark metrics of source node in the web UI. > > > Key: FLINK-15784 > URL: https://issues.apache.org/jira/browse/FLINK-15784 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Reporter: Guowei Ma >Priority: Minor > Labels: usability > > Currently, the web UI only shows the watermark of the non-source node. It > might mislead some users that the source could not “receive” the watermark.[1] > So it would be more consistent to show all the watermarks metrics included > the source node in the web UI. > > [1][https://lists.apache.org/thread.html/r0f565a425253afea67eb2cca8dc1b233b57f87bb71650681aa9b6731%40%3Cuser.flink.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15786) Load connector code with separate classloader
Guowei Ma created FLINK-15786: - Summary: Load connector code with separate classloader Key: FLINK-15786 URL: https://issues.apache.org/jira/browse/FLINK-15786 Project: Flink Issue Type: Improvement Components: Runtime / Task Reporter: Guowei Ma Currently, connector code can be seen as part of user code. Usually, users only need to add the corresponding connector as a dependency and package it in the user jar. This is convenient enough. However, connectors usually need to interact with external systems and often introduce heavy dependencies, there is a high possibility of a class conflict of different connectors or the user code of the same job. For example, every one or two weeks, we will receive issue reports relevant with connector class conflict from our users. The problem can get worse when users want to analyze data from different sources and write output to different sinks. Using separate classloader to load the different connector code could resolve the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15786) Load connector code with separate classloader
[ https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-15786: -- Labels: usability (was: ) > Load connector code with separate classloader > - > > Key: FLINK-15786 > URL: https://issues.apache.org/jira/browse/FLINK-15786 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Guowei Ma >Priority: Major > Labels: usability > > Currently, connector code can be seen as part of user code. Usually, users > only need to add the corresponding connector as a dependency and package it > in the user jar. This is convenient enough. > However, connectors usually need to interact with external systems and often > introduce heavy dependencies, there is a high possibility of a class conflict > of different connectors or the user code of the same job. For example, every > one or two weeks, we will receive issue reports relevant with connector class > conflict from our users. The problem can get worse when users want to analyze > data from different sources and write output to different sinks. > Using separate classloader to load the different connector code could resolve > the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17584) disableAutoTypeRegistration option does not work with Streaming API, only with Batch
[ https://issues.apache.org/jira/browse/FLINK-17584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17107882#comment-17107882 ] Guowei Ma commented on FLINK-17584: --- Thanks for opening this issue. 1. I think the autoTypeRegistrationEnabled is only used for the Dataset to register the type of output of all operators to Kryo. However, DataStream would use the TypeExtractor to create their output/state’s serializer and Kryo just a fallback. So I think it is reasonable that DataStream API does not handle autoTypeRegistrationEnabled. 2. >[https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java#L90] This would not register all types. The DataStream's type extract stack would not call the `registerType` at all. 3. I think we maybe we could do some improvements for the comments of autoTypeRegistrationEnabled. But when Flink replace DataSet with DataStream these issues would be gone. > disableAutoTypeRegistration option does not work with Streaming API, only > with Batch > > > Key: FLINK-17584 > URL: https://issues.apache.org/jira/browse/FLINK-17584 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.10.0 >Reporter: Yaron Shani >Priority: Minor > > Hey, > There is a feature called disableAutoTypeRegistration which is, from my > understanding, should disable the auto-loading classes into Kryo. It seems to > work on the Batch API, but I don't see any reference into the DataStream > code, and it does not work there. Is it by design? If so, I think its better > if it would state it clearly. If not, can I suggest a fix? Something like > this: > > {code:java} > @Override > @PublicEvolving > public TypeSerializer createSerializer(ExecutionConfig config) { >if (config.hasGenericTypesDisabled()) { > throw new UnsupportedOperationException( > "Generic types have been disabled in the ExecutionConfig and type " > + this.typeClass.getName() + > " is treated as a generic type."); >} >if(config.isAutoTypeRegistrationDisabled()) { > if(!config.getRegisteredKryoTypes().contains(this.typeClass)) { > throw new UnsupportedOperationException( > "Auto type registration (disableAutoTypeRegistration) have been > enabled in the ExecutionConfig and type " + this.typeClass.getName() + >" is treated as a auto type."); > } >} >return new KryoSerializer(this.typeClass, config); > } > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17900) Listener for when Files are committed in StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-17900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17115629#comment-17115629 ] Guowei Ma commented on FLINK-17900: --- Hi, [~maccamlc] # Currently, there is no public API that could let you register your own listener. # Do you need to be notified every time when a file is committed? or to only be notified when a bucket is updated? thanks. > Listener for when Files are committed in StreamingFileSink > -- > > Key: FLINK-17900 > URL: https://issues.apache.org/jira/browse/FLINK-17900 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.10.0 >Reporter: Matthew McMahon >Priority: Major > > I have a scenario where once a file has been committed to S3 using the > StreamingFileSink (I am using Java), that I should notify some downstream > services. > The idea is to produce a message on a Kafka topic for the files as they are > finalized. > I am currently looking into how this might be possible, and considering using > reflection and/or checking the S3 bucket before/after the checkpoint is > committed. > Still trying to find a suitable way. > However I was thinking it would be great if possible to register a listener > that can be fired when StreamingFileSink commits a file. > Does something like this exist, that I'm not aware of (new to flink) or could > it be added? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
[ https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991190#comment-16991190 ] Guowei Ma commented on FLINK-13993: --- [~tison] Sorry for the late reply. I think your method could work. But the method leads that the directory structure of TM&JM is almost the same in every mode. [~liyu] yes I would do it. > Using FlinkUserCodeClassLoaders to load the user class in the perjob mode > - > > Key: FLINK-13993 > URL: https://issues.apache.org/jira/browse/FLINK-13993 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Guowei Ma >Assignee: Guowei Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Original Estimate: 30h > Time Spent: 20m > Remaining Estimate: 29h 40m > > Currently, Flink has the FlinkUserCodeClassLoader, which is using to load > user’s class. However, the user class and the system class are all loaded by > the system classloader in the perjob mode. This introduces some conflicts. > This document[1] gives a proposal that makes the FlinkUserClassLoader load > the user class in perjob mode. (disscuss with Till[2]) > > [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7] > [2] > [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
[ https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma closed FLINK-13993. - Resolution: Fixed > Using FlinkUserCodeClassLoaders to load the user class in the perjob mode > - > > Key: FLINK-13993 > URL: https://issues.apache.org/jira/browse/FLINK-13993 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Guowei Ma >Assignee: Guowei Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Original Estimate: 30h > Time Spent: 20m > Remaining Estimate: 29h 40m > > Currently, Flink has the FlinkUserCodeClassLoader, which is using to load > user’s class. However, the user class and the system class are all loaded by > the system classloader in the perjob mode. This introduces some conflicts. > This document[1] gives a proposal that makes the FlinkUserClassLoader load > the user class in perjob mode. (disscuss with Till[2]) > > [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7] > [2] > [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
[ https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991190#comment-16991190 ] Guowei Ma edited comment on FLINK-13993 at 12/9/19 7:01 AM: [~tison] Sorry for the late reply. I think your method could work. But the benefit of my method could lead that the directory structure of TM&JM is almost the same in every mode. [~liyu] yes I would do it. was (Author: maguowei): [~tison] Sorry for the late reply. I think your method could work. But the method leads that the directory structure of TM&JM is almost the same in every mode. [~liyu] yes I would do it. > Using FlinkUserCodeClassLoaders to load the user class in the perjob mode > - > > Key: FLINK-13993 > URL: https://issues.apache.org/jira/browse/FLINK-13993 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0 >Reporter: Guowei Ma >Assignee: Guowei Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Original Estimate: 30h > Time Spent: 20m > Remaining Estimate: 29h 40m > > Currently, Flink has the FlinkUserCodeClassLoader, which is using to load > user’s class. However, the user class and the system class are all loaded by > the system classloader in the perjob mode. This introduces some conflicts. > This document[1] gives a proposal that makes the FlinkUserClassLoader load > the user class in perjob mode. (disscuss with Till[2]) > > [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7] > [2] > [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16262) Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory
[ https://issues.apache.org/jira/browse/FLINK-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045377#comment-17045377 ] Guowei Ma commented on FLINK-16262: --- Hi, [~jkreileder] could you provide the version of Kafka connector? > Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib > directory > --- > > Key: FLINK-16262 > URL: https://issues.apache.org/jira/browse/FLINK-16262 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Deployment / Docker >Affects Versions: 1.10.0 > Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 > build (nothing changed regarding Kafka and/or class loading). >Reporter: Jürgen Kreileder >Priority: Blocker > Fix For: 1.10.1, 1.11.0 > > > We're using Docker images modeled after > [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile] > (using Java 11) > When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the > taskmanager startup fails with: > {code:java} > 2020-02-24 18:25:16.389 INFO o.a.f.r.t.Task Create > Case Fixer -> Sink: Findings local-krei04-kba-digitalweb-uc1 (1/1) > (72f7764c6f6c614e5355562ed3d27209) switched from RUNNING to FAILED. > org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718) > at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:396) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:326) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:76) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107) > at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown > Source) > at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown > Source) > at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) > at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source) > at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source) > at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown > Source) > at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) > at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) > at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown > Source){code} > This looks like a class loading issue: If I copy our JAR to FLINK_LIB_DIR > instead of FLINK_USR_LIB_DIR, everything works fine. > (AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR) > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-16262) Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory
[ https://issues.apache.org/jira/browse/FLINK-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045377#comment-17045377 ] Guowei Ma edited comment on FLINK-16262 at 2/26/20 11:30 AM: - Hi, [~jkreileder] could you provide the version of Kafka connector and the files in the lib and usrlib directories? was (Author: maguowei): Hi, [~jkreileder] could you provide the version of Kafka connector? > Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib > directory > --- > > Key: FLINK-16262 > URL: https://issues.apache.org/jira/browse/FLINK-16262 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Deployment / Docker >Affects Versions: 1.10.0 > Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 > build (nothing changed regarding Kafka and/or class loading). >Reporter: Jürgen Kreileder >Priority: Blocker > Fix For: 1.10.1, 1.11.0 > > > We're using Docker images modeled after > [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile] > (using Java 11) > When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the > taskmanager startup fails with: > {code:java} > 2020-02-24 18:25:16.389 INFO o.a.f.r.t.Task Create > Case Fixer -> Sink: Findings local-krei04-kba-digitalweb-uc1 (1/1) > (72f7764c6f6c614e5355562ed3d27209) switched from RUNNING to FAILED. > org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718) > at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:396) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:326) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:76) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107) > at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown > Source) > at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown > Source) > at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) > at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source) > at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source) > at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown > Source) > at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) > at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) > at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown > Source){code} > This looks like a class loading issue: If I copy our JAR to FLINK_LIB_DIR > instead of FLINK_USR_LIB_DIR, everything works fine. > (AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR) > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-16262) Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory
[ https://issues.apache.org/jira/browse/FLINK-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045377#comment-17045377 ] Guowei Ma edited comment on FLINK-16262 at 2/26/20 11:45 AM: - Hi, [~jkreileder] could you provide the version of Kafka connector and list the files in the lib and usrlib directories in the container? was (Author: maguowei): Hi, [~jkreileder] could you provide the version of Kafka connector and the files in the lib and usrlib directories? > Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib > directory > --- > > Key: FLINK-16262 > URL: https://issues.apache.org/jira/browse/FLINK-16262 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Deployment / Docker >Affects Versions: 1.10.0 > Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 > build (nothing changed regarding Kafka and/or class loading). >Reporter: Jürgen Kreileder >Priority: Blocker > Fix For: 1.10.1, 1.11.0 > > > We're using Docker images modeled after > [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile] > (using Java 11) > When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the > taskmanager startup fails with: > {code:java} > 2020-02-24 18:25:16.389 INFO o.a.f.r.t.Task Create > Case Fixer -> Sink: Findings local-krei04-kba-digitalweb-uc1 (1/1) > (72f7764c6f6c614e5355562ed3d27209) switched from RUNNING to FAILED. > org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718) > at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:396) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:326) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:76) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107) > at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown > Source) > at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown > Source) > at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) > at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source) > at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source) > at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown > Source) > at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) > at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) > at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown > Source){code} > This looks like a class loading issue: If I copy our JAR to FLINK_LIB_DIR > instead of FLINK_USR_LIB_DIR, everything works fine. > (AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR) > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-16262) Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory
[ https://issues.apache.org/jira/browse/FLINK-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045377#comment-17045377 ] Guowei Ma edited comment on FLINK-16262 at 2/26/20 11:59 AM: - Hi, [~jkreileder] could you provide more information? such as # The version of Kafka connector # List the files in the lib and usrlib directories in the container. # Do you change the docker-entrypoint.sh ? was (Author: maguowei): Hi, [~jkreileder] could you provide the version of Kafka connector and list the files in the lib and usrlib directories in the container? > Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib > directory > --- > > Key: FLINK-16262 > URL: https://issues.apache.org/jira/browse/FLINK-16262 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Deployment / Docker >Affects Versions: 1.10.0 > Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 > build (nothing changed regarding Kafka and/or class loading). >Reporter: Jürgen Kreileder >Priority: Blocker > Fix For: 1.10.1, 1.11.0 > > > We're using Docker images modeled after > [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile] > (using Java 11) > When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the > taskmanager startup fails with: > {code:java} > 2020-02-24 18:25:16.389 INFO o.a.f.r.t.Task Create > Case Fixer -> Sink: Findings local-krei04-kba-digitalweb-uc1 (1/1) > (72f7764c6f6c614e5355562ed3d27209) switched from RUNNING to FAILED. > org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718) > at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:396) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:326) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:76) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107) > at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown > Source) > at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown > Source) > at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) > at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source) > at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source) > at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown > Source) > at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) > at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) > at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown > Source){code} > This looks like a class loading issue: If I copy our JAR to FLINK_LIB_DIR > instead of FLINK_USR_LIB_DIR, everything works fine. > (AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR) > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-2336) ArrayIndexOufOBoundsException in TypeExtractor when mapping
[ https://issues.apache.org/jira/browse/FLINK-2336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045473#comment-17045473 ] Guowei Ma commented on FLINK-2336: -- OK I could follow this. > ArrayIndexOufOBoundsException in TypeExtractor when mapping > --- > > Key: FLINK-2336 > URL: https://issues.apache.org/jira/browse/FLINK-2336 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 0.10.0 >Reporter: William Saar >Priority: Critical > Labels: usability > Fix For: 1.10.1, 1.11.0 > > > The line that causes this is > DataStream outputStream = insideIterationStream.filter(outputFilter).map(m > -> m.outputMessage); > Problem occurs both when compiled using Javac and Eclipse's JDT compiler (in > an environment where simple lambda type tests work) > Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1 > at java.util.ArrayList.elementData(Unknown Source) > at java.util.ArrayList.get(Unknown Source) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:553) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:468) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:370) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:254) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:91) > at > org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:605) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-2336) ArrayIndexOufOBoundsException in TypeExtractor when mapping
[ https://issues.apache.org/jira/browse/FLINK-2336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045473#comment-17045473 ] Guowei Ma edited comment on FLINK-2336 at 2/26/20 12:03 PM: OK I could follow this. [~sewen] was (Author: maguowei): OK I could follow this. > ArrayIndexOufOBoundsException in TypeExtractor when mapping > --- > > Key: FLINK-2336 > URL: https://issues.apache.org/jira/browse/FLINK-2336 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 0.10.0 >Reporter: William Saar >Priority: Critical > Labels: usability > Fix For: 1.10.1, 1.11.0 > > > The line that causes this is > DataStream outputStream = insideIterationStream.filter(outputFilter).map(m > -> m.outputMessage); > Problem occurs both when compiled using Javac and Eclipse's JDT compiler (in > an environment where simple lambda type tests work) > Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1 > at java.util.ArrayList.elementData(Unknown Source) > at java.util.ArrayList.get(Unknown Source) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:553) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:468) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:370) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:254) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:91) > at > org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:605) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-2336) ArrayIndexOufOBoundsException in TypeExtractor when mapping
[ https://issues.apache.org/jira/browse/FLINK-2336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045593#comment-17045593 ] Guowei Ma commented on FLINK-2336: -- The reason why the first test throws IndexOutOfBoundsException is that when using the input to infer the output type, the type hierarchy array length is 0. In this case, null should be returned in advance(TypeExtractor.java:956). The reason why the next two tests did not throw an exception is that the types of these two lambdas have been erased, which is different from the branch the first test took. A simple modification is to add a line in TypeExtractor.java:956 to determine if the length of inputTypeHierarchy is 0 and return null in advance. Theoretically speaking, the first type of the lambda is not type erased, so we could infer the output through Input. In other words, we can build a custom ParameterizedType with rawtype = MapFunction.class and acutalTypeArguments as the lambda input and output. However, this requires the introduction of a custom ParameterizedType, and I feel we can wait until the TypeExtractor is refactored. What do you think [~sewen] > ArrayIndexOufOBoundsException in TypeExtractor when mapping > --- > > Key: FLINK-2336 > URL: https://issues.apache.org/jira/browse/FLINK-2336 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 0.10.0 >Reporter: William Saar >Priority: Critical > Labels: usability > Fix For: 1.10.1, 1.11.0 > > > The line that causes this is > DataStream outputStream = insideIterationStream.filter(outputFilter).map(m > -> m.outputMessage); > Problem occurs both when compiled using Javac and Eclipse's JDT compiler (in > an environment where simple lambda type tests work) > Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1 > at java.util.ArrayList.elementData(Unknown Source) > at java.util.ArrayList.get(Unknown Source) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:553) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:468) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:370) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:254) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:91) > at > org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:605) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16262) Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory
[ https://issues.apache.org/jira/browse/FLINK-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045695#comment-17045695 ] Guowei Ma commented on FLINK-16262: --- 1. The direct cause of this problem should be that FlinkKafkaProducer has been using system classloader instead of user classloader in _abortTransactions_ in whatever situation. 2. FlinkKafkaProducer: line 1098 _transactionalIds.parallelStream().forEach(.)_. This would use a static thread pool and the context Classloader of this thread pool should be the system. I think this is a FlinkKafkaProducer bug that should be fixed. I would contact the committer to double-check it tomorrow. Thanks for your reporting. > Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib > directory > --- > > Key: FLINK-16262 > URL: https://issues.apache.org/jira/browse/FLINK-16262 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Deployment / Docker >Affects Versions: 1.10.0 > Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 > build (nothing changed regarding Kafka and/or class loading). >Reporter: Jürgen Kreileder >Priority: Blocker > Fix For: 1.10.1, 1.11.0 > > > We're using Docker images modeled after > [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile] > (using Java 11) > When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the > taskmanager startup fails with: > {code:java} > 2020-02-24 18:25:16.389 INFO o.a.f.r.t.Task Create > Case Fixer -> Sink: Findings local-krei04-kba-digitalweb-uc1 (1/1) > (72f7764c6f6c614e5355562ed3d27209) switched from RUNNING to FAILED. > org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718) > at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:396) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:326) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:76) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107) > at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown > Source) > at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown > Source) > at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) > at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source) > at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source) > at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown > Source) > at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) > at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) > at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown > Source){code} > This looks like a class loading issue: If I copy our JAR to FLINK_LIB_DIR > instead of FLINK_USR_LIB_DIR, everything works fine. > (AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR) > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15786) Load connector code with separate classloader
[ https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17046423#comment-17046423 ] Guowei Ma commented on FLINK-15786: --- [~fly_in_gis] Yes. But, there are some differences between FileSystem and Connector such as # The DataStream user could new a connector object directly # The Source/Sink operator would be deserialized at TM sided. # The Source operator might construct a user class provided by the user at the runtime. In general, these differences would lead to change the FlinkUsercodeClassloader. For example, FlinkUsercodeClassLoader might need some extra connector classloaders, which could be used to find the connector and its depend on class. [~pnowojski] I might miss some scenarios, could you give me some of your concerns? Thanks you ! > Load connector code with separate classloader > - > > Key: FLINK-15786 > URL: https://issues.apache.org/jira/browse/FLINK-15786 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Guowei Ma >Priority: Major > Labels: usability > > Currently, connector code can be seen as part of user code. Usually, users > only need to add the corresponding connector as a dependency and package it > in the user jar. This is convenient enough. > However, connectors usually need to interact with external systems and often > introduce heavy dependencies, there is a high possibility of a class conflict > of different connectors or the user code of the same job. For example, every > one or two weeks, we will receive issue reports relevant with connector class > conflict from our users. The problem can get worse when users want to analyze > data from different sources and write output to different sinks. > Using separate classloader to load the different connector code could resolve > the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15786) Load connector code with separate classloader
[ https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17047229#comment-17047229 ] Guowei Ma commented on FLINK-15786: --- Thanks [~pnowojski] for your detailed explanation. I agree with you that we could change the DataStream API to postpone the construction of the source/sink operator. This could avoid the user to use the Plugin mechanism directly to create a source/sink object at the client-side for separating the connector class loader from the user code classloader. What I concern about is that what the relationship between the FlinkUserClassLoader and the connector Classloader. In theory, there might be interdependent scenarios such as FlinkKafkaPartitioner/KafkaConnector. This interface provided by the Kafka connector that the user could implement it. The implemented class is passed to the Kafka connector and would be constructed at runtime. One option for this scenario is to provide a classloader that knows both the user implemented FlinkKafkaPartitioner class and the Kafka connector class. We could call it XClassloader at first. Do you think that we need to resolve this scenario? A related open question is that we might not need to change(or add) the DataStream API for separating the connector classloader if we choose to use the XClassloader. > Load connector code with separate classloader > - > > Key: FLINK-15786 > URL: https://issues.apache.org/jira/browse/FLINK-15786 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Guowei Ma >Priority: Major > Labels: usability > > Currently, connector code can be seen as part of user code. Usually, users > only need to add the corresponding connector as a dependency and package it > in the user jar. This is convenient enough. > However, connectors usually need to interact with external systems and often > introduce heavy dependencies, there is a high possibility of a class conflict > of different connectors or the user code of the same job. For example, every > one or two weeks, we will receive issue reports relevant with connector class > conflict from our users. The problem can get worse when users want to analyze > data from different sources and write output to different sinks. > Using separate classloader to load the different connector code could resolve > the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)