[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496174#comment-16496174
 ] 

ASF GitHub Bot commented on FLINK-7789:
---

Github user kisimple commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r192001515
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ---
@@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception {

ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(),
 TimeoutException.class);
}
 
+   @Test
+   public void testAsyncTimeoutAware() throws Exception {
--- End diff --

Updated as suggested, plz have a look :)


> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Assignee: blues zheng
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

2018-05-30 Thread kisimple
Github user kisimple commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r192001515
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ---
@@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception {

ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(),
 TimeoutException.class);
}
 
+   @Test
+   public void testAsyncTimeoutAware() throws Exception {
--- End diff --

Updated as suggested, plz have a look :)


---


[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-30 Thread yanxiaobin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496136#comment-16496136
 ] 

yanxiaobin edited comment on FLINK-8500 at 5/31/18 5:42 AM:


[~tzulitai]   thanks. That is great. 


was (Author: backlight):
[~tzulitai]   thanks, greate. 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-30 Thread yanxiaobin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496136#comment-16496136
 ] 

yanxiaobin commented on FLINK-8500:
---

[~tzulitai]   thanks, greate. 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9476) Lost sideOutPut Late Elements in CEP Operator

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496092#comment-16496092
 ] 

ASF GitHub Bot commented on FLINK-9476:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6104
  
@bowenli86 thanks for review, i have fix the error according to the comment 
and add the unit test in CEPITCase, please help review it again. cc @kl0u 


> Lost sideOutPut Late Elements in CEP Operator
> -
>
> Key: FLINK-9476
> URL: https://issues.apache.org/jira/browse/FLINK-9476
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6104: [FLINK-9476]Emit late elements in CEP as sideOutPut

2018-05-30 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6104
  
@bowenli86 thanks for review, i have fix the error according to the comment 
and add the unit test in CEPITCase, please help review it again. cc @kl0u 


---


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-30 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496068#comment-16496068
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8500:


[~Backlight]

Yes, we're currently going for a minimal incremental fix for now, so that at 
least we have a way around the problem.

In the long run, the ideal approach is discussed in the comments of this PR: 
[https://github.com/apache/flink/pull/5958.]

Given that [~FredTing]'s latest PR is a "short-term solution", I think it is 
definitely ok to target that for a merge to the 1.5.x series.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-30 Thread yanxiaobin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496006#comment-16496006
 ] 

yanxiaobin commented on FLINK-8500:
---

So far, it can solve the current problems, but in the long run, there will 
still be some limitations on the support of future Kafka version. By the way, 
can we fix this problem in the 1.5.x series?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-7525) Add config option to disable Cancel functionality on UI

2018-05-30 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16441630#comment-16441630
 ] 

Ted Yu edited comment on FLINK-7525 at 5/31/18 1:41 AM:


Hopefully FLIP-6 would be released soon .


was (Author: yuzhih...@gmail.com):
Hopefully FLIP-6 would be released soon.

> Add config option to disable Cancel functionality on UI
> ---
>
> Key: FLINK-7525
> URL: https://issues.apache.org/jira/browse/FLINK-7525
> Project: Flink
>  Issue Type: Improvement
>  Components: Web Client, Webfrontend
>Reporter: Ted Yu
>Priority: Major
>
> In this email thread 
> http://search-hadoop.com/m/Flink/VkLeQlf0QOnc7YA?subj=Security+Control+of+running+Flink+Jobs+on+Flink+UI
>  , Raja was asking for a way to control how users cancel Job(s).
> Robert proposed adding a config option which disables the Cancel 
> functionality.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9150) Prepare for Java 10

2018-05-30 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9150:
--
Component/s: Build System

> Prepare for Java 10
> ---
>
> Key: FLINK-9150
> URL: https://issues.apache.org/jira/browse/FLINK-9150
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Reporter: Ted Yu
>Priority: Major
>
> Java 9 is not a LTS release.
> When compiling with Java 10, I see the following compilation error:
> {code}
> [ERROR] Failed to execute goal on project flink-shaded-hadoop2: Could not 
> resolve dependencies for project 
> org.apache.flink:flink-shaded-hadoop2:jar:1.6-SNAPSHOT: Could not find 
> artifact jdk.tools:jdk.tools:jar:1.6 at specified path 
> /a/jdk-10/../lib/tools.jar -> [Help 1]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7689) Instrument the Flink JDBC sink

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495654#comment-16495654
 ] 

ASF GitHub Bot commented on FLINK-7689:
---

Github user pabloem commented on the issue:

https://github.com/apache/flink/pull/4725
  
@fhueske @asicoe is this PR still current / ongoing? I'm willing to driving 
it to the end if there's anything left to do.. : ) - Or perhaps it's almost 
ready to merge?


> Instrument the Flink JDBC sink
> --
>
> Key: FLINK-7689
> URL: https://issues.apache.org/jira/browse/FLINK-7689
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
>Reporter: Martin Eden
>Priority: Minor
>  Labels: jdbc, metrics
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> As confirmed by the Flink community in the following mailing list 
> [message|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/metrics-for-Flink-sinks-td15200.html]
>   using off the shelf Flink sinks like the JDBC sink, Redis sink or Cassandra 
> sink etc does not expose any sink specific metrics.
> The purpose of this ticket is to add some relevant metrics to the 
> JDBCOutputFormat:
> - Meters for when a flush is made.
> - Histograms for the jdbc batch count and batch execution latency.
> These would allow deeper understanding of the runtime behaviour of 
> performance critical jobs writing to external databases using this generic 
> interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #4725: [FLINK-7689] [Streaming Connectors] Added metrics to JDBC...

2018-05-30 Thread pabloem
Github user pabloem commented on the issue:

https://github.com/apache/flink/pull/4725
  
@fhueske @asicoe is this PR still current / ongoing? I'm willing to driving 
it to the end if there's anything left to do.. : ) - Or perhaps it's almost 
ready to merge?


---


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495589#comment-16495589
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

GitHub user FredTing opened a pull request:

https://github.com/apache/flink/pull/6105

[FLINK-8500] Get the timestamp of the Kafka message from kafka consumer

## What is the purpose of the change

This pull request make the Kafka timestamp and timestampType available in 
the message deserialisation so one can use it in the business logic processing.

## Brief change log
- *added new default method with Timestamp/TimestampType parameters to the 
interface `KeyedDeserializationSchema`

## Verifying this change

This change is already covered by existing tests, such as Kafka Consumer 
tests and JSONKeyValueDeserializationSchemaTest.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (yes / no / **don't know**)
  - The runtime per-record code paths (performance sensitive): (**yes** / 
no / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / **docs** / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/FredTing/flink FLINK-8500

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6105


commit a214481f6722e75a298d87333e4981cb87b9c2b9
Author: Fred Teunissen 
Date:   2018-05-20T20:45:29Z

[FLINK-8500] Get the timestamp of the Kafka message from kafka consumer




> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6105: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-30 Thread FredTing
GitHub user FredTing opened a pull request:

https://github.com/apache/flink/pull/6105

[FLINK-8500] Get the timestamp of the Kafka message from kafka consumer

## What is the purpose of the change

This pull request make the Kafka timestamp and timestampType available in 
the message deserialisation so one can use it in the business logic processing.

## Brief change log
- *added new default method with Timestamp/TimestampType parameters to the 
interface `KeyedDeserializationSchema`

## Verifying this change

This change is already covered by existing tests, such as Kafka Consumer 
tests and JSONKeyValueDeserializationSchemaTest.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (yes / no / **don't know**)
  - The runtime per-record code paths (performance sensitive): (**yes** / 
no / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / **docs** / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/FredTing/flink FLINK-8500

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6105


commit a214481f6722e75a298d87333e4981cb87b9c2b9
Author: Fred Teunissen 
Date:   2018-05-20T20:45:29Z

[FLINK-8500] Get the timestamp of the Kafka message from kafka consumer




---


[jira] [Commented] (FLINK-9430) Support Casting of Object to Primitive types for Flink SQL UDF

2018-05-30 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495510#comment-16495510
 ] 

Rong Rong commented on FLINK-9430:
--

Hi [~suez1224], I think [~twalthr] and I had this discussion in a separated doc 
regrading generic type inference in UDF: 
https://docs.google.com/document/d/1zKSY1z0lvtQdfOgwcLnCMSRHew3weeJ6QfQjSD0zWas/edit#heading=h.64s92ad5mb1.
 

I am not exactly sure but maybe can we add an ITCase to this JIRA? I think at 
some point the Object is serialized using Kyro and will significantly reduce 
the performance.

> Support Casting of Object to Primitive types for Flink SQL UDF
> --
>
> Key: FLINK-9430
> URL: https://issues.apache.org/jira/browse/FLINK-9430
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> We want to add a SQL UDF to access specific element in a JSON string using 
> JSON path. However, the JSON element can be of different types, e.g. Int, 
> Float, Double, String, Boolean and etc.. Since return type is not part of the 
> method signature, we can not use overload. So we will end up writing a UDF 
> for each type, e.g. GetFloatFromJSON, GetIntFromJSON and etc., which has a 
> lot of duplication. 
> One way to unify all these UDF functions is to implement one UDF and return 
> java.lang.Object, and in the SQL statement, use CAST AS to cast the returned 
> Object into the correct type. Below is an example:
>  
> {code:java}
> object JsonPathUDF extends ScalarFunction {
>  def eval(jsonStr: String, path: String): Object = {
>JSONParser.parse(jsonStr).read(path)
>  }
> }{code}
> {code:java}
>  SELECT CAST(jsonpath(json, "$.store.book.title") AS VARCHAR(32)) as 
> bookTitle FROM table1{code}
> The current Flink SQL cast implementation does not support casting from 
> GenericTypeInfo to another type, I have already got a local 
> branch to fix this. Please comment if there are alternatives to the problem 
> above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9476) Lost sideOutPut Late Elements in CEP Operator

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495490#comment-16495490
 ] 

ASF GitHub Bot commented on FLINK-9476:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6104#discussion_r191857843
  
--- Diff: docs/dev/libs/cep.md ---
@@ -1524,7 +1524,52 @@ In `CEP` the order in which elements are processed 
matters. To guarantee that el
 
 To guarantee that elements across watermarks are processed in event-time 
order, Flink's CEP library assumes
 *correctness of the watermark*, and considers as *late* elements whose 
timestamp is smaller than that of the last
-seen watermark. Late elements are not further processed.
+seen watermark. Late elements are not further processed. Also, you can 
specify a sideOutput tag to collect the late elements come after the last seen 
watermark, you can use it like this.
+
+
+
+
+{% highlight java %}
+PatternStream patternStream = CEP.pattern(input, pattern);
+
+OutputTag lataDataOutputTag = new 
OutputTag("lata-data""){};
--- End diff --

typo: "lateDataOutputTag" and "late-data" 


> Lost sideOutPut Late Elements in CEP Operator
> -
>
> Key: FLINK-9476
> URL: https://issues.apache.org/jira/browse/FLINK-9476
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9476) Lost sideOutPut Late Elements in CEP Operator

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495491#comment-16495491
 ] 

ASF GitHub Bot commented on FLINK-9476:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6104#discussion_r191858415
  
--- Diff: docs/dev/libs/cep.md ---
@@ -1524,7 +1524,52 @@ In `CEP` the order in which elements are processed 
matters. To guarantee that el
 
 To guarantee that elements across watermarks are processed in event-time 
order, Flink's CEP library assumes
 *correctness of the watermark*, and considers as *late* elements whose 
timestamp is smaller than that of the last
-seen watermark. Late elements are not further processed.
+seen watermark. Late elements are not further processed. Also, you can 
specify a sideOutput tag to collect the late elements come after the last seen 
watermark, you can use it like this.
+
+
+
+
+{% highlight java %}
+PatternStream patternStream = CEP.pattern(input, pattern);
+
+OutputTag lataDataOutputTag = new 
OutputTag("lata-data""){};
+
+OutputTag outputTag = new OutputTag("side-output""){};
+
+SingleOutputStreamOperator result = patternStream
+.sideOutputLateData(lataDataOutputTag)
+.select(
+new PatternTimeoutFunction() {...},
+outputTag,
+new PatternSelectFunction() {...}
+);
+
+DataStream lataData = result.getSideOutput(lataDataOutputTag);
--- End diff --

typo: lateData


> Lost sideOutPut Late Elements in CEP Operator
> -
>
> Key: FLINK-9476
> URL: https://issues.apache.org/jira/browse/FLINK-9476
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...

2018-05-30 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6104#discussion_r191858415
  
--- Diff: docs/dev/libs/cep.md ---
@@ -1524,7 +1524,52 @@ In `CEP` the order in which elements are processed 
matters. To guarantee that el
 
 To guarantee that elements across watermarks are processed in event-time 
order, Flink's CEP library assumes
 *correctness of the watermark*, and considers as *late* elements whose 
timestamp is smaller than that of the last
-seen watermark. Late elements are not further processed.
+seen watermark. Late elements are not further processed. Also, you can 
specify a sideOutput tag to collect the late elements come after the last seen 
watermark, you can use it like this.
+
+
+
+
+{% highlight java %}
+PatternStream patternStream = CEP.pattern(input, pattern);
+
+OutputTag lataDataOutputTag = new 
OutputTag("lata-data""){};
+
+OutputTag outputTag = new OutputTag("side-output""){};
+
+SingleOutputStreamOperator result = patternStream
+.sideOutputLateData(lataDataOutputTag)
+.select(
+new PatternTimeoutFunction() {...},
+outputTag,
+new PatternSelectFunction() {...}
+);
+
+DataStream lataData = result.getSideOutput(lataDataOutputTag);
--- End diff --

typo: lateData


---


[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...

2018-05-30 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6104#discussion_r191857843
  
--- Diff: docs/dev/libs/cep.md ---
@@ -1524,7 +1524,52 @@ In `CEP` the order in which elements are processed 
matters. To guarantee that el
 
 To guarantee that elements across watermarks are processed in event-time 
order, Flink's CEP library assumes
 *correctness of the watermark*, and considers as *late* elements whose 
timestamp is smaller than that of the last
-seen watermark. Late elements are not further processed.
+seen watermark. Late elements are not further processed. Also, you can 
specify a sideOutput tag to collect the late elements come after the last seen 
watermark, you can use it like this.
+
+
+
+
+{% highlight java %}
+PatternStream patternStream = CEP.pattern(input, pattern);
+
+OutputTag lataDataOutputTag = new 
OutputTag("lata-data""){};
--- End diff --

typo: "lateDataOutputTag" and "late-data" 


---


[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495470#comment-16495470
 ] 

ASF GitHub Bot commented on FLINK-6469:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5448
  
Will try and take a look at this soon... Sorry for the delay.

What I would consider very important is that users who don't change their 
configuration do not get different behavior all of a sudden. Meaning in the 
absence of a "unit" we do not always interpret the value as a "byte" but as 
whatever the config value was measured in before (such as MBs, ...).


> Configure Memory Sizes with units
> -
>
> Key: FLINK-6469
> URL: https://issues.apache.org/jira/browse/FLINK-6469
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> Currently, memory sizes are configured by pure numbers, the interpretation is 
> different from configuration parameter to parameter.
> For example, heap sizes are configured in megabytes, network buffer memory is 
> configured in bytes, alignment thresholds are configured in bytes.
> I propose to configure all memory parameters the same way, with units similar 
> to time. The JVM itself configured heap size similarly: {{Xmx5g}} or 
> {{Xmx2000m}}.
> {code}
> 1  -> bytes
> 10 kb
> 64 mb
> 1 gb
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5448
  
Will try and take a look at this soon... Sorry for the delay.

What I would consider very important is that users who don't change their 
configuration do not get different behavior all of a sudden. Meaning in the 
absence of a "unit" we do not always interpret the value as a "byte" but as 
whatever the config value was measured in before (such as MBs, ...).


---


[jira] [Commented] (FLINK-8873) move unit tests of KeyedStream from DataStreamTest to KeyedStreamTest

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495448#comment-16495448
 ] 

ASF GitHub Bot commented on FLINK-8873:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5649
  
When I was developing KeyedProcessFunction, I initially wondered why 
there's no tests for KeyedStream, and researched and realized that they were 
actually mixed with DataStream tests. 

I think having that clarity by separating those tests would be great. Well, 
I also agree it doesn't hurt that much to keep them as-is. If you feel strongly 
against it, I can close this PR




> move unit tests of KeyedStream from DataStreamTest to KeyedStreamTest
> -
>
> Key: FLINK-8873
> URL: https://issues.apache.org/jira/browse/FLINK-8873
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Tests
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.6.0
>
>
> move unit tests of KeyedStream.scala from DataStreamTest.scala to 
> KeyedStreamTest.scala, in order to have clearer separation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5649: [FLINK-8873] [DataStream API] [Tests] move unit tests of ...

2018-05-30 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5649
  
When I was developing KeyedProcessFunction, I initially wondered why 
there's no tests for KeyedStream, and researched and realized that they were 
actually mixed with DataStream tests. 

I think having that clarity by separating those tests would be great. Well, 
I also agree it doesn't hurt that much to keep them as-is. If you feel strongly 
against it, I can close this PR




---


[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495400#comment-16495400
 ] 

ASF GitHub Bot commented on FLINK-7789:
---

Github user kisimple commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r191835482
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
 ---
@@ -71,6 +71,9 @@ object AsyncDataStream {
   override def asyncInvoke(input: IN, resultFuture: 
JavaResultFuture[OUT]): Unit = {
 asyncFunction.asyncInvoke(input, new 
JavaResultFutureWrapper(resultFuture))
   }
+  override def timeout(input: IN, resultFuture: 
JavaResultFuture[OUT]): Unit = {
--- End diff --

I haven't found any tests for `AsyncDataStream.scala` or 
`AsyncFunction.scala`, I am not sure whether it is missing or unnecessary. What 
do you think?


> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Assignee: blues zheng
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

2018-05-30 Thread kisimple
Github user kisimple commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r191835482
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
 ---
@@ -71,6 +71,9 @@ object AsyncDataStream {
   override def asyncInvoke(input: IN, resultFuture: 
JavaResultFuture[OUT]): Unit = {
 asyncFunction.asyncInvoke(input, new 
JavaResultFutureWrapper(resultFuture))
   }
+  override def timeout(input: IN, resultFuture: 
JavaResultFuture[OUT]): Unit = {
--- End diff --

I haven't found any tests for `AsyncDataStream.scala` or 
`AsyncFunction.scala`, I am not sure whether it is missing or unnecessary. What 
do you think?


---


[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495395#comment-16495395
 ] 

ASF GitHub Bot commented on FLINK-7789:
---

Github user kisimple commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r191834614
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ---
@@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception {

ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(),
 TimeoutException.class);
}
 
+   @Test
+   public void testAsyncTimeoutAware() throws Exception {
--- End diff --

Good point :)


> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Assignee: blues zheng
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

2018-05-30 Thread kisimple
Github user kisimple commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r191834614
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ---
@@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception {

ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(),
 TimeoutException.class);
}
 
+   @Test
+   public void testAsyncTimeoutAware() throws Exception {
--- End diff --

Good point :)


---


[jira] [Commented] (FLINK-9476) Lost sideOutPut Late Elements in CEP Operator

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495355#comment-16495355
 ] 

ASF GitHub Bot commented on FLINK-9476:
---

GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6104

[FLINK-9476]Emit late elements in CEP as sideOutPut

Now, when use with Eventtime in CEP library, elements come later than 
watermark will be dropped,we can put it in side Output with outPutTag

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink FLINK-9476

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6104.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6104


commit c0b04fadacd3e9f3f403b3adfdbadf8d4aac79e4
Author: minwenjun 
Date:   2018-05-30T15:32:15Z

Add loseDataOutputTag in cep deal with event time dropped data

commit 373e376fc182c32fe69765aa564e93057954ff44
Author: minwenjun 
Date:   2018-05-30T15:50:01Z

add scala api




> Lost sideOutPut Late Elements in CEP Operator
> -
>
> Key: FLINK-9476
> URL: https://issues.apache.org/jira/browse/FLINK-9476
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...

2018-05-30 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6104

[FLINK-9476]Emit late elements in CEP as sideOutPut

Now, when use with Eventtime in CEP library, elements come later than 
watermark will be dropped,we can put it in side Output with outPutTag

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink FLINK-9476

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6104.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6104


commit c0b04fadacd3e9f3f403b3adfdbadf8d4aac79e4
Author: minwenjun 
Date:   2018-05-30T15:32:15Z

Add loseDataOutputTag in cep deal with event time dropped data

commit 373e376fc182c32fe69765aa564e93057954ff44
Author: minwenjun 
Date:   2018-05-30T15:50:01Z

add scala api




---


[jira] [Commented] (FLINK-7836) specifying node label for flink job to run on yarn

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495353#comment-16495353
 ] 

ASF GitHub Bot commented on FLINK-7836:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5593


> specifying node label for flink job to run on yarn
> --
>
> Key: FLINK-7836
> URL: https://issues.apache.org/jira/browse/FLINK-7836
> Project: Flink
>  Issue Type: New Feature
>  Components: Client
>Affects Versions: 1.3.2
>Reporter: zhaibaba
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.6.0
>
>
> flink client cannot specify node label for flink job to run on yarn



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5593: [FLINK-7836][Client] specifying node label for fli...

2018-05-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5593


---


[jira] [Resolved] (FLINK-7836) specifying node label for flink job to run on yarn

2018-05-30 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7836.
--
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed via b6448015f485b5051e5e920b82b4b3ee71c55974

> specifying node label for flink job to run on yarn
> --
>
> Key: FLINK-7836
> URL: https://issues.apache.org/jira/browse/FLINK-7836
> Project: Flink
>  Issue Type: New Feature
>  Components: Client
>Affects Versions: 1.3.2
>Reporter: zhaibaba
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.6.0
>
>
> flink client cannot specify node label for flink job to run on yarn



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-5789) Make Bucketing Sink independent of Hadoop's FileSystem

2018-05-30 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-5789:
-

Assignee: Kostas Kloudas

> Make Bucketing Sink independent of Hadoop's FileSystem
> --
>
> Key: FLINK-5789
> URL: https://issues.apache.org/jira/browse/FLINK-5789
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.6.0
>
>
> The {{BucketingSink}} is hard wired to Hadoop's FileSystem, bypassing Flink's 
> file system abstraction.
> This causes several issues:
>   - The bucketing sink will behave different than other file sinks with 
> respect to configuration
>   - Directly supported file systems (not through hadoop) like the MapR File 
> System does not work in the same way with the BuketingSink as other file 
> systems
>   - The previous point is all the more problematic in the effort to make 
> Hadoop an optional dependency and with in other stacks (Mesos, Kubernetes, 
> AWS, GCE, Azure) with ideally no Hadoop dependency.
> We should port the {{BucketingSink}} to use Flink's FileSystem classes.
> To support the *truncate* functionality that is needed for the exactly-once 
> semantics of the Bucketing Sink, we should extend Flink's FileSystem 
> abstraction to have the methods
>   - {{boolean supportsTruncate()}}
>   - {{void truncate(Path, long)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9413) Tasks can fail with PartitionNotFoundException if consumer deployment takes too long

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495303#comment-16495303
 ] 

ASF GitHub Bot commented on FLINK-9413:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6103#discussion_r191814673
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ---
@@ -103,9 +103,7 @@ public String toString() {
// The producing task needs to be RUNNING or already 
FINISHED
if (consumedPartition.isConsumable() && producerSlot != 
null &&
(producerState == 
ExecutionState.RUNNING ||
-   producerState == 
ExecutionState.FINISHED ||
-   producerState == 
ExecutionState.SCHEDULED ||
-   producerState == 
ExecutionState.DEPLOYING)) {
--- End diff --

Thank you till. But where is legacy code ? I found a lot of code belongs to 
legacy mode.omgI do not think I should remove all legacy mode 
code... Could you tell me please ? Lol.


> Tasks can fail with PartitionNotFoundException if consumer deployment takes 
> too long
> 
>
> Key: FLINK-9413
> URL: https://issues.apache.org/jira/browse/FLINK-9413
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.4.0, 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Critical
>
> {{Tasks}} can fail with a {{PartitionNotFoundException}} if the deployment of 
> the producer takes too long. More specifically, if it takes longer than the 
> {{taskmanager.network.request-backoff.max}}, then the {{Task}} will give up 
> and fail.
> The problem is that we calculate the {{InputGateDeploymentDescriptor}} for a 
> consuming task once the producer has been assigned a slot but we do not wait 
> until it is actually running. The problem should be fixed if we wait until 
> the task is in state {{RUNNING}} before assigning the result partition to the 
> consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6103: [FLINK-9413] [distributed coordination] Tasks can ...

2018-05-30 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6103#discussion_r191814673
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ---
@@ -103,9 +103,7 @@ public String toString() {
// The producing task needs to be RUNNING or already 
FINISHED
if (consumedPartition.isConsumable() && producerSlot != 
null &&
(producerState == 
ExecutionState.RUNNING ||
-   producerState == 
ExecutionState.FINISHED ||
-   producerState == 
ExecutionState.SCHEDULED ||
-   producerState == 
ExecutionState.DEPLOYING)) {
--- End diff --

Thank you till. But where is legacy code ? I found a lot of code belongs to 
legacy mode.omgI do not think I should remove all legacy mode 
code... Could you tell me please ? Lol.


---


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495255#comment-16495255
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user tragicjun commented on the issue:

https://github.com/apache/flink/pull/6082
  
I encountered another exception working with string type in Avro map/array, 
any advice whether I should open a separate issue or just reusing this one.


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Priority: Blocker
>  Labels: patch
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

2018-05-30 Thread tragicjun
Github user tragicjun commented on the issue:

https://github.com/apache/flink/pull/6082
  
I encountered another exception working with string type in Avro map/array, 
any advice whether I should open a separate issue or just reusing this one.


---


[jira] [Commented] (FLINK-9464) Clean up pom files

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495252#comment-16495252
 ] 

ASF GitHub Bot commented on FLINK-9464:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6093#discussion_r191794378
  
--- Diff: flink-connectors/flink-connector-filesystem/pom.xml ---
@@ -67,13 +67,6 @@ under the License.
 

 
-   
-   org.apache.flink
-   flink-test-utils-junit
--- End diff --

I think so. But let's wait what Travis says.


> Clean up pom files
> --
>
> Key: FLINK-9464
> URL: https://issues.apache.org/jira/browse/FLINK-9464
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.6.0
>
>
> Some of Flink module's {{pom.xml}} files contain unnecessary or redundant 
> information. For example, the {{flink-clients}} {{pom.xml}} specifies twice 
> the {{maven-jar-plugin}} in the build section. Other modules explicitly 
> specify the version and scope of the {{flink-test-utils-junit}} module which 
> is managed by the parent's dependency management section. I propose to clean 
> these things up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9461) Disentangle flink-connector-kafka from flink-table and flink-json

2018-05-30 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495248#comment-16495248
 ] 

Till Rohrmann commented on FLINK-9461:
--

If {{flink-table}} would only contain some interfaces defining the sources, 
sinks and operators, then it would not be a big problem. But this is obviously 
not the case. The point I'm trying to make is that it feels a bit odd that a 
Flink connector depends on a Flink library while it would be so simple to get 
rid of it by introducing a dedicated module.

Assume that we might get rid of the Scala dependency in {{flink-runtime}}, then 
we could still not remove the Scala suffix from the connectors because they 
depend on {{flink-table}}.

> Disentangle flink-connector-kafka from flink-table and flink-json
> -
>
> Key: FLINK-9461
> URL: https://issues.apache.org/jira/browse/FLINK-9461
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, the {{flink-connector-kafka}} module has a dependency on 
> {{flink-table}} and {{flink-json}}. The reason seems to be that the module 
> contains the {{KafkaJsonTableSource}} and {{KafkaJsonTableSink}}. Even though 
> the {{flink-table}} and {{flink-json}} dependency are marked as optional, the 
> {{flink-connector-kafka}} will still contain the table sources and sinks. I 
> think this is not a clean design.
> I would propose to move the table sources and sinks into a dedicated module 
> which depends on {{flink-connector-kafka}}. That way we would better separate 
> dependencies and could remove {{flink-table}} and {{flink-json}} from 
> {{flink-connector-kafka}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6093: [FLINK-9464] Various pom.xml file clean ups

2018-05-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6093#discussion_r191794378
  
--- Diff: flink-connectors/flink-connector-filesystem/pom.xml ---
@@ -67,13 +67,6 @@ under the License.
 

 
-   
-   org.apache.flink
-   flink-test-utils-junit
--- End diff --

I think so. But let's wait what Travis says.


---


[jira] [Commented] (FLINK-9464) Clean up pom files

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495241#comment-16495241
 ] 

ASF GitHub Bot commented on FLINK-9464:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6093#discussion_r191792861
  
--- Diff: flink-connectors/flink-connector-filesystem/pom.xml ---
@@ -67,13 +67,6 @@ under the License.
 

 
-   
-   org.apache.flink
-   flink-test-utils-junit
--- End diff --

i assume this is unused?


> Clean up pom files
> --
>
> Key: FLINK-9464
> URL: https://issues.apache.org/jira/browse/FLINK-9464
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.6.0
>
>
> Some of Flink module's {{pom.xml}} files contain unnecessary or redundant 
> information. For example, the {{flink-clients}} {{pom.xml}} specifies twice 
> the {{maven-jar-plugin}} in the build section. Other modules explicitly 
> specify the version and scope of the {{flink-test-utils-junit}} module which 
> is managed by the parent's dependency management section. I propose to clean 
> these things up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6093: [FLINK-9464] Various pom.xml file clean ups

2018-05-30 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6093#discussion_r191792861
  
--- Diff: flink-connectors/flink-connector-filesystem/pom.xml ---
@@ -67,13 +67,6 @@ under the License.
 

 
-   
-   org.apache.flink
-   flink-test-utils-junit
--- End diff --

i assume this is unused?


---


[jira] [Commented] (FLINK-9461) Disentangle flink-connector-kafka from flink-table and flink-json

2018-05-30 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495233#comment-16495233
 ] 

Fabian Hueske commented on FLINK-9461:
--

Why should the API depend on the connector? All our connectors are tied to an 
API and the Table API / SQL is an API just like the DataStream or DataSet API. 
Should the DataStream API depend on Kafka (and Kinesis, Cassandra, ES, ...)?
The API provides an interface and the connectors implement the interface. We 
followed that pattern for all APIs and connectors.

Moving all connector-related API classes to {{flink-core}} as suggested by 
[~Zentol] would require a larger refactoring because some API classes reference 
core classes of the Table API (implemented in Scala). 

I'm not opposed to move connectors into individual modules but IMO, the 
question is whether a few additional classes in a JAR file justify fragmenting 
the connectors into API specific modules.




> Disentangle flink-connector-kafka from flink-table and flink-json
> -
>
> Key: FLINK-9461
> URL: https://issues.apache.org/jira/browse/FLINK-9461
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, the {{flink-connector-kafka}} module has a dependency on 
> {{flink-table}} and {{flink-json}}. The reason seems to be that the module 
> contains the {{KafkaJsonTableSource}} and {{KafkaJsonTableSink}}. Even though 
> the {{flink-table}} and {{flink-json}} dependency are marked as optional, the 
> {{flink-connector-kafka}} will still contain the table sources and sinks. I 
> think this is not a clean design.
> I would propose to move the table sources and sinks into a dedicated module 
> which depends on {{flink-connector-kafka}}. That way we would better separate 
> dependencies and could remove {{flink-table}} and {{flink-json}} from 
> {{flink-connector-kafka}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-05-30 Thread Alexander Gardner (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495219#comment-16495219
 ] 

Alexander Gardner commented on FLINK-8707:
--

Hi Piotr

Apologies again, all my time on prioritized DEV work for PROD. Integrating my 
state into the Source checkpoints has worked a treat :)

Some new Flink jobs are nearly live so will reproduce data for you for this 
Jira for standalone & cluster.

We never break our ulimit in PROD anyways now.

Just want to get to the bottom of why so many handles!

 

Alex

> Excessive amount of files opened by flink task manager
> --
>
> Key: FLINK-8707
> URL: https://issues.apache.org/jira/browse/FLINK-8707
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.3.2
> Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir:  some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs:  some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory:  some dir...residing on the same box
>Reporter: Alexander Gardner
>Priority: Critical
> Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, 
> AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, 
> AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, 
> box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof, 
> ll.txt, ll.txt, lsof.txt, lsof.txt, lsofp.txt, lsofp.txt
>
>
> The job manager has less FDs than the task manager.
>  
> Hi
> A support alert indicated that there were a lot of open files for the boxes 
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs 
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l       ->  returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l-    > returned 129322 open FDs
> $ lsof -p 12154 | wc -l   -> returned 531 FDs
> There were 228 threads running for the task manager.
>  
> Drilling down a bit further, looking at a_inode and FIFO entries: 
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l  = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were 
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of file descriptors...are the above 
> figures deemed excessive for the no of FDs in use? I know Flink uses Netty - 
> is it using a separate Selector for reads & writes? 
> Additionally Flink uses memory mapped files? or direct bytebuffers are these 
> skewing the numbers of FDs shown?
> Example of one child process ID 6633:
> java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll]
>  java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe
>  java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe
> Lasty, cannot identify yet the reason for the creep in FDs even if Flink is 
> pretty dormant or has dormant jobs. Production nodes are not experiencing 
> excessive amounts of throughput yet either.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9464) Clean up pom files

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495208#comment-16495208
 ] 

ASF GitHub Bot commented on FLINK-9464:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/6093
  
I've updated the PR @zentol and removed the version and scope tags from all 
`flink-test-utils-junit` dependencies in all modules. The scope and version is 
now defined in the dependency management section of the parent `pom.xml`.


> Clean up pom files
> --
>
> Key: FLINK-9464
> URL: https://issues.apache.org/jira/browse/FLINK-9464
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.6.0
>
>
> Some of Flink module's {{pom.xml}} files contain unnecessary or redundant 
> information. For example, the {{flink-clients}} {{pom.xml}} specifies twice 
> the {{maven-jar-plugin}} in the build section. Other modules explicitly 
> specify the version and scope of the {{flink-test-utils-junit}} module which 
> is managed by the parent's dependency management section. I propose to clean 
> these things up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6093: [FLINK-9464] Various pom.xml file clean ups

2018-05-30 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/6093
  
I've updated the PR @zentol and removed the version and scope tags from all 
`flink-test-utils-junit` dependencies in all modules. The scope and version is 
now defined in the dependency management section of the parent `pom.xml`.


---


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-30 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495198#comment-16495198
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8500:


+1 to proceed with at least an incremental fix to the issue, for now. The 
discussion of breaking up concerns of deserialization / meta info enrichment 
into 2 interfaces can go into a separate thread.

For a minimal incremental fix for now, I would prefer this approach:
{code}
default T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException
{code}

I think in the long run, we'll still very likely break this up into two 
interfaces,

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9215) TaskManager Releasing - org.apache.flink.util.FlinkException

2018-05-30 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved FLINK-9215.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

> TaskManager Releasing  - org.apache.flink.util.FlinkException
> -
>
> Key: FLINK-9215
> URL: https://issues.apache.org/jira/browse/FLINK-9215
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, ResourceManager, Streaming
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> The exception stack is as follows:
> {code:java}
> //代码占位符
> {"root-exception":"
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> 0d87aa6fa99a6c12e36775b1d6bceb19.
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050)
> at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ","timestamp":1524106438997,
> "all-exceptions":[{"exception":"
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> 0d87aa6fa99a6c12e36775b1d6bceb19.
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050)
> at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ","task":"async wait operator 
> (14/20)","location":"slave1:60199","timestamp":1524106438996
> }],"truncated":false}
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9215) TaskManager Releasing - org.apache.flink.util.FlinkException

2018-05-30 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495193#comment-16495193
 ] 

Dawid Wysakowicz commented on FLINK-9215:
-

Closed in 1.5.1 via: e1cce3634e09726d7cadd81bab25da288dc0ba49

Closed in 1.6 via: 69135e933f6ac575ff12ef0390b9754a87c5bca2

> TaskManager Releasing  - org.apache.flink.util.FlinkException
> -
>
> Key: FLINK-9215
> URL: https://issues.apache.org/jira/browse/FLINK-9215
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, ResourceManager, Streaming
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.1
>
>
> The exception stack is as follows:
> {code:java}
> //代码占位符
> {"root-exception":"
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> 0d87aa6fa99a6c12e36775b1d6bceb19.
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050)
> at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ","timestamp":1524106438997,
> "all-exceptions":[{"exception":"
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> 0d87aa6fa99a6c12e36775b1d6bceb19.
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050)
> at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ","task":"async wait operator 
> (14/20)","location":"slave1:60199","timestamp":1524106438996
> }],"truncated":false}
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7386) Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495187#comment-16495187
 ] 

ASF GitHub Bot commented on FLINK-7386:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6043
  
@cjolif do you think it would be possible that with a clean cut using a 
REST implementation, we no longer need to have separate modules anymore for ES 
6.x, 7.x, 8.x or so on?
i.e., it would only be a matter for the user of recompiling that REST-based 
implementation with a different ES client version.

If no, then I would still prefer that we continue with the current approach 
this PR is proposing, since we need this fix in to have Elasticsearch 5.3+ 
working anyways.


> Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ 
> client
> 
>
> Key: FLINK-7386
> URL: https://issues.apache.org/jira/browse/FLINK-7386
> Project: Flink
>  Issue Type: Improvement
>  Components: ElasticSearch Connector
>Reporter: Dawid Wysakowicz
>Assignee: Fang Yong
>Priority: Critical
> Fix For: 1.6.0
>
>
> In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and 
> has no longer the method {{add(ActionRequest)}}.
> For more info see: https://github.com/elastic/elasticsearch/pull/20109



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-05-30 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6043
  
@cjolif do you think it would be possible that with a clean cut using a 
REST implementation, we no longer need to have separate modules anymore for ES 
6.x, 7.x, 8.x or so on?
i.e., it would only be a matter for the user of recompiling that REST-based 
implementation with a different ES client version.

If no, then I would still prefer that we continue with the current approach 
this PR is proposing, since we need this fix in to have Elasticsearch 5.3+ 
working anyways.


---


[jira] [Commented] (FLINK-9215) TaskManager Releasing - org.apache.flink.util.FlinkException

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495183#comment-16495183
 ] 

ASF GitHub Bot commented on FLINK-9215:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5879


> TaskManager Releasing  - org.apache.flink.util.FlinkException
> -
>
> Key: FLINK-9215
> URL: https://issues.apache.org/jira/browse/FLINK-9215
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, ResourceManager, Streaming
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.1
>
>
> The exception stack is as follows:
> {code:java}
> //代码占位符
> {"root-exception":"
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> 0d87aa6fa99a6c12e36775b1d6bceb19.
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050)
> at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ","timestamp":1524106438997,
> "all-exceptions":[{"exception":"
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> 0d87aa6fa99a6c12e36775b1d6bceb19.
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050)
> at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ","task":"async wait operator 
> (14/20)","location":"slave1:60199","timestamp":1524106438996
> }],"truncated":false}
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5879: [FLINK-9215][resoucemanager] Reduce noise in SlotP...

2018-05-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5879


---


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495178#comment-16495178
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191772629
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue for internal timers. This heap is supported 
by hash sets for fast contains
+ * (de-duplication) and deletes. The heap implementation is a simple 
binary tree stored inside an array. Element indexes
+ * in the heap array start at 1 instead of 0 to make array index 
computations a bit simpler in the hot methods.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap and the 
deduplication maps.
+ *  We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful 
adding, etc..
+ * 
+ *
+ * @param  type of the key of the internal timers managed by this 
priority queue.
+ * @param  type of the namespace of the internal timers managed by this 
priority queue.
+ */
+public class InternalTimerHeap implements 
Queue>, Set> {
+
+   /**
+* A safe maximum size for arrays in the JVM.
+*/
+   private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+   /**
+* Comparator for {@link TimerHeapInternalTimer}, based on the 
timestamp in ascending order.
+*/
+   private static final Comparator> 
COMPARATOR =
+   (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp());
+
+   /**
+* This array contains one hash set per key-group. The sets are used 
for fast de-duplication and deletes of timers.
+*/
+   private final HashMap, 
TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup;
+
+   /**
+* The array that represents the heap-organized priority queue.
+*/
+   private TimerHeapInternalTimer[] queue;
+
+   /**
+* The current size of the priority queue.
+*/
+   private int size;
+
+   /**
+* The key-group range of timers that are managed by this queue.
+*/
+   private final KeyGroupRange keyGroupRange;
+
+   /**
+* The total number of key-groups of the job.
+*/
+   private final int totalNumberOfKeyGroups;
+
+
+   /**
+* Creates an empty {@link InternalTimerHeap} with the requested 
initial capacity.
+*
+* @param minimumCapacity the minimum and initial capacity of this 
priority queue.
+*/
+   @SuppressWarnings("unchecked")
+   InternalTimerHeap(
+   @Nonnegative int minimumCapacity,
+   @Nonnull KeyGroupRange keyGroupRange,
+   @Nonnegative int totalNumberOfKeyGroups) {
+
+   this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
+   

[GitHub] flink pull request #6062: [FLINK-9423][state] Implement efficient deletes fo...

2018-05-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191772629
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue for internal timers. This heap is supported 
by hash sets for fast contains
+ * (de-duplication) and deletes. The heap implementation is a simple 
binary tree stored inside an array. Element indexes
+ * in the heap array start at 1 instead of 0 to make array index 
computations a bit simpler in the hot methods.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap and the 
deduplication maps.
+ *  We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful 
adding, etc..
+ * 
+ *
+ * @param  type of the key of the internal timers managed by this 
priority queue.
+ * @param  type of the namespace of the internal timers managed by this 
priority queue.
+ */
+public class InternalTimerHeap implements 
Queue>, Set> {
+
+   /**
+* A safe maximum size for arrays in the JVM.
+*/
+   private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+   /**
+* Comparator for {@link TimerHeapInternalTimer}, based on the 
timestamp in ascending order.
+*/
+   private static final Comparator> 
COMPARATOR =
+   (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp());
+
+   /**
+* This array contains one hash set per key-group. The sets are used 
for fast de-duplication and deletes of timers.
+*/
+   private final HashMap, 
TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup;
+
+   /**
+* The array that represents the heap-organized priority queue.
+*/
+   private TimerHeapInternalTimer[] queue;
+
+   /**
+* The current size of the priority queue.
+*/
+   private int size;
+
+   /**
+* The key-group range of timers that are managed by this queue.
+*/
+   private final KeyGroupRange keyGroupRange;
+
+   /**
+* The total number of key-groups of the job.
+*/
+   private final int totalNumberOfKeyGroups;
+
+
+   /**
+* Creates an empty {@link InternalTimerHeap} with the requested 
initial capacity.
+*
+* @param minimumCapacity the minimum and initial capacity of this 
priority queue.
+*/
+   @SuppressWarnings("unchecked")
+   InternalTimerHeap(
+   @Nonnegative int minimumCapacity,
+   @Nonnull KeyGroupRange keyGroupRange,
+   @Nonnegative int totalNumberOfKeyGroups) {
+
+   this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
+   this.keyGroupRange = keyGroupRange;
+
+   final int keyGroupsInLocalRange = 
keyGroupRange.getNumberOfKeyGroups();
+   final int deduplicationSetSize = 1 + minimumCapacity / 
keyGroupsInLocalRange;
+   this.deduplicat

[jira] [Commented] (FLINK-7836) specifying node label for flink job to run on yarn

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495172#comment-16495172
 ] 

ASF GitHub Bot commented on FLINK-7836:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5593
  
Thanks for the reminder @yanghua. I forgot about it and will merge it now. 
Thanks again for your contributions. Flink wouldn't be what it is without you!


> specifying node label for flink job to run on yarn
> --
>
> Key: FLINK-7836
> URL: https://issues.apache.org/jira/browse/FLINK-7836
> Project: Flink
>  Issue Type: New Feature
>  Components: Client
>Affects Versions: 1.3.2
>Reporter: zhaibaba
>Assignee: vinoyang
>Priority: Major
>
> flink client cannot specify node label for flink job to run on yarn



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5593: [FLINK-7836][Client] specifying node label for flink job ...

2018-05-30 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5593
  
Thanks for the reminder @yanghua. I forgot about it and will merge it now. 
Thanks again for your contributions. Flink wouldn't be what it is without you!


---


[jira] [Commented] (FLINK-9461) Disentangle flink-connector-kafka from flink-table and flink-json

2018-05-30 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495170#comment-16495170
 ] 

Till Rohrmann commented on FLINK-9461:
--

Logically, it does not make sense to couple a connector with its user (here 
{{flink-table}}). If it all, then it the user should depend on it or ideally we 
have a common abstraction/interface living for example in 
{{flink-table-connectors}} which can be implemented by a concrete connector and 
dropped in dynamically.

Not sure whether I buy the argument with too many maven modules. What would be 
the problem with that? It rather looks to me that making 
{{flink-connector-kafka}} depend on {{flink-table}} was a kind of short cut. 



> Disentangle flink-connector-kafka from flink-table and flink-json
> -
>
> Key: FLINK-9461
> URL: https://issues.apache.org/jira/browse/FLINK-9461
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, the {{flink-connector-kafka}} module has a dependency on 
> {{flink-table}} and {{flink-json}}. The reason seems to be that the module 
> contains the {{KafkaJsonTableSource}} and {{KafkaJsonTableSink}}. Even though 
> the {{flink-table}} and {{flink-json}} dependency are marked as optional, the 
> {{flink-connector-kafka}} will still contain the table sources and sinks. I 
> think this is not a clean design.
> I would propose to move the table sources and sinks into a dedicated module 
> which depends on {{flink-connector-kafka}}. That way we would better separate 
> dependencies and could remove {{flink-table}} and {{flink-json}} from 
> {{flink-connector-kafka}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9461) Disentangle flink-connector-kafka from flink-table and flink-json

2018-05-30 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495170#comment-16495170
 ] 

Till Rohrmann edited comment on FLINK-9461 at 5/30/18 1:38 PM:
---

Logically, it does not make sense to couple a connector with its user (here 
{{flink-table}}). If it all, then it the user should depend on the connector or 
ideally we have a common abstraction/interface living for example in 
{{flink-table-connectors}} which can be implemented by a concrete connector and 
dropped in dynamically.

Not sure whether I buy the argument with too many maven modules. What would be 
the problem with that? It rather looks to me that making 
{{flink-connector-kafka}} depend on {{flink-table}} was a kind of short cut. 




was (Author: till.rohrmann):
Logically, it does not make sense to couple a connector with its user (here 
{{flink-table}}). If it all, then it the user should depend on it or ideally we 
have a common abstraction/interface living for example in 
{{flink-table-connectors}} which can be implemented by a concrete connector and 
dropped in dynamically.

Not sure whether I buy the argument with too many maven modules. What would be 
the problem with that? It rather looks to me that making 
{{flink-connector-kafka}} depend on {{flink-table}} was a kind of short cut. 



> Disentangle flink-connector-kafka from flink-table and flink-json
> -
>
> Key: FLINK-9461
> URL: https://issues.apache.org/jira/browse/FLINK-9461
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, the {{flink-connector-kafka}} module has a dependency on 
> {{flink-table}} and {{flink-json}}. The reason seems to be that the module 
> contains the {{KafkaJsonTableSource}} and {{KafkaJsonTableSink}}. Even though 
> the {{flink-table}} and {{flink-json}} dependency are marked as optional, the 
> {{flink-connector-kafka}} will still contain the table sources and sinks. I 
> think this is not a clean design.
> I would propose to move the table sources and sinks into a dedicated module 
> which depends on {{flink-connector-kafka}}. That way we would better separate 
> dependencies and could remove {{flink-table}} and {{flink-json}} from 
> {{flink-connector-kafka}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495164#comment-16495164
 ] 

ASF GitHub Bot commented on FLINK-7789:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r191768245
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
 ---
@@ -71,6 +71,9 @@ object AsyncDataStream {
   override def asyncInvoke(input: IN, resultFuture: 
JavaResultFuture[OUT]): Unit = {
 asyncFunction.asyncInvoke(input, new 
JavaResultFutureWrapper(resultFuture))
   }
+  override def timeout(input: IN, resultFuture: 
JavaResultFuture[OUT]): Unit = {
--- End diff --

Are those changes in `AsyncDataStream.scala` somewhere covered by the 
tests? If you comment out their method bodies, does at least one test fails?


> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Assignee: blues zheng
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495167#comment-16495167
 ] 

ASF GitHub Bot commented on FLINK-7789:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r191760364
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ---
@@ -212,6 +212,20 @@ public static void countDown() {
}
}
 
+   private static class TimeoutAwareLazyAsyncFunction extends 
LazyAsyncFunction {
+   private static final long serialVersionUID = 
1428714561365346128L;
+
+   @Override
+   public void timeout(Integer input, ResultFuture 
resultFuture) throws Exception {
+   if (input != null && input % 2 == 0) {
+   
resultFuture.complete(Collections.singletonList(input * 3));
+   } else {
+   // ignore odd input number when it timeouts
--- End diff --

Move this comment to the top of this static class?


> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Assignee: blues zheng
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495165#comment-16495165
 ] 

ASF GitHub Bot commented on FLINK-7789:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r191768672
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
 ---
@@ -46,4 +48,16 @@ trait AsyncFunction[IN, OUT] extends Function {
 * @param resultFuture to be completed with the result data
 */
   def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
+
+  /**
+* [[AsyncFunction.asyncInvoke]] timeout occurred.
+* By default, the result future is exceptionally completed with 
timeout exception.
+*
+* @param input element coming from an upstream task
+* @param resultFuture to be completed with the result data
+*/
+  def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = {
+resultFuture.completeExceptionally(new TimeoutException("Async 
function call has timed out."))
--- End diff --

same question about the tests.


> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Assignee: blues zheng
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495168#comment-16495168
 ] 

ASF GitHub Bot commented on FLINK-7789:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r191757079
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ---
@@ -212,6 +212,20 @@ public static void countDown() {
}
}
 
+   private static class TimeoutAwareLazyAsyncFunction extends 
LazyAsyncFunction {
--- End diff --

rename to `IgnoreTimeoutLazyAsyncFunction` ?


> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Assignee: blues zheng
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495166#comment-16495166
 ] 

ASF GitHub Bot commented on FLINK-7789:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r191767070
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ---
@@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception {

ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(),
 TimeoutException.class);
}
 
+   @Test
+   public void testAsyncTimeoutAware() throws Exception {
--- End diff --

Please deduplicate the code of this method with `testAsyncTimeout()` to sth 
like that:

```
@Test
public void testAsyncTimeoutFailure() throws Exception {
testAsyncTimeout(
new LazyAsyncFunction()
Optional.of(TimeoutException.class),
new StreamRecord<>(2, 5L));
}

public void testAsyncTimeoutIgnore() throws Exception {
testAsyncTimeout(
new IgnoreTimeoutLazyAsyncFunction()
Optional.of(TimeoutException.class),
new StreamRecord<>(6, 0L),
new StreamRecord<>(4, 5L));
}

private void testAsyncTimeout(
Optional> expectedException,
StreamRecord... expectedRecords) throws Exception {
// your current testAsyncTimeoutAware method body adjusted to above 
parameters
}
```

or sth similar.


> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Assignee: blues zheng
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

2018-05-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r191760364
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ---
@@ -212,6 +212,20 @@ public static void countDown() {
}
}
 
+   private static class TimeoutAwareLazyAsyncFunction extends 
LazyAsyncFunction {
+   private static final long serialVersionUID = 
1428714561365346128L;
+
+   @Override
+   public void timeout(Integer input, ResultFuture 
resultFuture) throws Exception {
+   if (input != null && input % 2 == 0) {
+   
resultFuture.complete(Collections.singletonList(input * 3));
+   } else {
+   // ignore odd input number when it timeouts
--- End diff --

Move this comment to the top of this static class?


---


[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

2018-05-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r191768245
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
 ---
@@ -71,6 +71,9 @@ object AsyncDataStream {
   override def asyncInvoke(input: IN, resultFuture: 
JavaResultFuture[OUT]): Unit = {
 asyncFunction.asyncInvoke(input, new 
JavaResultFutureWrapper(resultFuture))
   }
+  override def timeout(input: IN, resultFuture: 
JavaResultFuture[OUT]): Unit = {
--- End diff --

Are those changes in `AsyncDataStream.scala` somewhere covered by the 
tests? If you comment out their method bodies, does at least one test fails?


---


[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

2018-05-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r191757079
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ---
@@ -212,6 +212,20 @@ public static void countDown() {
}
}
 
+   private static class TimeoutAwareLazyAsyncFunction extends 
LazyAsyncFunction {
--- End diff --

rename to `IgnoreTimeoutLazyAsyncFunction` ?


---


[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

2018-05-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r191768672
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
 ---
@@ -46,4 +48,16 @@ trait AsyncFunction[IN, OUT] extends Function {
 * @param resultFuture to be completed with the result data
 */
   def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
+
+  /**
+* [[AsyncFunction.asyncInvoke]] timeout occurred.
+* By default, the result future is exceptionally completed with 
timeout exception.
+*
+* @param input element coming from an upstream task
+* @param resultFuture to be completed with the result data
+*/
+  def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = {
+resultFuture.completeExceptionally(new TimeoutException("Async 
function call has timed out."))
--- End diff --

same question about the tests.


---


[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

2018-05-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6091#discussion_r191767070
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ---
@@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception {

ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(),
 TimeoutException.class);
}
 
+   @Test
+   public void testAsyncTimeoutAware() throws Exception {
--- End diff --

Please deduplicate the code of this method with `testAsyncTimeout()` to sth 
like that:

```
@Test
public void testAsyncTimeoutFailure() throws Exception {
testAsyncTimeout(
new LazyAsyncFunction()
Optional.of(TimeoutException.class),
new StreamRecord<>(2, 5L));
}

public void testAsyncTimeoutIgnore() throws Exception {
testAsyncTimeout(
new IgnoreTimeoutLazyAsyncFunction()
Optional.of(TimeoutException.class),
new StreamRecord<>(6, 0L),
new StreamRecord<>(4, 5L));
}

private void testAsyncTimeout(
Optional> expectedException,
StreamRecord... expectedRecords) throws Exception {
// your current testAsyncTimeoutAware method body adjusted to above 
parameters
}
```

or sth similar.


---


[jira] [Assigned] (FLINK-9366) Distribute Cache only works for client-accessible files

2018-05-30 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz reassigned FLINK-9366:
---

Assignee: Dawid Wysakowicz

> Distribute Cache only works for client-accessible files
> ---
>
> Key: FLINK-9366
> URL: https://issues.apache.org/jira/browse/FLINK-9366
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Local Runtime
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 1.6.0
>
>
> In FLINK-8620 the distributed cache was modified to the distribute files via 
> the blob store, instead of downloading them from a distributed filesystem.
> Previously, taskmanagers would download requested files from the DFS. Now, 
> they retrieve it form the blob store. This requires the client to 
> preemptively upload all files used with distributed cache.
> As a result it is no longer possible to use the distributed cache for files 
> that reside in a cluster-internal DFS, as the client cannot download it. This 
> is a regression from the previous behavior and may break existing setups.
> [~aljoscha] [~dawidwys]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495159#comment-16495159
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191768328
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue for internal timers. This heap is supported 
by hash sets for fast contains
+ * (de-duplication) and deletes. The heap implementation is a simple 
binary tree stored inside an array. Element indexes
+ * in the heap array start at 1 instead of 0 to make array index 
computations a bit simpler in the hot methods.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap and the 
deduplication maps.
+ *  We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful 
adding, etc..
+ * 
+ *
+ * @param  type of the key of the internal timers managed by this 
priority queue.
+ * @param  type of the namespace of the internal timers managed by this 
priority queue.
+ */
+public class InternalTimerHeap implements 
Queue>, Set> {
+
+   /**
+* A safe maximum size for arrays in the JVM.
+*/
+   private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+   /**
+* Comparator for {@link TimerHeapInternalTimer}, based on the 
timestamp in ascending order.
+*/
+   private static final Comparator> 
COMPARATOR =
+   (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp());
+
+   /**
+* This array contains one hash set per key-group. The sets are used 
for fast de-duplication and deletes of timers.
+*/
+   private final HashMap, 
TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup;
+
+   /**
+* The array that represents the heap-organized priority queue.
+*/
+   private TimerHeapInternalTimer[] queue;
+
+   /**
+* The current size of the priority queue.
+*/
+   private int size;
+
+   /**
+* The key-group range of timers that are managed by this queue.
+*/
+   private final KeyGroupRange keyGroupRange;
+
+   /**
+* The total number of key-groups of the job.
+*/
+   private final int totalNumberOfKeyGroups;
+
+
+   /**
+* Creates an empty {@link InternalTimerHeap} with the requested 
initial capacity.
+*
+* @param minimumCapacity the minimum and initial capacity of this 
priority queue.
+*/
+   @SuppressWarnings("unchecked")
+   InternalTimerHeap(
+   @Nonnegative int minimumCapacity,
+   @Nonnull KeyGroupRange keyGroupRange,
+   @Nonnegative int totalNumberOfKeyGroups) {
+
+   this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
+ 

[GitHub] flink pull request #6062: [FLINK-9423][state] Implement efficient deletes fo...

2018-05-30 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191768328
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue for internal timers. This heap is supported 
by hash sets for fast contains
+ * (de-duplication) and deletes. The heap implementation is a simple 
binary tree stored inside an array. Element indexes
+ * in the heap array start at 1 instead of 0 to make array index 
computations a bit simpler in the hot methods.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap and the 
deduplication maps.
+ *  We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful 
adding, etc..
+ * 
+ *
+ * @param  type of the key of the internal timers managed by this 
priority queue.
+ * @param  type of the namespace of the internal timers managed by this 
priority queue.
+ */
+public class InternalTimerHeap implements 
Queue>, Set> {
+
+   /**
+* A safe maximum size for arrays in the JVM.
+*/
+   private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+   /**
+* Comparator for {@link TimerHeapInternalTimer}, based on the 
timestamp in ascending order.
+*/
+   private static final Comparator> 
COMPARATOR =
+   (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp());
+
+   /**
+* This array contains one hash set per key-group. The sets are used 
for fast de-duplication and deletes of timers.
+*/
+   private final HashMap, 
TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup;
+
+   /**
+* The array that represents the heap-organized priority queue.
+*/
+   private TimerHeapInternalTimer[] queue;
+
+   /**
+* The current size of the priority queue.
+*/
+   private int size;
+
+   /**
+* The key-group range of timers that are managed by this queue.
+*/
+   private final KeyGroupRange keyGroupRange;
+
+   /**
+* The total number of key-groups of the job.
+*/
+   private final int totalNumberOfKeyGroups;
+
+
+   /**
+* Creates an empty {@link InternalTimerHeap} with the requested 
initial capacity.
+*
+* @param minimumCapacity the minimum and initial capacity of this 
priority queue.
+*/
+   @SuppressWarnings("unchecked")
+   InternalTimerHeap(
+   @Nonnegative int minimumCapacity,
+   @Nonnull KeyGroupRange keyGroupRange,
+   @Nonnegative int totalNumberOfKeyGroups) {
+
+   this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
+   this.keyGroupRange = keyGroupRange;
+
+   final int keyGroupsInLocalRange = 
keyGroupRange.getNumberOfKeyGroups();
+   final int deduplicationSetSize = 1 + minimumCapacity / 
keyGroupsInLocalRange;
+   this.deduplic

[jira] [Commented] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495157#comment-16495157
 ] 

ASF GitHub Bot commented on FLINK-9410:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/6087
  
Thanks for opening the PR @zhangminglei. I agree with @sihuazhou that it's 
not as easy as replacing `x` with `xAsync`. As @sihuazhou pointed out, we have 
to react to asynchronous Yarn messages depending on our internal state. So for 
example, when calling `startContainerAsync` we should store somewhere that we 
tried to start a container. If this request later fails, we should not fail the 
complete ResourceManager but rather send a new container request (as it is done 
in the `YarnResourceManager#onContainersAllocated` method).

Additionally, while touching this code, we should check whether we can 
improve the unit tests for this component. Especially with the asynchronous 
node manager client, it should be rather simple to write good tests when 
creating a mock implementation and manually calling the callbacks.


> Replace NMClient with NMClientAsync in YarnResourceManager
> --
>
> Key: FLINK-9410
> URL: https://issues.apache.org/jira/browse/FLINK-9410
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Critical
> Fix For: 1.6.0
>
>
> Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} 
> which is called from within the main thread of the {{ResourceManager}}. Since 
> these operations are blocking, we should replace the client with the 
> {{NMClientAsync}} and make the calls non blocking.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6087: [FLINK-9410] [yarn] Replace NMClient with NMClientAsync i...

2018-05-30 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/6087
  
Thanks for opening the PR @zhangminglei. I agree with @sihuazhou that it's 
not as easy as replacing `x` with `xAsync`. As @sihuazhou pointed out, we have 
to react to asynchronous Yarn messages depending on our internal state. So for 
example, when calling `startContainerAsync` we should store somewhere that we 
tried to start a container. If this request later fails, we should not fail the 
complete ResourceManager but rather send a new container request (as it is done 
in the `YarnResourceManager#onContainersAllocated` method).

Additionally, while touching this code, we should check whether we can 
improve the unit tests for this component. Especially with the asynchronous 
node manager client, it should be rather simple to write good tests when 
creating a mock implementation and manually calling the callbacks.


---


[jira] [Commented] (FLINK-9215) TaskManager Releasing - org.apache.flink.util.FlinkException

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495150#comment-16495150
 ] 

ASF GitHub Bot commented on FLINK-9215:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5879
  
@dawidwys It's ok with me, the unrelated change should be a minor java doc 
hot fix.


> TaskManager Releasing  - org.apache.flink.util.FlinkException
> -
>
> Key: FLINK-9215
> URL: https://issues.apache.org/jira/browse/FLINK-9215
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, ResourceManager, Streaming
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.1
>
>
> The exception stack is as follows:
> {code:java}
> //代码占位符
> {"root-exception":"
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> 0d87aa6fa99a6c12e36775b1d6bceb19.
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050)
> at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ","timestamp":1524106438997,
> "all-exceptions":[{"exception":"
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> 0d87aa6fa99a6c12e36775b1d6bceb19.
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050)
> at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ","task":"async wait operator 
> (14/20)","location":"slave1:60199","timestamp":1524106438996
> }],"truncated":false}
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5879: [FLINK-9215][resoucemanager] Reduce noise in SlotPool's l...

2018-05-30 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5879
  
@dawidwys It's ok with me, the unrelated change should be a minor java doc 
hot fix.


---


[jira] [Commented] (FLINK-9215) TaskManager Releasing - org.apache.flink.util.FlinkException

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495146#comment-16495146
 ] 

ASF GitHub Bot commented on FLINK-9215:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5879
  
Thanks @sihuazhou for the contribution. LGTM for me though I would remove 
the unrelated change if it is ok with you.


> TaskManager Releasing  - org.apache.flink.util.FlinkException
> -
>
> Key: FLINK-9215
> URL: https://issues.apache.org/jira/browse/FLINK-9215
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, ResourceManager, Streaming
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.1
>
>
> The exception stack is as follows:
> {code:java}
> //代码占位符
> {"root-exception":"
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> 0d87aa6fa99a6c12e36775b1d6bceb19.
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050)
> at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ","timestamp":1524106438997,
> "all-exceptions":[{"exception":"
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> 0d87aa6fa99a6c12e36775b1d6bceb19.
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050)
> at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ","task":"async wait operator 
> (14/20)","location":"slave1:60199","timestamp":1524106438996
> }],"truncated":false}
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9215) TaskManager Releasing - org.apache.flink.util.FlinkException

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495143#comment-16495143
 ] 

ASF GitHub Bot commented on FLINK-9215:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5879#discussion_r191764153
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -304,7 +305,8 @@ public int getMaxParallelism() {
/**
 * Sets the maximum parallelism for the task.
 *
-* @param maxParallelism The maximum parallelism to be set. must be 
between 1 and Short.MAX_VALUE.
+* @param maxParallelism The maximum parallelism to be set.
+*   Must be between 1 and {@link 
KeyGroupRangeAssignment#UPPER_BOUND_MAX_PARALLELISM}.
--- End diff --

Unrelated change. Will remove while merging, if you are ok with it 
@sihuazhou .


> TaskManager Releasing  - org.apache.flink.util.FlinkException
> -
>
> Key: FLINK-9215
> URL: https://issues.apache.org/jira/browse/FLINK-9215
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, ResourceManager, Streaming
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.1
>
>
> The exception stack is as follows:
> {code:java}
> //代码占位符
> {"root-exception":"
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> 0d87aa6fa99a6c12e36775b1d6bceb19.
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050)
> at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ","timestamp":1524106438997,
> "all-exceptions":[{"exception":"
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> 0d87aa6fa99a6c12e36775b1d6bceb19.
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050)
> at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ","task":"async wait operator 
> (14/20)","location":"slave1:601

[GitHub] flink issue #5879: [FLINK-9215][resoucemanager] Reduce noise in SlotPool's l...

2018-05-30 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5879
  
Thanks @sihuazhou for the contribution. LGTM for me though I would remove 
the unrelated change if it is ok with you.


---


[GitHub] flink pull request #5879: [FLINK-9215][resoucemanager] Reduce noise in SlotP...

2018-05-30 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5879#discussion_r191764153
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -304,7 +305,8 @@ public int getMaxParallelism() {
/**
 * Sets the maximum parallelism for the task.
 *
-* @param maxParallelism The maximum parallelism to be set. must be 
between 1 and Short.MAX_VALUE.
+* @param maxParallelism The maximum parallelism to be set.
+*   Must be between 1 and {@link 
KeyGroupRangeAssignment#UPPER_BOUND_MAX_PARALLELISM}.
--- End diff --

Unrelated change. Will remove while merging, if you are ok with it 
@sihuazhou .


---


[jira] [Commented] (FLINK-9480) Let local recovery support rescaling

2018-05-30 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495140#comment-16495140
 ] 

Till Rohrmann commented on FLINK-9480:
--

I can totally see the benefits of speeding up rescaling operation by first 
trying to read local state and then falling back to remote state. In the first 
iteration, it could be a best effort approach as suggested by [~sihuazhou]. 
Next we could try to make the scheduling a bit smarter and eventually it could 
mean that we first load the required state to a TM before deploying tasks.

I also agree with you two about the priorities wrt rescalable timers and state 
ttl.

> Let local recovery support rescaling
> 
>
> Key: FLINK-9480
> URL: https://issues.apache.org/jira/browse/FLINK-9480
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Priority: Major
>
> Currently, local recovery only support restore from checkpoint and without 
> rescaling. Maybe we should enable it to support rescaling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5721: Update kubernetes.md

2018-05-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5721


---


[GitHub] flink issue #5721: Update kubernetes.md

2018-05-30 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5721
  
merging


---


[jira] [Commented] (FLINK-9413) Tasks can fail with PartitionNotFoundException if consumer deployment takes too long

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495115#comment-16495115
 ] 

ASF GitHub Bot commented on FLINK-9413:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6103#discussion_r191753857
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ---
@@ -103,9 +103,7 @@ public String toString() {
// The producing task needs to be RUNNING or already 
FINISHED
if (consumedPartition.isConsumable() && producerSlot != 
null &&
(producerState == 
ExecutionState.RUNNING ||
-   producerState == 
ExecutionState.FINISHED ||
-   producerState == 
ExecutionState.SCHEDULED ||
-   producerState == 
ExecutionState.DEPLOYING)) {
--- End diff --

This change will break deployments where `allowLazyDeployment` is set to 
`false`. In the Flip-6 code, this is per default set to `true`, but for the 
legacy mode, the value is `false`. Thus, we would have to first remove the 
legacy mode and `allowLazyDeployment`.


> Tasks can fail with PartitionNotFoundException if consumer deployment takes 
> too long
> 
>
> Key: FLINK-9413
> URL: https://issues.apache.org/jira/browse/FLINK-9413
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.4.0, 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Critical
>
> {{Tasks}} can fail with a {{PartitionNotFoundException}} if the deployment of 
> the producer takes too long. More specifically, if it takes longer than the 
> {{taskmanager.network.request-backoff.max}}, then the {{Task}} will give up 
> and fail.
> The problem is that we calculate the {{InputGateDeploymentDescriptor}} for a 
> consuming task once the producer has been assigned a slot but we do not wait 
> until it is actually running. The problem should be fixed if we wait until 
> the task is in state {{RUNNING}} before assigning the result partition to the 
> consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6103: [FLINK-9413] [distributed coordination] Tasks can ...

2018-05-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6103#discussion_r191753857
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ---
@@ -103,9 +103,7 @@ public String toString() {
// The producing task needs to be RUNNING or already 
FINISHED
if (consumedPartition.isConsumable() && producerSlot != 
null &&
(producerState == 
ExecutionState.RUNNING ||
-   producerState == 
ExecutionState.FINISHED ||
-   producerState == 
ExecutionState.SCHEDULED ||
-   producerState == 
ExecutionState.DEPLOYING)) {
--- End diff --

This change will break deployments where `allowLazyDeployment` is set to 
`false`. In the Flip-6 code, this is per default set to `true`, but for the 
legacy mode, the value is `false`. Thus, we would have to first remove the 
legacy mode and `allowLazyDeployment`.


---


[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

2018-05-30 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6103
  
@tillrohrmann Could you take a look on this PR  ? Thank you.


---


[jira] [Commented] (FLINK-9413) Tasks can fail with PartitionNotFoundException if consumer deployment takes too long

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495113#comment-16495113
 ] 

ASF GitHub Bot commented on FLINK-9413:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6103
  
@tillrohrmann Could you take a look on this PR  ? Thank you.


> Tasks can fail with PartitionNotFoundException if consumer deployment takes 
> too long
> 
>
> Key: FLINK-9413
> URL: https://issues.apache.org/jira/browse/FLINK-9413
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.4.0, 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Critical
>
> {{Tasks}} can fail with a {{PartitionNotFoundException}} if the deployment of 
> the producer takes too long. More specifically, if it takes longer than the 
> {{taskmanager.network.request-backoff.max}}, then the {{Task}} will give up 
> and fail.
> The problem is that we calculate the {{InputGateDeploymentDescriptor}} for a 
> consuming task once the producer has been assigned a slot but we do not wait 
> until it is actually running. The problem should be fixed if we wait until 
> the task is in state {{RUNNING}} before assigning the result partition to the 
> consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495093#comment-16495093
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6089
  
@zentol PR is updated!


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8873) move unit tests of KeyedStream from DataStreamTest to KeyedStreamTest

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495092#comment-16495092
 ] 

ASF GitHub Bot commented on FLINK-8873:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5649
  
@bowenli86 What's your opinion? If you are ok with not merging it, could 
you close this PR?


> move unit tests of KeyedStream from DataStreamTest to KeyedStreamTest
> -
>
> Key: FLINK-8873
> URL: https://issues.apache.org/jira/browse/FLINK-8873
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Tests
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.6.0
>
>
> move unit tests of KeyedStream.scala from DataStreamTest.scala to 
> KeyedStreamTest.scala, in order to have clearer separation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-05-30 Thread medcv
Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6089
  
@zentol PR is updated!


---


[GitHub] flink issue #5649: [FLINK-8873] [DataStream API] [Tests] move unit tests of ...

2018-05-30 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5649
  
@bowenli86 What's your opinion? If you are ok with not merging it, could 
you close this PR?


---


[jira] [Commented] (FLINK-9413) Tasks can fail with PartitionNotFoundException if consumer deployment takes too long

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495089#comment-16495089
 ] 

ASF GitHub Bot commented on FLINK-9413:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/6103

[FLINK-9413] [distributed coordination] Tasks can fail with Partition…

…NotFoundException if consumer deployment takes too long

## What is the purpose of the change
Tasks can fail with PartitionNotFoundException if consumer deployment takes 
too long. And the producer has been assigned a slot but we do not wait until it 
is actually running.

## Brief change log
Change the condition to make the producer wait until it is actually running.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-9413

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6103.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6103


commit a52434d14117fde4e911f9a8f81a2e10fdd9ba77
Author: zhangminglei 
Date:   2018-05-30T12:17:17Z

[FLINK-9413] [distributed coordination] Tasks can fail with 
PartitionNotFoundException if consumer deployment takes too long




> Tasks can fail with PartitionNotFoundException if consumer deployment takes 
> too long
> 
>
> Key: FLINK-9413
> URL: https://issues.apache.org/jira/browse/FLINK-9413
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.4.0, 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Critical
>
> {{Tasks}} can fail with a {{PartitionNotFoundException}} if the deployment of 
> the producer takes too long. More specifically, if it takes longer than the 
> {{taskmanager.network.request-backoff.max}}, then the {{Task}} will give up 
> and fail.
> The problem is that we calculate the {{InputGateDeploymentDescriptor}} for a 
> consuming task once the producer has been assigned a slot but we do not wait 
> until it is actually running. The problem should be fixed if we wait until 
> the task is in state {{RUNNING}} before assigning the result partition to the 
> consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6103: [FLINK-9413] [distributed coordination] Tasks can ...

2018-05-30 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/6103

[FLINK-9413] [distributed coordination] Tasks can fail with Partition…

…NotFoundException if consumer deployment takes too long

## What is the purpose of the change
Tasks can fail with PartitionNotFoundException if consumer deployment takes 
too long. And the producer has been assigned a slot but we do not wait until it 
is actually running.

## Brief change log
Change the condition to make the producer wait until it is actually running.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-9413

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6103.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6103


commit a52434d14117fde4e911f9a8f81a2e10fdd9ba77
Author: zhangminglei 
Date:   2018-05-30T12:17:17Z

[FLINK-9413] [distributed coordination] Tasks can fail with 
PartitionNotFoundException if consumer deployment takes too long




---


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495082#comment-16495082
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r191740715
  
--- Diff: flink-end-to-end-tests/test-scripts/elasticsearch-common.sh ---
@@ -75,6 +76,8 @@ function verify_result {
 }
 
 function shutdown_elasticsearch_cluster {
+   local index=$1
--- End diff --

+1


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495083#comment-16495083
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r191740903
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -18,29 +18,38 @@
 

 
 # End to end test for quick starts test.
+# Usage:
+# FLINK_DIR= 
flink-end-to-end-tests/test-scripts/test_quickstarts.sh  
--- End diff --

will change the name


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495084#comment-16495084
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r191740667
  
--- Diff: flink-end-to-end-tests/test-scripts/elasticsearch-common.sh ---
@@ -56,13 +56,14 @@ function verify_elasticsearch_process_exist {
 
 function verify_result {
 local numRecords=$1
+local index=$2
--- End diff --

+1


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-05-30 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r191740715
  
--- Diff: flink-end-to-end-tests/test-scripts/elasticsearch-common.sh ---
@@ -75,6 +76,8 @@ function verify_result {
 }
 
 function shutdown_elasticsearch_cluster {
+   local index=$1
--- End diff --

+1


---


[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-05-30 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r191740903
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -18,29 +18,38 @@
 

 
 # End to end test for quick starts test.
+# Usage:
+# FLINK_DIR= 
flink-end-to-end-tests/test-scripts/test_quickstarts.sh  
--- End diff --

will change the name


---


[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-05-30 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r191740667
  
--- Diff: flink-end-to-end-tests/test-scripts/elasticsearch-common.sh ---
@@ -56,13 +56,14 @@ function verify_elasticsearch_process_exist {
 
 function verify_result {
 local numRecords=$1
+local index=$2
--- End diff --

+1


---


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495079#comment-16495079
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6089
  
@zentol Thanks! found them :)


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >