[jira] [Commented] (FLINK-10469) FileChannel may not write the whole buffer in a single call to FileChannel.write(Buffer buffer)

2018-09-30 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633622#comment-16633622
 ] 

Guowei Ma commented on FLINK-10469:
---

+1

> FileChannel may not write the whole buffer in a single call to 
> FileChannel.write(Buffer buffer)
> ---
>
> Key: FLINK-10469
> URL: https://issues.apache.org/jira/browse/FLINK-10469
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.1, 1.4.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4, 1.6.2
>Reporter: Yun Gao
>Priority: Major
>
> Currently all the calls to _FileChannel.write(ByteBuffer src)_ assumes that 
> this method will not return before the whole buffer is written, like the one 
> in _AsynchronousFileIOChannel.write()._
>  
> However, this assumption may not be right for all the environments. We have 
> encountered the case that only part of a buffer was written on a cluster with 
> a high IO load, and the target file got messy. 
>  
> To fix this issue, I think we should add a utility method in the 
> org.apache.flink.util.IOUtils to ensure the whole buffer is written with a 
> loop,and replace all the calls to _FileChannel.write(ByteBuffer)_ with this 
> new method. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-06 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-13993:
-

 Summary: Using FlinkUserCodeClassLoaders to load the user class in 
the perjob mode
 Key: FLINK-13993
 URL: https://issues.apache.org/jira/browse/FLINK-13993
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.9.0, 1.10.0
Reporter: Guowei Ma


Currently, Flink has the FlinkUserCodeClassLoader, which is using to load 
user’s class. However, the user class and the system class are all loaded by 
the system classloader in the perjob mode. This introduces some conflicts.

This document[1] gives a proposal that makes the FlinkUserClassLoader load the 
user class in perjob mode. (disscuss with Till[2])

 

[1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7]

[2] 
[https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-06 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-13993:
--
Fix Version/s: 1.10.0

> Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
> -
>
> Key: FLINK-13993
> URL: https://issues.apache.org/jira/browse/FLINK-13993
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Guowei Ma
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently, Flink has the FlinkUserCodeClassLoader, which is using to load 
> user’s class. However, the user class and the system class are all loaded by 
> the system classloader in the perjob mode. This introduces some conflicts.
> This document[1] gives a proposal that makes the FlinkUserClassLoader load 
> the user class in perjob mode. (disscuss with Till[2])
>  
> [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7]
> [2] 
> [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Closed] (FLINK-13840) Let StandaloneJobClusterEntrypoint use user code class loader

2019-09-11 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma closed FLINK-13840.
-
Resolution: Duplicate

> Let StandaloneJobClusterEntrypoint use user code class loader
> -
>
> Key: FLINK-13840
> URL: https://issues.apache.org/jira/browse/FLINK-13840
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.10.0
>
>
> In order to resolve class loading issues when using the 
> {{StandaloneJobClusterEntryPoint}}, it would be better to run the user code 
> in the user code class loader which supports child first class loading. At 
> the moment, the user code jar is part of the system class path and, hence, 
> part of the system class loader.
> An easy way to solve this problem would be to place the user code in a 
> different directory than {{lib}} and then specify this path as an additional 
> classpath when creating the {{PackagedProgram}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-11 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16927484#comment-16927484
 ] 

Guowei Ma commented on FLINK-13993:
---

Thanks for reminding me. I'll close it. [~till.rohrmann]

> Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
> -
>
> Key: FLINK-13993
> URL: https://issues.apache.org/jira/browse/FLINK-13993
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Guowei Ma
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently, Flink has the FlinkUserCodeClassLoader, which is using to load 
> user’s class. However, the user class and the system class are all loaded by 
> the system classloader in the perjob mode. This introduces some conflicts.
> This document[1] gives a proposal that makes the FlinkUserClassLoader load 
> the user class in perjob mode. (disscuss with Till[2])
>  
> [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7]
> [2] 
> [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-16 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930613#comment-16930613
 ] 

Guowei Ma commented on FLINK-13993:
---

I will do it. thanks. [~till.rohrmann]

> Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
> -
>
> Key: FLINK-13993
> URL: https://issues.apache.org/jira/browse/FLINK-13993
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently, Flink has the FlinkUserCodeClassLoader, which is using to load 
> user’s class. However, the user class and the system class are all loaded by 
> the system classloader in the perjob mode. This introduces some conflicts.
> This document[1] gives a proposal that makes the FlinkUserClassLoader load 
> the user class in perjob mode. (disscuss with Till[2])
>  
> [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7]
> [2] 
> [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-18 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-13993:
--
Remaining Estimate: 168h
 Original Estimate: 168h

> Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
> -
>
> Key: FLINK-13993
> URL: https://issues.apache.org/jira/browse/FLINK-13993
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>Priority: Major
> Fix For: 1.10.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Currently, Flink has the FlinkUserCodeClassLoader, which is using to load 
> user’s class. However, the user class and the system class are all loaded by 
> the system classloader in the perjob mode. This introduces some conflicts.
> This document[1] gives a proposal that makes the FlinkUserClassLoader load 
> the user class in perjob mode. (disscuss with Till[2])
>  
> [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7]
> [2] 
> [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-19 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-13993:
--
Remaining Estimate: 144h  (was: 168h)
 Original Estimate: 144h  (was: 168h)

> Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
> -
>
> Key: FLINK-13993
> URL: https://issues.apache.org/jira/browse/FLINK-13993
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>Priority: Major
> Fix For: 1.10.0
>
>   Original Estimate: 144h
>  Remaining Estimate: 144h
>
> Currently, Flink has the FlinkUserCodeClassLoader, which is using to load 
> user’s class. However, the user class and the system class are all loaded by 
> the system classloader in the perjob mode. This introduces some conflicts.
> This document[1] gives a proposal that makes the FlinkUserClassLoader load 
> the user class in perjob mode. (disscuss with Till[2])
>  
> [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7]
> [2] 
> [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-19 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-13993:
--
Remaining Estimate: 48h  (was: 144h)
 Original Estimate: 48h  (was: 144h)

> Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
> -
>
> Key: FLINK-13993
> URL: https://issues.apache.org/jira/browse/FLINK-13993
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>Priority: Major
> Fix For: 1.10.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Currently, Flink has the FlinkUserCodeClassLoader, which is using to load 
> user’s class. However, the user class and the system class are all loaded by 
> the system classloader in the perjob mode. This introduces some conflicts.
> This document[1] gives a proposal that makes the FlinkUserClassLoader load 
> the user class in perjob mode. (disscuss with Till[2])
>  
> [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7]
> [2] 
> [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-19 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-13993:
--
Labels: pull-request-available  (was: )

> Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
> -
>
> Key: FLINK-13993
> URL: https://issues.apache.org/jira/browse/FLINK-13993
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Currently, Flink has the FlinkUserCodeClassLoader, which is using to load 
> user’s class. However, the user class and the system class are all loaded by 
> the system classloader in the perjob mode. This introduces some conflicts.
> This document[1] gives a proposal that makes the FlinkUserClassLoader load 
> the user class in perjob mode. (disscuss with Till[2])
>  
> [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7]
> [2] 
> [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-19 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-13993:
--
Remaining Estimate: 30h  (was: 48h)
 Original Estimate: 30h  (was: 48h)

> Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
> -
>
> Key: FLINK-13993
> URL: https://issues.apache.org/jira/browse/FLINK-13993
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>   Original Estimate: 30h
>  Remaining Estimate: 30h
>
> Currently, Flink has the FlinkUserCodeClassLoader, which is using to load 
> user’s class. However, the user class and the system class are all loaded by 
> the system classloader in the perjob mode. This introduces some conflicts.
> This document[1] gives a proposal that makes the FlinkUserClassLoader load 
> the user class in perjob mode. (disscuss with Till[2])
>  
> [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7]
> [2] 
> [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-19 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16933201#comment-16933201
 ] 

Guowei Ma commented on FLINK-13993:
---

Sorry, I change the time.

Actually, what I mean is one man week.

> Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
> -
>
> Key: FLINK-13993
> URL: https://issues.apache.org/jira/browse/FLINK-13993
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>   Original Estimate: 30h
>  Remaining Estimate: 30h
>
> Currently, Flink has the FlinkUserCodeClassLoader, which is using to load 
> user’s class. However, the user class and the system class are all loaded by 
> the system classloader in the perjob mode. This introduces some conflicts.
> This document[1] gives a proposal that makes the FlinkUserClassLoader load 
> the user class in perjob mode. (disscuss with Till[2])
>  
> [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7]
> [2] 
> [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-19 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16933201#comment-16933201
 ] 

Guowei Ma edited comment on FLINK-13993 at 9/19/19 9:25 AM:


Sorry [~trohrmann] , I change the time.

Actually, what I mean is one man week.


was (Author: maguowei):
Sorry, I change the time.

Actually, what I mean is one man week.

> Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
> -
>
> Key: FLINK-13993
> URL: https://issues.apache.org/jira/browse/FLINK-13993
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>   Original Estimate: 30h
>  Remaining Estimate: 30h
>
> Currently, Flink has the FlinkUserCodeClassLoader, which is using to load 
> user’s class. However, the user class and the system class are all loaded by 
> the system classloader in the perjob mode. This introduces some conflicts.
> This document[1] gives a proposal that makes the FlinkUserClassLoader load 
> the user class in perjob mode. (disscuss with Till[2])
>  
> [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7]
> [2] 
> [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-02 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16877524#comment-16877524
 ] 

Guowei Ma commented on FLINK-12852:
---

In long run there may be another way to avoid the dead lock. Task could spill 
some buffer to the disk and return the buffer to the local buffer pool. I think 
this could increase the utilization of memory in most time. What do you guys 
think?

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13132) Allow ClusterEntrypoints use user main method to generate job graph

2019-07-07 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880053#comment-16880053
 ] 

Guowei Ma commented on FLINK-13132:
---

This is a very interesting optimization, but I have two questions.


1)Why do you only change the yarn, according to your description, there should 
be more than a yarn in your business system. Have you optimized other systems? 
Could you say which systems are you switching between? Such as yarn, mesos, k8s

2) In addition, even if you change to ClassPathJobGraphRetriever, you need to 
upload the application and lib jar to the server and regenerate your JobGraph. 
Compared with the “client mode”, there is no advantage in time because there is 
no optimization of anything, and the user may not feel any difference. Or your 
business scenario has a certain speciality, could you make some further 
explanations?

> Allow ClusterEntrypoints use user main method to generate job graph
> ---
>
> Key: FLINK-13132
> URL: https://issues.apache.org/jira/browse/FLINK-13132
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0, 1.8.1
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>Priority: Minor
>
> We are building a service that can transparently deploy a job to different 
> cluster management systems, such as Yarn and another internal system. It is 
> very cost to download the jar and generate JobGraph in the client side. Thus, 
> I want to propose an improvement to make Yarn Entrypoints can be configurable 
> to use either FileJobGraphRetriever or ClassPathJobGraphRetriever. It is 
> actually a long asking TODO in AbstractionYarnClusterDescriptor in line 834.
> https://github.com/apache/flink/blob/21468e0050dc5f97de5cfe39885e0d3fd648e399/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L834



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13132) Allow ClusterEntrypoints use user main method to generate job graph

2019-07-11 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16883481#comment-16883481
 ] 

Guowei Ma commented on FLINK-13132:
---

[~suez1224] and [~ZhenqiuHuang]. Thanks for your detailed explanations. Your 
scenario makes sense to me.  
1. Except for the jars, how do you deal with the dependency of the user's jobs 
when you move from a cluster to another? For example, the checkpoint data of 
jobs and the source and sink of user's jobs. Will your deploy service also move 
those dependencies for the users?

2. In the HA scenario, we could use the ability of the SubmittedGraphStore to 
resolve the non-deterministic problem. Before generating the new JobGraph, 
Dispatcher could check the "SubmittedGraphStore" first then to decide whether 
to generate the JobGraph or not. I think [~till.rohrmann] could give more 
suggestions.
Thanks.

 

> Allow ClusterEntrypoints use user main method to generate job graph
> ---
>
> Key: FLINK-13132
> URL: https://issues.apache.org/jira/browse/FLINK-13132
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0, 1.8.1
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>Priority: Minor
>
> We are building a service that can transparently deploy a job to different 
> cluster management systems, such as Yarn and another internal system. It is 
> very cost to download the jar and generate JobGraph in the client side. Thus, 
> I want to propose an improvement to make Yarn Entrypoints can be configurable 
> to use either FileJobGraphRetriever or ClassPathJobGraphRetriever. It is 
> actually a long asking TODO in AbstractionYarnClusterDescriptor in line 834.
> https://github.com/apache/flink/blob/21468e0050dc5f97de5cfe39885e0d3fd648e399/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L834



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-14464) Introduce the 'AbstractUserClassPathJobGraphRetriever' which could construct the 'user code class path' from the "job" dir.

2019-10-20 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-14464:
-

 Summary: Introduce the 'AbstractUserClassPathJobGraphRetriever' 
which could construct the 'user code class path' from the "job" dir.
 Key: FLINK-14464
 URL: https://issues.apache.org/jira/browse/FLINK-14464
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.9.0, 1.10.0
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14465) Let StandaloneJobClusterEntrypoint use user code class loader

2019-10-20 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-14465:
-

 Summary: Let StandaloneJobClusterEntrypoint use user code class 
loader
 Key: FLINK-14465
 URL: https://issues.apache.org/jira/browse/FLINK-14465
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.9.0, 1.10.0
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14466) Let YarnJobClusterEntrypoint use user code class loader

2019-10-20 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-14466:
-

 Summary: Let YarnJobClusterEntrypoint use user code class loader
 Key: FLINK-14466
 URL: https://issues.apache.org/jira/browse/FLINK-14466
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.9.0, 1.10.0
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14467) Let MesosJobClusterEntrypoint use user code class loader

2019-10-20 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-14467:
-

 Summary: Let MesosJobClusterEntrypoint use user code class loader
 Key: FLINK-14467
 URL: https://issues.apache.org/jira/browse/FLINK-14467
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.9.0, 1.10.0
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13840) Let StandaloneJobClusterEntrypoint use user code class loader

2019-08-31 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920265#comment-16920265
 ] 

Guowei Ma commented on FLINK-13840:
---

As Till said, it is an easy way to add a different directory than lib to 
PackagedProgram.

I wrote a draft doc about how to deal with this. 

Any comments are welcome!

[https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit#]

> Let StandaloneJobClusterEntrypoint use user code class loader
> -
>
> Key: FLINK-13840
> URL: https://issues.apache.org/jira/browse/FLINK-13840
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.10.0
>
>
> In order to resolve class loading issues when using the 
> {{StandaloneJobClusterEntryPoint}}, it would be better to run the user code 
> in the user code class loader which supports child first class loading. At 
> the moment, the user code jar is part of the system class path and, hence, 
> part of the system class loader.
> An easy way to solve this problem would be to place the user code in a 
> different directory than {{lib}} and then specify this path as an additional 
> classpath when creating the {{PackagedProgram}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14991) Export `FLINK_HOME` environment variable to all the entrypoint

2019-11-29 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-14991:
-

 Summary: Export `FLINK_HOME` environment variable to all the 
entrypoint
 Key: FLINK-14991
 URL: https://issues.apache.org/jira/browse/FLINK-14991
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Guowei Ma


 Currently, Flink depends on 6 types of files: configuration files, system jars 
files, script files、library jar files, plugin jar files, and user jars files. 
These files are in different directories. 

Flink exports 5 environment variables to locate these different type files: 
`FLINK_CONF_DIR`,`FLINK_LIB_DIR`,`FLINK_OPT_DIR`,`FLINK_PLUGIN_DIR`,`FLINK_BIN_DIR`.

It is not a good style that exports an environment variable for every type of 
file.

So this jira proposes to export the `FLINK_HOME` environment variable to all 
the entrypoint. Derive the directory of the different type files from the 
`FLINK_HOME` environment variable and every type file has a fixed directory 
name.

 This also has another benefit that the method implies the directory structure 
is the same in all the situations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15032) Remove the eagerly serialization from `RemoteRpcInvocation`

2019-12-02 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-15032:
-

 Summary: Remove the eagerly serialization from 
`RemoteRpcInvocation` 
 Key: FLINK-15032
 URL: https://issues.apache.org/jira/browse/FLINK-15032
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Guowei Ma


    Currently, the constructor of `RemoteRpcInvocation` serializes the 
`parameterTypes` and `arg` of an RPC call. This could lead to two problems:
 # Consider a job that has 1k parallelism and has a 1m union list state. When 
deploying the 1k tasks, the eager serialization would use 1G memory 
instantly(Some time the serialization amplifies the memory usage). However, the 
serialized object is only used when the Akka sends the message.  So we could 
reduce the memory pressure if we only serialize the object when the message 
would be sent by the Akka.

 # Furthermore, Akka would serialize the message at last and all the XXXGateway 
related class could be visible by the RPC level. Because of that, I think the 
serialization in the constructor of `RemoteRpcInvocation` could be avoided. I 
also do a simple test and find this could reduce the time cost of the RPC call. 
The 1k number of  RPC calls with 1m `String` message:  The current version 
costs around 2700ms; the Nonserialization version cost about 37ms.

 

In summary, this Jira proposes to remove the serialization at the constructor 
of `RemoteRpcInvocation`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15032) Remove the eagerly serialization from `RemoteRpcInvocation`

2019-12-03 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-15032:
--
Description: 
    Currently, the constructor of `RemoteRpcInvocation` serializes the 
`parameterTypes` and `arg` of an RPC call. This could lead to a problem:

Consider a job that has 1k parallelism and has a 1m union list state. When 
deploying the 1k tasks, the eager serialization would use 1G memory 
instantly(Some time the serialization amplifies the memory usage). However, the 
serialized object is only used when the Akka sends the message.  So we could 
reduce the memory pressure if we only serialize the object when the message 
would be sent by the Akka.

Akka would serialize the message at last and all the XXXGateway related class 
could be visible by the RPC level. Because of that, I think the eager 
serialization in the constructor of `RemoteRpcInvocation` could be avoided. I 
also do a simple test and find this could reduce the time cost of the RPC call. 
The 1k number of  RPC calls with 1m `String` message:  The current version 
costs around 2700ms; the Nonserialization version cost about 37ms.

 

In summary, this Jira proposes to remove the serialization at the constructor 
of `RemoteRpcInvocation`.

  was:
    Currently, the constructor of `RemoteRpcInvocation` serializes the 
`parameterTypes` and `arg` of an RPC call. This could lead to two problems:
 # Consider a job that has 1k parallelism and has a 1m union list state. When 
deploying the 1k tasks, the eager serialization would use 1G memory 
instantly(Some time the serialization amplifies the memory usage). However, the 
serialized object is only used when the Akka sends the message.  So we could 
reduce the memory pressure if we only serialize the object when the message 
would be sent by the Akka.

 # Furthermore, Akka would serialize the message at last and all the XXXGateway 
related class could be visible by the RPC level. Because of that, I think the 
serialization in the constructor of `RemoteRpcInvocation` could be avoided. I 
also do a simple test and find this could reduce the time cost of the RPC call. 
The 1k number of  RPC calls with 1m `String` message:  The current version 
costs around 2700ms; the Nonserialization version cost about 37ms.

 

In summary, this Jira proposes to remove the serialization at the constructor 
of `RemoteRpcInvocation`.


> Remove the eagerly serialization from `RemoteRpcInvocation` 
> 
>
> Key: FLINK-15032
> URL: https://issues.apache.org/jira/browse/FLINK-15032
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Guowei Ma
>Priority: Major
>
>     Currently, the constructor of `RemoteRpcInvocation` serializes the 
> `parameterTypes` and `arg` of an RPC call. This could lead to a problem:
> Consider a job that has 1k parallelism and has a 1m union list state. When 
> deploying the 1k tasks, the eager serialization would use 1G memory 
> instantly(Some time the serialization amplifies the memory usage). However, 
> the serialized object is only used when the Akka sends the message.  So we 
> could reduce the memory pressure if we only serialize the object when the 
> message would be sent by the Akka.
> Akka would serialize the message at last and all the XXXGateway related class 
> could be visible by the RPC level. Because of that, I think the eager 
> serialization in the constructor of `RemoteRpcInvocation` could be avoided. I 
> also do a simple test and find this could reduce the time cost of the RPC 
> call. The 1k number of  RPC calls with 1m `String` message:  The current 
> version costs around 2700ms; the Nonserialization version cost about 37ms.
>  
> In summary, this Jira proposes to remove the serialization at the constructor 
> of `RemoteRpcInvocation`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15032) Remove the eager serialization from `RemoteRpcInvocation`

2019-12-03 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-15032:
--
Description: 
    Currently, the constructor of `RemoteRpcInvocation` serializes the 
`parameterTypes` and `arg` of an RPC call. This could lead to a problem:

Consider a job that has 1k parallelism and has a 1m union list state. When 
deploying the 1k tasks, the eager serialization would use 1G memory 
instantly(Some time the serialization amplifies the memory usage). However, the 
serialized object is only used when the Akka sends the message.  So we could 
reduce the memory pressure if we only serialize the object when the message 
would be sent by the Akka.

Akka would serialize the message at last and all the XXXGateway related class 
could be visible by the RPC level. Because of that, I think the eager 
serialization in the constructor of `RemoteRpcInvocation` could be avoided. I 
also do a simple test and find this could reduce the time cost of the RPC call. 
The 1k number of  RPC calls with 1m `String` message:  The current version 
costs around 2700ms; the Nonserialization version cost about 37ms.

 

In summary, this Jira proposes to remove the eager serialization at the 
constructor of `RemoteRpcInvocation`.

  was:
    Currently, the constructor of `RemoteRpcInvocation` serializes the 
`parameterTypes` and `arg` of an RPC call. This could lead to a problem:

Consider a job that has 1k parallelism and has a 1m union list state. When 
deploying the 1k tasks, the eager serialization would use 1G memory 
instantly(Some time the serialization amplifies the memory usage). However, the 
serialized object is only used when the Akka sends the message.  So we could 
reduce the memory pressure if we only serialize the object when the message 
would be sent by the Akka.

Akka would serialize the message at last and all the XXXGateway related class 
could be visible by the RPC level. Because of that, I think the eager 
serialization in the constructor of `RemoteRpcInvocation` could be avoided. I 
also do a simple test and find this could reduce the time cost of the RPC call. 
The 1k number of  RPC calls with 1m `String` message:  The current version 
costs around 2700ms; the Nonserialization version cost about 37ms.

 

In summary, this Jira proposes to remove the serialization at the constructor 
of `RemoteRpcInvocation`.


> Remove the eager serialization from `RemoteRpcInvocation` 
> --
>
> Key: FLINK-15032
> URL: https://issues.apache.org/jira/browse/FLINK-15032
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Guowei Ma
>Priority: Major
>
>     Currently, the constructor of `RemoteRpcInvocation` serializes the 
> `parameterTypes` and `arg` of an RPC call. This could lead to a problem:
> Consider a job that has 1k parallelism and has a 1m union list state. When 
> deploying the 1k tasks, the eager serialization would use 1G memory 
> instantly(Some time the serialization amplifies the memory usage). However, 
> the serialized object is only used when the Akka sends the message.  So we 
> could reduce the memory pressure if we only serialize the object when the 
> message would be sent by the Akka.
> Akka would serialize the message at last and all the XXXGateway related class 
> could be visible by the RPC level. Because of that, I think the eager 
> serialization in the constructor of `RemoteRpcInvocation` could be avoided. I 
> also do a simple test and find this could reduce the time cost of the RPC 
> call. The 1k number of  RPC calls with 1m `String` message:  The current 
> version costs around 2700ms; the Nonserialization version cost about 37ms.
>  
> In summary, this Jira proposes to remove the eager serialization at the 
> constructor of `RemoteRpcInvocation`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15032) Remove the eager serialization from `RemoteRpcInvocation`

2019-12-03 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-15032:
--
Summary: Remove the eager serialization from `RemoteRpcInvocation`   (was: 
Remove the eagerly serialization from `RemoteRpcInvocation` )

> Remove the eager serialization from `RemoteRpcInvocation` 
> --
>
> Key: FLINK-15032
> URL: https://issues.apache.org/jira/browse/FLINK-15032
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Guowei Ma
>Priority: Major
>
>     Currently, the constructor of `RemoteRpcInvocation` serializes the 
> `parameterTypes` and `arg` of an RPC call. This could lead to a problem:
> Consider a job that has 1k parallelism and has a 1m union list state. When 
> deploying the 1k tasks, the eager serialization would use 1G memory 
> instantly(Some time the serialization amplifies the memory usage). However, 
> the serialized object is only used when the Akka sends the message.  So we 
> could reduce the memory pressure if we only serialize the object when the 
> message would be sent by the Akka.
> Akka would serialize the message at last and all the XXXGateway related class 
> could be visible by the RPC level. Because of that, I think the eager 
> serialization in the constructor of `RemoteRpcInvocation` could be avoided. I 
> also do a simple test and find this could reduce the time cost of the RPC 
> call. The 1k number of  RPC calls with 1m `String` message:  The current 
> version costs around 2700ms; the Nonserialization version cost about 37ms.
>  
> In summary, this Jira proposes to remove the serialization at the constructor 
> of `RemoteRpcInvocation`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15032) Remove the eager serialization from `RemoteRpcInvocation`

2019-12-03 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16986935#comment-16986935
 ] 

Guowei Ma commented on FLINK-15032:
---

Hi [~SleePy] 

>>>Do you mean we could postpone the serialization till 
>>>{{RemoteRpcInvocation#writeObject}}?

yes.

>>>Could you explain it a bit more? Do you mean there is no visible issue of 
>>>postponing the serialization? Or we don't need serialization anymore?

What I mean is that there is no visible issue of postponing the serialization.

 
>>>Do we still need serialization in {{wirteObject}} after removing 
>>>serialization in constructor?

The serialization is still needed.
 

> Remove the eager serialization from `RemoteRpcInvocation` 
> --
>
> Key: FLINK-15032
> URL: https://issues.apache.org/jira/browse/FLINK-15032
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Guowei Ma
>Priority: Major
>
>     Currently, the constructor of `RemoteRpcInvocation` serializes the 
> `parameterTypes` and `arg` of an RPC call. This could lead to a problem:
> Consider a job that has 1k parallelism and has a 1m union list state. When 
> deploying the 1k tasks, the eager serialization would use 1G memory 
> instantly(Some time the serialization amplifies the memory usage). However, 
> the serialized object is only used when the Akka sends the message.  So we 
> could reduce the memory pressure if we only serialize the object when the 
> message would be sent by the Akka.
> Akka would serialize the message at last and all the XXXGateway related class 
> could be visible by the RPC level. Because of that, I think the eager 
> serialization in the constructor of `RemoteRpcInvocation` could be avoided. I 
> also do a simple test and find this could reduce the time cost of the RPC 
> call. The 1k number of  RPC calls with 1m `String` message:  The current 
> version costs around 2700ms; the Nonserialization version cost about 37ms.
>  
> In summary, this Jira proposes to remove the eager serialization at the 
> constructor of `RemoteRpcInvocation`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable

2019-07-19 Thread Guowei Ma (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-10819:
--
Attachment: image-2019-07-19-16-59-10-178.png

> JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is 
> unstable
> -
>
> Key: FLINK-10819
> URL: https://issues.apache.org/jira/browse/FLINK-10819
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: Guowei Ma
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
> Attachments: image-2019-07-19-16-59-10-178.png
>
>
> Found the following error in the process of CI:
> Results :
> Tests in error: 
>  JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » 
> IllegalArgument
> Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29
> 18:40:55.828 [INFO] 
> 
> 18:40:55.829 [INFO] BUILD FAILURE
> 18:40:55.829 [INFO] 
> 
> 18:40:55.830 [INFO] Total time: 30:19 min
> 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00
> 18:40:56.294 [INFO] Final Memory: 92M/678M
> 18:40:56.294 [INFO] 
> 
> 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be 
> activated because it does not exist.
> 18:40:56.295 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test 
> (integration-tests) on project flink-tests_2.11: There are test failures.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] Please refer to 
> /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports 
> for the individual test results.
> 18:40:56.295 [ERROR] -> [Help 1]
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven 
> with the -e switch.
> 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug 
> logging.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] For more information about the errors and possible 
> solutions, please read the following articles:
> 18:40:56.295 [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> MVN exited with EXIT CODE: 1.
> Trying to KILL watchdog (11329).
> ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog
> PRODUCED build artifacts.
> But after the rerun, the error disappeared. 
> Currently,no specific reasons are found, and will continue to pay attention.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable

2019-07-19 Thread Guowei Ma (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-10819:
--
Attachment: image-2019-07-19-17-00-17-194.png

> JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is 
> unstable
> -
>
> Key: FLINK-10819
> URL: https://issues.apache.org/jira/browse/FLINK-10819
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: Guowei Ma
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
> Attachments: image-2019-07-19-16-59-10-178.png, 
> image-2019-07-19-17-00-17-194.png
>
>
> Found the following error in the process of CI:
> Results :
> Tests in error: 
>  JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » 
> IllegalArgument
> Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29
> 18:40:55.828 [INFO] 
> 
> 18:40:55.829 [INFO] BUILD FAILURE
> 18:40:55.829 [INFO] 
> 
> 18:40:55.830 [INFO] Total time: 30:19 min
> 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00
> 18:40:56.294 [INFO] Final Memory: 92M/678M
> 18:40:56.294 [INFO] 
> 
> 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be 
> activated because it does not exist.
> 18:40:56.295 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test 
> (integration-tests) on project flink-tests_2.11: There are test failures.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] Please refer to 
> /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports 
> for the individual test results.
> 18:40:56.295 [ERROR] -> [Help 1]
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven 
> with the -e switch.
> 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug 
> logging.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] For more information about the errors and possible 
> solutions, please read the following articles:
> 18:40:56.295 [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> MVN exited with EXIT CODE: 1.
> Trying to KILL watchdog (11329).
> ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog
> PRODUCED build artifacts.
> But after the rerun, the error disappeared. 
> Currently,no specific reasons are found, and will continue to pay attention.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable

2019-07-19 Thread Guowei Ma (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-10819:
--
Attachment: image-2019-07-19-17-01-19-758.png

> JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is 
> unstable
> -
>
> Key: FLINK-10819
> URL: https://issues.apache.org/jira/browse/FLINK-10819
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: Guowei Ma
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
> Attachments: image-2019-07-19-16-59-10-178.png, 
> image-2019-07-19-17-00-17-194.png, image-2019-07-19-17-01-19-758.png
>
>
> Found the following error in the process of CI:
> Results :
> Tests in error: 
>  JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » 
> IllegalArgument
> Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29
> 18:40:55.828 [INFO] 
> 
> 18:40:55.829 [INFO] BUILD FAILURE
> 18:40:55.829 [INFO] 
> 
> 18:40:55.830 [INFO] Total time: 30:19 min
> 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00
> 18:40:56.294 [INFO] Final Memory: 92M/678M
> 18:40:56.294 [INFO] 
> 
> 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be 
> activated because it does not exist.
> 18:40:56.295 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test 
> (integration-tests) on project flink-tests_2.11: There are test failures.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] Please refer to 
> /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports 
> for the individual test results.
> 18:40:56.295 [ERROR] -> [Help 1]
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven 
> with the -e switch.
> 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug 
> logging.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] For more information about the errors and possible 
> solutions, please read the following articles:
> 18:40:56.295 [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> MVN exited with EXIT CODE: 1.
> Trying to KILL watchdog (11329).
> ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog
> PRODUCED build artifacts.
> But after the rerun, the error disappeared. 
> Currently,no specific reasons are found, and will continue to pay attention.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable

2019-07-20 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16889486#comment-16889486
 ] 

Guowei Ma commented on FLINK-10819:
---

It might be a bug. 

1. The test is time out because two "READY_MARKER_FILE_PREFIX" files are 
missing.

2. Two tasks, which response for creating the two files can't be deployed 
because the resource is not available.

!image-2019-07-19-17-01-19-758.png!

3. The slots from one TM(34dbf0f8264469af49be8e1dbc2ad811) are not recognized 
by SlotManger. Since this, the two tasks can't be deployed.

!image-2019-07-19-17-00-17-194.png!

4. The TM(34dbf0f8264469af49be8e1dbc2ad811) registers to RM twice.

 

!image-2019-07-19-16-59-10-178.png!

The RM responses two RegistrationResponses to TM. But TM uses different threads 
to deal  with RegistrationResponse. The registrationId of old 
RegistrationResponse override the registrationId of new RegistrationResponse.

 

The simple idea is to use the main thread to process on the TM side. I am still 
thinking about it if there is another method.

 

 

 

 

 

 

> JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is 
> unstable
> -
>
> Key: FLINK-10819
> URL: https://issues.apache.org/jira/browse/FLINK-10819
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: Guowei Ma
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
> Attachments: image-2019-07-19-16-59-10-178.png, 
> image-2019-07-19-17-00-17-194.png, image-2019-07-19-17-01-19-758.png
>
>
> Found the following error in the process of CI:
> Results :
> Tests in error: 
>  JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » 
> IllegalArgument
> Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29
> 18:40:55.828 [INFO] 
> 
> 18:40:55.829 [INFO] BUILD FAILURE
> 18:40:55.829 [INFO] 
> 
> 18:40:55.830 [INFO] Total time: 30:19 min
> 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00
> 18:40:56.294 [INFO] Final Memory: 92M/678M
> 18:40:56.294 [INFO] 
> 
> 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be 
> activated because it does not exist.
> 18:40:56.295 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test 
> (integration-tests) on project flink-tests_2.11: There are test failures.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] Please refer to 
> /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports 
> for the individual test results.
> 18:40:56.295 [ERROR] -> [Help 1]
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven 
> with the -e switch.
> 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug 
> logging.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] For more information about the errors and possible 
> solutions, please read the following articles:
> 18:40:56.295 [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> MVN exited with EXIT CODE: 1.
> Trying to KILL watchdog (11329).
> ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog
> PRODUCED build artifacts.
> But after the rerun, the error disappeared. 
> Currently,no specific reasons are found, and will continue to pay attention.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable

2019-07-20 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16889486#comment-16889486
 ] 

Guowei Ma edited comment on FLINK-10819 at 7/20/19 11:56 AM:
-

It might be a bug according to 
https://api.travis-ci.org/v3/job/508500560/log.txt

1. The test is time out because two "READY_MARKER_FILE_PREFIX" files are 
missing.

2. Two tasks, which response for creating the two files can't be deployed 
because the resource is not available.

!image-2019-07-19-17-01-19-758.png!

3. The slots from one TM(34dbf0f8264469af49be8e1dbc2ad811) are not recognized 
by SlotManger. Since this, the two tasks can't be deployed.

!image-2019-07-19-17-00-17-194.png!

4. The TM(34dbf0f8264469af49be8e1dbc2ad811) registers to RM twice.

 

!image-2019-07-19-16-59-10-178.png!

The RM responses two RegistrationResponses to TM. But TM uses different threads 
to deal  with RegistrationResponse. The registrationId of old 
RegistrationResponse override the registrationId of new RegistrationResponse.

 

The simple idea is to use the main thread to process on the TM side. I am still 
thinking about it if there is another method.

 

 

 

 

 

 


was (Author: maguowei):
It might be a bug. 

1. The test is time out because two "READY_MARKER_FILE_PREFIX" files are 
missing.

2. Two tasks, which response for creating the two files can't be deployed 
because the resource is not available.

!image-2019-07-19-17-01-19-758.png!

3. The slots from one TM(34dbf0f8264469af49be8e1dbc2ad811) are not recognized 
by SlotManger. Since this, the two tasks can't be deployed.

!image-2019-07-19-17-00-17-194.png!

4. The TM(34dbf0f8264469af49be8e1dbc2ad811) registers to RM twice.

 

!image-2019-07-19-16-59-10-178.png!

The RM responses two RegistrationResponses to TM. But TM uses different threads 
to deal  with RegistrationResponse. The registrationId of old 
RegistrationResponse override the registrationId of new RegistrationResponse.

 

The simple idea is to use the main thread to process on the TM side. I am still 
thinking about it if there is another method.

 

 

 

 

 

 

> JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is 
> unstable
> -
>
> Key: FLINK-10819
> URL: https://issues.apache.org/jira/browse/FLINK-10819
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: Guowei Ma
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
> Attachments: image-2019-07-19-16-59-10-178.png, 
> image-2019-07-19-17-00-17-194.png, image-2019-07-19-17-01-19-758.png
>
>
> Found the following error in the process of CI:
> Results :
> Tests in error: 
>  JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » 
> IllegalArgument
> Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29
> 18:40:55.828 [INFO] 
> 
> 18:40:55.829 [INFO] BUILD FAILURE
> 18:40:55.829 [INFO] 
> 
> 18:40:55.830 [INFO] Total time: 30:19 min
> 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00
> 18:40:56.294 [INFO] Final Memory: 92M/678M
> 18:40:56.294 [INFO] 
> 
> 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be 
> activated because it does not exist.
> 18:40:56.295 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test 
> (integration-tests) on project flink-tests_2.11: There are test failures.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] Please refer to 
> /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports 
> for the individual test results.
> 18:40:56.295 [ERROR] -> [Help 1]
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven 
> with the -e switch.
> 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug 
> logging.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] For more information about the errors and possible 
> solutions, please read the following articles:
> 18:40:56.295 [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> MVN exited with EXIT CODE: 1.
> Trying to KILL watchdog (11329).
> ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog
> PRODUCED build artifacts.
> But after the rerun, the error disappeared. 
> Currently,no specific reasons are found, and will continue to pay attention.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13426) TaskExecutor uses the wrong Registrationid in the heartbeat with RM.

2019-07-25 Thread Guowei Ma (JIRA)
Guowei Ma created FLINK-13426:
-

 Summary: TaskExecutor uses the wrong Registrationid in the 
heartbeat with RM.
 Key: FLINK-13426
 URL: https://issues.apache.org/jira/browse/FLINK-13426
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.8.1, 1.9.0
Reporter: Guowei Ma


1. First-time TaskExecutor register to rm successfully. If it fails to send 
SlotReport to SlotMaanger, TaskExecutor will reconnect to RM again. However, 
TaskExecutor still uses the old registration id in the 
EstablishedResourceManagerConnection.

2. Second-time TaskExecutor registers to rm successfully and gets a new 
registration id.

3. First-round and second-round has a race condition. Since that the task 
executor maybe use the old registration id in heartbeat with rm.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13426) TaskExecutor uses the wrong Registrationid in the heartbeat with RM.

2019-07-25 Thread Guowei Ma (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-13426:
--
Attachment: image-2019-07-25-17-57-03-537.png

> TaskExecutor uses the wrong Registrationid in the heartbeat with RM.
> 
>
> Key: FLINK-13426
> URL: https://issues.apache.org/jira/browse/FLINK-13426
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.1, 1.9.0
>Reporter: Guowei Ma
>Priority: Minor
> Attachments: image-2019-07-25-17-57-03-537.png
>
>
> 1. First-time TaskExecutor register to rm successfully. If it fails to send 
> SlotReport to SlotMaanger, TaskExecutor will reconnect to RM again. However, 
> TaskExecutor still uses the old registration id in the 
> EstablishedResourceManagerConnection.
> 2. Second-time TaskExecutor registers to rm successfully and gets a new 
> registration id.
> 3. First-round and second-round has a race condition. Since that the task 
> executor maybe use the old registration id in heartbeat with rm.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13426) TaskExecutor uses the wrong Registrationid in the heartbeat with RM.

2019-07-25 Thread Guowei Ma (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-13426:
--
Description: 
1. First-time TaskExecutor register to rm successfully. If it fails to send 
SlotReport to SlotMaanger, TaskExecutor will reconnect to RM again. However, 
TaskExecutor still uses the old registration id in the 
EstablishedResourceManagerConnection.

2. Second-time TaskExecutor registers to rm successfully and gets a new 
registration id.

3. First-round and second-round has a race condition. Since that the task 
executor maybe use the old registration id in heartbeat with rm.

 

!image-2019-07-25-17-57-03-537.png!

  was:
1. First-time TaskExecutor register to rm successfully. If it fails to send 
SlotReport to SlotMaanger, TaskExecutor will reconnect to RM again. However, 
TaskExecutor still uses the old registration id in the 
EstablishedResourceManagerConnection.

2. Second-time TaskExecutor registers to rm successfully and gets a new 
registration id.

3. First-round and second-round has a race condition. Since that the task 
executor maybe use the old registration id in heartbeat with rm.


> TaskExecutor uses the wrong Registrationid in the heartbeat with RM.
> 
>
> Key: FLINK-13426
> URL: https://issues.apache.org/jira/browse/FLINK-13426
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.1, 1.9.0
>Reporter: Guowei Ma
>Priority: Minor
> Attachments: image-2019-07-25-17-57-03-537.png
>
>
> 1. First-time TaskExecutor register to rm successfully. If it fails to send 
> SlotReport to SlotMaanger, TaskExecutor will reconnect to RM again. However, 
> TaskExecutor still uses the old registration id in the 
> EstablishedResourceManagerConnection.
> 2. Second-time TaskExecutor registers to rm successfully and gets a new 
> registration id.
> 3. First-round and second-round has a race condition. Since that the task 
> executor maybe use the old registration id in heartbeat with rm.
>  
> !image-2019-07-25-17-57-03-537.png!



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable

2019-07-25 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16889486#comment-16889486
 ] 

Guowei Ma edited comment on FLINK-10819 at 7/25/19 10:05 AM:
-

It might be a bug according to 
[https://api.travis-ci.org/v3/job/508500560/log.txt]

1. The test is time out because two "READY_MARKER_FILE_PREFIX" files are 
missing.

2. Two tasks, which response for creating the two files can't be deployed 
because the resource is not available.

!image-2019-07-19-17-01-19-758.png!

3. However, the reason why RM has not allocated resources to the two tasks is 
still uncertain. Because two task executors have been registered at the first 
time; there must be 4 free slots.

!image-2019-07-19-17-00-17-194.png!

I have read a few logs, and all have the following three characteristics.

1. Always have a TM registered twice;
2. The taskexecutor, which registers twice uses the old RegistrationID in the 
heartbeat. I already open a Jira [FLINK-13426]
3. When the job starts, it applies for 4 slots at the beginning. However, only 
one TM is available. During the execution of the Source, the SlotPool cancels 
the two Slot requests that have not come. In the next execution, the Scheduler 
issues two SlotRequests, but the two SlotRequests don't return until timeout. 

 

 

 

 

 


was (Author: maguowei):
It might be a bug according to 
https://api.travis-ci.org/v3/job/508500560/log.txt

1. The test is time out because two "READY_MARKER_FILE_PREFIX" files are 
missing.

2. Two tasks, which response for creating the two files can't be deployed 
because the resource is not available.

!image-2019-07-19-17-01-19-758.png!

3. The slots from one TM(34dbf0f8264469af49be8e1dbc2ad811) are not recognized 
by SlotManger. Since this, the two tasks can't be deployed.

!image-2019-07-19-17-00-17-194.png!

4. The TM(34dbf0f8264469af49be8e1dbc2ad811) registers to RM twice.

 

!image-2019-07-19-16-59-10-178.png!

The RM responses two RegistrationResponses to TM. But TM uses different threads 
to deal  with RegistrationResponse. The registrationId of old 
RegistrationResponse override the registrationId of new RegistrationResponse.

 

The simple idea is to use the main thread to process on the TM side. I am still 
thinking about it if there is another method.

 

 

 

 

 

 

> JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is 
> unstable
> -
>
> Key: FLINK-10819
> URL: https://issues.apache.org/jira/browse/FLINK-10819
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: Guowei Ma
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
> Attachments: image-2019-07-19-16-59-10-178.png, 
> image-2019-07-19-17-00-17-194.png, image-2019-07-19-17-01-19-758.png
>
>
> Found the following error in the process of CI:
> Results :
> Tests in error: 
>  JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » 
> IllegalArgument
> Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29
> 18:40:55.828 [INFO] 
> 
> 18:40:55.829 [INFO] BUILD FAILURE
> 18:40:55.829 [INFO] 
> 
> 18:40:55.830 [INFO] Total time: 30:19 min
> 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00
> 18:40:56.294 [INFO] Final Memory: 92M/678M
> 18:40:56.294 [INFO] 
> 
> 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be 
> activated because it does not exist.
> 18:40:56.295 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test 
> (integration-tests) on project flink-tests_2.11: There are test failures.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] Please refer to 
> /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports 
> for the individual test results.
> 18:40:56.295 [ERROR] -> [Help 1]
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven 
> with the -e switch.
> 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug 
> logging.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] For more information about the errors and possible 
> solutions, please read the following articles:
> 18:40:56.295 [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> MVN exited with EXIT CODE: 1.
> Trying to KILL watchdog (11329).
> ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog
> PRODUCED bui

[jira] [Commented] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable

2019-07-25 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16892620#comment-16892620
 ] 

Guowei Ma commented on FLINK-10819:
---

I think we could add some status log in SlotManager like slot pool. It will 
help to find why RM can not fulfill the slot requests and slot status.

> JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is 
> unstable
> -
>
> Key: FLINK-10819
> URL: https://issues.apache.org/jira/browse/FLINK-10819
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: Guowei Ma
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
> Attachments: image-2019-07-19-16-59-10-178.png, 
> image-2019-07-19-17-00-17-194.png, image-2019-07-19-17-01-19-758.png
>
>
> Found the following error in the process of CI:
> Results :
> Tests in error: 
>  JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » 
> IllegalArgument
> Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29
> 18:40:55.828 [INFO] 
> 
> 18:40:55.829 [INFO] BUILD FAILURE
> 18:40:55.829 [INFO] 
> 
> 18:40:55.830 [INFO] Total time: 30:19 min
> 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00
> 18:40:56.294 [INFO] Final Memory: 92M/678M
> 18:40:56.294 [INFO] 
> 
> 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be 
> activated because it does not exist.
> 18:40:56.295 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test 
> (integration-tests) on project flink-tests_2.11: There are test failures.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] Please refer to 
> /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports 
> for the individual test results.
> 18:40:56.295 [ERROR] -> [Help 1]
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven 
> with the -e switch.
> 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug 
> logging.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] For more information about the errors and possible 
> solutions, please read the following articles:
> 18:40:56.295 [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> MVN exited with EXIT CODE: 1.
> Trying to KILL watchdog (11329).
> ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog
> PRODUCED build artifacts.
> But after the rerun, the error disappeared. 
> Currently,no specific reasons are found, and will continue to pay attention.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13426) TaskExecutor uses the wrong Registrationid in the heartbeat with RM.

2019-07-29 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16895684#comment-16895684
 ] 

Guowei Ma commented on FLINK-13426:
---

TaskExecutor should use the taskExecutorRegistrationId after it successfully 

sendSlotReport the first time. Or it might use the old one in the heartbeat 
with ResourceManager.

Since that TaskExecutor should monitor the ResourceManager at 
slotReportResponseFuture complete callback.

 

> TaskExecutor uses the wrong Registrationid in the heartbeat with RM.
> 
>
> Key: FLINK-13426
> URL: https://issues.apache.org/jira/browse/FLINK-13426
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.1, 1.9.0
>Reporter: Guowei Ma
>Priority: Minor
> Attachments: image-2019-07-25-17-57-03-537.png
>
>
> 1. First-time TaskExecutor register to rm successfully. If it fails to send 
> SlotReport to SlotMaanger, TaskExecutor will reconnect to RM again. However, 
> TaskExecutor still uses the old registration id in the 
> EstablishedResourceManagerConnection.
> 2. Second-time TaskExecutor registers to rm successfully and gets a new 
> registration id.
> 3. First-round and second-round has a race condition. Since that the task 
> executor maybe use the old registration id in heartbeat with rm.
>  
> !image-2019-07-25-17-57-03-537.png!



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13480) Export SlotManager status at debug mode.

2019-07-29 Thread Guowei Ma (JIRA)
Guowei Ma created FLINK-13480:
-

 Summary: Export SlotManager status at debug mode.
 Key: FLINK-13480
 URL: https://issues.apache.org/jira/browse/FLINK-13480
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.9.0, 1.10
Reporter: Guowei Ma


It is very difficult to resolve some resource allocation issues, for example, 
FLINK-10819 .

One reason is that the status of slotmanager is very difficult to know.  I 
think we could save a lot of time when troubleshooting problems if the status 
of slotmanager can export to log 

So I propose to export the status of slotmanager when debug is open. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13480) Export SlotManager status at debug mode.

2019-07-29 Thread Guowei Ma (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-13480:
--
Issue Type: Improvement  (was: Bug)

> Export SlotManager status at debug mode.
> 
>
> Key: FLINK-13480
> URL: https://issues.apache.org/jira/browse/FLINK-13480
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10
>Reporter: Guowei Ma
>Priority: Minor
>
> It is very difficult to resolve some resource allocation issues, for example, 
> FLINK-10819 .
> One reason is that the status of slotmanager is very difficult to know.  I 
> think we could save a lot of time when troubleshooting problems if the status 
> of slotmanager can export to log 
> So I propose to export the status of slotmanager when debug is open. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13491) AsyncWaitOperator doesn't handle endInput call properly

2019-07-30 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16895937#comment-16895937
 ] 

Guowei Ma commented on FLINK-13491:
---

Currently, Operatorchain calls all operator's "endInput" together.

An alternative way likes 
{code:java}
operator1.endInput()
operator1.close()
operator2.endInput
operator2.close()
{code}
 
 this could flush the buffer data in the operator.

 

> AsyncWaitOperator doesn't handle endInput call properly
> ---
>
> Key: FLINK-13491
> URL: https://issues.apache.org/jira/browse/FLINK-13491
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.9.0
>Reporter: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.9.0
>
>
> This is the same issue as for {{ContinousFileReaderOperator}} in 
> https://issues.apache.org/jira/browse/FLINK-13376. {{AsyncWaitOperator}} will 
> propagate {{endInput}} notification immediately, even if it has some records 
> buffered.
> Side note, this also shows that the current {{BoundedOneInput#endInput}} API 
> is easy to mishandle if an operator buffers some records internally. Maybe we 
> could redesign this API somehow [~aljoscha] [~sunhaibotb]?



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (FLINK-11985) Remove ignored command line parameter from yarn_setup.md

2019-03-20 Thread Guowei Ma (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma reassigned FLINK-11985:
-

Assignee: Guowei Ma

> Remove ignored command line parameter from yarn_setup.md
> 
>
> Key: FLINK-11985
> URL: https://issues.apache.org/jira/browse/FLINK-11985
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>Priority: Trivial
>
> In "Flink Yarn Session" mode the command line parameter "-n, --container" has 
> already been ignored. It would be more friendly for user that the 
> documentation is consistent with the implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11985) Remove ignored command line parameter from yarn_setup.md

2019-03-20 Thread Guowei Ma (JIRA)
Guowei Ma created FLINK-11985:
-

 Summary: Remove ignored command line parameter from yarn_setup.md
 Key: FLINK-11985
 URL: https://issues.apache.org/jira/browse/FLINK-11985
 Project: Flink
  Issue Type: Task
  Components: Documentation
Reporter: Guowei Ma


In "Flink Yarn Session" mode the command line parameter "-n, --container" has 
already been ignored. It would be more friendly for user that the documentation 
is consistent with the implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12037) High parallelism E2E test failed on Travis

2019-04-01 Thread Guowei Ma (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-12037:
--
Attachment: image-2019-04-01-16-33-17-540.png

> High parallelism E2E test failed on Travis
> --
>
> Key: FLINK-12037
> URL: https://issues.apache.org/jira/browse/FLINK-12037
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.8.1
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: test-stability
> Attachments: image-2019-04-01-16-33-17-540.png
>
>
> https://travis-ci.org/apache/flink/jobs/511335563
> {code}
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: f149b50ac8c6291371ce54b30fb0a072)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
>   at 
> org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 22 more
> Caused by: org.apache.flink.util.FlinkException: TaskExecutor 
> akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated 
> slots for job f149b50ac8c6291371ce54b30fb0a072.
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 

[jira] [Commented] (FLINK-12037) High parallelism E2E test failed on Travis

2019-04-01 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806517#comment-16806517
 ] 

Guowei Ma commented on FLINK-12037:
---

# A slot is allocated but jm don't  deploy any task on it.
 # Since that TaskManager free the slot.(after 10s)
 # JM deploy the task to the TM which don't have any resource for it.

!image-2019-04-01-16-33-17-540.png!

I think it just the direct reason not root cause. Because I don't understand 
why jm requests the slot but don't deploy task on it.

> High parallelism E2E test failed on Travis
> --
>
> Key: FLINK-12037
> URL: https://issues.apache.org/jira/browse/FLINK-12037
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.8.1
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: test-stability
> Attachments: image-2019-04-01-16-33-17-540.png
>
>
> https://travis-ci.org/apache/flink/jobs/511335563
> {code}
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: f149b50ac8c6291371ce54b30fb0a072)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
>   at 
> org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 22 more
> Caused by: org.apache.flink.util.FlinkException: TaskExecutor 
> akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated 
> slots for job f149b50ac8c6291371ce54b30fb0a072.
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>  

[jira] [Comment Edited] (FLINK-12037) High parallelism E2E test failed on Travis

2019-04-01 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806517#comment-16806517
 ] 

Guowei Ma edited comment on FLINK-12037 at 4/1/19 8:45 AM:
---

# A slot is allocated but jm don't  deploy any task on it.
 # Since that TaskManager free the slot.(after 10s)
 # JM deploy the task to the TM which don't have any resource for it.

!image-2019-04-01-16-33-17-540.png!

 


was (Author: maguowei):
# A slot is allocated but jm don't  deploy any task on it.
 # Since that TaskManager free the slot.(after 10s)
 # JM deploy the task to the TM which don't have any resource for it.

!image-2019-04-01-16-33-17-540.png!

I think it just the direct reason not root cause. Because I don't understand 
why jm requests the slot but don't deploy task on it.

> High parallelism E2E test failed on Travis
> --
>
> Key: FLINK-12037
> URL: https://issues.apache.org/jira/browse/FLINK-12037
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.8.1
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: test-stability
> Attachments: image-2019-04-01-16-33-17-540.png
>
>
> https://travis-ci.org/apache/flink/jobs/511335563
> {code}
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: f149b50ac8c6291371ce54b30fb0a072)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
>   at 
> org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 22 more
> Caused by: org.apache.flink.util.FlinkException: TaskExecutor 
> akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated 
> slots for job f149b50ac8c6291371ce54b30fb0a072.
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(Un

[jira] [Issue Comment Deleted] (FLINK-12037) High parallelism E2E test failed on Travis

2019-04-01 Thread Guowei Ma (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-12037:
--
Comment: was deleted

(was: # A slot is allocated but jm don't  deploy any task on it.
 # Since that TaskManager free the slot.(after 10s)
 # JM deploy the task to the TM which don't have any resource for it.

!image-2019-04-01-16-33-17-540.png!

 )

> High parallelism E2E test failed on Travis
> --
>
> Key: FLINK-12037
> URL: https://issues.apache.org/jira/browse/FLINK-12037
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.8.1
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: test-stability
>
> https://travis-ci.org/apache/flink/jobs/511335563
> {code}
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: f149b50ac8c6291371ce54b30fb0a072)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
>   at 
> org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 22 more
> Caused by: org.apache.flink.util.FlinkException: TaskExecutor 
> akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated 
> slots for job f149b50ac8c6291371ce54b30fb0a072.
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.Fork

[jira] [Updated] (FLINK-12037) High parallelism E2E test failed on Travis

2019-04-01 Thread Guowei Ma (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-12037:
--
Attachment: (was: image-2019-04-01-16-33-17-540.png)

> High parallelism E2E test failed on Travis
> --
>
> Key: FLINK-12037
> URL: https://issues.apache.org/jira/browse/FLINK-12037
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.8.1
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: test-stability
>
> https://travis-ci.org/apache/flink/jobs/511335563
> {code}
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: f149b50ac8c6291371ce54b30fb0a072)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
>   at 
> org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 22 more
> Caused by: org.apache.flink.util.FlinkException: TaskExecutor 
> akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated 
> slots for job f149b50ac8c6291371ce54b30fb0a072.
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWor

[jira] [Commented] (FLINK-12037) High parallelism E2E test failed on Travis

2019-04-01 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806596#comment-16806596
 ] 

Guowei Ma commented on FLINK-12037:
---

>From the log:

The TaskManager(b15f5702d9e1cc57e353824b3ec4b792) doesn't get response from jm 
after it offer its' slot. Since that the taskmanager release the slot and close 
the connection with jm. 

 

> High parallelism E2E test failed on Travis
> --
>
> Key: FLINK-12037
> URL: https://issues.apache.org/jira/browse/FLINK-12037
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.8.1
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: test-stability
>
> https://travis-ci.org/apache/flink/jobs/511335563
> {code}
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: f149b50ac8c6291371ce54b30fb0a072)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
>   at 
> org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 22 more
> Caused by: org.apache.flink.util.FlinkException: TaskExecutor 
> akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated 
> slots for job f149b50ac8c6291371ce54b30fb0a072.
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.fork

[jira] [Comment Edited] (FLINK-12037) High parallelism E2E test failed on Travis

2019-04-01 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806596#comment-16806596
 ] 

Guowei Ma edited comment on FLINK-12037 at 4/1/19 10:10 AM:


>From the log:

The TaskManager(b15f5702d9e1cc57e353824b3ec4b792) doesn't get response from jm 
after it offer its' slot. Since that the taskmanager release the slot and close 
the connection with jm after 10s. 

 


was (Author: maguowei):
>From the log:

The TaskManager(b15f5702d9e1cc57e353824b3ec4b792) doesn't get response from jm 
after it offer its' slot. Since that the taskmanager release the slot and close 
the connection with jm. 

 

> High parallelism E2E test failed on Travis
> --
>
> Key: FLINK-12037
> URL: https://issues.apache.org/jira/browse/FLINK-12037
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.8.1
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: test-stability
>
> https://travis-ci.org/apache/flink/jobs/511335563
> {code}
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: f149b50ac8c6291371ce54b30fb0a072)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
>   at 
> org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 22 more
> Caused by: org.apache.flink.util.FlinkException: TaskExecutor 
> akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated 
> slots for job f149b50ac8c6291371ce54b30fb0a072.
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(Act

[jira] [Comment Edited] (FLINK-12037) High parallelism E2E test failed on Travis

2019-04-02 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806596#comment-16806596
 ] 

Guowei Ma edited comment on FLINK-12037 at 4/2/19 10:34 AM:


>From the log:

The TaskManager(b15f5702d9e1cc57e353824b3ec4b792) doesn't get response from jm 
after it offer its' slot. Since that the taskmanager release the slot and close 
the connection with jm after 10s. 

 

For TM

11:38:11,436 this taskmanger b15f5702d9e1cc57e353824b3ec4b792  is started

11:40:36,761 Receive slot request

11:40:43,972 Establish JobManager connection for job 
f149b50ac8c6291371ce54b30fb0a072

11:40:43,989 Offer reserved slots to the leader of job

11:40:46,766 Free slot TaskSlot

 

During 11:40:43~11:40:46 JM is mainly deploying the task. 

!image-2019-04-02-18-30-20-634.png!

Some deployings have 100ms  interval


was (Author: maguowei):
>From the log:

The TaskManager(b15f5702d9e1cc57e353824b3ec4b792) doesn't get response from jm 
after it offer its' slot. Since that the taskmanager release the slot and close 
the connection with jm after 10s. 

 

> High parallelism E2E test failed on Travis
> --
>
> Key: FLINK-12037
> URL: https://issues.apache.org/jira/browse/FLINK-12037
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.8.1
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: test-stability
> Attachments: image-2019-04-02-18-30-20-634.png
>
>
> https://travis-ci.org/apache/flink/jobs/511335563
> {code}
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: f149b50ac8c6291371ce54b30fb0a072)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
>   at 
> org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 22 more
> Caused by: org.apache.flink.util.FlinkException: TaskExecutor 
> akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more allocated 
> slots for job f149b50ac8c6291371ce54b30fb0a072.
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1401)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1418)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:136)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:1609)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAs

[jira] [Comment Edited] (FLINK-12037) High parallelism E2E test failed on Travis

2019-04-02 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806596#comment-16806596
 ] 

Guowei Ma edited comment on FLINK-12037 at 4/2/19 1:29 PM:
---

>From the log:

The TaskManager(b15f5702d9e1cc57e353824b3ec4b792) doesn't get response from jm 
after it offer its' slot. Since that the taskmanager release the slot and close 
the connection with jm after 10s. 

 

For TM

11:38:11,436 this taskmanger b15f5702d9e1cc57e353824b3ec4b792  is started

11:40:36,761 Receive slot request

11:40:43,972 Establish JobManager connection for job 
f149b50ac8c6291371ce54b30fb0a072

11:40:43,989 Offer reserved slots to the leader of job

11:40:46,766 Free slot TaskSlot

 

During 11:40:43~11:40:46 JM is mainly deploying the task. 

There is no obvious information about what the JM is doing. There is also no 
obvious information that "Deploying" is time consuming.

!image-2019-04-02-18-30-20-634.png!

Some deployings have 100ms  interval


was (Author: maguowei):
>From the log:

The TaskManager(b15f5702d9e1cc57e353824b3ec4b792) doesn't get response from jm 
after it offer its' slot. Since that the taskmanager release the slot and close 
the connection with jm after 10s. 

 

For TM

11:38:11,436 this taskmanger b15f5702d9e1cc57e353824b3ec4b792  is started

11:40:36,761 Receive slot request

11:40:43,972 Establish JobManager connection for job 
f149b50ac8c6291371ce54b30fb0a072

11:40:43,989 Offer reserved slots to the leader of job

11:40:46,766 Free slot TaskSlot

 

During 11:40:43~11:40:46 JM is mainly deploying the task. 

!image-2019-04-02-18-30-20-634.png!

Some deployings have 100ms  interval

> High parallelism E2E test failed on Travis
> --
>
> Key: FLINK-12037
> URL: https://issues.apache.org/jira/browse/FLINK-12037
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.8.1
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: test-stability
> Attachments: image-2019-04-02-18-30-20-634.png
>
>
> https://travis-ci.org/apache/flink/jobs/511335563
> {code}
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: f149b50ac8c6291371ce54b30fb0a072)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
>   at 
> org.apache.flink.batch.HighParallelismIterationsTestProgram.main(HighParallelismIterationsTestProgram.java:73)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 22 more
> Caused by: org.apache.flink.util.FlinkException: TaskExecutor 
> akka.tcp://flink@127.0.1.1:37009/user/taskmanager_0 has no more 

[jira] [Commented] (FLINK-12086) AsyncFunction - Add access to a user defined Object for cleanup on timeout

2019-04-02 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16808282#comment-16808282
 ] 

Guowei Ma commented on FLINK-12086:
---

I think there might be another way to access user object other than change the 
API.
{code:java}
 package dev.codeflush;

import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SomeAsyncFunction implements AsyncFunction {

private static final long serialVersionUID = 1L;

private HashMap, Future> libFutures = new 
HashMap();

@Override
public void asyncInvoke(Integer input, ResultFuture resultFuture) 
throws Exception {
Future future = null; // submit something in a library 
thread-pool 
libFutures.put(resultFuture, future);

CompletableFuture.runAsync(() -> {
try {

resultFuture.complete(Collections.singleton(future.get()));
libFutures.remove(resultFuture);
} catch (ExecutionException e) {
// handle this
} catch (InterruptedException e) {
// handle that
}
});

return future;
}

@Override
public void timeout(Integer input, ResultFuture resultFuture) 
throws Exception {
Future future = libFutures.remove(resultFuture);
if (future != null) {
 future.cancel(true);
}
resultFuture.complete(Collections.emptySet());
}
}
{code}
 

 

> AsyncFunction - Add access to a user defined Object for cleanup on timeout
> --
>
> Key: FLINK-12086
> URL: https://issues.apache.org/jira/browse/FLINK-12086
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Felix Wollschläger
>Priority: Major
>
> When executing async-requests it would be nice to have access to a user 
> defined object to perform cleanup when the process times out.
> For example, when executing Cassandra-Queries I'm using the drivers 
> threadpool to submit Statements, which returns a 
> com.datastax.driver.core.ResultSetFutre ( 
> [https://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/core/ResultSetFuture.html]
>  ). When I run into a timeout I could cancel the Future because waiting for 
> it to complete is unnecessary in that case.
>  
> The API could be extendend to something like this:
>  
> Adding an Type-Parameter to the AsnyFunction Interface:
> {code:java}
> AsyncFunction{code}
> Updating the asnyInvoke-Method to return the user-defined object:
> {code:java}
> T asyncInvoke(IN input, ResultFuture future) throws Exception;{code}
> Updating the timeout-Method to accept the user-defined object:
> {code:java}
> void timeout(IN input, T obj, ResultFuture resultFuture) throws 
> Exception{code}
>  
> An example Implementation could look like this:
> {code:java}
> package dev.codeflush;
> import org.apache.flink.streaming.api.functions.async.AsyncFunction;
> import org.apache.flink.streaming.api.functions.async.ResultFuture;
> import java.util.Collections;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.ExecutionException;
> import java.util.concurrent.Future;
> public class SomeAsyncFunction implements AsyncFunction Future> {
> private static final long serialVersionUID = 1L;
> 
> @Override
> public Future asyncInvoke(Integer input, ResultFuture 
> resultFuture) throws Exception {
> Future future = null; // submit something in a library 
> thread-pool 
> CompletableFuture.runAsync(() -> {
> try {
> resultFuture.complete(Collections.singleton(future.get()));
> } catch (ExecutionException e) {
> // handle this
> } catch (InterruptedException e) {
> // handle that
> }
> });
> 
> return future;
> }
> @Override
> public void timeout(Integer input, Future future, 
> ResultFuture resultFuture) throws Exception {
> future.cancel(true);
> resultFuture.complete(Collections.emptySet());
> }
> }
> {code}
> As it currently is, submitted tasks in the asnyInvoke-Method will use 
> resources (Threads, IO) even if the application is no longer in a state where 
> it could do something meaningful with the result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12086) AsyncFunction - Add access to a user defined Object for cleanup on timeout

2019-04-02 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16808282#comment-16808282
 ] 

Guowei Ma edited comment on FLINK-12086 at 4/3/19 1:30 AM:
---

I think there might be another way to access user object other than change the 
API.
{code:java}
 package dev.codeflush;

import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SomeAsyncFunction implements AsyncFunction {

private static final long serialVersionUID = 1L;

private HashMap, Future> libFutures = new 
HashMap();

@Override
public void asyncInvoke(Integer input, ResultFuture resultFuture) 
throws Exception {
Future future = null; // submit something in a library 
thread-pool 
libFutures.put(resultFuture, future);

CompletableFuture.runAsync(() -> {
try {

resultFuture.complete(Collections.singleton(future.get()));
libFutures.remove(resultFuture);
} catch (ExecutionException e) {
// handle this
} catch (InterruptedException e) {
// handle that
}
});

return future;
}

@Override
public void timeout(Integer input, ResultFuture resultFuture) 
throws Exception {
Future future = libFutures.remove(resultFuture);
if (future != null) {
 future.cancel(true);
}
resultFuture.complete(Collections.emptySet());
}
}
{code}
 

 


was (Author: maguowei):
I think there might be another way to access user object other than change the 
API.
{code:java}
 package dev.codeflush;

import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SomeAsyncFunction implements AsyncFunction {

private static final long serialVersionUID = 1L;

private HashMap, Future> libFutures = new 
HashMap();

@Override
public void asyncInvoke(Integer input, ResultFuture resultFuture) 
throws Exception {
Future future = null; // submit something in a library 
thread-pool 
libFutures.put(resultFuture, future);

CompletableFuture.runAsync(() -> {
try {

resultFuture.complete(Collections.singleton(future.get()));
libFutures.remove(resultFuture);
} catch (ExecutionException e) {
// handle this
} catch (InterruptedException e) {
// handle that
}
});

return future;
}

@Override
public void timeout(Integer input, ResultFuture resultFuture) 
throws Exception {
Future future = libFutures.remove(resultFuture);
if (future != null) {
 future.cancel(true);
}
resultFuture.complete(Collections.emptySet());
}
}
{code}
 

 

> AsyncFunction - Add access to a user defined Object for cleanup on timeout
> --
>
> Key: FLINK-12086
> URL: https://issues.apache.org/jira/browse/FLINK-12086
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Felix Wollschläger
>Priority: Major
>
> When executing async-requests it would be nice to have access to a user 
> defined object to perform cleanup when the process times out.
> For example, when executing Cassandra-Queries I'm using the drivers 
> threadpool to submit Statements, which returns a 
> com.datastax.driver.core.ResultSetFutre ( 
> [https://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/core/ResultSetFuture.html]
>  ). When I run into a timeout I could cancel the Future because waiting for 
> it to complete is unnecessary in that case.
>  
> The API could be extendend to something like this:
>  
> Adding an Type-Parameter to the AsnyFunction Interface:
> {code:java}
> AsyncFunction{code}
> Updating the asnyInvoke-Method to return the user-defined object:
> {code:java}
> T asyncInvoke(IN input, ResultFuture future) throws Exception;{code}
> Updating the timeout-Method to accept the user-defined object:
> {code:java}
> void timeout(IN input, T obj, ResultFuture resultFuture) throws 
> Exception{code}
>  
> An example Implementation could look like this:
> {code:java}
> package dev.codeflush;
> import org.apache.flink.streaming.api.functions.async.A

[jira] [Commented] (FLINK-12106) Jobmanager is killing FINISHED taskmanger containers, causing exception in still running Taskmanagers an

2019-04-03 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16809421#comment-16809421
 ] 

Guowei Ma commented on FLINK-12106:
---

AFAIK, the community is working on it.  
[FLINK-10941|https://issues.apache.org/jira/browse/FLINK-10941] has the same 
problem. 

This issue is related to the lifecycle control of Shuffle Resource. There have 
some related discussions and design[1][2].

[1] 
[https://docs.google.com/document/d/13vAJJxfRXAwI4MtO8dux8hHnNMw2Biu5XRrb_hvGehA/edit#heading=h.v7vhb7w01d61]

[2] 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Manager]

 

> Jobmanager is killing FINISHED taskmanger containers, causing exception in 
> still running Taskmanagers an
> 
>
> Key: FLINK-12106
> URL: https://issues.apache.org/jira/browse/FLINK-12106
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.7.2
> Environment: Hadoop:  hdp/2.5.6.0-40
> Flink: 2.7.2
>Reporter: John
>Priority: Major
>
> When running a single flink job on YARN, some of the taskmanger containers 
> reach the FINISHED state before others.  It appears that, after receiving 
> final execution state FINISHED from a taskmanager, jobmanager is waiting ~68 
> seconds and then freeing the associated slot in the taskmanager.  After and 
> additional 60 seconds, jobmanager is stopping the same taskmanger because 
> TaskExecutor exceeded the idle timeout.
> Meanwhile, other taskmangers are still working to complete the job.  Within 
> 10 seconds after the taskmanger container above is stopped, the remaining 
> task managers receive an exception due to loss of connection to the stopped 
> taskmanager.  These exceptions result job failure.
>  
> Relevant logs:
> 2019-04-03 13:49:00,013 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Registering TaskManager with ResourceID 
> container_1553017480503_0158_01_38 
> (akka.tcp://flink@hadoop4:42745/user/taskmanager_0) at ResourceManager
> 2019-04-03 13:49:05,900 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Registering TaskManager with ResourceID 
> container_1553017480503_0158_01_59 
> (akka.tcp://flink@hadoop9:55042/user/taskmanager_0) at ResourceManager
>  
>  
> 2019-04-03 13:48:51,132 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Received new container: 
> container_1553017480503_0158_01_77 - Remaining pending container 
> requests: 6
> 2019-04-03 13:48:52,862 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner    
>               -     
> -Dlog.file=/hadoop/yarn/log/application_1553017480503_0158/container_1553017480503_0158_01_77/taskmanager.log
> 2019-04-03 13:48:57,490 INFO  
> org.apache.flink.runtime.io.network.netty.NettyServer         - Successful 
> initialization (took 202 ms). Listening on SocketAddress 
> /192.168.230.69:40140.
> 2019-04-03 13:49:12,575 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Registering TaskManager with ResourceID 
> container_1553017480503_0158_01_77 
> (akka.tcp://flink@hadoop9:51525/user/taskmanager_0) at ResourceManager
> 2019-04-03 13:49:12,631 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Allocated 
> slot for AllocationID\{42fed3e5a136240c23cc7b394e3249e9}.
> 2019-04-03 14:58:15,188 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - 
> Un-registering task and sending final execution state FINISHED to JobManager 
> for task DataSink 
> (com.anovadata.alexflinklib.sinks.bucketing.BucketingOutputFormat@26874f2c) 
> a4b5fb32830d4561147b2714828109e2.
> 2019-04-03 14:59:23,049 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing 
> idle slot [AllocationID\{42fed3e5a136240c23cc7b394e3249e9}].
> 2019-04-03 14:59:23,058 INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot 
> TaskSlot(index:0, state:ACTIVE, resource profile: 
> ResourceProfile\{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647, 
> directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, 
> networkMemoryInMB=2147483647}, allocationId: 
> AllocationID\{42fed3e5a136240c23cc7b394e3249e9}, jobId: 
> a6c4e367698c15cdf168d19a89faff1d).
> 2019-04-03 15:00:02,641 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Stopping container container_1553017480503_0158_01_77.
> 2019-04-03 15:00:02,646 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Closing TaskExecutor connection 
> container_1553017480503_0158_01_77 because: TaskExecutor exceeded the 
> idle timeout.
>  
>  
> 2019-04-03 13:48:48,902 INFO  org.apache.flink.yarn.YarnTaskExecut

[jira] [Updated] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned

2019-04-07 Thread Guowei Ma (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-12113:
--
Attachment: image-2019-04-07-21-52-37-264.png

> User code passing to fromCollection(Iterator, Class) not cleaned
> 
>
> Key: FLINK-12113
> URL: https://issues.apache.org/jira/browse/FLINK-12113
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
>Reporter: yankai zhang
>Priority: Major
> Attachments: image-2019-04-07-21-52-37-264.png
>
>
>  
> {code:java}
> interface IS extends Iterator, Serializable { }
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }, Object.class);
> {code}
> Code piece above throws exception:
> {code:java}
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
>   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> {code}
> And my workaround is wrapping clean around iterator instance, like this:
>  
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(env.clean(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }), Object.class);
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned

2019-04-07 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16811892#comment-16811892
 ] 

Guowei Ma commented on FLINK-12113:
---

I can't reproduce your problem. 

 

!image-2019-04-07-21-52-37-264.png!

> User code passing to fromCollection(Iterator, Class) not cleaned
> 
>
> Key: FLINK-12113
> URL: https://issues.apache.org/jira/browse/FLINK-12113
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
>Reporter: yankai zhang
>Priority: Major
> Attachments: image-2019-04-07-21-52-37-264.png
>
>
>  
> {code:java}
> interface IS extends Iterator, Serializable { }
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }, Object.class);
> {code}
> Code piece above throws exception:
> {code:java}
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
>   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> {code}
> And my workaround is wrapping clean around iterator instance, like this:
>  
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(env.clean(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }), Object.class);
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned

2019-04-08 Thread Guowei Ma (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-12113:
--
Attachment: image-2019-04-08-23-19-27-359.png

> User code passing to fromCollection(Iterator, Class) not cleaned
> 
>
> Key: FLINK-12113
> URL: https://issues.apache.org/jira/browse/FLINK-12113
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
>Reporter: yankai zhang
>Priority: Major
> Attachments: image-2019-04-07-21-52-37-264.png, 
> image-2019-04-08-23-19-27-359.png
>
>
>  
> {code:java}
> interface IS extends Iterator, Serializable { }
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }, Object.class);
> {code}
> Code piece above throws exception:
> {code:java}
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
>   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> {code}
> And my workaround is wrapping clean around iterator instance, like this:
>  
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(env.clean(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }), Object.class);
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned

2019-04-08 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812502#comment-16812502
 ] 

Guowei Ma commented on FLINK-12113:
---

hi, [~vision57] I change my code and don't use anonymous class. But it does not 
produce the problem.

I am using JDK8.

!image-2019-04-08-23-19-27-359.png!

> User code passing to fromCollection(Iterator, Class) not cleaned
> 
>
> Key: FLINK-12113
> URL: https://issues.apache.org/jira/browse/FLINK-12113
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
>Reporter: yankai zhang
>Priority: Major
> Attachments: image-2019-04-07-21-52-37-264.png, 
> image-2019-04-08-23-19-27-359.png
>
>
>  
> {code:java}
> interface IS extends Iterator, Serializable { }
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }, Object.class);
> {code}
> Code piece above throws exception:
> {code:java}
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
>   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> {code}
> And my workaround is wrapping clean around iterator instance, like this:
>  
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(env.clean(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }), Object.class);
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned

2019-04-08 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813050#comment-16813050
 ] 

Guowei Ma commented on FLINK-12113:
---

Thanks for your explanation. [~vision57].

DataStream.flatmap/DataStream.process do the clean but 
StreamExecutionEnvironment does not do the clean.

I think we could learn from DataStream api. [~aljoscha] Could you give some 
advice?

> User code passing to fromCollection(Iterator, Class) not cleaned
> 
>
> Key: FLINK-12113
> URL: https://issues.apache.org/jira/browse/FLINK-12113
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
>Reporter: yankai zhang
>Priority: Major
> Attachments: image-2019-04-07-21-52-37-264.png, 
> image-2019-04-08-23-19-27-359.png
>
>
>  
> {code:java}
> interface IS extends Iterator, Serializable { }
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }, Object.class);
> {code}
> Code piece above throws exception:
> {code:java}
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
>   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> {code}
> And my workaround is wrapping clean around iterator instance, like this:
>  
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(env.clean(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }), Object.class);
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned

2019-04-08 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813050#comment-16813050
 ] 

Guowei Ma edited comment on FLINK-12113 at 4/9/19 6:58 AM:
---

Thanks for your explanation. [~vision57]. I think StreamExecutionEnvironment's 
other functions might have the same problem.

DataStream.flatmap/DataStream.process do the clean but 
StreamExecutionEnvironment does not do the clean.

I think we could learn from DataStream api. [~aljoscha] Could you give some 
advice?


was (Author: maguowei):
Thanks for your explanation. [~vision57].

DataStream.flatmap/DataStream.process do the clean but 
StreamExecutionEnvironment does not do the clean.

I think we could learn from DataStream api. [~aljoscha] Could you give some 
advice?

> User code passing to fromCollection(Iterator, Class) not cleaned
> 
>
> Key: FLINK-12113
> URL: https://issues.apache.org/jira/browse/FLINK-12113
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
>Reporter: yankai zhang
>Priority: Major
> Attachments: image-2019-04-07-21-52-37-264.png, 
> image-2019-04-08-23-19-27-359.png
>
>
>  
> {code:java}
> interface IS extends Iterator, Serializable { }
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }, Object.class);
> {code}
> Code piece above throws exception:
> {code:java}
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
>   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> {code}
> And my workaround is wrapping clean around iterator instance, like this:
>  
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(env.clean(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }), Object.class);
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned

2019-04-11 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815286#comment-16815286
 ] 

Guowei Ma commented on FLINK-12113:
---

[~vision57] do you want to fix this ? if  I you would not have time I think I 
could fix it. What do you think?

> User code passing to fromCollection(Iterator, Class) not cleaned
> 
>
> Key: FLINK-12113
> URL: https://issues.apache.org/jira/browse/FLINK-12113
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
>Reporter: yankai zhang
>Priority: Major
> Attachments: image-2019-04-07-21-52-37-264.png, 
> image-2019-04-08-23-19-27-359.png
>
>
>  
> {code:java}
> interface IS extends Iterator, Serializable { }
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }, Object.class);
> {code}
> Code piece above throws exception:
> {code:java}
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
>   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> {code}
> And my workaround is wrapping clean around iterator instance, like this:
>  
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(env.clean(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }), Object.class);
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned

2019-04-15 Thread Guowei Ma (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma reassigned FLINK-12113:
-

Assignee: Guowei Ma

> User code passing to fromCollection(Iterator, Class) not cleaned
> 
>
> Key: FLINK-12113
> URL: https://issues.apache.org/jira/browse/FLINK-12113
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
>Reporter: yankai zhang
>Assignee: Guowei Ma
>Priority: Major
> Attachments: image-2019-04-07-21-52-37-264.png, 
> image-2019-04-08-23-19-27-359.png
>
>
>  
> {code:java}
> interface IS extends Iterator, Serializable { }
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }, Object.class);
> {code}
> Code piece above throws exception:
> {code:java}
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
>   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> {code}
> And my workaround is wrapping clean around iterator instance, like this:
>  
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(env.clean(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }), Object.class);
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-17038) Decouple resolving Type and creating TypeInformation process

2020-04-07 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-17038:
-

 Summary: Decouple resolving Type and creating TypeInformation 
process
 Key: FLINK-17038
 URL: https://issues.apache.org/jira/browse/FLINK-17038
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.10.0
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17038) Decouple resolving Type and creating TypeInformation process

2020-04-07 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-17038:
--
Parent: FLINK-15674
Issue Type: Sub-task  (was: Improvement)

> Decouple resolving Type and creating TypeInformation process
> 
>
> Key: FLINK-17038
> URL: https://issues.apache.org/jira/browse/FLINK-17038
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: Guowei Ma
>Priority: Critical
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17039) Introduce TypeInformationExtractor interface

2020-04-07 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-17039:
-

 Summary: Introduce TypeInformationExtractor interface
 Key: FLINK-17039
 URL: https://issues.apache.org/jira/browse/FLINK-17039
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17038) Decouple resolving Type from creating TypeInformation process

2020-04-07 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-17038:
--
Summary: Decouple resolving Type from creating TypeInformation process  
(was: Decouple resolving Type and creating TypeInformation process)

> Decouple resolving Type from creating TypeInformation process
> -
>
> Key: FLINK-17038
> URL: https://issues.apache.org/jira/browse/FLINK-17038
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: Guowei Ma
>Priority: Critical
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17039) Introduce TypeInformationExtractor

2020-04-07 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-17039:
--
Summary: Introduce TypeInformationExtractor  (was: Introduce 
TypeInformationExtractor interface)

> Introduce TypeInformationExtractor
> --
>
> Key: FLINK-17039
> URL: https://issues.apache.org/jira/browse/FLINK-17039
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Guowei Ma
>Priority: Critical
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17041) Migrate current TypeInformation creation to the TypeInformationExtractor framework

2020-04-07 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-17041:
-

 Summary: Migrate current TypeInformation creation to the 
TypeInformationExtractor framework
 Key: FLINK-17041
 URL: https://issues.apache.org/jira/browse/FLINK-17041
 Project: Flink
  Issue Type: Sub-task
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17039) Introduce TypeInformationExtractor famework

2020-04-07 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-17039:
--
Summary: Introduce TypeInformationExtractor famework  (was: Introduce 
TypeInformationExtractor)

> Introduce TypeInformationExtractor famework
> ---
>
> Key: FLINK-17039
> URL: https://issues.apache.org/jira/browse/FLINK-17039
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Guowei Ma
>Priority: Critical
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15032) Remove the eager serialization from `RemoteRpcInvocation`

2020-01-06 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17008617#comment-17008617
 ] 

Guowei Ma commented on FLINK-15032:
---

Thanks [~trohrmann] and [~SleePy] for your concerns. As [~SleePy] said there is 
no good way to check the size of load without serialization.  For reducing the 
memory pressure we could only compute size of load but not save the 
serialization result. But it is only useful for the specific scenarios for 
example (large parallelism and network is limited).   So I would close this 
issue now.

> Remove the eager serialization from `RemoteRpcInvocation` 
> --
>
> Key: FLINK-15032
> URL: https://issues.apache.org/jira/browse/FLINK-15032
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Guowei Ma
>Priority: Major
>
>     Currently, the constructor of `RemoteRpcInvocation` serializes the 
> `parameterTypes` and `arg` of an RPC call. This could lead to a problem:
> Consider a job that has 1k parallelism and has a 1m union list state. When 
> deploying the 1k tasks, the eager serialization would use 1G memory 
> instantly(Some time the serialization amplifies the memory usage). However, 
> the serialized object is only used when the Akka sends the message.  So we 
> could reduce the memory pressure if we only serialize the object when the 
> message would be sent by the Akka.
> Akka would serialize the message at last and all the XXXGateway related class 
> could be visible by the RPC level. Because of that, I think the eager 
> serialization in the constructor of `RemoteRpcInvocation` could be avoided. I 
> also do a simple test and find this could reduce the time cost of the RPC 
> call. The 1k number of  RPC calls with 1m `String` message:  The current 
> version costs around 2700ms; the Nonserialization version cost about 37ms.
>  
> In summary, this Jira proposes to remove the eager serialization at the 
> constructor of `RemoteRpcInvocation`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15032) Remove the eager serialization from `RemoteRpcInvocation`

2020-01-06 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma closed FLINK-15032.
-
Resolution: Won't Fix

> Remove the eager serialization from `RemoteRpcInvocation` 
> --
>
> Key: FLINK-15032
> URL: https://issues.apache.org/jira/browse/FLINK-15032
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Guowei Ma
>Priority: Major
>
>     Currently, the constructor of `RemoteRpcInvocation` serializes the 
> `parameterTypes` and `arg` of an RPC call. This could lead to a problem:
> Consider a job that has 1k parallelism and has a 1m union list state. When 
> deploying the 1k tasks, the eager serialization would use 1G memory 
> instantly(Some time the serialization amplifies the memory usage). However, 
> the serialized object is only used when the Akka sends the message.  So we 
> could reduce the memory pressure if we only serialize the object when the 
> message would be sent by the Akka.
> Akka would serialize the message at last and all the XXXGateway related class 
> could be visible by the RPC level. Because of that, I think the eager 
> serialization in the constructor of `RemoteRpcInvocation` could be avoided. I 
> also do a simple test and find this could reduce the time cost of the RPC 
> call. The 1k number of  RPC calls with 1m `String` message:  The current 
> version costs around 2700ms; the Nonserialization version cost about 37ms.
>  
> In summary, this Jira proposes to remove the eager serialization at the 
> constructor of `RemoteRpcInvocation`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-2646) User functions should be able to differentiate between successful close and erroneous close

2020-01-20 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17019866#comment-17019866
 ] 

Guowei Ma commented on FLINK-2646:
--

I want to understand what specific scenarios the jira wants to resolve. After 
reading some mail/doc[1][2] and some scenarios I encountered. I summary it as 
below(correct me if I miss something):

Currently, the scenarios that _close_ interface could not be satisfied
 # The user wants to accelerate the failover process. Currently, some users 
implement the _close_ interface to flush the memory data to the external system 
because the job would deal with the bounded stream sometimes. However, it slows 
down the failover process because when canceling the task the Flink would also 
call the close method which might do some heavy i/o processing.
 # The user wants the exactly once semantics for the bounded stream. If the 
user implements the _close_ interface which commits the results some results 
would be committed multi-times because when failover occurs some messages would 
be replayed. If the user implements the _close_ interface which does not commit 
the result some results would be lost.

Because many users implement the _close_ interface to release the resources so 
we could not break this semantics that whenever a task is terminated the 
_close_ method should always be called.

If Flink provides an interface such as `_closeAtEndofStream_` I think we could 
resolve the second problem in most situations. However I think this also needs 
some other efforts such as dedupe the commit at the _close_ or using the 
_finalizeOnMaster_ callback.

 

[1] 
https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E

[2] 
https://docs.google.com/document/d/1SXfhmeiJfWqi2ITYgCgAoSDUv5PNq1T8Zu01nR5Ebog/edit#

> User functions should be able to differentiate between successful close and 
> erroneous close
> ---
>
> Key: FLINK-2646
> URL: https://issues.apache.org/jira/browse/FLINK-2646
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: usability
>
> Right now, the {{close()}} method of rich functions is invoked in case of 
> proper completion, and in case of canceling in case of error (to allow for 
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether 
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By 
> default, this method calls {{close()}}. The runtime is the changed to call 
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in 
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API 
> breaking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15670) Provide a Kafka Source/Sink pair that aligns Kafka's Partitions and Flink's KeyGroups

2020-01-20 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17019887#comment-17019887
 ] 

Guowei Ma commented on FLINK-15670:
---

>>>I am wondering how do we expose this feature to the users?

Maybe we could do something automatically when generating the StreamGraph or 
JobGraph.

For example if `_blockingConnectionsBetweenChains_ ` is true we could add the 
Kafka source and Kafka sink automatically between the two shuffle operators 
automatically. The topic could use the operator's uid.

> Provide a Kafka Source/Sink pair that aligns Kafka's Partitions and Flink's 
> KeyGroups
> -
>
> Key: FLINK-15670
> URL: https://issues.apache.org/jira/browse/FLINK-15670
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream, Connectors / Kafka
>Reporter: Stephan Ewen
>Priority: Major
>  Labels: usability
> Fix For: 1.11.0
>
>
> This Source/Sink pair would serve two purposes:
> 1. You can read topics that are already partitioned by key and process them 
> without partitioning them again (avoid shuffles)
> 2. You can use this to shuffle through Kafka, thereby decomposing the job 
> into smaller jobs and independent pipelined regions that fail over 
> independently.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-2646) User functions should be able to differentiate between successful close and erroneous close

2020-01-24 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17022873#comment-17022873
 ] 

Guowei Ma commented on FLINK-2646:
--

Thanks for your detailed comments.  I think it is very cool unification the 
sink for the bounded and unbounded job.

What I understand about the Sinks and Committing as follow, correct me if I am 
wrong
 # EndOfInput(TaskFinish) is very similar to trigger a checkpoint. One 
difference is this checkpoint is not triggered by the _CheckpointCoordinator_. 
So maybe Flink could notify the UDF to snapshot state when receiving the 
_EndOfInput_
 # JobFinish is very similar to the checkpoint complete. So maybe Flink could 
also notify the UDF the _notifyCheckpointComplete_ when a job is finished.

So the sink does not need to assume that the input is bounded or unbounded. It 
only depends on the checkpoint mechanism to achieve exactly-once on its side.

 

I have some little questions and thoughts. I want to be on the same page with 
you guys through thinking about these problems.
 # When does the Flink notify the task CheckpointComplete if a job has both 
bounded and unbounded source? Because the job could not finish the finished 
tasks of the job could not be notified of the _JobFinished_ An option is that 
Flink needs to support triggering the checkpoint for a job that has the 
finished tasks and notifying the completion of the checkpoint.
 # When a slot could be released for the other task to use? If I understand 
correctly all the resources(included managed memory) should be released in the 
_dispose_ stage in the new design. So a task could not release any resource 
even after the task reports it is finished to JM if it needs to be notified of 
the _JobFinish_ As far as I know the JM could release slot when all the tasks 
in it are finished. This might lead to inconsistency. I am not pretty sure 
there are some specific cases for this. But I think it might be some potential 
risks in theory.
 # Flink needs to guarantee that the JobFinish event is received by all the 
tasks. Flink could not receive the acknowledgment of the JobFinish event from 
the task. There could be two situations. (The drain might have the same claim 
in some situations.)
 ## JobFinshed request/response is lost. Retrying JobFinished notification 
might resolve this problem.
 ## The task failed when handling the JobFinished event. So Flink could not 
receive the acknowledge. Flink could use the normal Failover Strategy and 
restart the task with the state that is snapshotted at the moment of the 
_endOfInput_. This could trigger another round _endOfInput_ and _JobFinished_. 
But I think this only works for the source that supports the checkpoint. (JM 
failover when notifying the JobFinish Event to the task. The new JM should 
notify the JobFinshed evet to all the tasks.)

> User functions should be able to differentiate between successful close and 
> erroneous close
> ---
>
> Key: FLINK-2646
> URL: https://issues.apache.org/jira/browse/FLINK-2646
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: usability
>
> Right now, the {{close()}} method of rich functions is invoked in case of 
> proper completion, and in case of canceling in case of error (to allow for 
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether 
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By 
> default, this method calls {{close()}}. The runtime is the changed to call 
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in 
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API 
> breaking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable

2020-01-24 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17023396#comment-17023396
 ] 

Guowei Ma commented on FLINK-10819:
---

Hi, [~trohrmann]

I could replace the scala version Deadline with Flink version Deadline. But I 
think this is not the root cause after I analyze the log 
https://api.travis-ci.org/v3/job/508500560/log.txt.

However, I could not reproduce this bug in 1.9 version. Currently, no user 
reports this situation now. Maybe there is some change which fixes this bug or 
decreases the possibility.

I want to know what the usual way approach for this not reproducing bug.

Close it and reopen it when it happens again. Or keep it open.

 

thanks

> JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is 
> unstable
> -
>
> Key: FLINK-10819
> URL: https://issues.apache.org/jira/browse/FLINK-10819
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: Guowei Ma
>Priority: Critical
>  Labels: test-stability
> Attachments: image-2019-07-19-16-59-10-178.png, 
> image-2019-07-19-17-00-17-194.png, image-2019-07-19-17-01-19-758.png
>
>
> Found the following error in the process of CI:
> Results :
> Tests in error: 
>  JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » 
> IllegalArgument
> Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29
> 18:40:55.828 [INFO] 
> 
> 18:40:55.829 [INFO] BUILD FAILURE
> 18:40:55.829 [INFO] 
> 
> 18:40:55.830 [INFO] Total time: 30:19 min
> 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00
> 18:40:56.294 [INFO] Final Memory: 92M/678M
> 18:40:56.294 [INFO] 
> 
> 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be 
> activated because it does not exist.
> 18:40:56.295 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test 
> (integration-tests) on project flink-tests_2.11: There are test failures.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] Please refer to 
> /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports 
> for the individual test results.
> 18:40:56.295 [ERROR] -> [Help 1]
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven 
> with the -e switch.
> 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug 
> logging.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] For more information about the errors and possible 
> solutions, please read the following articles:
> 18:40:56.295 [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> MVN exited with EXIT CODE: 1.
> Trying to KILL watchdog (11329).
> ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog
> PRODUCED build artifacts.
> But after the rerun, the error disappeared. 
> Currently,no specific reasons are found, and will continue to pay attention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-2646) User functions should be able to differentiate between successful close and erroneous close

2020-01-26 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17023777#comment-17023777
 ] 

Guowei Ma commented on FLINK-2646:
--

Thanks [~kkl0u] for your detailed comment on the difference between the CP and 
EOI. That is also what I was thinking. If there is no difference between them 
at all we might not need to expose the EOI interface to the DataStream user for 
this scenario. For example, the user could finalize in-progress file in the 
dispose stage. (Sometime user could cancel the job and use the sink result). Of 
course, this could introduce some of the cost for only a canceling job scenario 
and I think the cost might be the same in the failover scenarios. Do you have 
some concerns about this? Enlighten me please if I missing something.

> User functions should be able to differentiate between successful close and 
> erroneous close
> ---
>
> Key: FLINK-2646
> URL: https://issues.apache.org/jira/browse/FLINK-2646
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: usability
>
> Right now, the {{close()}} method of rich functions is invoked in case of 
> proper completion, and in case of canceling in case of error (to allow for 
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether 
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By 
> default, this method calls {{close()}}. The runtime is the changed to call 
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in 
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API 
> breaking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-10819) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is unstable

2020-01-26 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma closed FLINK-10819.
-

> JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure is 
> unstable
> -
>
> Key: FLINK-10819
> URL: https://issues.apache.org/jira/browse/FLINK-10819
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: Guowei Ma
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0
>
> Attachments: image-2019-07-19-16-59-10-178.png, 
> image-2019-07-19-17-00-17-194.png, image-2019-07-19-17-01-19-758.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Found the following error in the process of CI:
> Results :
> Tests in error: 
>  JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » 
> IllegalArgument
> Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29
> 18:40:55.828 [INFO] 
> 
> 18:40:55.829 [INFO] BUILD FAILURE
> 18:40:55.829 [INFO] 
> 
> 18:40:55.830 [INFO] Total time: 30:19 min
> 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00
> 18:40:56.294 [INFO] Final Memory: 92M/678M
> 18:40:56.294 [INFO] 
> 
> 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be 
> activated because it does not exist.
> 18:40:56.295 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test 
> (integration-tests) on project flink-tests_2.11: There are test failures.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] Please refer to 
> /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports 
> for the individual test results.
> 18:40:56.295 [ERROR] -> [Help 1]
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven 
> with the -e switch.
> 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug 
> logging.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] For more information about the errors and possible 
> solutions, please read the following articles:
> 18:40:56.295 [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> MVN exited with EXIT CODE: 1.
> Trying to KILL watchdog (11329).
> ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog
> PRODUCED build artifacts.
> But after the rerun, the error disappeared. 
> Currently,no specific reasons are found, and will continue to pay attention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-2646) User functions should be able to differentiate between successful close and erroneous close

2020-01-27 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17024305#comment-17024305
 ] 

Guowei Ma commented on FLINK-2646:
--

Hi, [~kkl0u]

>From Stephan and your reply, I have an assumption: Flink could always trigger 
>a checkpoint at the end of input and notified the checkpoint complete to all 
>the UDF if they need it in whatever situation.

So

1. For the stop-with-checkpoint scenario, we could choose another 
implementation that commits the part of the last in-progress file in the 
dispose stage.

2. For the non-2PC and normal UDFs, they could implement the 
CheckpointedFunction interface.

 Generally, I think the EOI interface is a very clear definition. However, I 
just want to know Flink introduces the EOI interface to the DataStream is for 
clarifying the concept or there are some scenarios that could not be satisfied 
with the current interfaces if the assumption is true.

 

> User functions should be able to differentiate between successful close and 
> erroneous close
> ---
>
> Key: FLINK-2646
> URL: https://issues.apache.org/jira/browse/FLINK-2646
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: usability
>
> Right now, the {{close()}} method of rich functions is invoked in case of 
> proper completion, and in case of canceling in case of error (to allow for 
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether 
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By 
> default, this method calls {{close()}}. The runtime is the changed to call 
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in 
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API 
> breaking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-2646) User functions should be able to differentiate between successful close and erroneous close

2020-01-28 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17024938#comment-17024938
 ] 

Guowei Ma commented on FLINK-2646:
--

Share some thoughts about who should trigger the last checkpoint in the bounded 
stream scenario:

The _CheckpointCoordiantor_ could trigger the last checkpoint. However, I think 
that it has some advantages that the task triggers the last checkpoint in this 
scenario. Of course, it is the responsibility that the _CheckpointCoordinator_ 
should notify the completion of the checkpoint in both two options.
 # There would be fewer RPC calls
 ## The first option (_CheckpointCoodinator_ triggers the last checkpoint)
 ### Task->JM: Task is finished
 ### JM -> _CheckpointCoordinator_ -> Task: Trigger the last checkpoint.
 ### Task -> _CheckpointCoordinator_: Acknowledge the last checkpoint.
 ### _CheckpointCoodinator_ -> Task: Confirm the completion of the last 
checkpoint.
 ##  Second Option (Task triggers the last checkpoint)
 ### Task -> _CheckpointCoordinator_: Task acknowledges the last checkpoint 
when it reaches the end of input.
 ### _CheckpointCoordinator_ -> Task: Confirm the completion of the last 
checkpoint
 ### Task -> JM: Task is finished.
 ##  The second option is at least once less RPC call than the first option. 
Actually I think the step of a.i is not intuitive that _CheckpointCoordinator_ 
sends RPC call to the Finished tasks. If Flink introduces another state such as 
End_Of_Input to the ExecutionState, there would another more RPC call.
 #  Release the resource is simpler. The _CheckpointCoodinator_ should notify 
_Scheduler_ of the completion of the last checkpoint then the _Scheduler_ could 
release the slot properly in the first option. The second option does not need 
to do anything special for releasing the resource properly.
 # Easier to deal with the AM failover scenario. Currently, if a job reaches 
the FINISHED status its’ JobGraph would be removed from the JobGraphStore. So 
when a new AM grants the leadership it would not re-summitted the Job. We 
should only remove the JobGraph when the CheckpointCoordinator confirms the 
notification of last-checkpoint is done in the first option. In the second 
option, we could do nothing.

 In the drain scenario, Flink should trigger the checkpoint from the 
CheckpointCoordiantor.

> User functions should be able to differentiate between successful close and 
> erroneous close
> ---
>
> Key: FLINK-2646
> URL: https://issues.apache.org/jira/browse/FLINK-2646
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: usability
>
> Right now, the {{close()}} method of rich functions is invoked in case of 
> proper completion, and in case of canceling in case of error (to allow for 
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether 
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By 
> default, this method calls {{close()}}. The runtime is the changed to call 
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in 
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API 
> breaking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-2646) User functions should be able to differentiate between successful close and erroneous close

2020-01-28 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17024938#comment-17024938
 ] 

Guowei Ma edited comment on FLINK-2646 at 1/28/20 9:41 AM:
---

Share some thoughts about who should trigger the last checkpoint in the bounded 
stream scenario:

The _CheckpointCoordiantor_ could trigger the last checkpoint. However, I think 
that it has some advantages that the task triggers the last checkpoint in this 
scenario. Of course, it is the responsibility that the _CheckpointCoordinator_ 
should notify the completion of the checkpoint in both two options.
 # There would be fewer RPC calls
 ## The first option (_CheckpointCoodinator_ triggers the last checkpoint)
 ### Task->JM: Task is finished
 ### JM -> _CheckpointCoordinator_ -> Task: Trigger the last checkpoint.
 ### Task -> _CheckpointCoordinator_: Acknowledge the last checkpoint.
 ### _CheckpointCoodinator_ -> Task: Confirm the completion of the last 
checkpoint.
 ##  Second Option (Task triggers the last checkpoint)
 ### Task -> _CheckpointCoordinator_: Task acknowledges the last checkpoint 
when it reaches the end of input.
 ### _CheckpointCoordinator_ -> Task: Confirm the completion of the last 
checkpoint
 ### Task -> JM: Task is finished.
 ##  The second option is at least once less RPC call than the first option. 
Actually I think the step of 1.1.1 is not intuitive that 
_CheckpointCoordinator_ sends RPC call to the Finished tasks. If Flink 
introduces another state such as End_Of_Input to the ExecutionState, there 
would another more RPC call.
 #  Release the resource is simpler. The _CheckpointCoodinator_ should notify 
_Scheduler_ of the completion of the last checkpoint then the _Scheduler_ could 
release the slot properly in the first option. The second option does not need 
to do anything special for releasing the resource properly.
 # Easier to deal with the AM failover scenario. Currently, if a job reaches 
the FINISHED status its’ JobGraph would be removed from the JobGraphStore. So 
when a new AM grants the leadership it would not re-summitted the Job. We 
should only remove the JobGraph when the CheckpointCoordinator confirms the 
notification of last-checkpoint is done in the first option. In the second 
option, we could do nothing.

 In the drain scenario, Flink should trigger the checkpoint from the 
CheckpointCoordiantor.


was (Author: maguowei):
Share some thoughts about who should trigger the last checkpoint in the bounded 
stream scenario:

The _CheckpointCoordiantor_ could trigger the last checkpoint. However, I think 
that it has some advantages that the task triggers the last checkpoint in this 
scenario. Of course, it is the responsibility that the _CheckpointCoordinator_ 
should notify the completion of the checkpoint in both two options.
 # There would be fewer RPC calls
 ## The first option (_CheckpointCoodinator_ triggers the last checkpoint)
 ### Task->JM: Task is finished
 ### JM -> _CheckpointCoordinator_ -> Task: Trigger the last checkpoint.
 ### Task -> _CheckpointCoordinator_: Acknowledge the last checkpoint.
 ### _CheckpointCoodinator_ -> Task: Confirm the completion of the last 
checkpoint.
 ##  Second Option (Task triggers the last checkpoint)
 ### Task -> _CheckpointCoordinator_: Task acknowledges the last checkpoint 
when it reaches the end of input.
 ### _CheckpointCoordinator_ -> Task: Confirm the completion of the last 
checkpoint
 ### Task -> JM: Task is finished.
 ##  The second option is at least once less RPC call than the first option. 
Actually I think the step of a.i is not intuitive that _CheckpointCoordinator_ 
sends RPC call to the Finished tasks. If Flink introduces another state such as 
End_Of_Input to the ExecutionState, there would another more RPC call.
 #  Release the resource is simpler. The _CheckpointCoodinator_ should notify 
_Scheduler_ of the completion of the last checkpoint then the _Scheduler_ could 
release the slot properly in the first option. The second option does not need 
to do anything special for releasing the resource properly.
 # Easier to deal with the AM failover scenario. Currently, if a job reaches 
the FINISHED status its’ JobGraph would be removed from the JobGraphStore. So 
when a new AM grants the leadership it would not re-summitted the Job. We 
should only remove the JobGraph when the CheckpointCoordinator confirms the 
notification of last-checkpoint is done in the first option. In the second 
option, we could do nothing.

 In the drain scenario, Flink should trigger the checkpoint from the 
CheckpointCoordiantor.

> User functions should be able to differentiate between successful close and 
> erroneous close
> ---
>
> Key: FLINK-2646
> URL: https://issues.apache.org/jira/browse/FLINK-2646
> 

[jira] [Created] (FLINK-15784) Show the Watermark metrics of source node in the web UI.

2020-01-28 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-15784:
-

 Summary: Show the Watermark metrics of source node in the web UI.
 Key: FLINK-15784
 URL: https://issues.apache.org/jira/browse/FLINK-15784
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Reporter: Guowei Ma


Currently, the web UI only shows the watermark of the non-source node. It might 
mislead some users that the source could not “receive” the watermark.[1]

So it would be more consistent to show all the watermarks metrics included the 
source node in the web UI.

 

[1][https://lists.apache.org/thread.html/r0f565a425253afea67eb2cca8dc1b233b57f87bb71650681aa9b6731%40%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15784) Show the Watermark metrics of source node in the web UI.

2020-01-28 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-15784:
--
Labels: usability  (was: )

> Show the Watermark metrics of source node in the web UI.
> 
>
> Key: FLINK-15784
> URL: https://issues.apache.org/jira/browse/FLINK-15784
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Guowei Ma
>Priority: Minor
>  Labels: usability
>
> Currently, the web UI only shows the watermark of the non-source node. It 
> might mislead some users that the source could not “receive” the watermark.[1]
> So it would be more consistent to show all the watermarks metrics included 
> the source node in the web UI.
>  
> [1][https://lists.apache.org/thread.html/r0f565a425253afea67eb2cca8dc1b233b57f87bb71650681aa9b6731%40%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15786) Load connector code with separate classloader

2020-01-28 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-15786:
-

 Summary: Load connector code with separate classloader
 Key: FLINK-15786
 URL: https://issues.apache.org/jira/browse/FLINK-15786
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Guowei Ma


Currently, connector code can be seen as part of user code. Usually, users only 
need to add the corresponding connector as a dependency and package it in the 
user jar. This is convenient enough.

However, connectors usually need to interact with external systems and often 
introduce heavy dependencies, there is a high possibility of a class conflict 
of different connectors or the user code of the same job. For example, every 
one or two weeks, we will receive issue reports relevant with connector class 
conflict from our users. The problem can get worse when users want to analyze 
data from different sources and write output to different sinks.

Using separate classloader to load the different connector code could resolve 
the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15786) Load connector code with separate classloader

2020-01-28 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-15786:
--
Labels: usability  (was: )

> Load connector code with separate classloader
> -
>
> Key: FLINK-15786
> URL: https://issues.apache.org/jira/browse/FLINK-15786
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Guowei Ma
>Priority: Major
>  Labels: usability
>
> Currently, connector code can be seen as part of user code. Usually, users 
> only need to add the corresponding connector as a dependency and package it 
> in the user jar. This is convenient enough.
> However, connectors usually need to interact with external systems and often 
> introduce heavy dependencies, there is a high possibility of a class conflict 
> of different connectors or the user code of the same job. For example, every 
> one or two weeks, we will receive issue reports relevant with connector class 
> conflict from our users. The problem can get worse when users want to analyze 
> data from different sources and write output to different sinks.
> Using separate classloader to load the different connector code could resolve 
> the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17584) disableAutoTypeRegistration option does not work with Streaming API, only with Batch

2020-05-14 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17107882#comment-17107882
 ] 

Guowei Ma commented on FLINK-17584:
---

Thanks for opening this issue.

1. I think the autoTypeRegistrationEnabled is only used for the Dataset to 
register the type of output of all operators to Kryo. However, DataStream would 
use the TypeExtractor to create their output/state’s serializer and Kryo just a 
fallback. So I think it is reasonable that DataStream API does not handle 
autoTypeRegistrationEnabled. 

2. 
>[https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java#L90]

This would not register all types. The DataStream's type extract stack would 
not call the `registerType` at all.

3. I think we maybe we could do some improvements for the comments of 
autoTypeRegistrationEnabled.  But when Flink replace DataSet with DataStream 
these issues would be gone.

 

> disableAutoTypeRegistration option does not work with Streaming API, only 
> with Batch
> 
>
> Key: FLINK-17584
> URL: https://issues.apache.org/jira/browse/FLINK-17584
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: Yaron Shani
>Priority: Minor
>
> Hey,
> There is a feature called disableAutoTypeRegistration which is, from my 
> understanding, should disable the auto-loading classes into Kryo. It seems to 
> work on the Batch API, but I don't see any reference into the DataStream 
> code, and it does not work there. Is it by design? If so, I think its better 
> if it would state it clearly. If not, can I suggest a fix? Something like 
> this:
>  
> {code:java}
> @Override
> @PublicEvolving
> public TypeSerializer createSerializer(ExecutionConfig config) {
>if (config.hasGenericTypesDisabled()) {
>   throw new UnsupportedOperationException(
>  "Generic types have been disabled in the ExecutionConfig and type " 
> + this.typeClass.getName() +
>  " is treated as a generic type.");
>}
>if(config.isAutoTypeRegistrationDisabled()) {
>   if(!config.getRegisteredKryoTypes().contains(this.typeClass)) {
>  throw new UnsupportedOperationException(
> "Auto type registration (disableAutoTypeRegistration) have been 
> enabled in the ExecutionConfig and type " + this.typeClass.getName() +
>" is treated as a auto type.");
>   }
>}
>return new KryoSerializer(this.typeClass, config);
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17900) Listener for when Files are committed in StreamingFileSink

2020-05-24 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17115629#comment-17115629
 ] 

Guowei Ma commented on FLINK-17900:
---

Hi, [~maccamlc]
 # Currently, there is no public API that could let you register your own 
listener. 
 # Do you need to be notified every time when a file is committed? or to only 
be notified when a bucket is updated?

thanks.

> Listener for when Files are committed in StreamingFileSink
> --
>
> Key: FLINK-17900
> URL: https://issues.apache.org/jira/browse/FLINK-17900
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.10.0
>Reporter: Matthew McMahon
>Priority: Major
>
> I have a scenario where once a file has been committed to S3 using the 
> StreamingFileSink (I am using Java), that I should notify some downstream 
> services.
> The idea is to produce a message on a Kafka topic for the files as they are 
> finalized.
> I am currently looking into how this might be possible, and considering using 
> reflection and/or checking the S3 bucket before/after the checkpoint is 
> committed.
> Still trying to find a suitable way.
> However I was thinking it would be great if possible to register a listener 
> that can be fired when StreamingFileSink commits a file. 
> Does something like this exist, that I'm not aware of (new to flink) or could 
> it be added?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-12-08 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991190#comment-16991190
 ] 

Guowei Ma commented on FLINK-13993:
---

[~tison] Sorry for the late reply.  I think your method could work.  But the 
method leads that the directory structure of TM&JM is almost the same in every 
mode.

 

[~liyu] yes I would do it.

> Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
> -
>
> Key: FLINK-13993
> URL: https://issues.apache.org/jira/browse/FLINK-13993
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>   Original Estimate: 30h
>  Time Spent: 20m
>  Remaining Estimate: 29h 40m
>
> Currently, Flink has the FlinkUserCodeClassLoader, which is using to load 
> user’s class. However, the user class and the system class are all loaded by 
> the system classloader in the perjob mode. This introduces some conflicts.
> This document[1] gives a proposal that makes the FlinkUserClassLoader load 
> the user class in perjob mode. (disscuss with Till[2])
>  
> [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7]
> [2] 
> [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-12-08 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma closed FLINK-13993.
-
Resolution: Fixed

> Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
> -
>
> Key: FLINK-13993
> URL: https://issues.apache.org/jira/browse/FLINK-13993
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>   Original Estimate: 30h
>  Time Spent: 20m
>  Remaining Estimate: 29h 40m
>
> Currently, Flink has the FlinkUserCodeClassLoader, which is using to load 
> user’s class. However, the user class and the system class are all loaded by 
> the system classloader in the perjob mode. This introduces some conflicts.
> This document[1] gives a proposal that makes the FlinkUserClassLoader load 
> the user class in perjob mode. (disscuss with Till[2])
>  
> [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7]
> [2] 
> [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-13993) Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-12-08 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991190#comment-16991190
 ] 

Guowei Ma edited comment on FLINK-13993 at 12/9/19 7:01 AM:


[~tison] Sorry for the late reply.  I think your method could work.  But the 
benefit of my method could lead that the directory structure of TM&JM is almost 
the same in every mode.

 

[~liyu] yes I would do it.


was (Author: maguowei):
[~tison] Sorry for the late reply.  I think your method could work.  But the 
method leads that the directory structure of TM&JM is almost the same in every 
mode.

 

[~liyu] yes I would do it.

> Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
> -
>
> Key: FLINK-13993
> URL: https://issues.apache.org/jira/browse/FLINK-13993
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>   Original Estimate: 30h
>  Time Spent: 20m
>  Remaining Estimate: 29h 40m
>
> Currently, Flink has the FlinkUserCodeClassLoader, which is using to load 
> user’s class. However, the user class and the system class are all loaded by 
> the system classloader in the perjob mode. This introduces some conflicts.
> This document[1] gives a proposal that makes the FlinkUserClassLoader load 
> the user class in perjob mode. (disscuss with Till[2])
>  
> [1][https://docs.google.com/document/d/1fH2Cwrrmps5RxxvVuUdeprruvDNabEaIHPyYps28WM8/edit#heading=h.815t5dodlxh7]
> [2] 
> [https://docs.google.com/document/d/1SUhFt1BmsGMLUYVa72SWLbNrrWzunvcjAlEm8iusvq0/edit]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16262) Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory

2020-02-26 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045377#comment-17045377
 ] 

Guowei Ma commented on FLINK-16262:
---

Hi, [~jkreileder] could you provide the version of Kafka connector?

> Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib 
> directory
> ---
>
> Key: FLINK-16262
> URL: https://issues.apache.org/jira/browse/FLINK-16262
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Deployment / Docker
>Affects Versions: 1.10.0
> Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 
> build (nothing changed regarding Kafka and/or class loading).
>Reporter: Jürgen Kreileder
>Priority: Blocker
> Fix For: 1.10.1, 1.11.0
>
>
> We're using Docker images modeled after 
> [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile]
>  (using Java 11)
> When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the 
> taskmanager startup fails with:
> {code:java}
> 2020-02-24 18:25:16.389 INFO  o.a.f.r.t.Task                           Create 
> Case Fixer -> Sink: Findings local-krei04-kba-digitalweb-uc1 (1/1) 
> (72f7764c6f6c614e5355562ed3d27209) switched from RUNNING to FAILED.
> org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>  at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718)
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>  at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:396)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:326)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
>  at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:76)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
>  at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown 
> Source)
>  at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown 
> Source)
>  at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
>  at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source)
>  at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>  at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
> Source)
>  at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown 
> Source){code}
> This looks like a class loading issue: If I copy our JAR to FLINK_LIB_DIR 
> instead of FLINK_USR_LIB_DIR, everything works fine.
> (AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16262) Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory

2020-02-26 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045377#comment-17045377
 ] 

Guowei Ma edited comment on FLINK-16262 at 2/26/20 11:30 AM:
-

Hi, [~jkreileder] could you provide the version of Kafka connector and the 
files in the lib and usrlib directories?


was (Author: maguowei):
Hi, [~jkreileder] could you provide the version of Kafka connector?

> Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib 
> directory
> ---
>
> Key: FLINK-16262
> URL: https://issues.apache.org/jira/browse/FLINK-16262
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Deployment / Docker
>Affects Versions: 1.10.0
> Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 
> build (nothing changed regarding Kafka and/or class loading).
>Reporter: Jürgen Kreileder
>Priority: Blocker
> Fix For: 1.10.1, 1.11.0
>
>
> We're using Docker images modeled after 
> [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile]
>  (using Java 11)
> When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the 
> taskmanager startup fails with:
> {code:java}
> 2020-02-24 18:25:16.389 INFO  o.a.f.r.t.Task                           Create 
> Case Fixer -> Sink: Findings local-krei04-kba-digitalweb-uc1 (1/1) 
> (72f7764c6f6c614e5355562ed3d27209) switched from RUNNING to FAILED.
> org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>  at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718)
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>  at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:396)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:326)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
>  at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:76)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
>  at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown 
> Source)
>  at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown 
> Source)
>  at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
>  at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source)
>  at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>  at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
> Source)
>  at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown 
> Source){code}
> This looks like a class loading issue: If I copy our JAR to FLINK_LIB_DIR 
> instead of FLINK_USR_LIB_DIR, everything works fine.
> (AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16262) Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory

2020-02-26 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045377#comment-17045377
 ] 

Guowei Ma edited comment on FLINK-16262 at 2/26/20 11:45 AM:
-

Hi, [~jkreileder] could you provide the version of Kafka connector and list the 
files in the lib and usrlib directories in the container?


was (Author: maguowei):
Hi, [~jkreileder] could you provide the version of Kafka connector and the 
files in the lib and usrlib directories?

> Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib 
> directory
> ---
>
> Key: FLINK-16262
> URL: https://issues.apache.org/jira/browse/FLINK-16262
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Deployment / Docker
>Affects Versions: 1.10.0
> Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 
> build (nothing changed regarding Kafka and/or class loading).
>Reporter: Jürgen Kreileder
>Priority: Blocker
> Fix For: 1.10.1, 1.11.0
>
>
> We're using Docker images modeled after 
> [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile]
>  (using Java 11)
> When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the 
> taskmanager startup fails with:
> {code:java}
> 2020-02-24 18:25:16.389 INFO  o.a.f.r.t.Task                           Create 
> Case Fixer -> Sink: Findings local-krei04-kba-digitalweb-uc1 (1/1) 
> (72f7764c6f6c614e5355562ed3d27209) switched from RUNNING to FAILED.
> org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>  at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718)
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>  at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:396)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:326)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
>  at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:76)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
>  at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown 
> Source)
>  at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown 
> Source)
>  at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
>  at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source)
>  at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>  at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
> Source)
>  at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown 
> Source){code}
> This looks like a class loading issue: If I copy our JAR to FLINK_LIB_DIR 
> instead of FLINK_USR_LIB_DIR, everything works fine.
> (AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16262) Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory

2020-02-26 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045377#comment-17045377
 ] 

Guowei Ma edited comment on FLINK-16262 at 2/26/20 11:59 AM:
-

Hi, [~jkreileder] could you provide more information? such as
 # The version of Kafka connector
 #  List the files in the lib and usrlib directories in the container.
 #  Do you change the docker-entrypoint.sh ?


was (Author: maguowei):
Hi, [~jkreileder] could you provide the version of Kafka connector and list the 
files in the lib and usrlib directories in the container?

> Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib 
> directory
> ---
>
> Key: FLINK-16262
> URL: https://issues.apache.org/jira/browse/FLINK-16262
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Deployment / Docker
>Affects Versions: 1.10.0
> Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 
> build (nothing changed regarding Kafka and/or class loading).
>Reporter: Jürgen Kreileder
>Priority: Blocker
> Fix For: 1.10.1, 1.11.0
>
>
> We're using Docker images modeled after 
> [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile]
>  (using Java 11)
> When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the 
> taskmanager startup fails with:
> {code:java}
> 2020-02-24 18:25:16.389 INFO  o.a.f.r.t.Task                           Create 
> Case Fixer -> Sink: Findings local-krei04-kba-digitalweb-uc1 (1/1) 
> (72f7764c6f6c614e5355562ed3d27209) switched from RUNNING to FAILED.
> org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>  at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718)
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>  at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:396)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:326)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
>  at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:76)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
>  at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown 
> Source)
>  at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown 
> Source)
>  at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
>  at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source)
>  at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>  at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
> Source)
>  at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown 
> Source){code}
> This looks like a class loading issue: If I copy our JAR to FLINK_LIB_DIR 
> instead of FLINK_USR_LIB_DIR, everything works fine.
> (AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-2336) ArrayIndexOufOBoundsException in TypeExtractor when mapping

2020-02-26 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-2336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045473#comment-17045473
 ] 

Guowei Ma commented on FLINK-2336:
--

OK I could follow this.

> ArrayIndexOufOBoundsException in TypeExtractor when mapping
> ---
>
> Key: FLINK-2336
> URL: https://issues.apache.org/jira/browse/FLINK-2336
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 0.10.0
>Reporter: William Saar
>Priority: Critical
>  Labels: usability
> Fix For: 1.10.1, 1.11.0
>
>
> The line that causes this is
> DataStream outputStream = insideIterationStream.filter(outputFilter).map(m 
> -> m.outputMessage);
> Problem occurs both when compiled using Javac and Eclipse's JDT compiler (in 
> an environment where simple lambda type tests work)
> Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
>   at java.util.ArrayList.elementData(Unknown Source)
>   at java.util.ArrayList.get(Unknown Source)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:553)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:468)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:370)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:254)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:91)
>   at 
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:605)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-2336) ArrayIndexOufOBoundsException in TypeExtractor when mapping

2020-02-26 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-2336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045473#comment-17045473
 ] 

Guowei Ma edited comment on FLINK-2336 at 2/26/20 12:03 PM:


OK I could follow this. [~sewen]


was (Author: maguowei):
OK I could follow this.

> ArrayIndexOufOBoundsException in TypeExtractor when mapping
> ---
>
> Key: FLINK-2336
> URL: https://issues.apache.org/jira/browse/FLINK-2336
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 0.10.0
>Reporter: William Saar
>Priority: Critical
>  Labels: usability
> Fix For: 1.10.1, 1.11.0
>
>
> The line that causes this is
> DataStream outputStream = insideIterationStream.filter(outputFilter).map(m 
> -> m.outputMessage);
> Problem occurs both when compiled using Javac and Eclipse's JDT compiler (in 
> an environment where simple lambda type tests work)
> Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
>   at java.util.ArrayList.elementData(Unknown Source)
>   at java.util.ArrayList.get(Unknown Source)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:553)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:468)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:370)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:254)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:91)
>   at 
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:605)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-2336) ArrayIndexOufOBoundsException in TypeExtractor when mapping

2020-02-26 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-2336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045593#comment-17045593
 ] 

Guowei Ma commented on FLINK-2336:
--

The reason why the first test throws IndexOutOfBoundsException is that when 
using the input to infer the output type, the type hierarchy array length is 0. 
In this case, null should be returned in advance(TypeExtractor.java:956).

The reason why the next two tests did not throw an exception is that the types 
of these two lambdas have been erased, which is different from the branch the 
first test took.

A simple modification is to add a line in TypeExtractor.java:956 to determine 
if the length of inputTypeHierarchy is 0 and return null in advance.

Theoretically speaking, the first type of the lambda is not type erased, so we 
could infer the output through Input. In other words, we can build a custom 
ParameterizedType with rawtype = MapFunction.class and acutalTypeArguments as 
the lambda input and output. However, this requires the introduction of a 
custom ParameterizedType, and I feel we can wait until the TypeExtractor is 
refactored.

 

What do you think [~sewen]

> ArrayIndexOufOBoundsException in TypeExtractor when mapping
> ---
>
> Key: FLINK-2336
> URL: https://issues.apache.org/jira/browse/FLINK-2336
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 0.10.0
>Reporter: William Saar
>Priority: Critical
>  Labels: usability
> Fix For: 1.10.1, 1.11.0
>
>
> The line that causes this is
> DataStream outputStream = insideIterationStream.filter(outputFilter).map(m 
> -> m.outputMessage);
> Problem occurs both when compiled using Javac and Eclipse's JDT compiler (in 
> an environment where simple lambda type tests work)
> Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
>   at java.util.ArrayList.elementData(Unknown Source)
>   at java.util.ArrayList.get(Unknown Source)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:553)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:468)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:370)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:254)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:91)
>   at 
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:605)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16262) Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory

2020-02-26 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045695#comment-17045695
 ] 

Guowei Ma commented on FLINK-16262:
---

1. The direct cause of this problem should be that FlinkKafkaProducer has been 
using system classloader instead of user classloader in _abortTransactions_ in 
whatever situation.
2. FlinkKafkaProducer: line 1098 
_transactionalIds.parallelStream().forEach(.)_. This would use a static 
thread pool and the context Classloader of this thread pool should be the 
system.

I think this is a FlinkKafkaProducer bug that should be fixed.  I would contact 
the committer to double-check it tomorrow.

Thanks for your reporting.

> Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib 
> directory
> ---
>
> Key: FLINK-16262
> URL: https://issues.apache.org/jira/browse/FLINK-16262
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Deployment / Docker
>Affects Versions: 1.10.0
> Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 
> build (nothing changed regarding Kafka and/or class loading).
>Reporter: Jürgen Kreileder
>Priority: Blocker
> Fix For: 1.10.1, 1.11.0
>
>
> We're using Docker images modeled after 
> [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile]
>  (using Java 11)
> When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the 
> taskmanager startup fails with:
> {code:java}
> 2020-02-24 18:25:16.389 INFO  o.a.f.r.t.Task                           Create 
> Case Fixer -> Sink: Findings local-krei04-kba-digitalweb-uc1 (1/1) 
> (72f7764c6f6c614e5355562ed3d27209) switched from RUNNING to FAILED.
> org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>  at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718)
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>  at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:396)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:326)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
>  at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:76)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
>  at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown 
> Source)
>  at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown 
> Source)
>  at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
>  at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source)
>  at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>  at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
> Source)
>  at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown 
> Source){code}
> This looks like a class loading issue: If I copy our JAR to FLINK_LIB_DIR 
> instead of FLINK_USR_LIB_DIR, everything works fine.
> (AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15786) Load connector code with separate classloader

2020-02-27 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17046423#comment-17046423
 ] 

Guowei Ma commented on FLINK-15786:
---

[~fly_in_gis]  Yes. But, there are some differences between FileSystem and 
Connector such as
 # The DataStream user could new a connector object directly
 # The Source/Sink operator would be deserialized at TM sided.
 # The Source operator might construct a user class provided by the user at the 
runtime.

In general, these differences would lead to change the 
FlinkUsercodeClassloader. For example, FlinkUsercodeClassLoader might need some 
extra connector classloaders, which could be used to find the connector and its 
depend on class.

[~pnowojski] I might miss some scenarios, could you give me some of your 
concerns? Thanks you !

> Load connector code with separate classloader
> -
>
> Key: FLINK-15786
> URL: https://issues.apache.org/jira/browse/FLINK-15786
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Guowei Ma
>Priority: Major
>  Labels: usability
>
> Currently, connector code can be seen as part of user code. Usually, users 
> only need to add the corresponding connector as a dependency and package it 
> in the user jar. This is convenient enough.
> However, connectors usually need to interact with external systems and often 
> introduce heavy dependencies, there is a high possibility of a class conflict 
> of different connectors or the user code of the same job. For example, every 
> one or two weeks, we will receive issue reports relevant with connector class 
> conflict from our users. The problem can get worse when users want to analyze 
> data from different sources and write output to different sinks.
> Using separate classloader to load the different connector code could resolve 
> the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15786) Load connector code with separate classloader

2020-02-27 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17047229#comment-17047229
 ] 

Guowei Ma commented on FLINK-15786:
---

Thanks [~pnowojski] for your detailed explanation.

I agree with you that we could change the DataStream API to postpone the 
construction of the source/sink operator. This could avoid the user to use the 
Plugin mechanism directly to create a source/sink object at the client-side for 
separating the connector class loader from the user code classloader.

What I concern about is that what the relationship between the 
FlinkUserClassLoader and the connector Classloader. In theory, there might be 
interdependent scenarios such as FlinkKafkaPartitioner/KafkaConnector. This 
interface provided by the Kafka connector that the user could implement it. The 
implemented class is passed to the Kafka connector and would be constructed at 
runtime. One option for this scenario is to provide a classloader that knows 
both the user implemented FlinkKafkaPartitioner class and the Kafka connector 
class. We could call it XClassloader at first. 

Do you think that we need to resolve this scenario?  A related open question is 
that we might not need to change(or add) the DataStream API for separating the 
connector classloader if we choose to use the XClassloader.

> Load connector code with separate classloader
> -
>
> Key: FLINK-15786
> URL: https://issues.apache.org/jira/browse/FLINK-15786
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Guowei Ma
>Priority: Major
>  Labels: usability
>
> Currently, connector code can be seen as part of user code. Usually, users 
> only need to add the corresponding connector as a dependency and package it 
> in the user jar. This is convenient enough.
> However, connectors usually need to interact with external systems and often 
> introduce heavy dependencies, there is a high possibility of a class conflict 
> of different connectors or the user code of the same job. For example, every 
> one or two weeks, we will receive issue reports relevant with connector class 
> conflict from our users. The problem can get worse when users want to analyze 
> data from different sources and write output to different sinks.
> Using separate classloader to load the different connector code could resolve 
> the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   5   6   7   8   9   10   >