[GitHub] zhangxinyu1 commented on issue #6770: [FLINK-10002] [Webfrontend] WebUI shows jm/tm logs more friendly.

2018-10-24 Thread GitBox
zhangxinyu1 commented on issue #6770:  [FLINK-10002] [Webfrontend] WebUI shows 
jm/tm logs more friendly.
URL: https://github.com/apache/flink/pull/6770#issuecomment-432905856
 
 
   @ifndef-SleePy Thanks for your suggestions. I have modified the code. I 
removed the legacy part and refactor the implementation. 
   About the test, I don't find any tests for the REST API of logs. Should I 
add tests for `JobManagerLogFileHandler`, `JobManagerLogListHandler`, 
`TaskManagerLogListHandler ` and `TaskManagerLogFileHandler `?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10002) WebUI shows logs unfriendly, especially when the amount of logs is large

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663171#comment-16663171
 ] 

ASF GitHub Bot commented on FLINK-10002:


zhangxinyu1 commented on issue #6770:  [FLINK-10002] [Webfrontend] WebUI shows 
jm/tm logs more friendly.
URL: https://github.com/apache/flink/pull/6770#issuecomment-432905856
 
 
   @ifndef-SleePy Thanks for your suggestions. I have modified the code. I 
removed the legacy part and refactor the implementation. 
   About the test, I don't find any tests for the REST API of logs. Should I 
add tests for `JobManagerLogFileHandler`, `JobManagerLogListHandler`, 
`TaskManagerLogListHandler ` and `TaskManagerLogFileHandler `?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> WebUI shows logs unfriendly, especially when the amount of logs is large
> 
>
> Key: FLINK-10002
> URL: https://issues.apache.org/jira/browse/FLINK-10002
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: zhangxinyu
>Assignee: zhangxinyu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-09-10-11-38-07-973.png
>
>
> When a streaming job run for a long time, the amount of logs may be very 
> large. The current WebUI shows all content of logs. It will cost much time to 
> download logs from task managers. and the browser cannot display the logs.
> Therefore, I suggest that Flink uses DailyRollingAppender to split logs by 
> default, and task manager provides an API that can get logs based on a 
> parameter of time interval. In this way WebUI can display logs based on time 
> interval.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10621) BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when running locally

2018-10-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10621:
---
Labels: pull-request-available  (was: )

> BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when 
> running locally
> ---
>
> Key: FLINK-10621
> URL: https://issues.apache.org/jira/browse/FLINK-10621
> Project: Flink
>  Issue Type: Bug
> Environment: OS X, 4 cores, Java 1.8.0_181
>Reporter: Alexey Trenikhin
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Attachments: threaddump-1540049768201.tdump
>
>
> When running locally 
> BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358)
>  hangs forever (waited for 20 minutes, multiple attempts)
> {noformat}
> "main" #1 prio=5 os_prio=31 tid=0x7facaa801000 nid=0x1703 waiting on 
> condition [0x70218000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00074fee2370> (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.clusterframework.BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
>Locked ownable synchronizers:
> - None
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] ifndef-SleePy opened a new pull request #6925: [FLINK-10621][runtime] Fix unstable unit test case of BootstrapToolsTest

2018-10-24 Thread GitBox
ifndef-SleePy opened a new pull request #6925: [FLINK-10621][runtime] Fix 
unstable unit test case of BootstrapToolsTest
URL: https://github.com/apache/flink/pull/6925
 
 
   ## What is the purpose of the change
   
   `BootstrapToolsTest.testConcurrentActorSystemCreation` is unstable. We 
forgot to pass the `ExecutorService` to `CompletableFuture.supplyAsync`, so the 
thread count depends on the machine the case is running on. If the thread count 
is less than 10, the case would hang forever.
   
   ## Brief change log
   
   * Pass the `ExecutorService` to `CompletableFuture.supplyAsync`
   
   ## Verifying this change
   
   * Verifying by unit test
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (on)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10621) BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when running locally

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663160#comment-16663160
 ] 

ASF GitHub Bot commented on FLINK-10621:


ifndef-SleePy opened a new pull request #6925: [FLINK-10621][runtime] Fix 
unstable unit test case of BootstrapToolsTest
URL: https://github.com/apache/flink/pull/6925
 
 
   ## What is the purpose of the change
   
   `BootstrapToolsTest.testConcurrentActorSystemCreation` is unstable. We 
forgot to pass the `ExecutorService` to `CompletableFuture.supplyAsync`, so the 
thread count depends on the machine the case is running on. If the thread count 
is less than 10, the case would hang forever.
   
   ## Brief change log
   
   * Pass the `ExecutorService` to `CompletableFuture.supplyAsync`
   
   ## Verifying this change
   
   * Verifying by unit test
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (on)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when 
> running locally
> ---
>
> Key: FLINK-10621
> URL: https://issues.apache.org/jira/browse/FLINK-10621
> Project: Flink
>  Issue Type: Bug
> Environment: OS X, 4 cores, Java 1.8.0_181
>Reporter: Alexey Trenikhin
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Attachments: threaddump-1540049768201.tdump
>
>
> When running locally 
> BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358)
>  hangs forever (waited for 20 minutes, multiple attempts)
> {noformat}
> "main" #1 prio=5 os_prio=31 tid=0x7facaa801000 nid=0x1703 waiting on 
> condition [0x70218000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00074fee2370> (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.clusterframework.BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at 

[jira] [Assigned] (FLINK-10504) Decide actual parallelism based on available resources

2018-10-24 Thread blues zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

blues zheng reassigned FLINK-10504:
---

Assignee: blues zheng

> Decide actual parallelism based on available resources
> --
>
> Key: FLINK-10504
> URL: https://issues.apache.org/jira/browse/FLINK-10504
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: blues zheng
>Priority: Major
> Fix For: 1.7.0
>
>
> Check if a {{JobGraph}} can be scheduled with the available set of resources 
> (slots). If the minimum parallelism is fulfilled, then distribute the 
> available set of slots across all available slot sharing groups in order to 
> decide on the actual runtime parallelism. In the absence of minimum, target 
> and maximum parallelism, assume minimum = target = maximum = parallelism 
> defined in the {{JobGraph}}.
> Ideally, we make the slot assignment strategy pluggable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9957) Rescale job with respect to available slots

2018-10-24 Thread blues zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

blues zheng reassigned FLINK-9957:
--

Assignee: blues zheng

> Rescale job with respect to available slots
> ---
>
> Key: FLINK-9957
> URL: https://issues.apache.org/jira/browse/FLINK-9957
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Till Rohrmann
>Assignee: blues zheng
>Priority: Major
> Fix For: 1.7.0
>
>
> The {{JobMaster}} which runs in the reactive container mode, needs to react 
> to additionally offered slots in order to make use of newly started 
> {{TaskExecutors}}. This could mean that the {{JobMaster}} tries to scale the 
> job wrt the available number of slots after some grace period.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10621) BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when running locally

2018-10-24 Thread Biao Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663146#comment-16663146
 ] 

Biao Liu commented on FLINK-10621:
--

Thank you for reporting. [~alexeyt820]
It's an unstable case. Would fix it asap.

> BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when 
> running locally
> ---
>
> Key: FLINK-10621
> URL: https://issues.apache.org/jira/browse/FLINK-10621
> Project: Flink
>  Issue Type: Bug
> Environment: OS X, 4 cores, Java 1.8.0_181
>Reporter: Alexey Trenikhin
>Assignee: Biao Liu
>Priority: Major
> Attachments: threaddump-1540049768201.tdump
>
>
> When running locally 
> BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358)
>  hangs forever (waited for 20 minutes, multiple attempts)
> {noformat}
> "main" #1 prio=5 os_prio=31 tid=0x7facaa801000 nid=0x1703 waiting on 
> condition [0x70218000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00074fee2370> (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.clusterframework.BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
>Locked ownable synchronizers:
> - None
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10507) Set target parallelism to maximum when using the standalone job cluster mode

2018-10-24 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10507:


Assignee: vinoyang

> Set target parallelism to maximum when using the standalone job cluster mode
> 
>
> Key: FLINK-10507
> URL: https://issues.apache.org/jira/browse/FLINK-10507
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0
>
>
> In order to enable the reactive container mode, we should set the target 
> value to the maximum parallelism if we run in standalone job cluster mode. 
> That way, we will always use all available resources and scale up if new 
> resources are being added.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10505) Treat fail signal as scheduling event

2018-10-24 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10505:


Assignee: vinoyang

> Treat fail signal as scheduling event
> -
>
> Key: FLINK-10505
> URL: https://issues.apache.org/jira/browse/FLINK-10505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0
>
>
> Instead of simply calling into the {{RestartStrategy}} which restarts the 
> existing {{ExecutionGraph}} with the same parallelism, the 
> {{ExecutionGraphDriver}} should treat a recovery similar to the initial 
> scheduling operation. First, one needs to decide on the new parallelism of 
> the {{ExecutionGraph}} (scale up/scale down) wrt to the available set of 
> resources. Only if the minimum configuration is fulfilled, the potentially 
> rescaled {{ExecutionGraph}} will be restarted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10621) BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when running locally

2018-10-24 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu reassigned FLINK-10621:


Assignee: Biao Liu

> BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when 
> running locally
> ---
>
> Key: FLINK-10621
> URL: https://issues.apache.org/jira/browse/FLINK-10621
> Project: Flink
>  Issue Type: Bug
> Environment: OS X, 4 cores, Java 1.8.0_181
>Reporter: Alexey Trenikhin
>Assignee: Biao Liu
>Priority: Major
> Attachments: threaddump-1540049768201.tdump
>
>
> When running locally 
> BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358)
>  hangs forever (waited for 20 minutes, multiple attempts)
> {noformat}
> "main" #1 prio=5 os_prio=31 tid=0x7facaa801000 nid=0x1703 waiting on 
> condition [0x70218000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00074fee2370> (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.clusterframework.BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
>Locked ownable synchronizers:
> - None
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10503) Periodically check for new resources

2018-10-24 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10503:
---

Assignee: Shimin Yang

> Periodically check for new resources
> 
>
> Key: FLINK-10503
> URL: https://issues.apache.org/jira/browse/FLINK-10503
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> In order to decide when to start scheduling or to rescale, we need to 
> periodically check for new resources (slots).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10501) Obtain resource overview of cluster

2018-10-24 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10501:
---

Assignee: Shimin Yang

> Obtain resource overview of cluster
> ---
>
> Key: FLINK-10501
> URL: https://issues.apache.org/jira/browse/FLINK-10501
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> In order to decide with which parallelism to run, the 
> {{ExecutionGraphDriver}} needs to obtain an overview over all available 
> resources. This includes the resources managed by the {{SlotPool}} as well as 
> not yet allocated resources on the {{ResourceManager}}. This is a temporary 
> workaround until we adapted the slot allocation protocol to support resource 
> declaration. Once this is done, we will only take the {{SlotPool’s}} slots 
> into account.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10656) Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663133#comment-16663133
 ] 

ASF GitHub Bot commented on FLINK-10656:


zhijiangW commented on a change in pull request #6911: [FLINK-10656] Refactor 
org.apache.flink.runtime.io.network.api.reader…
URL: https://github.com/apache/flink/pull/6911#discussion_r228020917
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/IterationReader.java
 ##
 @@ -21,34 +21,21 @@
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
-import java.io.IOException;
-
 /**
- * The basic API for every reader.
+ * Reader for iteration.
  */
-public interface ReaderBase {
-
-   /**
-* Returns whether the reader has consumed the input.
-*/
-   boolean isFinished();
-
-   // 

-   // Task events
-   // 

-
-   void sendTaskEvent(TaskEvent event) throws IOException;
-
-   void registerTaskEventListener(EventListener listener, 
Class eventType);
-
-   // 

-   // Iterations
-   // 

+public interface IterationReader {
 
void setIterativeReader();
 
void startNextSuperstep();
 
boolean hasReachedEndOfSuperstep();
 
+   /**
+* Returns whether the reader has consumed the input.
+*/
+   boolean isFinished();
 
 Review comment:
   Yes, I just confirmed it is actually only used in iteration, so all the 
current methods in `ReaderBase` are for iteration. If we change it to 
`IterationReader`, the corresponding `AbstractReader` should also be changed to 
`AbstractIterationReader`, because the current `AbstractReader` is also only 
for iteration.
   
   But I think it still seems a little strange to let `IterationReader` in the 
upper layer, because the current `Reader` interface which is used for common 
read extends the `IterationReader`. This relationship seems not make sense.
   
   If we have two independent reader base interfaces, and one is for iteration, 
the other is for common case. Then the specific record reader can implement 
both of interfaces for iteration scenario or only implement common reader 
interface for non-iteration scenario.
   
   So it may need reorganize the current relationships if we want to change 
this part. Easily change the name to `IterationReader` may bring literal 
confusing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase
> --
>
> Key: FLINK-10656
> URL: https://issues.apache.org/jira/browse/FLINK-10656
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> The interface of org.apache.flink.runtime.io.network.api.reader.ReaderBase is 
> not very clean, the API in it are called only by iteration and handle event. 
> which is not related the name ReaderBase. And the functionality is 
> independent, so propose to change the name and split the interface to two 
> isolated interface. 
> More details please look at the PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader…

2018-10-24 Thread GitBox
zhijiangW commented on a change in pull request #6911: [FLINK-10656] Refactor 
org.apache.flink.runtime.io.network.api.reader…
URL: https://github.com/apache/flink/pull/6911#discussion_r228020917
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/IterationReader.java
 ##
 @@ -21,34 +21,21 @@
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
-import java.io.IOException;
-
 /**
- * The basic API for every reader.
+ * Reader for iteration.
  */
-public interface ReaderBase {
-
-   /**
-* Returns whether the reader has consumed the input.
-*/
-   boolean isFinished();
-
-   // 

-   // Task events
-   // 

-
-   void sendTaskEvent(TaskEvent event) throws IOException;
-
-   void registerTaskEventListener(EventListener listener, 
Class eventType);
-
-   // 

-   // Iterations
-   // 

+public interface IterationReader {
 
void setIterativeReader();
 
void startNextSuperstep();
 
boolean hasReachedEndOfSuperstep();
 
+   /**
+* Returns whether the reader has consumed the input.
+*/
+   boolean isFinished();
 
 Review comment:
   Yes, I just confirmed it is actually only used in iteration, so all the 
current methods in `ReaderBase` are for iteration. If we change it to 
`IterationReader`, the corresponding `AbstractReader` should also be changed to 
`AbstractIterationReader`, because the current `AbstractReader` is also only 
for iteration.
   
   But I think it still seems a little strange to let `IterationReader` in the 
upper layer, because the current `Reader` interface which is used for common 
read extends the `IterationReader`. This relationship seems not make sense.
   
   If we have two independent reader base interfaces, and one is for iteration, 
the other is for common case. Then the specific record reader can implement 
both of interfaces for iteration scenario or only implement common reader 
interface for non-iteration scenario.
   
   So it may need reorganize the current relationships if we want to change 
this part. Easily change the name to `IterationReader` may bring literal 
confusing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10656) Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663127#comment-16663127
 ] 

ASF GitHub Bot commented on FLINK-10656:


zhijiangW commented on a change in pull request #6911: [FLINK-10656] Refactor 
org.apache.flink.runtime.io.network.api.reader…
URL: https://github.com/apache/flink/pull/6911#discussion_r228020917
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/IterationReader.java
 ##
 @@ -21,34 +21,21 @@
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
-import java.io.IOException;
-
 /**
- * The basic API for every reader.
+ * Reader for iteration.
  */
-public interface ReaderBase {
-
-   /**
-* Returns whether the reader has consumed the input.
-*/
-   boolean isFinished();
-
-   // 

-   // Task events
-   // 

-
-   void sendTaskEvent(TaskEvent event) throws IOException;
-
-   void registerTaskEventListener(EventListener listener, 
Class eventType);
-
-   // 

-   // Iterations
-   // 

+public interface IterationReader {
 
void setIterativeReader();
 
void startNextSuperstep();
 
boolean hasReachedEndOfSuperstep();
 
+   /**
+* Returns whether the reader has consumed the input.
+*/
+   boolean isFinished();
 
 Review comment:
   Yes, I just confirmed it is actually only used in iteration, so all the 
current methods in `ReaderBase` are for iteration. If we change it to 
`IterationReaderBase`, the corresponding `AbstractReader` should also be 
changed to `AbstractIterationReader`, because the current `AbstractReader` is 
also only for iteration.
   
   But I think it still seems a little strange to let `IterationReaderBase` in 
the upper layer, because the current `Reader` interface which is used for 
common read extends the `IterationReaderBase`. This relationship seems not make 
sense.
   
   If we have two independent reader base interfaces, and one is for iteration, 
the other is for common case. Then the specific record reader can implement 
both of interfaces for iteration scenario or only implement common reader 
interface for non-iteration scenario.
   
   So it may need reorganize the current relationships if we want to change 
this part. Easily change the name to `IterationReaderBase` may bring literal 
confusing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase
> --
>
> Key: FLINK-10656
> URL: https://issues.apache.org/jira/browse/FLINK-10656
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> The interface of org.apache.flink.runtime.io.network.api.reader.ReaderBase is 
> not very clean, the API in it are called only by iteration and handle event. 
> which is not related the name ReaderBase. And the functionality is 
> independent, so propose to change the name and split the interface to two 
> isolated interface. 
> More details please look at the PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader…

2018-10-24 Thread GitBox
zhijiangW commented on a change in pull request #6911: [FLINK-10656] Refactor 
org.apache.flink.runtime.io.network.api.reader…
URL: https://github.com/apache/flink/pull/6911#discussion_r228020917
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/IterationReader.java
 ##
 @@ -21,34 +21,21 @@
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
-import java.io.IOException;
-
 /**
- * The basic API for every reader.
+ * Reader for iteration.
  */
-public interface ReaderBase {
-
-   /**
-* Returns whether the reader has consumed the input.
-*/
-   boolean isFinished();
-
-   // 

-   // Task events
-   // 

-
-   void sendTaskEvent(TaskEvent event) throws IOException;
-
-   void registerTaskEventListener(EventListener listener, 
Class eventType);
-
-   // 

-   // Iterations
-   // 

+public interface IterationReader {
 
void setIterativeReader();
 
void startNextSuperstep();
 
boolean hasReachedEndOfSuperstep();
 
+   /**
+* Returns whether the reader has consumed the input.
+*/
+   boolean isFinished();
 
 Review comment:
   Yes, I just confirmed it is actually only used in iteration, so all the 
current methods in `ReaderBase` are for iteration. If we change it to 
`IterationReaderBase`, the corresponding `AbstractReader` should also be 
changed to `AbstractIterationReader`, because the current `AbstractReader` is 
also only for iteration.
   
   But I think it still seems a little strange to let `IterationReaderBase` in 
the upper layer, because the current `Reader` interface which is used for 
common read extends the `IterationReaderBase`. This relationship seems not make 
sense.
   
   If we have two independent reader base interfaces, and one is for iteration, 
the other is for common case. Then the specific record reader can implement 
both of interfaces for iteration scenario or only implement common reader 
interface for non-iteration scenario.
   
   So it may need reorganize the current relationships if we want to change 
this part. Easily change the name to `IterationReaderBase` may bring literal 
confusing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10673) Table API / SQL UIDs not the only one

2018-10-24 Thread winifredtang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

winifredtang reassigned FLINK-10673:


Assignee: winifredtang

> Table API / SQL UIDs not the only one
> -
>
> Key: FLINK-10673
> URL: https://issues.apache.org/jira/browse/FLINK-10673
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.4, 1.6.1
> Environment: flink 1.5.0
>Reporter: Fan weiwen
>Assignee: winifredtang
>Priority: Major
>
> a job  have two sql 
> source is kafka 
> sink is redis  or other sink 
> Asql 
> {code:java}
> //代码占位符
> select 
>   reqIp as factorContenta, 
>   count(*) as eCount, 
>   60 * 60 as expire 
> from 
>   kafka_source 
> where 
>   uri is not null 
> group by 
>   hop( 
>     rowtime, 
>     interval '2' second, 
>     interval '60' minute 
>   ), 
>   reqIp 
> {code}
> Bsql 
> {code:java}
> //代码占位符
> select 
>         uid as factorContentb, 
>   count(*) as eCount, 
>   60 * 60 as expire 
> from 
>   kafka_source 
> where 
>   uri is not null 
> group by 
>   hop( 
>     rowtime, 
>     interval '2' second, 
>     interval '60' minute 
>   ), 
>   uid 
> {code}
> now only start Asql  stop Bsql    sink  have key   656.19.173.34 
> then stop Asql and savepoint hdfs   now  del key 656.19.173.34( if sink is 
> kafka Don't delete) 
> start Bsql  from savepoint 
> you will find sink have key   656.19.173.34 and 6630519 all exist 
> Bsql fetch Asql savepoint result 
> i think sql uids not the only one 
> Who can help me see this problem? 
> my test data is 
> {code:java}
> //代码占位符
> { 
>    "reqIp" : "656.19.173.34", 
>    "rowtime" : 1537950912546, 
>    "uid" : 6630519, 
>    "uri" : "/web" 
> } 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10265) Configure checkpointing behavior for SQL Client

2018-10-24 Thread winifredtang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

winifredtang reassigned FLINK-10265:


Assignee: winifredtang  (was: vinoyang)

> Configure checkpointing behavior for SQL Client
> ---
>
> Key: FLINK-10265
> URL: https://issues.apache.org/jira/browse/FLINK-10265
> Project: Flink
>  Issue Type: New Feature
>  Components: SQL Client
>Reporter: Timo Walther
>Assignee: winifredtang
>Priority: Major
>
> The SQL Client environment file should expose checkpointing related 
> properties:
> - enable checkpointing
> - checkpointing interval
> - mode
> - timeout
> - etc. see {{org.apache.flink.streaming.api.environment.CheckpointConfig}}
> Per-job selection of state backends and their configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10448) VALUES clause is translated into a separate operator per value

2018-10-24 Thread winifredtang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

winifredtang reassigned FLINK-10448:


Assignee: winifredtang  (was: vinoyang)

> VALUES clause is translated into a separate operator per value
> --
>
> Key: FLINK-10448
> URL: https://issues.apache.org/jira/browse/FLINK-10448
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.1
>Reporter: Timo Walther
>Assignee: winifredtang
>Priority: Major
>
> It seems that a SQL VALUES clause uses one operator per value under certain 
> conditions which leads to a complicated job graph. Given that we need to 
> compile code for every operator in the open method and have other overhead as 
> well, this looks inefficient to me.
> For example, the following query creates and unions 6 operators together:
> {code}
> SELECT *
>   FROM (
> VALUES
>   (1, 'Bob', CAST(0 AS BIGINT)),
>   (22, 'Alice', CAST(0 AS BIGINT)),
>   (42, 'Greg', CAST(0 AS BIGINT)),
>   (42, 'Greg', CAST(0 AS BIGINT)),
>   (42, 'Greg', CAST(0 AS BIGINT)),
>   (1, 'Bob', CAST(0 AS BIGINT)))
> AS UserCountTable(user_id, user_name, user_count)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663102#comment-16663102
 ] 

ASF GitHub Bot commented on FLINK-10600:


yanghua opened a new pull request #6924: [FLINK-10600] Provide End-to-end test 
cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924
 
 
   ## What is the purpose of the change
   
   *This pull request provides End-to-end test cases for modern Kafka 
connectors*
   
   
   ## Brief change log
   
 - *Provide End-to-end test cases for modern Kafka connectors*
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
 - *Added end-to-end integration tests for modern kafka connector *
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10600:
---
Labels: pull-request-available  (was: )

> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua opened a new pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-10-24 Thread GitBox
yanghua opened a new pull request #6924: [FLINK-10600] Provide End-to-end test 
cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924
 
 
   ## What is the purpose of the change
   
   *This pull request provides End-to-end test cases for modern Kafka 
connectors*
   
   
   ## Brief change log
   
 - *Provide End-to-end test cases for modern Kafka connectors*
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
 - *Added end-to-end integration tests for modern kafka connector *
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-5832) Support for simple hive UDF

2018-10-24 Thread winifredtang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

winifredtang reassigned FLINK-5832:
---

Assignee: winifredtang  (was: vinoyang)

> Support for simple hive UDF
> ---
>
> Key: FLINK-5832
> URL: https://issues.apache.org/jira/browse/FLINK-5832
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Zhuoluo Yang
>Assignee: winifredtang
>Priority: Major
>  Labels: pull-request-available
>
> The first step of FLINK-5802 is to support simple Hive UDF.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-5802) Flink SQL calling Hive User-Defined Functions

2018-10-24 Thread winifredtang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

winifredtang reassigned FLINK-5802:
---

Assignee: winifredtang

> Flink SQL calling Hive User-Defined Functions
> -
>
> Key: FLINK-5802
> URL: https://issues.apache.org/jira/browse/FLINK-5802
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Zhuoluo Yang
>Assignee: winifredtang
>Priority: Major
>  Labels: features
>
> It's important to call hive udf in Flink SQL. A great many udfs were written 
> in hive since last ten years. 
> It's really important to reuse the hive udfs. This feature will reduce the 
> cost of migration and bring more users to flink.
> Spark SQL has already supported this function.
> https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.0/bk_spark-guide/content/calling-udfs.html
> The Hive UDFs here include both built-in UDFs and customized UDFs. As many 
> business logic had been written in UDFs, the customized UDFs are more 
> important than the built-in UDFs. 
> Generally, there are three kinds of UDFs in Hive: UDF, UDTF and UDAF.
> Here is the document of the Spark SQL: 
> http://spark.apache.org/docs/latest/sql-programming-guide.html#compatibility-with-apache-hive
>  
> Spark code:
> https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
> https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10673) Table API / SQL UIDs not the only one

2018-10-24 Thread Fan weiwen (JIRA)
Fan weiwen created FLINK-10673:
--

 Summary: Table API / SQL UIDs not the only one
 Key: FLINK-10673
 URL: https://issues.apache.org/jira/browse/FLINK-10673
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.1, 1.5.4
 Environment: flink 1.5.0
Reporter: Fan weiwen


a job  have two sql 
source is kafka 
sink is redis  or other sink 

Asql 


{code:java}
//代码占位符
select 
  reqIp as factorContenta, 
  count(*) as eCount, 
  60 * 60 as expire 
from 
  kafka_source 
where 
  uri is not null 
group by 
  hop( 
    rowtime, 
    interval '2' second, 
    interval '60' minute 
  ), 
  reqIp 
{code}



Bsql 


{code:java}
//代码占位符
select 
        uid as factorContentb, 
  count(*) as eCount, 
  60 * 60 as expire 
from 
  kafka_source 
where 
  uri is not null 
group by 
  hop( 
    rowtime, 
    interval '2' second, 
    interval '60' minute 
  ), 
  uid 
{code}



now only start Asql  stop Bsql    sink  have key   656.19.173.34 

then stop Asql and savepoint hdfs   now  del key 656.19.173.34( if sink is 
kafka Don't delete) 

start Bsql  from savepoint 
you will find sink have key   656.19.173.34 and 6630519 all exist 

Bsql fetch Asql savepoint result 

i think sql uids not the only one 

Who can help me see this problem? 

my test data is 


{code:java}
//代码占位符
{ 
   "reqIp" : "656.19.173.34", 
   "rowtime" : 1537950912546, 
   "uid" : 6630519, 
   "uri" : "/web" 
} 
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5833) Support for Hive GenericUDF

2018-10-24 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663093#comment-16663093
 ] 

vinoyang commented on FLINK-5833:
-

Hi [~clarkyzl] This issue has not been active for a long time. Are you still 
maintaining it now? If not, then I am happy to take over it.

> Support for Hive GenericUDF
> ---
>
> Key: FLINK-5833
> URL: https://issues.apache.org/jira/browse/FLINK-5833
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>Priority: Major
>
> The second step of FLINK-5802 is to support Hive's GenericUDF.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-5832) Support for simple hive UDF

2018-10-24 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-5832:
---

Assignee: vinoyang

> Support for simple hive UDF
> ---
>
> Key: FLINK-5832
> URL: https://issues.apache.org/jira/browse/FLINK-5832
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Zhuoluo Yang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The first step of FLINK-5802 is to support simple Hive UDF.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10672) Task stuck while writing output to flink

2018-10-24 Thread Ankur Goenka (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankur Goenka updated FLINK-10672:
-
Description: 
I am running a fairly complex pipleline with 200+ task.

The pipeline works fine with small data (order of 10kb input) but gets stuck 
with a slightly larger data (300kb input).

 

The task gets stuck while writing the output toFlink, more specifically it gets 
stuck while requesting memory segment in local buffer pool. The Task manager UI 
shows that it has enough memory and memory segments to work with.

The relevant stack trace is 
{quote}"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 
tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9]
 java.lang.Thread.State: TIMED_WAITING (on object monitor)
 at (C/C++) 0x7fef201c7dae (Unknown Source)
 at (C/C++) 0x7fef1f2aea07 (Unknown Source)
 at (C/C++) 0x7fef1f241cd3 (Unknown Source)
 at java.lang.Object.wait(Native Method)
 - waiting on <0xf6d56450> (a java.util.ArrayDeque)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
 - locked <0xf6d56450> (a java.util.ArrayDeque)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
 at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
 at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
 at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)
 at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)
 at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
 at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
 at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)
 - locked <0xf6a60bd0> (a java.lang.Object)
 at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
 at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
 at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
 at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
 at 
org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
 at 
org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
 at 
org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
 at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
 at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
 at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
 at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748){quote}
 

The full stack trace and logs are attached.
 Please take a look and let me know if more information is needed.

  was:
I am running a fairly complex pipleline with 200+ task.

The pipeline works fine with small data (order of 10kb input) but gets stuck 
with a slightly larger data (300kb input).

 

The task gets stuck while writing the output toFlink, more specifically it gets 
stuck while requesting memory segment in local buffer pool. The Task manager UI 
shows that it has enough memory and memory segments to work with.

The relevant stack trace is 
bq.  "grpc-default-executor-0" #138 daemon prio=5 os_prio=0 
tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9]
bq.java.lang.Thread.State: TIMED_WAITING (on object monitor)
bq. at (C/C++) 0x7fef201c7dae (Unknown Source)
bq. at (C/C++) 

[jira] [Updated] (FLINK-10672) Task stuck while writing output to flink

2018-10-24 Thread Ankur Goenka (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankur Goenka updated FLINK-10672:
-
Description: 
I am running a fairly complex pipleline with 200+ task.

The pipeline works fine with small data (order of 10kb input) but gets stuck 
with a slightly larger data (300kb input).

 

The task gets stuck while writing the output toFlink, more specifically it gets 
stuck while requesting memory segment in local buffer pool. The Task manager UI 
shows that it has enough memory and memory segments to work with.

The relevant stack trace is 
bq.  "grpc-default-executor-0" #138 daemon prio=5 os_prio=0 
tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9]
bq.java.lang.Thread.State: TIMED_WAITING (on object monitor)
bq. at (C/C++) 0x7fef201c7dae (Unknown Source)
bq. at (C/C++) 0x7fef1f2aea07 (Unknown Source)
bq. at (C/C++) 0x7fef1f241cd3 (Unknown Source)
bq. at java.lang.Object.wait(Native Method)
bq. - waiting on <0xf6d56450> (a java.util.ArrayDeque)
bq. at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
bq. - locked <0xf6d56450> (a java.util.ArrayDeque)
bq. at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
bq. at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
bq. at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
bq. at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
bq. at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
bq. at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
bq. at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)
bq. at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)
bq. at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
bq. at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
bq. at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)
bq. - locked <0xf6a60bd0> (a java.lang.Object)
bq. at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
bq. at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
bq. at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
bq. at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
bq. at 
org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
bq. at 
org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
bq. at 
org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
bq. at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
bq. at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
bq. at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
bq. at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
bq. at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
bq. at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
bq. at java.lang.Thread.run(Thread.java:748)
bq. 
bq. 
 

The full stack trace and logs are attached.
 Please take a look and let me know if more information is needed.

  was:
I am running a fairly complex pipleline with 200+ task.

The pipeline works fine with small data (order of 10kb input) but gets stuck 
with a slightly larger data (300kb input).

 

The task gets stuck while writing the output toFlink, more specifically it gets 
stuck while requesting memory segment in local buffer pool. The Task manager UI 
shows that it has 

[jira] [Updated] (FLINK-10672) Task stuck while writing output to flink

2018-10-24 Thread Ankur Goenka (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankur Goenka updated FLINK-10672:
-
Description: 
I am running a fairly complex pipleline with 200+ task.

The pipeline works fine with small data (order of 10kb input) but gets stuck 
with a slightly larger data (300kb input).

 

The task gets stuck while writing the output toFlink, more specifically it gets 
stuck while requesting memory segment in local buffer pool. The Task manager UI 
shows that it has enough memory and memory segments to work with.

The relevant stack trace is 

 "grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x7fedb0163800 
nid=0x30b7f in Object.wait() [0x7fedb4f9]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
at (C/C++) 0x7fef201c7dae (Unknown Source)
at (C/C++) 0x7fef1f2aea07 (Unknown Source)
at (C/C++) 0x7fef1f241cd3 (Unknown Source)
at java.lang.Object.wait(Native Method)
- waiting on <0xf6d56450> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
- locked <0xf6d56450> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)
at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)
at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)
- locked <0xf6a60bd0> (a java.lang.Object)
at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
at 
org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
at 
org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at 
org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


 

The full stack trace and logs are attached.
 Please take a look and let me know if more information is needed.

  was:
I am running a fairly complex pipleline with 200+ task.

The pipeline works fine with small data (order of 10kb input) but gets stuck 
with a slightly larger data (300kb input).

 

The task gets stuck while writing the output toFlink, more specifically it gets 
stuck while requesting memory segment in local buffer pool. The Task manager UI 
shows that it has enough memory and memory segments to work with.

The relevant stack trace is 

"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 

[jira] [Updated] (FLINK-10672) Task stuck while writing output to flink

2018-10-24 Thread Ankur Goenka (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankur Goenka updated FLINK-10672:
-
Description: 
I am running a fairly complex pipleline with 200+ task.

The pipeline works fine with small data (order of 10kb input) but gets stuck 
with a slightly larger data (300kb input).

 

The task gets stuck while writing the output toFlink, more specifically it gets 
stuck while requesting memory segment in local buffer pool. The Task manager UI 
shows that it has enough memory and memory segments to work with.

The relevant stack trace is 
{quote}
 "grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x7fedb0163800 
nid=0x30b7f in Object.wait() [0x7fedb4f9]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
at (C/C++) 0x7fef201c7dae (Unknown Source)
at (C/C++) 0x7fef1f2aea07 (Unknown Source)
at (C/C++) 0x7fef1f241cd3 (Unknown Source)
at java.lang.Object.wait(Native Method)
- waiting on <0xf6d56450> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
- locked <0xf6d56450> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)
at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)
at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)
- locked <0xf6a60bd0> (a java.lang.Object)
at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
at 
org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
at 
org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at 
org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){quote}


 

The full stack trace and logs are attached.
 Please take a look and let me know if more information is needed.

  was:
I am running a fairly complex pipleline with 200+ task.

The pipeline works fine with small data (order of 10kb input) but gets stuck 
with a slightly larger data (300kb input).

 

The task gets stuck while writing the output toFlink, more specifically it gets 
stuck while requesting memory segment in local buffer pool. The Task manager UI 
shows that it has enough memory and memory segments to work with.

The relevant stack trace is 

 "grpc-default-executor-0" #138 daemon prio=5 os_prio=0 

[jira] [Updated] (FLINK-10672) Task stuck while writing output to flink

2018-10-24 Thread Ankur Goenka (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankur Goenka updated FLINK-10672:
-
Description: 
I am running a fairly complex pipleline with 200+ task.

The pipeline works fine with small data (order of 10kb input) but gets stuck 
with a slightly larger data (300kb input).

 

The task gets stuck while writing the output toFlink, more specifically it gets 
stuck while requesting memory segment in local buffer pool. The Task manager UI 
shows that it has enough memory and memory segments to work with.

The relevant stack trace is 

"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x7fedb0163800 
nid=0x30b7f in Object.wait() [0x7fedb4f9]
 java.lang.Thread.State: TIMED_WAITING (on object monitor)
 at (C/C++) 0x7fef201c7dae (Unknown Source)
 at (C/C++) 0x7fef1f2aea07 (Unknown Source)
 at (C/C++) 0x7fef1f241cd3 (Unknown Source)
 at java.lang.Object.wait(Native Method)
 - waiting on <0xf6d56450> (a java.util.ArrayDeque)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
 - locked <0xf6d56450> (a java.util.ArrayDeque)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
 at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
 at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
 at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)
 at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)
 at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
 at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
 at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)
 - locked <0xf6a60bd0> (a java.lang.Object)
 at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
 at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
 at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
 at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
 at 
org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
 at 
org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
 at 
org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
 at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
 at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
 at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
 at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

 

The full stack trace and logs are attached.
 Please take a look and let me know if more information is needed.

  was:
I am running a fairly complex pipleline with 200+ task.

The pipeline works fine with small data (order of 10kb input) but gets stuck 
with a slightly larger data (300kb input).

 

The task gets stuck while writing the output toFlink, more specifically it gets 
stuck while requesting memory segment in local buffer pool. The Task manager UI 
shows that it has enough memory and memory segments to work with.

The relevant stack trace is 

{{"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x7fedb0163800 
nid=0x30b7f in Object.wait() [0x7fedb4f9]}}
{{ java.lang.Thread.State: TIMED_WAITING (on object monitor)}}
{{ at (C/C++) 0x7fef201c7dae (Unknown Source)}}
{{ at (C/C++) 0x7fef1f2aea07 (Unknown Source)}}
{{ at 

[jira] [Commented] (FLINK-10672) Task stuck while writing output to flink

2018-10-24 Thread Ankur Goenka (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662937#comment-16662937
 ] 

Ankur Goenka commented on FLINK-10672:
--

cc: [~mxm] [~robertwb]

> Task stuck while writing output to flink
> 
>
> Key: FLINK-10672
> URL: https://issues.apache.org/jira/browse/FLINK-10672
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.4
> Environment: OS: Debuan rodente 4.17
> Flink version: 1.5.4
> ||Key||Value||
> |jobmanager.heap.mb|1024|
> |jobmanager.rpc.address|localhost|
> |jobmanager.rpc.port|6123|
> |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter|
> |metrics.reporter.jmx.port|9250-9260|
> |metrics.reporters|jmx|
> |parallelism.default|1|
> |rest.port|8081|
> |taskmanager.heap.mb|1024|
> |taskmanager.numberOfTaskSlots|1|
> |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26|
>  
> h1. Overview
> ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap 
> Size||Flink Managed Memory||
> |43501|1|0|12|62.9 GB|922 MB|642 MB|
> h1. Memory
> h2. JVM (Heap/Non-Heap)
> ||Type||Committed||Used||Maximum||
> |Heap|922 MB|575 MB|922 MB|
> |Non-Heap|68.8 MB|64.3 MB|-1 B|
> |Total|991 MB|639 MB|922 MB|
> h2. Outside JVM
> ||Type||Count||Used||Capacity||
> |Direct|3,292|105 MB|105 MB|
> |Mapped|0|0 B|0 B|
> h1. Network
> h2. Memory Segments
> ||Type||Count||
> |Available|3,194|
> |Total|3,278|
> h1. Garbage Collection
> ||Collector||Count||Time||
> |G1_Young_Generation|13|336|
> |G1_Old_Generation|1|21|
>Reporter: Ankur Goenka
>Priority: Major
> Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, 
> jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, 
> jstack_66985.log
>
>
> I am running a fairly complex pipleline with 200+ task.
> The pipeline works fine with small data (order of 10kb input) but gets stuck 
> with a slightly larger data (300kb input).
>  
> The task gets stuck while writing the output toFlink, more specifically it 
> gets stuck while requesting memory segment in local buffer pool. The Task 
> manager UI shows that it has enough memory and memory segments to work with.
> The relevant stack trace is 
> {{"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 
> tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9]}}
> {{ java.lang.Thread.State: TIMED_WAITING (on object monitor)}}
> {{ at (C/C++) 0x7fef201c7dae (Unknown Source)}}
> {{ at (C/C++) 0x7fef1f2aea07 (Unknown Source)}}
> {{ at (C/C++) 0x7fef1f241cd3 (Unknown Source)}}
> {{ at java.lang.Object.wait(Native Method)}}
> {{ - waiting on <0xf6d56450> (a java.util.ArrayDeque)}}
> {{ at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)}}
> {{ - locked <0xf6d56450> (a java.util.ArrayDeque)}}
> {{ at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)}}
> {{ at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)}}
> {{ at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)}}
> {{ at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)}}
> {{ at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)}}
> {{ at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)}}
> {{ at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)}}
> {{ at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)}}
> {{ at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)}}
> {{ at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)}}
> {{ at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)}}
> {{ - locked <0xf6a60bd0> (a java.lang.Object)}}
> {{ at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)}}
> {{ at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)}}
> {{ at 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)}}
> {{ at 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)}}
> {{ at 
> 

[jira] [Created] (FLINK-10672) Task stuck while writing output to flink

2018-10-24 Thread Ankur Goenka (JIRA)
Ankur Goenka created FLINK-10672:


 Summary: Task stuck while writing output to flink
 Key: FLINK-10672
 URL: https://issues.apache.org/jira/browse/FLINK-10672
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.5.4
 Environment: OS: Debuan rodente 4.17

Flink version: 1.5.4
||Key||Value||
|jobmanager.heap.mb|1024|
|jobmanager.rpc.address|localhost|
|jobmanager.rpc.port|6123|
|metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter|
|metrics.reporter.jmx.port|9250-9260|
|metrics.reporters|jmx|
|parallelism.default|1|
|rest.port|8081|
|taskmanager.heap.mb|1024|
|taskmanager.numberOfTaskSlots|1|
|web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26|

 
h1. Overview
||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap 
Size||Flink Managed Memory||
|43501|1|0|12|62.9 GB|922 MB|642 MB|
h1. Memory
h2. JVM (Heap/Non-Heap)
||Type||Committed||Used||Maximum||
|Heap|922 MB|575 MB|922 MB|
|Non-Heap|68.8 MB|64.3 MB|-1 B|
|Total|991 MB|639 MB|922 MB|
h2. Outside JVM
||Type||Count||Used||Capacity||
|Direct|3,292|105 MB|105 MB|
|Mapped|0|0 B|0 B|
h1. Network
h2. Memory Segments
||Type||Count||
|Available|3,194|
|Total|3,278|
h1. Garbage Collection
||Collector||Count||Time||
|G1_Young_Generation|13|336|
|G1_Old_Generation|1|21|
Reporter: Ankur Goenka
 Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, 
jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, 
jstack_66985.log

I am running a fairly complex pipleline with 200+ task.

The pipeline works fine with small data (order of 10kb input) but gets stuck 
with a slightly larger data (300kb input).

 

The task gets stuck while writing the output toFlink, more specifically it gets 
stuck while requesting memory segment in local buffer pool. The Task manager UI 
shows that it has enough memory and memory segments to work with.

The relevant stack trace is 

{{"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x7fedb0163800 
nid=0x30b7f in Object.wait() [0x7fedb4f9]}}
{{ java.lang.Thread.State: TIMED_WAITING (on object monitor)}}
{{ at (C/C++) 0x7fef201c7dae (Unknown Source)}}
{{ at (C/C++) 0x7fef1f2aea07 (Unknown Source)}}
{{ at (C/C++) 0x7fef1f241cd3 (Unknown Source)}}
{{ at java.lang.Object.wait(Native Method)}}
{{ - waiting on <0xf6d56450> (a java.util.ArrayDeque)}}
{{ at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)}}
{{ - locked <0xf6d56450> (a java.util.ArrayDeque)}}
{{ at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)}}
{{ at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)}}
{{ at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)}}
{{ at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)}}
{{ at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)}}
{{ at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)}}
{{ at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)}}
{{ at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)}}
{{ at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)}}
{{ at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)}}
{{ at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)}}
{{ - locked <0xf6a60bd0> (a java.lang.Object)}}
{{ at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)}}
{{ at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)}}
{{ at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)}}
{{ at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)}}
{{ at 
org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)}}
{{ at 
org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)}}
{{ at 
org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)}}
{{ at 

[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662923#comment-16662923
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-432846236
 
 
   @yanghua, @aljoscha is anything else need to be done with this PR ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2018-10-24 Thread GitBox
alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-432846236
 
 
   @yanghua, @aljoscha is anything else need to be done with this PR ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10542) Register Hive metastore as an external catalog in TableEnvironment

2018-10-24 Thread Xuefu Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang updated FLINK-10542:

Description: 
Similar to FLINK-2167 but rather register Hive metastore as an external catalog 
in the {{TableEnvironment}}. After registration, Table API and SQL queries 
should be able to access all Hive tables.

This might supersede the need of FLINK-2167 because Hive metastore stores a 
superset of tables available via hCat without an indirection.

  was:
Similar to FLINK-2167 but rather register Hive metastore as an external ctalog 
in the {{TableEnvironment}}. After registration, Table API and SQL queries 
should be able to access all Hive tables.

This might supersede the need of FLINK-2167 because Hive metastore stores a 
superset of tables available via hCat without an indirection.


> Register Hive metastore as an external catalog in TableEnvironment
> --
>
> Key: FLINK-10542
> URL: https://issues.apache.org/jira/browse/FLINK-10542
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>
> Similar to FLINK-2167 but rather register Hive metastore as an external 
> catalog in the {{TableEnvironment}}. After registration, Table API and SQL 
> queries should be able to access all Hive tables.
> This might supersede the need of FLINK-2167 because Hive metastore stores a 
> superset of tables available via hCat without an indirection.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10556) Integration with Apache Hive

2018-10-24 Thread Xuefu Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang updated FLINK-10556:

Attachment: (was: Proposal_ Integrate Flink with Hive Ecosystem.pdf)

> Integration with Apache Hive
> 
>
> Key: FLINK-10556
> URL: https://issues.apache.org/jira/browse/FLINK-10556
> Project: Flink
>  Issue Type: New Feature
>  Components: Batch Connectors and Input/Output Formats, SQL Client, 
> Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
> Attachments: Proposal_ Integrate Flink with Hive Ecosystem.pdf
>
>
> This is an umbrella JIRA tracking all enhancement and issues related to 
> integrating Flink with Hive ecosystem. This is an outcome of a discussion in 
> the community, and thanks go to everyone that provided feedback and interest.
> Specifically, we'd like to see the following features and capabilities 
> immediately in Flink:
> # Metadata interoperability
> # Data interoperability
> # Data type compatibility
> # Hive UDF support
> # DDL/DML/Query language compatibility
> For a longer term, we'd also like to add or improve:
> # Compatible SQL service, client tools, JDBC/ODBC drivers
> # Better task failure tolerance and task scheduling
> # Support other user customizations in Hive (storage handlers, serdes, etc).
> I will provide more details regarding the proposal in a doc shortly. Design 
> doc, if deemed necessary, will be provided in each related sub tasks under 
> this JIRA.
> Feedback and contributions are greatly welcome!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10556) Integration with Apache Hive

2018-10-24 Thread Xuefu Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang updated FLINK-10556:

Attachment: Proposal_ Integrate Flink with Hive Ecosystem.pdf

> Integration with Apache Hive
> 
>
> Key: FLINK-10556
> URL: https://issues.apache.org/jira/browse/FLINK-10556
> Project: Flink
>  Issue Type: New Feature
>  Components: Batch Connectors and Input/Output Formats, SQL Client, 
> Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
> Attachments: Proposal_ Integrate Flink with Hive Ecosystem.pdf
>
>
> This is an umbrella JIRA tracking all enhancement and issues related to 
> integrating Flink with Hive ecosystem. This is an outcome of a discussion in 
> the community, and thanks go to everyone that provided feedback and interest.
> Specifically, we'd like to see the following features and capabilities 
> immediately in Flink:
> # Metadata interoperability
> # Data interoperability
> # Data type compatibility
> # Hive UDF support
> # DDL/DML/Query language compatibility
> For a longer term, we'd also like to add or improve:
> # Compatible SQL service, client tools, JDBC/ODBC drivers
> # Better task failure tolerance and task scheduling
> # Support other user customizations in Hive (storage handlers, serdes, etc).
> I will provide more details regarding the proposal in a doc shortly. Design 
> doc, if deemed necessary, will be provided in each related sub tasks under 
> this JIRA.
> Feedback and contributions are greatly welcome!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10671) rest monitoring api Savepoint status call fails if akka.ask.timeout < checkpoint duration

2018-10-24 Thread Cliff Resnick (JIRA)
Cliff Resnick created FLINK-10671:
-

 Summary: rest monitoring api Savepoint status call fails if 
akka.ask.timeout < checkpoint duration
 Key: FLINK-10671
 URL: https://issues.apache.org/jira/browse/FLINK-10671
 Project: Flink
  Issue Type: Bug
  Components: REST
Affects Versions: 1.6.1
Reporter: Cliff Resnick


Hi,
 
There seems to be a problem with REST monitoring API:
|/jobs/:jobid/savepoints/:triggerid|
 
The problem is that when the Savepoint represented by :triggerid is called with 
`cancel=true` the above status call seems to fail if the savepoint duration 
exceeds `akka.ask.timeout` value.
 
Below is a log in which I invoke "cancel with savepoint" then poll the above 
endpoint for status at 2 second intervals. akka.ask.timout is set for twenty 
seconds. The error is repeatable at various values of akka.ask.timeout.
 
2018/10/24 19:42:25 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:27 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:29 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:31 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:33 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:35 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:37 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:39 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:41 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:43 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:45 Cancel with Savepoint may have failed: 
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask 
timed out on [Actor[akka://flink/user/jobmanager_0#-234856817]] after [2 
ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at 
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_0#-234856817]] after [2 ms]. 
Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
... 9 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6036) Let catalog support partition

2018-10-24 Thread Xuefu Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662721#comment-16662721
 ] 

Xuefu Zhang commented on FLINK-6036:


Thank you, [~jinyu.zj]. PR looks good overall, and I left some minor comments 
for consideration.

> Let catalog support partition
> -
>
> Key: FLINK-6036
> URL: https://issues.apache.org/jira/browse/FLINK-6036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: jingzhang
>Assignee: jingzhang
>Priority: Major
>
> Now catalog only support CRUD at database and table level. But in some kind 
> of catalog, for example for hive, we also need do CRUD operations at 
> partition level. 
> This issue aims to let catalog support partition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.

2018-10-24 Thread GitBox
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let 
catalog support partition.
URL: https://github.com/apache/flink/pull/6906#discussion_r227924642
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
 ##
 @@ -120,4 +122,122 @@ class InMemoryExternalCatalog(name: String) extends 
CrudExternalCatalog {
   override def listSubCatalogs(): JList[String] = synchronized {
 databases.keys.toList.asJava
   }
+
+  /**
+* Adds partition into an external Catalog table
+*
+* @param tableName  table name
+* @param partition  partition description of partition which to create
+* @param ignoreIfExists if partition already exists in the catalog, not 
throw exception and
+*   leave the existed partition if ignoreIfExists is 
true;
+*   else throw PartitionAlreadyExistException
+* @throws TableNotExistException if table does not exist in the 
catalog yet
+* @throws PartitionAlreadyExistException if partition exists in the 
catalog and
+*ignoreIfExists is false
+*/
+  override def createPartition(
+tableName: String,
+partition: ExternalCatalogPartition,
+ignoreIfExists: Boolean): Unit = synchronized {
+val newPartSpec = partition.partitionSpec
+val table = getTable(tableName)
+val partitions = getPartitions(tableName, table)
+if (partitions.contains(newPartSpec)) {
+  if (!ignoreIfExists) {
+throw new PartitionAlreadyExistException(name, tableName, newPartSpec)
+  }
+} else {
+  partitions.put(newPartSpec, partition)
 
 Review comment:
   We might want to validate if the partitionSpec is what the table is 
expecting. Otherwise, we might insert some partition who's partition columns 
are different from how the table is partitioned.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.

2018-10-24 Thread GitBox
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let 
catalog support partition.
URL: https://github.com/apache/flink/pull/6906#discussion_r227920287
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
 ##
 @@ -120,4 +122,122 @@ class InMemoryExternalCatalog(name: String) extends 
CrudExternalCatalog {
   override def listSubCatalogs(): JList[String] = synchronized {
 databases.keys.toList.asJava
   }
+
+  /**
+* Adds partition into an external Catalog table
+*
+* @param tableName  table name
+* @param partition  partition description of partition which to create
+* @param ignoreIfExists if partition already exists in the catalog, not 
throw exception and
+*   leave the existed partition if ignoreIfExists is 
true;
+*   else throw PartitionAlreadyExistException
+* @throws TableNotExistException if table does not exist in the 
catalog yet
+* @throws PartitionAlreadyExistException if partition exists in the 
catalog and
+*ignoreIfExists is false
+*/
+  override def createPartition(
+tableName: String,
+partition: ExternalCatalogPartition,
+ignoreIfExists: Boolean): Unit = synchronized {
+val newPartSpec = partition.partitionSpec
+val table = getTable(tableName)
+val partitions = getPartitions(tableName, table)
+if (partitions.contains(newPartSpec)) {
+  if (!ignoreIfExists) {
+throw new PartitionAlreadyExistException(name, tableName, newPartSpec)
+  }
+} else {
+  partitions.put(newPartSpec, partition)
+}
+  }
+
+  private def getPartitions(tableName: String, table: ExternalCatalogTable)
+  : mutable.HashMap[JLinkedHashMap[String, String], ExternalCatalogPartition] 
= table match {
+case t: ExternalCatalogPartitionedTable =>
+  partitions.getOrElseUpdate(
+tableName, new mutable.HashMap[JLinkedHashMap[String, String], 
ExternalCatalogPartition])
+case _ => throw new UnsupportedOperationException(
 
 Review comment:
   It might be better to define an exception class called TableNotPartitioned 
similar to TableNotExistException, so this exception can be handled explicitly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.

2018-10-24 Thread GitBox
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let 
catalog support partition.
URL: https://github.com/apache/flink/pull/6906#discussion_r227915200
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ##
 @@ -282,34 +296,63 @@ class ExternalCatalogTableBuilder(private val 
connectorDescriptor: ConnectorDesc
 this
   }
 
+  /**
+* Specifies the partition columns for this external table.
+*/
+  def withPartitionColumnNames(
+partitionColumnNames: java.util.LinkedHashSet[String]): 
ExternalCatalogTableBuilder = {
+require(partitionColumnNames != null && !partitionColumnNames.isEmpty)
+this.partitionColumnNames = Some(partitionColumnNames)
+this
+  }
+
   /**
 * Declares this external table as a table source and returns the
 * configured [[ExternalCatalogTable]].
 *
 * @return External catalog table
 */
-  def asTableSource(): ExternalCatalogTable = {
-new ExternalCatalogTable(
-  isBatch,
-  isStreaming,
-  isSource = true,
-  isSink = false,
-  DescriptorProperties.toJavaMap(this))
-  }
+  def asTableSource(): ExternalCatalogTable = this.partitionColumnNames match {
+  case Some(pc) =>
+new ExternalCatalogPartitionedTable(
+  isBatch,
+  isStreaming,
+  isSource = true,
+  isSink = false,
+  pc,
+  DescriptorProperties.toJavaMap(this)
+)
+  case None =>
+new ExternalCatalogTable(
+  isBatch,
+  isStreaming,
+  isSource = true,
+  isSink = false,
+  DescriptorProperties.toJavaMap(this))
+   }
 
   /**
 * Declares this external table as a table sink and returns the
 * configured [[ExternalCatalogTable]].
 *
 * @return External catalog table
 */
-  def asTableSink(): ExternalCatalogTable = {
-new ExternalCatalogTable(
-  isBatch,
-  isStreaming,
-  isSource = false,
-  isSink = true,
-  DescriptorProperties.toJavaMap(this))
+  def asTableSink(): ExternalCatalogTable = this.partitionColumnNames match {
 
 Review comment:
   I see a repeated pattern in the three asXXX methods. While it's not 
introduced in this PR, it might be good if we can introduce a help method that 
those asXXX methods call to minimize the repetition.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.

2018-10-24 Thread GitBox
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let 
catalog support partition.
URL: https://github.com/apache/flink/pull/6906#discussion_r227908967
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
 ##
 @@ -31,6 +31,32 @@ import org.apache.flink.table.api._
   */
 trait ExternalCatalog {
 
+  /**
+* Gets the partition from external Catalog
+*
+* @param tableName table name
+* @param partSpec  partition specification
+* @throws TableNotExistException if table does not exist in the 
catalog yet
+* @throws PartitionNotExistException if partition does not exist in the 
catalog yet
+* @return found partition
+*/
+  @throws[TableNotExistException]
+  @throws[PartitionNotExistException]
+  def getPartition(
+tableName: String,
+partSpec: JLinkedHashMap[String, String]): ExternalCatalogPartition
+
+  /**
+* Gets the partition specification list of a table from external catalog
+*
+* @param tableName table name
+* @throws CatalogNotExistException  if database does not exist in the 
catalog yet
 
 Review comment:
   This (CatalogNotExistException) seems not needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.

2018-10-24 Thread GitBox
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let 
catalog support partition.
URL: https://github.com/apache/flink/pull/6906#discussion_r227908967
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
 ##
 @@ -31,6 +31,32 @@ import org.apache.flink.table.api._
   */
 trait ExternalCatalog {
 
+  /**
+* Gets the partition from external Catalog
+*
+* @param tableName table name
+* @param partSpec  partition specification
+* @throws TableNotExistException if table does not exist in the 
catalog yet
+* @throws PartitionNotExistException if partition does not exist in the 
catalog yet
+* @return found partition
+*/
+  @throws[TableNotExistException]
+  @throws[PartitionNotExistException]
+  def getPartition(
+tableName: String,
+partSpec: JLinkedHashMap[String, String]): ExternalCatalogPartition
+
+  /**
+* Gets the partition specification list of a table from external catalog
+*
+* @param tableName table name
+* @throws CatalogNotExistException  if database does not exist in the 
catalog yet
 
 Review comment:
   This seems not needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10656) Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662592#comment-16662592
 ] 

ASF GitHub Bot commented on FLINK-10656:


isunjin commented on a change in pull request #6911: [FLINK-10656] Refactor 
org.apache.flink.runtime.io.network.api.reader…
URL: https://github.com/apache/flink/pull/6911#discussion_r227888205
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/TaskEventSender.java
 ##
 @@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import org.apache.flink.runtime.event.TaskEvent;
+
+import java.io.IOException;
+
+/**
+ * The basic API for every reader.
+ */
+public interface TaskEventSender {
+
+   void sendTaskEvent(TaskEvent event) throws IOException;
 
 Review comment:
   Yeah, maybe we just simply change the name of ```ReaderBase``` to 
```IterationReader```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase
> --
>
> Key: FLINK-10656
> URL: https://issues.apache.org/jira/browse/FLINK-10656
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> The interface of org.apache.flink.runtime.io.network.api.reader.ReaderBase is 
> not very clean, the API in it are called only by iteration and handle event. 
> which is not related the name ReaderBase. And the functionality is 
> independent, so propose to change the name and split the interface to two 
> isolated interface. 
> More details please look at the PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] isunjin commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader…

2018-10-24 Thread GitBox
isunjin commented on a change in pull request #6911: [FLINK-10656] Refactor 
org.apache.flink.runtime.io.network.api.reader…
URL: https://github.com/apache/flink/pull/6911#discussion_r227888205
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/TaskEventSender.java
 ##
 @@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import org.apache.flink.runtime.event.TaskEvent;
+
+import java.io.IOException;
+
+/**
+ * The basic API for every reader.
+ */
+public interface TaskEventSender {
+
+   void sendTaskEvent(TaskEvent event) throws IOException;
 
 Review comment:
   Yeah, maybe we just simply change the name of ```ReaderBase``` to 
```IterationReader```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10656) Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662587#comment-16662587
 ] 

ASF GitHub Bot commented on FLINK-10656:


isunjin commented on a change in pull request #6911: [FLINK-10656] Refactor 
org.apache.flink.runtime.io.network.api.reader…
URL: https://github.com/apache/flink/pull/6911#discussion_r227887183
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/IterationReader.java
 ##
 @@ -21,34 +21,21 @@
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
-import java.io.IOException;
-
 /**
- * The basic API for every reader.
+ * Reader for iteration.
  */
-public interface ReaderBase {
-
-   /**
-* Returns whether the reader has consumed the input.
-*/
-   boolean isFinished();
-
-   // 

-   // Task events
-   // 

-
-   void sendTaskEvent(TaskEvent event) throws IOException;
-
-   void registerTaskEventListener(EventListener listener, 
Class eventType);
-
-   // 

-   // Iterations
-   // 

+public interface IterationReader {
 
void setIterativeReader();
 
void startNextSuperstep();
 
boolean hasReachedEndOfSuperstep();
 
+   /**
+* Returns whether the reader has consumed the input.
+*/
+   boolean isFinished();
 
 Review comment:
   well, true from literal, however, isFinished() is only used in iteration 
scenario. maybe we can also change the name of it. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase
> --
>
> Key: FLINK-10656
> URL: https://issues.apache.org/jira/browse/FLINK-10656
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> The interface of org.apache.flink.runtime.io.network.api.reader.ReaderBase is 
> not very clean, the API in it are called only by iteration and handle event. 
> which is not related the name ReaderBase. And the functionality is 
> independent, so propose to change the name and split the interface to two 
> isolated interface. 
> More details please look at the PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] isunjin commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader…

2018-10-24 Thread GitBox
isunjin commented on a change in pull request #6911: [FLINK-10656] Refactor 
org.apache.flink.runtime.io.network.api.reader…
URL: https://github.com/apache/flink/pull/6911#discussion_r227887183
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/IterationReader.java
 ##
 @@ -21,34 +21,21 @@
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
-import java.io.IOException;
-
 /**
- * The basic API for every reader.
+ * Reader for iteration.
  */
-public interface ReaderBase {
-
-   /**
-* Returns whether the reader has consumed the input.
-*/
-   boolean isFinished();
-
-   // 

-   // Task events
-   // 

-
-   void sendTaskEvent(TaskEvent event) throws IOException;
-
-   void registerTaskEventListener(EventListener listener, 
Class eventType);
-
-   // 

-   // Iterations
-   // 

+public interface IterationReader {
 
void setIterativeReader();
 
void startNextSuperstep();
 
boolean hasReachedEndOfSuperstep();
 
+   /**
+* Returns whether the reader has consumed the input.
+*/
+   boolean isFinished();
 
 Review comment:
   well, true from literal, however, isFinished() is only used in iteration 
scenario. maybe we can also change the name of it. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10556) Integration with Apache Hive

2018-10-24 Thread Xuefu Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662551#comment-16662551
 ] 

Xuefu Zhang commented on FLINK-10556:
-

The proposal is attached.

> Integration with Apache Hive
> 
>
> Key: FLINK-10556
> URL: https://issues.apache.org/jira/browse/FLINK-10556
> Project: Flink
>  Issue Type: New Feature
>  Components: Batch Connectors and Input/Output Formats, SQL Client, 
> Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
> Attachments: Proposal_ Integrate Flink with Hive Ecosystem.pdf
>
>
> This is an umbrella JIRA tracking all enhancement and issues related to 
> integrating Flink with Hive ecosystem. This is an outcome of a discussion in 
> the community, and thanks go to everyone that provided feedback and interest.
> Specifically, we'd like to see the following features and capabilities 
> immediately in Flink:
> # Metadata interoperability
> # Data interoperability
> # Data type compatibility
> # Hive UDF support
> # DDL/DML/Query language compatibility
> For a longer term, we'd also like to add or improve:
> # Compatible SQL service, client tools, JDBC/ODBC drivers
> # Better task failure tolerance and task scheduling
> # Support other user customizations in Hive (storage handlers, serdes, etc).
> I will provide more details regarding the proposal in a doc shortly. Design 
> doc, if deemed necessary, will be provided in each related sub tasks under 
> this JIRA.
> Feedback and contributions are greatly welcome!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662539#comment-16662539
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227849935
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -324,78 +297,98 @@ public void disconnectResourceManager() {
boolean allowQueuedScheduling,
Time allocationTimeout) {
 
-   log.debug("Received slot request [{}] for task: {}", 
slotRequestId, task.getTaskToExecute());
-
-   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
-
-   if (slotSharingGroupId != null) {
-   // allocate slot with slot sharing
-   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
-   slotSharingGroupId,
-   id -> new SlotSharingManager(
-   id,
-   this,
-   providerAndOwner));
-
-   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality;
-
-   try {
-   if (task.getCoLocationConstraint() != null) {
-   multiTaskSlotLocality = 
allocateCoLocatedMultiTaskSlot(
-   task.getCoLocationConstraint(),
-   multiTaskSlotManager,
-   slotProfile,
-   allowQueuedScheduling,
-   allocationTimeout);
+   return 
CompletableFuture.completedFuture(null).thenComposeAsync((i) -> {
 
 Review comment:
   Shouldn't this method be only called by the new `Scheduler` which already 
wraps all external calls into the main thread? If yes, then we don't need to do 
it here again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10556) Integration with Apache Hive

2018-10-24 Thread Xuefu Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang updated FLINK-10556:

Attachment: Proposal_ Integrate Flink with Hive Ecosystem.pdf

> Integration with Apache Hive
> 
>
> Key: FLINK-10556
> URL: https://issues.apache.org/jira/browse/FLINK-10556
> Project: Flink
>  Issue Type: New Feature
>  Components: Batch Connectors and Input/Output Formats, SQL Client, 
> Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
> Attachments: Proposal_ Integrate Flink with Hive Ecosystem.pdf
>
>
> This is an umbrella JIRA tracking all enhancement and issues related to 
> integrating Flink with Hive ecosystem. This is an outcome of a discussion in 
> the community, and thanks go to everyone that provided feedback and interest.
> Specifically, we'd like to see the following features and capabilities 
> immediately in Flink:
> # Metadata interoperability
> # Data interoperability
> # Data type compatibility
> # Hive UDF support
> # DDL/DML/Query language compatibility
> For a longer term, we'd also like to add or improve:
> # Compatible SQL service, client tools, JDBC/ODBC drivers
> # Better task failure tolerance and task scheduling
> # Support other user customizations in Hive (storage handlers, serdes, etc).
> I will provide more details regarding the proposal in a doc shortly. Design 
> doc, if deemed necessary, will be provided in each related sub tasks under 
> this JIRA.
> Feedback and contributions are greatly welcome!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-24 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227849935
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -324,78 +297,98 @@ public void disconnectResourceManager() {
boolean allowQueuedScheduling,
Time allocationTimeout) {
 
-   log.debug("Received slot request [{}] for task: {}", 
slotRequestId, task.getTaskToExecute());
-
-   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
-
-   if (slotSharingGroupId != null) {
-   // allocate slot with slot sharing
-   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
-   slotSharingGroupId,
-   id -> new SlotSharingManager(
-   id,
-   this,
-   providerAndOwner));
-
-   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality;
-
-   try {
-   if (task.getCoLocationConstraint() != null) {
-   multiTaskSlotLocality = 
allocateCoLocatedMultiTaskSlot(
-   task.getCoLocationConstraint(),
-   multiTaskSlotManager,
-   slotProfile,
-   allowQueuedScheduling,
-   allocationTimeout);
+   return 
CompletableFuture.completedFuture(null).thenComposeAsync((i) -> {
 
 Review comment:
   Shouldn't this method be only called by the new `Scheduler` which already 
wraps all external calls into the main thread? If yes, then we don't need to do 
it here again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-24 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227877459
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
 ##
 @@ -312,7 +313,7 @@ public void validateRunsInMainThread() {
/**
 * Executor which executes runnables in the main thread context.
 */
-   protected static class MainThreadExecutor implements Executor {
+   protected static class MainThreadExecutor implements 
MainThreadExecutable {
 
 Review comment:
   `MainThreadExecutable` was actually intended to be a marker interface for 
the `RpcServer`. I would recommend to let the `MainThreadExecutor` implement 
the `ScheduledExecutor`. That way, there is no need to change the 
`MainThreadExecutable`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-24 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872341
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
+*
 * @param slotRequestId identifying the requested slot
 * @param scheduledUnit for which to allocate slot
 * @param slotProfile profile that specifies the requirements for the 
requested slot
 * @param allowQueuedScheduling true if the slot request can be queued 
(e.g. the returned future must not be completed)
 * @param timeout for the operation
 * @return Future which is completed with the allocated {@link 
LogicalSlot}
 */
+   @Deprecated
CompletableFuture allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
+
+   /**
+* Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot
+* pool.
+*
+* @return a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot pool.
+*/
+   @Nonnull
+   List getAvailableSlotsInformation();
 
 Review comment:
   Let's make the return type a `Collection`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662544#comment-16662544
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872252
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
 
 Review comment:
   What's preventing us from removing this method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-24 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227871624
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -136,12 +143,13 @@
 
private String jobManagerAddress;
 
+   private MainThreadExecutable jmMainThreadScheduledExecutor;
+
// 

 
@VisibleForTesting
-   protected SlotPool(RpcService rpcService, JobID jobId, 
SchedulingStrategy schedulingStrategy) {
+   protected SlotPool(JobID jobId, SchedulingStrategy schedulingStrategy) {
 
 Review comment:
   This class still contains a lot of code which actually moved to the new 
`Scheduler`. Can we remove this code? Having this code still in this class 
makes it extremely hard to review. Of course, this would mean that we also need 
to adapt the `SlotPoolTests` which makes sense anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662540#comment-16662540
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227848647
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -324,78 +297,98 @@ public void disconnectResourceManager() {
boolean allowQueuedScheduling,
Time allocationTimeout) {
 
-   log.debug("Received slot request [{}] for task: {}", 
slotRequestId, task.getTaskToExecute());
-
-   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
-
-   if (slotSharingGroupId != null) {
-   // allocate slot with slot sharing
-   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
-   slotSharingGroupId,
-   id -> new SlotSharingManager(
-   id,
-   this,
-   providerAndOwner));
-
-   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality;
-
-   try {
-   if (task.getCoLocationConstraint() != null) {
-   multiTaskSlotLocality = 
allocateCoLocatedMultiTaskSlot(
-   task.getCoLocationConstraint(),
-   multiTaskSlotManager,
-   slotProfile,
-   allowQueuedScheduling,
-   allocationTimeout);
+   return 
CompletableFuture.completedFuture(null).thenComposeAsync((i) -> {
 
 Review comment:
   `CompletableFuture.supplyAsync()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-24 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872749
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
+*
 * @param slotRequestId identifying the requested slot
 * @param scheduledUnit for which to allocate slot
 * @param slotProfile profile that specifies the requirements for the 
requested slot
 * @param allowQueuedScheduling true if the slot request can be queued 
(e.g. the returned future must not be completed)
 * @param timeout for the operation
 * @return Future which is completed with the allocated {@link 
LogicalSlot}
 */
+   @Deprecated
CompletableFuture allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
+
+   /**
+* Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot
+* pool.
+*
+* @return a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot pool.
+*/
+   @Nonnull
+   List getAvailableSlotsInformation();
+
+   /**
+* Allocates the available slot with the given allocation id under the 
given request id. This method returns
+* {@code null} if no slot with the given allocation id is available.
+*
+* @param slotRequestId identifying the requested slot
+* @param allocationID the allocation id of the requested available slot
+* @return the previously available slot with the given allocation id 
or {@code null} if no such slot existed.
+*/
+   @Nullable
+   AllocatedSlot allocateAvailableSlot(
 
 Review comment:
   It's not so nice that we leak the `AllocatedSlot` which is supposed to be an 
internal class of the `SlotPool`. I think it would be better to have an 
interface which allows to do what's needed but, for example, does not expose 
the `releasePayload` call.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662546#comment-16662546
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227877459
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
 ##
 @@ -312,7 +313,7 @@ public void validateRunsInMainThread() {
/**
 * Executor which executes runnables in the main thread context.
 */
-   protected static class MainThreadExecutor implements Executor {
+   protected static class MainThreadExecutor implements 
MainThreadExecutable {
 
 Review comment:
   `MainThreadExecutable` was actually intended to be a marker interface for 
the `RpcServer`. I would recommend to let the `MainThreadExecutor` implement 
the `ScheduledExecutor`. That way, there is no need to change the 
`MainThreadExecutable`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662547#comment-16662547
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227870899
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -822,6 +835,29 @@ private SlotAndLocality pollAndAllocateSlot(SlotRequestId 
slotRequestId, SlotPro
return slotFromPool;
}
 
+   @Nullable
+   private AllocatedSlot allocateSlotWithID(@Nonnull SlotRequestId 
slotRequestId, @Nonnull AllocationID allocationID) {
+   AllocatedSlot allocatedSlot = 
availableSlots.tryRemove(allocationID);
+   if (allocatedSlot != null) {
+   allocatedSlots.add(slotRequestId, allocatedSlot);
+   }
+   return allocatedSlot;
+   }
+
+   @Override
+   @Nullable
+   public AllocatedSlot allocateAvailableSlot(
 
 Review comment:
   Could think of whether to make it an `Optional` to make it more explicit 
that this might not succeed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662542#comment-16662542
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872881
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
+*
 * @param slotRequestId identifying the requested slot
 * @param scheduledUnit for which to allocate slot
 * @param slotProfile profile that specifies the requirements for the 
requested slot
 * @param allowQueuedScheduling true if the slot request can be queued 
(e.g. the returned future must not be completed)
 * @param timeout for the operation
 * @return Future which is completed with the allocated {@link 
LogicalSlot}
 */
+   @Deprecated
CompletableFuture allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
+
+   /**
+* Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot
+* pool.
+*
+* @return a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot pool.
+*/
+   @Nonnull
+   List getAvailableSlotsInformation();
+
+   /**
+* Allocates the available slot with the given allocation id under the 
given request id. This method returns
+* {@code null} if no slot with the given allocation id is available.
+*
+* @param slotRequestId identifying the requested slot
+* @param allocationID the allocation id of the requested available slot
+* @return the previously available slot with the given allocation id 
or {@code null} if no such slot existed.
+*/
+   @Nullable
+   AllocatedSlot allocateAvailableSlot(
+   @Nonnull SlotRequestId slotRequestId,
+   @Nonnull AllocationID allocationID);
+
+   /**
+* Request the allocation of a new slot from the resource manager. This 
method will not return a slot from the
+* already available slots from the pool, but instead will add a new 
slot to that pool that is immediately allocated
+* and returned.
+*
+* @param slotRequestId identifying the requested slot
+* @param resourceProfile resource profile that specifies the resource 
requirements for the requested slot
+* @param timeout timeout for the allocation procedure
+* @return a newly allocated slot that was previously not available.
+*/
+   @Nonnull
+   CompletableFuture requestNewAllocatedSlot(
 
 Review comment:
   Same here with the `AllocatedSlot`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can 

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662548#comment-16662548
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872948
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
 ##
 @@ -90,7 +90,7 @@
 * @param cause of the cancellation
 * @return Future which is completed once the slot request has been 
cancelled
 */
-   CompletableFuture cancelSlotRequest(
+   Acknowledge cancelSlotRequest(
 
 Review comment:
   Can this become `void`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-24 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227870899
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -822,6 +835,29 @@ private SlotAndLocality pollAndAllocateSlot(SlotRequestId 
slotRequestId, SlotPro
return slotFromPool;
}
 
+   @Nullable
+   private AllocatedSlot allocateSlotWithID(@Nonnull SlotRequestId 
slotRequestId, @Nonnull AllocationID allocationID) {
+   AllocatedSlot allocatedSlot = 
availableSlots.tryRemove(allocationID);
+   if (allocatedSlot != null) {
+   allocatedSlots.add(slotRequestId, allocatedSlot);
+   }
+   return allocatedSlot;
+   }
+
+   @Override
+   @Nullable
+   public AllocatedSlot allocateAvailableSlot(
 
 Review comment:
   Could think of whether to make it an `Optional` to make it more explicit 
that this might not succeed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662543#comment-16662543
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227871624
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -136,12 +143,13 @@
 
private String jobManagerAddress;
 
+   private MainThreadExecutable jmMainThreadScheduledExecutor;
+
// 

 
@VisibleForTesting
-   protected SlotPool(RpcService rpcService, JobID jobId, 
SchedulingStrategy schedulingStrategy) {
+   protected SlotPool(JobID jobId, SchedulingStrategy schedulingStrategy) {
 
 Review comment:
   This class still contains a lot of code which actually moved to the new 
`Scheduler`. Can we remove this code? Having this code still in this class 
makes it extremely hard to review. Of course, this would mean that we also need 
to adapt the `SlotPoolTests` which makes sense anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-24 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227848647
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -324,78 +297,98 @@ public void disconnectResourceManager() {
boolean allowQueuedScheduling,
Time allocationTimeout) {
 
-   log.debug("Received slot request [{}] for task: {}", 
slotRequestId, task.getTaskToExecute());
-
-   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
-
-   if (slotSharingGroupId != null) {
-   // allocate slot with slot sharing
-   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
-   slotSharingGroupId,
-   id -> new SlotSharingManager(
-   id,
-   this,
-   providerAndOwner));
-
-   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality;
-
-   try {
-   if (task.getCoLocationConstraint() != null) {
-   multiTaskSlotLocality = 
allocateCoLocatedMultiTaskSlot(
-   task.getCoLocationConstraint(),
-   multiTaskSlotManager,
-   slotProfile,
-   allowQueuedScheduling,
-   allocationTimeout);
+   return 
CompletableFuture.completedFuture(null).thenComposeAsync((i) -> {
 
 Review comment:
   `CompletableFuture.supplyAsync()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662541#comment-16662541
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872341
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
+*
 * @param slotRequestId identifying the requested slot
 * @param scheduledUnit for which to allocate slot
 * @param slotProfile profile that specifies the requirements for the 
requested slot
 * @param allowQueuedScheduling true if the slot request can be queued 
(e.g. the returned future must not be completed)
 * @param timeout for the operation
 * @return Future which is completed with the allocated {@link 
LogicalSlot}
 */
+   @Deprecated
CompletableFuture allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
+
+   /**
+* Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot
+* pool.
+*
+* @return a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot pool.
+*/
+   @Nonnull
+   List getAvailableSlotsInformation();
 
 Review comment:
   Let's make the return type a `Collection`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-24 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872948
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
 ##
 @@ -90,7 +90,7 @@
 * @param cause of the cancellation
 * @return Future which is completed once the slot request has been 
cancelled
 */
-   CompletableFuture cancelSlotRequest(
+   Acknowledge cancelSlotRequest(
 
 Review comment:
   Can this become `void`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-24 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872252
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
 
 Review comment:
   What's preventing us from removing this method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662545#comment-16662545
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872749
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
+*
 * @param slotRequestId identifying the requested slot
 * @param scheduledUnit for which to allocate slot
 * @param slotProfile profile that specifies the requirements for the 
requested slot
 * @param allowQueuedScheduling true if the slot request can be queued 
(e.g. the returned future must not be completed)
 * @param timeout for the operation
 * @return Future which is completed with the allocated {@link 
LogicalSlot}
 */
+   @Deprecated
CompletableFuture allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
+
+   /**
+* Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot
+* pool.
+*
+* @return a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot pool.
+*/
+   @Nonnull
+   List getAvailableSlotsInformation();
+
+   /**
+* Allocates the available slot with the given allocation id under the 
given request id. This method returns
+* {@code null} if no slot with the given allocation id is available.
+*
+* @param slotRequestId identifying the requested slot
+* @param allocationID the allocation id of the requested available slot
+* @return the previously available slot with the given allocation id 
or {@code null} if no such slot existed.
+*/
+   @Nullable
+   AllocatedSlot allocateAvailableSlot(
 
 Review comment:
   It's not so nice that we leak the `AllocatedSlot` which is supposed to be an 
internal class of the `SlotPool`. I think it would be better to have an 
interface which allows to do what's needed but, for example, does not expose 
the `releasePayload` call.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-24 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872881
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
+*
 * @param slotRequestId identifying the requested slot
 * @param scheduledUnit for which to allocate slot
 * @param slotProfile profile that specifies the requirements for the 
requested slot
 * @param allowQueuedScheduling true if the slot request can be queued 
(e.g. the returned future must not be completed)
 * @param timeout for the operation
 * @return Future which is completed with the allocated {@link 
LogicalSlot}
 */
+   @Deprecated
CompletableFuture allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
+
+   /**
+* Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot
+* pool.
+*
+* @return a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot pool.
+*/
+   @Nonnull
+   List getAvailableSlotsInformation();
+
+   /**
+* Allocates the available slot with the given allocation id under the 
given request id. This method returns
+* {@code null} if no slot with the given allocation id is available.
+*
+* @param slotRequestId identifying the requested slot
+* @param allocationID the allocation id of the requested available slot
+* @return the previously available slot with the given allocation id 
or {@code null} if no such slot existed.
+*/
+   @Nullable
+   AllocatedSlot allocateAvailableSlot(
+   @Nonnull SlotRequestId slotRequestId,
+   @Nonnull AllocationID allocationID);
+
+   /**
+* Request the allocation of a new slot from the resource manager. This 
method will not return a slot from the
+* already available slots from the pool, but instead will add a new 
slot to that pool that is immediately allocated
+* and returned.
+*
+* @param slotRequestId identifying the requested slot
+* @param resourceProfile resource profile that specifies the resource 
requirements for the requested slot
+* @param timeout timeout for the allocation procedure
+* @return a newly allocated slot that was previously not available.
+*/
+   @Nonnull
+   CompletableFuture requestNewAllocatedSlot(
 
 Review comment:
   Same here with the `AllocatedSlot`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10657) TPCHQuery3 fail with IllegalAccessException

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662521#comment-16662521
 ] 

ASF GitHub Bot commented on FLINK-10657:


isunjin commented on issue #6912: [FLINK-10657] TPCHQuery3 fail with 
IllegalAccessException
URL: https://github.com/apache/flink/pull/6912#issuecomment-432735636
 
 
   > ```
   > 00:58:26.666 [ERROR] 
src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java:[241] 
(javadoc) JavadocType: Missing a Javadoc comment.
   > ```
   
   Thanks for review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TPCHQuery3 fail with IllegalAccessException
> ---
>
> Key: FLINK-10657
> URL: https://issues.apache.org/jira/browse/FLINK-10657
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Trivial
>  Labels: easy-fix, pull-request-available
> Fix For: 1.7.0
>
>
> Similar with [FLINK-7998|https://issues.apache.org/jira/browse/FLINK-7998], 
> ShoppingPriorityItem in example TPCHQuery3.java are set to private. This 
> causes an IllegalAccessException exception because of reflection check in 
> dynamic class instantiation. Making them public resolves the problem 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] isunjin commented on issue #6912: [FLINK-10657] TPCHQuery3 fail with IllegalAccessException

2018-10-24 Thread GitBox
isunjin commented on issue #6912: [FLINK-10657] TPCHQuery3 fail with 
IllegalAccessException
URL: https://github.com/apache/flink/pull/6912#issuecomment-432735636
 
 
   > ```
   > 00:58:26.666 [ERROR] 
src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java:[241] 
(javadoc) JavadocType: Missing a Javadoc comment.
   > ```
   
   Thanks for review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8995) Add a test operator with keyed state that uses custom, stateful serializer

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662491#comment-16662491
 ] 

ASF GitHub Bot commented on FLINK-8995:
---

tzulitai commented on a change in pull request #6909: [FLINK-8995][tests] Add 
keyed state that uses custom stateful seriali…
URL: https://github.com/apache/flink/pull/6909#discussion_r227863415
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/StatefulComplexPayloadSerializer.java
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A custom stateful serializer to test that serializers are not used 
concurrently.
+ */
+public class StatefulComplexPayloadSerializer extends 
TypeSerializer {
+
+   private static final long serialVersionUID = 8766687317209282373L;
+
+   /** This holds the thread that currently has exclusive ownership over 
the serializer. */
+   private final AtomicReference currentOwnerThread;
+
+   public StatefulComplexPayloadSerializer() {
+   this.currentOwnerThread = new AtomicReference<>(null);
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return false;
+   }
+
+   @Override
+   public TypeSerializer duplicate() {
+   return new StatefulComplexPayloadSerializer();
+   }
+
+   @Override
+   public ComplexPayload createInstance() {
+   throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public ComplexPayload copy(ComplexPayload from) {
+   try {
+   if (currentOwnerThread.compareAndSet(null, 
Thread.currentThread())) {
+   return InstantiationUtil.deserializeObject(
+   
InstantiationUtil.serializeObject(from), 
Thread.currentThread().getContextClassLoader());
 
 Review comment:
   Can avoid multiple invocations on `Thread.currentThread()`:
   
   ```
   Thread currentThread = Thread.currentThread();
   if (currentOwnerThread.compareAndSet(null, currentThread)) {
   return deserializeObject(serializeObject(from), 
currentThread.getContextClassLoader());
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a test operator with keyed state that uses custom, stateful serializer
> --
>
> Key: FLINK-8995
> URL: https://issues.apache.org/jira/browse/FLINK-8995
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This test should figure out problems in places where multiple threads would 
> share the same serializer instead of properly duplicating it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai commented on a change in pull request #6909: [FLINK-8995][tests] Add keyed state that uses custom stateful seriali…

2018-10-24 Thread GitBox
tzulitai commented on a change in pull request #6909: [FLINK-8995][tests] Add 
keyed state that uses custom stateful seriali…
URL: https://github.com/apache/flink/pull/6909#discussion_r227863415
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/StatefulComplexPayloadSerializer.java
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A custom stateful serializer to test that serializers are not used 
concurrently.
+ */
+public class StatefulComplexPayloadSerializer extends 
TypeSerializer {
+
+   private static final long serialVersionUID = 8766687317209282373L;
+
+   /** This holds the thread that currently has exclusive ownership over 
the serializer. */
+   private final AtomicReference currentOwnerThread;
+
+   public StatefulComplexPayloadSerializer() {
+   this.currentOwnerThread = new AtomicReference<>(null);
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return false;
+   }
+
+   @Override
+   public TypeSerializer duplicate() {
+   return new StatefulComplexPayloadSerializer();
+   }
+
+   @Override
+   public ComplexPayload createInstance() {
+   throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public ComplexPayload copy(ComplexPayload from) {
+   try {
+   if (currentOwnerThread.compareAndSet(null, 
Thread.currentThread())) {
+   return InstantiationUtil.deserializeObject(
+   
InstantiationUtil.serializeObject(from), 
Thread.currentThread().getContextClassLoader());
 
 Review comment:
   Can avoid multiple invocations on `Thread.currentThread()`:
   
   ```
   Thread currentThread = Thread.currentThread();
   if (currentOwnerThread.compareAndSet(null, currentThread)) {
   return deserializeObject(serializeObject(from), 
currentThread.getContextClassLoader());
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10570) State grows unbounded when "within" constraint not applied

2018-10-24 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662481#comment-16662481
 ] 

Dawid Wysakowicz commented on FLINK-10570:
--

Fixed in master: fa085699ee5a8a90492952ae05bfc78ded3d1ec3
Fixed in 1.6: 8b8422c86ae2ef8b657c395cf3a10fc5f2180b84

> State grows unbounded when "within" constraint not applied
> --
>
> Key: FLINK-10570
> URL: https://issues.apache.org/jira/browse/FLINK-10570
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.6.1
>Reporter: Thomas Wozniakowski
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> We have been running some failure monitoring using the CEP library. Simple 
> stuff that should probably have been implemented with a window, rather than 
> CEP, but we had already set the project up to use CEP elsewhere and it was 
> trivial to add this.
> We ran the following pattern (on 1.4.2):
> {code:java}
> begin(PURCHASE_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent())
> .subtype(PurchaseEvent.class)
> .times(100)
> {code}
> and then flat selected the responses if the failure ratio was over a certain 
> threshold.
> With 1.6.1, the state size of the CEP operator for this pattern grows 
> unbounded, and eventually destroys the job with an OOM exception. We have 
> many CEP operators in this job but all the rest use a "within" call.
> In 1.4.2, it seems events would be discarded once they were no longer in the 
> 100 most recent, now it seems they are held onto indefinitely. 
> We have a workaround (we're just going to add a "within" call to force the 
> CEP operator to discard old events), but it would be useful if we could have 
> the old behaviour back.
> Please let me know if I can provide any more information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10570) State grows unbounded when "within" constraint not applied

2018-10-24 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-10570.

   Resolution: Fixed
Fix Version/s: 1.7.0
   1.6.3

> State grows unbounded when "within" constraint not applied
> --
>
> Key: FLINK-10570
> URL: https://issues.apache.org/jira/browse/FLINK-10570
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.6.1
>Reporter: Thomas Wozniakowski
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> We have been running some failure monitoring using the CEP library. Simple 
> stuff that should probably have been implemented with a window, rather than 
> CEP, but we had already set the project up to use CEP elsewhere and it was 
> trivial to add this.
> We ran the following pattern (on 1.4.2):
> {code:java}
> begin(PURCHASE_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent())
> .subtype(PurchaseEvent.class)
> .times(100)
> {code}
> and then flat selected the responses if the failure ratio was over a certain 
> threshold.
> With 1.6.1, the state size of the CEP operator for this pattern grows 
> unbounded, and eventually destroys the job with an OOM exception. We have 
> many CEP operators in this job but all the rest use a "within" call.
> In 1.4.2, it seems events would be discarded once they were no longer in the 
> 100 most recent, now it seems they are held onto indefinitely. 
> We have a workaround (we're just going to add a "within" call to force the 
> CEP operator to discard old events), but it would be useful if we could have 
> the old behaviour back.
> Please let me know if I can provide any more information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10670) Fix Correlate codegen error

2018-10-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10670:
---
Labels: pull-request-available  (was: )

> Fix Correlate codegen error
> ---
>
> Key: FLINK-10670
> URL: https://issues.apache.org/jira/browse/FLINK-10670
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> TableFunctionCollector should handle reuseInitCode and reuseMemberCode



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10670) Fix Correlate codegen error

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662476#comment-16662476
 ] 

ASF GitHub Bot commented on FLINK-10670:


Xpray opened a new pull request #6923: [FLINK-10670] [table] Fix Correlate 
codegen error
URL: https://github.com/apache/flink/pull/6923
 
 
   
   
   ## What is the purpose of the change
   TableFunctionCollector should handle reuseInitStatements and 
reuseMemberStatements
   
   
   ## Brief change log
   * merge the statements of `CollectorCodeGenerator` and 
`FunctionCodeGenerator`
   
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   Added test: 
org.apache.flink.table.runtime.stream.table.CorrelateITCase#testTableFunctionCollectorInit
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
 - The serializers:  no
 - The runtime per-record code paths (performance sensitive):  no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper:  no 
 - The S3 file system connector:  no
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  no
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix Correlate codegen error
> ---
>
> Key: FLINK-10670
> URL: https://issues.apache.org/jira/browse/FLINK-10670
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> TableFunctionCollector should handle reuseInitCode and reuseMemberCode



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Xpray opened a new pull request #6923: [FLINK-10670] [table] Fix Correlate codegen error

2018-10-24 Thread GitBox
Xpray opened a new pull request #6923: [FLINK-10670] [table] Fix Correlate 
codegen error
URL: https://github.com/apache/flink/pull/6923
 
 
   
   
   ## What is the purpose of the change
   TableFunctionCollector should handle reuseInitStatements and 
reuseMemberStatements
   
   
   ## Brief change log
   * merge the statements of `CollectorCodeGenerator` and 
`FunctionCodeGenerator`
   
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   Added test: 
org.apache.flink.table.runtime.stream.table.CorrelateITCase#testTableFunctionCollectorInit
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
 - The serializers:  no
 - The runtime per-record code paths (performance sensitive):  no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper:  no 
 - The S3 file system connector:  no
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  no
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10669) Exceptions & errors are not properly checked in logs in e2e tests

2018-10-24 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-10669.

Resolution: Fixed

> Exceptions & errors are not properly checked in logs in e2e tests
> -
>
> Key: FLINK-10669
> URL: https://issues.apache.org/jira/browse/FLINK-10669
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10220) StreamSQL E2E test fails on travis

2018-10-24 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved FLINK-10220.
--
Resolution: Fixed

> StreamSQL E2E test fails on travis
> --
>
> Key: FLINK-10220
> URL: https://issues.apache.org/jira/browse/FLINK-10220
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Hequn Cheng
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> https://travis-ci.org/zentol/flink-ci/jobs/420972344
> {code}
> [FAIL] 'Streaming SQL end-to-end test' failed after 1 minutes and 49 seconds! 
> Test exited with exit code 0 but the logs contained errors, exceptions or 
> non-empty .out files
> 2018-08-27 07:34:36,311 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- window: 
> (TumblingGroupWindow('w$, 'rowtime, 2.millis)), select: ($SUM0(correct) 
> AS correct, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS 
> w$rowtime, proctime('w$) AS w$proctime) -> select: (correct, w$start AS 
> rowtime) -> to: Row -> Map -> Sink: Unnamed (1/1) 
> (97d055e4661ff3361a504626257d406d) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:65)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateAllWindowFunction.apply(IncrementalAggregateAllWindowFunction.scala:62)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateAllTimeWindowFunction.apply(IncrementalAggregateAllTimeWindowFunction.scala:65)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateAllTimeWindowFunction.apply(IncrementalAggregateAllTimeWindowFunction.scala:37)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:46)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:34)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
>   at 
> 

[GitHub] yanghua commented on issue #6890: [FLINK-10603] Reduce kafka test duration

2018-10-24 Thread GitBox
yanghua commented on issue #6890: [FLINK-10603] Reduce kafka test duration
URL: https://github.com/apache/flink/pull/6890#issuecomment-432718022
 
 
   excluded all but `FlinkKafkaProducer011ITCase`, test result : 
   
   ```
   15:41:23.815 [INFO] flink-connector-kafka-0.8 .. 
SUCCESS [02:38 min]
   15:41:23.815 [INFO] flink-connector-kafka-0.9 .. 
SUCCESS [02:52 min]
   15:41:23.815 [INFO] flink-connector-kafka-0.10 . 
SUCCESS [03:13 min]
   15:41:23.815 [INFO] flink-connector-kafka-0.11 . 
SUCCESS [04:30 min]
   ```
   
   What do you think about the time duration, does it look normal? @pnowojski 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10603) Reduce kafka test duration

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662466#comment-16662466
 ] 

ASF GitHub Bot commented on FLINK-10603:


yanghua commented on issue #6890: [FLINK-10603] Reduce kafka test duration
URL: https://github.com/apache/flink/pull/6890#issuecomment-432718022
 
 
   excluded all but `FlinkKafkaProducer011ITCase`, test result : 
   
   ```
   15:41:23.815 [INFO] flink-connector-kafka-0.8 .. 
SUCCESS [02:38 min]
   15:41:23.815 [INFO] flink-connector-kafka-0.9 .. 
SUCCESS [02:52 min]
   15:41:23.815 [INFO] flink-connector-kafka-0.10 . 
SUCCESS [03:13 min]
   15:41:23.815 [INFO] flink-connector-kafka-0.11 . 
SUCCESS [04:30 min]
   ```
   
   What do you think about the time duration, does it look normal? @pnowojski 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce kafka test duration
> --
>
> Key: FLINK-10603
> URL: https://issues.apache.org/jira/browse/FLINK-10603
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The tests for the modern kafka connector take more than 10 minutes which is 
> simply unacceptable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10665) Port YARNSessionFIFOITCase#testJavaAPI to new codebase

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662439#comment-16662439
 ] 

ASF GitHub Bot commented on FLINK-10665:


TisonKun commented on issue #6917: [FLINK-10665] [tests] Port 
YARNSessionFIFOITCase#testJavaAPI to new c…
URL: https://github.com/apache/flink/pull/6917#issuecomment-432711697
 
 
   @zentol thanks for the review, add a hotfix :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port YARNSessionFIFOITCase#testJavaAPI to new codebase
> --
>
> Key: FLINK-10665
> URL: https://issues.apache.org/jira/browse/FLINK-10665
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{YARNSessionFIFOITCase#testJavaAPI}} to new codebase



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6917: [FLINK-10665] [tests] Port YARNSessionFIFOITCase#testJavaAPI to new c…

2018-10-24 Thread GitBox
TisonKun commented on issue #6917: [FLINK-10665] [tests] Port 
YARNSessionFIFOITCase#testJavaAPI to new c…
URL: https://github.com/apache/flink/pull/6917#issuecomment-432711697
 
 
   @zentol thanks for the review, add a hotfix :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-8921) Split code generated call expression

2018-10-24 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-8921:
-

Assignee: xueyu  (was: Ruidong Li)

> Split code generated call expression 
> -
>
> Key: FLINK-8921
> URL: https://issues.apache.org/jira/browse/FLINK-8921
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: xueyu
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-8274 we introduced the possibility of splitting the generated code 
> into multiple methods in order to exceed the JVMs maximum method size (see 
> also https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.9).
> At the moment we only split methods by fields, however, this is not enough in 
> all case. We should also split expressions. I suggest to split the operands 
> of a {{RexCall}} in 
> {{org.apache.flink.table.codegen.CodeGenerator#visitCall}} if we reach a 
> certain threshold. However, this should happen as lazily as possible to keep 
> the runtime overhead low.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8921) Split code generated call expression

2018-10-24 Thread Ruidong Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662436#comment-16662436
 ] 

Ruidong Li commented on FLINK-8921:
---

[~xueyu] thanks for you contribution, I assign this issue to you if you want 
fix this.

> Split code generated call expression 
> -
>
> Key: FLINK-8921
> URL: https://issues.apache.org/jira/browse/FLINK-8921
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-8274 we introduced the possibility of splitting the generated code 
> into multiple methods in order to exceed the JVMs maximum method size (see 
> also https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.9).
> At the moment we only split methods by fields, however, this is not enough in 
> all case. We should also split expressions. I suggest to split the operands 
> of a {{RexCall}} in 
> {{org.apache.flink.table.codegen.CodeGenerator#visitCall}} if we reach a 
> certain threshold. However, this should happen as lazily as possible to keep 
> the runtime overhead low.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10665) Port YARNSessionFIFOITCase#testJavaAPI to new codebase

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662430#comment-16662430
 ] 

ASF GitHub Bot commented on FLINK-10665:


zentol commented on issue #6917: [FLINK-10665] [tests] Port 
YARNSessionFIFOITCase#testJavaAPI to new c…
URL: https://github.com/apache/flink/pull/6917#issuecomment-432709017
 
 
   ```
   14:45:49.851 [INFO] There are 3 errors reported by Checkstyle 8.9 with 
/tools/maven/checkstyle.xml ruleset.
   14:45:49.852 [ERROR] 
src/test/java/org/apache/flink/yarn/YARNITCase.java:[127] (javadoc) 
JavadocType: Missing a Javadoc comment.
   14:45:49.852 [ERROR] 
src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java:[38] (imports) 
ImportOrder: Import org.apache.flink.yarn.util.YarnTestUtils appears after 
other imports that it should precede
   14:45:49.852 [ERROR] 
src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java:[39] (imports) 
ImportOrder: 'org.apache.hadoop.fs.Path' should be separated from previous 
imports.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port YARNSessionFIFOITCase#testJavaAPI to new codebase
> --
>
> Key: FLINK-10665
> URL: https://issues.apache.org/jira/browse/FLINK-10665
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{YARNSessionFIFOITCase#testJavaAPI}} to new codebase



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #6917: [FLINK-10665] [tests] Port YARNSessionFIFOITCase#testJavaAPI to new c…

2018-10-24 Thread GitBox
zentol commented on issue #6917: [FLINK-10665] [tests] Port 
YARNSessionFIFOITCase#testJavaAPI to new c…
URL: https://github.com/apache/flink/pull/6917#issuecomment-432709017
 
 
   ```
   14:45:49.851 [INFO] There are 3 errors reported by Checkstyle 8.9 with 
/tools/maven/checkstyle.xml ruleset.
   14:45:49.852 [ERROR] 
src/test/java/org/apache/flink/yarn/YARNITCase.java:[127] (javadoc) 
JavadocType: Missing a Javadoc comment.
   14:45:49.852 [ERROR] 
src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java:[38] (imports) 
ImportOrder: Import org.apache.flink.yarn.util.YarnTestUtils appears after 
other imports that it should precede
   14:45:49.852 [ERROR] 
src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java:[39] (imports) 
ImportOrder: 'org.apache.hadoop.fs.Path' should be separated from previous 
imports.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10490) OperatorSnapshotUtil should probably use SavepointV2Serializer

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662427#comment-16662427
 ] 

ASF GitHub Bot commented on FLINK-10490:


zentol commented on issue #6910: [FLINK-10490][tests] OperatorSnapshotUtil 
should use SavepointV2Seria…
URL: https://github.com/apache/flink/pull/6910#issuecomment-432708555
 
 
   The code loading the savepoint is aware of previous versions and starts a 
migration routine if a v1 savepoint is detected; the savepoint is loaded using 
the v1 serializer and migrated before it is passed to the task.
   I supposed our migration tests aren't affected since they don't make 
structural changes to jobs, which IIRC is the major limitation of the old 
format.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> OperatorSnapshotUtil should probably use SavepointV2Serializer
> --
>
> Key: FLINK-10490
> URL: https://issues.apache.org/jira/browse/FLINK-10490
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> {{OperatorSnapshotUtil}} is used for testing savepoint migration. This 
> utility internally still uses {{SavepointV1Serializer}} and I would assume 
> that it should use {{SavepointV2Serializer}}. I wonder if that means that 
> some newer cases are actually not covered in the migration tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #6910: [FLINK-10490][tests] OperatorSnapshotUtil should use SavepointV2Seria…

2018-10-24 Thread GitBox
zentol commented on issue #6910: [FLINK-10490][tests] OperatorSnapshotUtil 
should use SavepointV2Seria…
URL: https://github.com/apache/flink/pull/6910#issuecomment-432708555
 
 
   The code loading the savepoint is aware of previous versions and starts a 
migration routine if a v1 savepoint is detected; the savepoint is loaded using 
the v1 serializer and migrated before it is passed to the task.
   I supposed our migration tests aren't affected since they don't make 
structural changes to jobs, which IIRC is the major limitation of the old 
format.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8274) Fix Java 64K method compiling limitation for CommonCalc

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662422#comment-16662422
 ] 

ASF GitHub Bot commented on FLINK-8274:
---

xueyumusic commented on issue #5613: [FLINK-8274] [table] Split generated 
methods for preventing compiler exceptions
URL: https://github.com/apache/flink/pull/5613#issuecomment-432705919
 
 
   > Hi @twalthr
   > Current implementation will reach the limits if a field becomes too large. 
This can be reproduced by creating a filed produced by concat_ws(). see 
[hequn8128@871fb2c](https://github.com/hequn8128/flink/commit/871fb2c7723c11dd0b75176a3be3be70e2740b2b)
   > Best, Hequn
   
   @hequn8128 I use your test case in this PR 
[FLINK-8921](https://github.com/apache/flink/pull/6921), thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix Java 64K method compiling limitation for CommonCalc
> ---
>
> Key: FLINK-8274
> URL: https://issues.apache.org/jira/browse/FLINK-8274
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.0
>
>
> For complex SQL Queries, the generated code for {code}DataStreamCalc{code}, 
> {code}DataSetCalc{code} may exceed Java's method length limitation 64kb.
>  
> This issue will split long method to several sub method calls.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8274) Fix Java 64K method compiling limitation for CommonCalc

2018-10-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8274:
--
Labels: pull-request-available  (was: )

> Fix Java 64K method compiling limitation for CommonCalc
> ---
>
> Key: FLINK-8274
> URL: https://issues.apache.org/jira/browse/FLINK-8274
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.0
>
>
> For complex SQL Queries, the generated code for {code}DataStreamCalc{code}, 
> {code}DataSetCalc{code} may exceed Java's method length limitation 64kb.
>  
> This issue will split long method to several sub method calls.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xueyumusic commented on issue #5613: [FLINK-8274] [table] Split generated methods for preventing compiler exceptions

2018-10-24 Thread GitBox
xueyumusic commented on issue #5613: [FLINK-8274] [table] Split generated 
methods for preventing compiler exceptions
URL: https://github.com/apache/flink/pull/5613#issuecomment-432705919
 
 
   > Hi @twalthr
   > Current implementation will reach the limits if a field becomes too large. 
This can be reproduced by creating a filed produced by concat_ws(). see 
[hequn8128@871fb2c](https://github.com/hequn8128/flink/commit/871fb2c7723c11dd0b75176a3be3be70e2740b2b)
   > Best, Hequn
   
   @hequn8128 I use your test case in this PR 
[FLINK-8921](https://github.com/apache/flink/pull/6921), thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10613) Remove logger casts in HBaseConnectorITCase

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662421#comment-16662421
 ] 

ASF GitHub Bot commented on FLINK-10613:


zentol opened a new pull request #6922: [FLINK-10613][hbase][tests] Remove 
logger casts
URL: https://github.com/apache/flink/pull/6922
 
 
   This PR removes some logger casts in the HBase IT case. The casting is only 
used to increase the log-level, but this can be done more safely via the 
`log4j-test.properties` file.
   
   I do not modify the log4j properties in this PR since it seems unnecessary 
in the first place. Chances are that someone trying to debug the code will just 
get lost in a sea of trace messages from hbase.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove logger casts in HBaseConnectorITCase
> ---
>
> Key: FLINK-10613
> URL: https://issues.apache.org/jira/browse/FLINK-10613
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.4
>Reporter: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> During the testing of {{1.5.5-rc1}} an issue was discovered in the 
> {{HBaseConnectorITCase}} where a cast could fail if {{flink-table}} tests 
> were previously executed, since in this case the {{jcl-over-slf4j}} bridge is 
> being loaded.
> {code:java}
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; 
> support was removed in 8.0
> Running org.apache.flink.addons.hbase.HBaseConnectorITCase
> Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 0.557 sec <<< 
> FAILURE! - in org.apache.flink.addons.hbase.HBaseConnectorITCase
> org.apache.flink.addons.hbase.HBaseConnectorITCase  Time elapsed: 0.556 sec  
> <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.commons.logging.impl.SLF4JLocationAwareLog cannot be cast to 
> org.apache.commons.logging.impl.Log4JLogger
> org.apache.flink.addons.hbase.HBaseConnectorITCase  Time elapsed: 0.557 sec  
> <<< ERROR!
> java.lang.NullPointerException{code}
> While the logger-loading issue itself is rather subtle I believe the 
> underlying issue to be the casts to set the log-level.
> It should be possible to enable said logging through the 
> {{log4j-test.properties}} configuration instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10613) Remove logger casts in HBaseConnectorITCase

2018-10-24 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-10613:


Assignee: Chesnay Schepler

> Remove logger casts in HBaseConnectorITCase
> ---
>
> Key: FLINK-10613
> URL: https://issues.apache.org/jira/browse/FLINK-10613
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.4
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> During the testing of {{1.5.5-rc1}} an issue was discovered in the 
> {{HBaseConnectorITCase}} where a cast could fail if {{flink-table}} tests 
> were previously executed, since in this case the {{jcl-over-slf4j}} bridge is 
> being loaded.
> {code:java}
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; 
> support was removed in 8.0
> Running org.apache.flink.addons.hbase.HBaseConnectorITCase
> Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 0.557 sec <<< 
> FAILURE! - in org.apache.flink.addons.hbase.HBaseConnectorITCase
> org.apache.flink.addons.hbase.HBaseConnectorITCase  Time elapsed: 0.556 sec  
> <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.commons.logging.impl.SLF4JLocationAwareLog cannot be cast to 
> org.apache.commons.logging.impl.Log4JLogger
> org.apache.flink.addons.hbase.HBaseConnectorITCase  Time elapsed: 0.557 sec  
> <<< ERROR!
> java.lang.NullPointerException{code}
> While the logger-loading issue itself is rather subtle I believe the 
> underlying issue to be the casts to set the log-level.
> It should be possible to enable said logging through the 
> {{log4j-test.properties}} configuration instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10613) Remove logger casts in HBaseConnectorITCase

2018-10-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10613:
---
Labels: pull-request-available  (was: )

> Remove logger casts in HBaseConnectorITCase
> ---
>
> Key: FLINK-10613
> URL: https://issues.apache.org/jira/browse/FLINK-10613
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.4
>Reporter: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> During the testing of {{1.5.5-rc1}} an issue was discovered in the 
> {{HBaseConnectorITCase}} where a cast could fail if {{flink-table}} tests 
> were previously executed, since in this case the {{jcl-over-slf4j}} bridge is 
> being loaded.
> {code:java}
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; 
> support was removed in 8.0
> Running org.apache.flink.addons.hbase.HBaseConnectorITCase
> Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 0.557 sec <<< 
> FAILURE! - in org.apache.flink.addons.hbase.HBaseConnectorITCase
> org.apache.flink.addons.hbase.HBaseConnectorITCase  Time elapsed: 0.556 sec  
> <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.commons.logging.impl.SLF4JLocationAwareLog cannot be cast to 
> org.apache.commons.logging.impl.Log4JLogger
> org.apache.flink.addons.hbase.HBaseConnectorITCase  Time elapsed: 0.557 sec  
> <<< ERROR!
> java.lang.NullPointerException{code}
> While the logger-loading issue itself is rather subtle I believe the 
> underlying issue to be the casts to set the log-level.
> It should be possible to enable said logging through the 
> {{log4j-test.properties}} configuration instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >