[GitHub] flink pull request #5357: [hotfix] Eliminate the conditions of parallelism i...

2018-01-24 Thread maqingxiang
GitHub user maqingxiang opened a pull request:

https://github.com/apache/flink/pull/5357

[hotfix] Eliminate the conditions of parallelism in isChainable

When building StreamGraph, the restriction is added, and the upstream and 
downstream nodes of the forward are required to have the same parallelism.

When we added edge, we added the following restrictions.
`if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != 
downstreamNode.getParallelism()) {
throw new 
UnsupportedOperationException("Forward partitioning does not allow " +
"change of parallelism. 
Upstream operation: " + upstreamNode + " parallelism: " + 
upstreamNode.getParallelism() +
", downstream 
operation: " + downstreamNode + " parallelism: " + 
downstreamNode.getParallelism() +
" You must use another 
partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/maqingxiang/flink fix-isChainable

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5357.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5357


commit fbdd387215f9f8e26742897ec0ba86c411a7c362
Author: maqingxiang-it 
Date:   2018-01-25T07:45:06Z

Eliminate the conditions of parallelism in isChainable




---


[jira] [Commented] (FLINK-8364) Add iterator() to ListState which returns empty iterator when it has no value

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338862#comment-16338862
 ] 

ASF GitHub Bot commented on FLINK-8364:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5356

[FLINK-8364][state backend] Add iterator() to ListState which returns empty 
iterator when it has no value

## What is the purpose of the change

Add iterator() to ListState which returns empty iterator when it has no 
value

## Brief change log

Add iterator() to ListState which returns empty iterator when it has no 
value

## Verifying this change

This change added tests and can be verified by extending 
`StateBackendTestBase#testListStateAPIs()`

## Does this pull request potentially affect one of the following parts:

  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8364

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5356.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5356


commit fe3ae4e4798fb3f1a440dee737298718143edaf3
Author: Bowen Li 
Date:   2018-01-25T07:01:46Z

[FLINK-8364] Add iterator() to ListState which returns empty iterator when 
it has no value

commit c6a3e6bdb276b910e679bfa12a1abadaf2c9fcf8
Author: Bowen Li 
Date:   2018-01-25T07:41:46Z

add doc and unit test




> Add iterator() to ListState which returns empty iterator when it has no value
> -
>
> Key: FLINK-8364
> URL: https://issues.apache.org/jira/browse/FLINK-8364
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Add iterator() to ListState which returns empty iterator when it has no value



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5356: [FLINK-8364][state backend] Add iterator() to List...

2018-01-24 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5356

[FLINK-8364][state backend] Add iterator() to ListState which returns empty 
iterator when it has no value

## What is the purpose of the change

Add iterator() to ListState which returns empty iterator when it has no 
value

## Brief change log

Add iterator() to ListState which returns empty iterator when it has no 
value

## Verifying this change

This change added tests and can be verified by extending 
`StateBackendTestBase#testListStateAPIs()`

## Does this pull request potentially affect one of the following parts:

  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8364

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5356.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5356


commit fe3ae4e4798fb3f1a440dee737298718143edaf3
Author: Bowen Li 
Date:   2018-01-25T07:01:46Z

[FLINK-8364] Add iterator() to ListState which returns empty iterator when 
it has no value

commit c6a3e6bdb276b910e679bfa12a1abadaf2c9fcf8
Author: Bowen Li 
Date:   2018-01-25T07:41:46Z

add doc and unit test




---


[jira] [Created] (FLINK-8510) Provide access to Delivery envelope in RabbitMQ Source

2018-01-24 Thread Wojciech Luczkow (JIRA)
Wojciech Luczkow created FLINK-8510:
---

 Summary: Provide access to Delivery envelope in RabbitMQ Source
 Key: FLINK-8510
 URL: https://issues.apache.org/jira/browse/FLINK-8510
 Project: Flink
  Issue Type: Wish
Reporter: Wojciech Luczkow


Currently RMQSource is limited to get AMQP message body in resulting Stream,

sometimes it would be good to get access to Envelope

 

For example 

if subscribing to Topic using Routing Key amq.topic.#

it is impossible to distinguish whether message came from amq.topic.1 or 
amq.topic.2

 

Studying current implementation I think the easiest option from consumer point 
of view would be to change RMQSource  to provide Stream 
which will break Flink API...

Another would be to create something like detailedsource with different API - 
provide generic Wrapper with message content  and Map with message 
properties (and protected method to override and fill that map).

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8509) Remove SqlGroupedWindowFunction from Flink repo

2018-01-24 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-8509:
-

 Summary: Remove SqlGroupedWindowFunction from Flink repo
 Key: FLINK-8509
 URL: https://issues.apache.org/jira/browse/FLINK-8509
 Project: Flink
  Issue Type: Task
  Components: Table API  SQL
Reporter: Shuyi Chen
Assignee: Shuyi Chen


SqlGroupedWindowFunction is copied to the Flink repo due to 
[CALCITE-2133|https://issues.apache.org/jira/browse/CALCITE-2133], we should 
remove it once flink upgrade Calcite dependency to 1.16.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8508) Remove RexSimplify from Flink repo

2018-01-24 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-8508:
-

 Summary: Remove RexSimplify from Flink repo
 Key: FLINK-8508
 URL: https://issues.apache.org/jira/browse/FLINK-8508
 Project: Flink
  Issue Type: Task
  Components: Table API  SQL
Reporter: Shuyi Chen
Assignee: Shuyi Chen


RexSimplify is copied to the Flink repo due to 
[CALCITE-2110|https://issues.apache.org/jira/browse/CALCITE-2110], we should 
remove it once flink upgrade Calcite dependency to 1.16.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7934) Upgrade Calcite dependency to 1.15

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338748#comment-16338748
 ] 

ASF GitHub Bot commented on FLINK-7934:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5355
  
Hi, are there anything new in calcite 1.15 that are critical to flink-table?


> Upgrade Calcite dependency to 1.15
> --
>
> Key: FLINK-7934
> URL: https://issues.apache.org/jira/browse/FLINK-7934
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
>
> Umbrella issue for all related issues for Apache Calcite 1.15 release.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5355: [FLINK-7934][Table & SQL API] Upgrade Flink to use Calcit...

2018-01-24 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5355
  
Hi, are there anything new in calcite 1.15 that are critical to flink-table?


---


[jira] [Created] (FLINK-8507) Upgrade Calcite dependency to 1.16

2018-01-24 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-8507:
-

 Summary: Upgrade Calcite dependency to 1.16
 Key: FLINK-8507
 URL: https://issues.apache.org/jira/browse/FLINK-8507
 Project: Flink
  Issue Type: Task
  Components: Table API  SQL
Reporter: Shuyi Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7934) Upgrade Calcite dependency to 1.15

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338650#comment-16338650
 ] 

ASF GitHub Bot commented on FLINK-7934:
---

GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/5355

[FLINK-7934][Table & SQL API] Upgrade Flink to use Calcite 1.15

## Brief change log
  - Update flink-table dependency to use Calcite 1.15
  - Add new codegen from EXTRACT since Calcite 1.15 changed how Extract is 
converted
  - Copy and fixed SqlGroupedWindowFunction from Calcite. (CALCITE-2133)
  - Copy and fixed RexSimplify from Calcite. (CALCITE-2110)
  - Fix unittests.

## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes, as stated in 
the title)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers:  no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink flink-7934

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5355.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5355


commit a3a6451edfc6229bcbc95cbf1f87056bcdc33b7b
Author: Shuyi Chen 
Date:   2018-01-10T00:52:56Z

Upgrade to Calcite 1.15




> Upgrade Calcite dependency to 1.15
> --
>
> Key: FLINK-7934
> URL: https://issues.apache.org/jira/browse/FLINK-7934
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
>
> Umbrella issue for all related issues for Apache Calcite 1.15 release.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5355: [FLINK-7934][Table & SQL API] Upgrade Flink to use...

2018-01-24 Thread suez1224
GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/5355

[FLINK-7934][Table & SQL API] Upgrade Flink to use Calcite 1.15

## Brief change log
  - Update flink-table dependency to use Calcite 1.15
  - Add new codegen from EXTRACT since Calcite 1.15 changed how Extract is 
converted
  - Copy and fixed SqlGroupedWindowFunction from Calcite. (CALCITE-2133)
  - Copy and fixed RexSimplify from Calcite. (CALCITE-2110)
  - Fix unittests.

## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes, as stated in 
the title)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers:  no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink flink-7934

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5355.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5355


commit a3a6451edfc6229bcbc95cbf1f87056bcdc33b7b
Author: Shuyi Chen 
Date:   2018-01-10T00:52:56Z

Upgrade to Calcite 1.15




---


[jira] [Created] (FLINK-8506) fullRestarts Gauge not incremented when jobmanager got killed

2018-01-24 Thread Steven Zhen Wu (JIRA)
Steven Zhen Wu created FLINK-8506:
-

 Summary: fullRestarts Gauge not incremented when jobmanager got 
killed
 Key: FLINK-8506
 URL: https://issues.apache.org/jira/browse/FLINK-8506
 Project: Flink
  Issue Type: Bug
Reporter: Steven Zhen Wu


[~till.rohrmann] When jobmanager node got killed, it will cause job restart. 
But in this case, we didn't see _fullRestarts_ guage got incremented. is this 
expected or a bug?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7954) sideoutput in async function

2018-01-24 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338361#comment-16338361
 ] 

Bowen Li commented on FLINK-7954:
-

I agree. I've tried a few different ways and it's not easily achievable. We can 
just wait until something fundamentally changed

> sideoutput in async function
> 
>
> Key: FLINK-7954
> URL: https://issues.apache.org/jira/browse/FLINK-7954
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: similar to FLINK-7635,adding support of sideoutput to 
> asynFunction 
>Reporter: Chen Qin
>Assignee: Bowen Li
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8364) Add iterator() to ListState which returns empty iterator when it has no value

2018-01-24 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8364:

Description: 
Add iterator() to ListState which returns empty iterator when it has no value



  was:
per discussion with [~stefanrichte...@gmail.com] in 
https://github.com/apache/flink/pull/4963, we decide to have {{ListState#get}} 
return empty iterator if it has no value in it.




> Add iterator() to ListState which returns empty iterator when it has no value
> -
>
> Key: FLINK-8364
> URL: https://issues.apache.org/jira/browse/FLINK-8364
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Add iterator() to ListState which returns empty iterator when it has no value



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338297#comment-16338297
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r163692223
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

changing EventTimeTrigger/ProcessingTimeTrigger would be my preference. 
However I was hesitant to just go in and change the class definition. I also 
don't believe it *should* break existing tests

I'll give it a go and see


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-24 Thread dyanarose
Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r163692223
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

changing EventTimeTrigger/ProcessingTimeTrigger would be my preference. 
However I was hesitant to just go in and change the class definition. I also 
don't believe it *should* break existing tests

I'll give it a go and see


---


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338292#comment-16338292
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5337
  
good to have the background info from the email thread. I didn't have a 
full picture before


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-24 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5337
  
good to have the background info from the email thread. I didn't have a 
full picture before


---


[jira] [Closed] (FLINK-8406) BucketingSink does not detect hadoop file systems

2018-01-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8406.
---

> BucketingSink does not detect hadoop file systems
> -
>
> Key: FLINK-8406
> URL: https://issues.apache.org/jira/browse/FLINK-8406
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> In {{BucketingSink#createHadoopFileSystem}} one can find this piece of code:
> {code}
> final org.apache.flink.core.fs.FileSystem flinkFs =
>   org.apache.flink.core.fs.FileSystem.get(path.toUri());
> final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem)
>   ? ((HadoopFileSystem) flinkFs).getHadoopFileSystem()
>   : null;
> {code}
> {{FileSystem#get()}} wraps the created {{FileSystem}} in a 
> {{SafetyNetWrapperFileSystem}}, resulting in the instanceof check to 
> categorically fail.
> We may want to replace the {{get()}} call with {{getUnguardedFileSystem()}}. 
> We should also look for other occurrences of similar instanceof checks.
> According to a thread on the mailing list this causes the BucketingSink to be 
> unusable. 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/BucketingSink-broken-in-flink-1-4-0-td17710.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8406) BucketingSink does not detect hadoop file systems

2018-01-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8406.
-
Resolution: Fixed

Fixed in
  - 1.4.1 via eb6380901e0c62c57eb300b730548c8ad9ec15e0
  - 1.5.0 via c869eb9d1cee7e52f5dcf4a5803cdf7681d979fd

> BucketingSink does not detect hadoop file systems
> -
>
> Key: FLINK-8406
> URL: https://issues.apache.org/jira/browse/FLINK-8406
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> In {{BucketingSink#createHadoopFileSystem}} one can find this piece of code:
> {code}
> final org.apache.flink.core.fs.FileSystem flinkFs =
>   org.apache.flink.core.fs.FileSystem.get(path.toUri());
> final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem)
>   ? ((HadoopFileSystem) flinkFs).getHadoopFileSystem()
>   : null;
> {code}
> {{FileSystem#get()}} wraps the created {{FileSystem}} in a 
> {{SafetyNetWrapperFileSystem}}, resulting in the instanceof check to 
> categorically fail.
> We may want to replace the {{get()}} call with {{getUnguardedFileSystem()}}. 
> We should also look for other occurrences of similar instanceof checks.
> According to a thread on the mailing list this causes the BucketingSink to be 
> unusable. 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/BucketingSink-broken-in-flink-1-4-0-td17710.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8466) ErrorInfo needs to hold Exception as SerializedThrowable

2018-01-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8466.
-
Resolution: Fixed

Fixed in
  - 1.4.1 via 4d1ba45e78fa4ed6cd5af0405c7988ffaa6dee13
  - 1.5.0 via 524c5013a1e877cdaf2f4269863a49851eabda85

> ErrorInfo needs to hold Exception as SerializedThrowable
> 
>
> Key: FLINK-8466
> URL: https://issues.apache.org/jira/browse/FLINK-8466
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: Screen Shot 2018-01-20 at 1.48.33 PM.png
>
>
> MemoryArchivist holding on to last thrown exception prevents the user code 
> classloader from being garbage collected.
> MemoryArchivist holds on to any number of ArchivedExecutionGraph's. These 
> ArchivedExecutionGraph instances contain a failureCause field of type 
> ErrorInfo that wrap the exception in case one was thrown to terminate the job
> This exception class will more often than not have been loaded by a user code 
> classloader, and as long as the MemoryArchivist holds on to this exception, 
> the JVM won't be able to reclaim the resources held by this classloader



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8466) ErrorInfo needs to hold Exception as SerializedThrowable

2018-01-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8466.
---

> ErrorInfo needs to hold Exception as SerializedThrowable
> 
>
> Key: FLINK-8466
> URL: https://issues.apache.org/jira/browse/FLINK-8466
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: Screen Shot 2018-01-20 at 1.48.33 PM.png
>
>
> MemoryArchivist holding on to last thrown exception prevents the user code 
> classloader from being garbage collected.
> MemoryArchivist holds on to any number of ArchivedExecutionGraph's. These 
> ArchivedExecutionGraph instances contain a failureCause field of type 
> ErrorInfo that wrap the exception in case one was thrown to terminate the job
> This exception class will more often than not have been loaded by a user code 
> classloader, and as long as the MemoryArchivist holds on to this exception, 
> the JVM won't be able to reclaim the resources held by this classloader



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8499) Kryo must not be child-first loaded

2018-01-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8499.
---

> Kryo must not be child-first loaded
> ---
>
> Key: FLINK-8499
> URL: https://issues.apache.org/jira/browse/FLINK-8499
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> Kryo classes are part of Flink API and hence move between Flink's core 
> (serializers) and the user-application (Avro-Kryo-utils).
> Duplicating the kryo dependency through reversed classloading yields 
> problems. If Kryo is in the user application jar, together with Avro, the 
> following error happens:
> (this seems a semi-bug in the JVM, because this should clearly be a 
> {{ClassCastException}}, not such a cryptic byte code error).
> {code}
> java.lang.VerifyError: Bad type on operand stack
> Exception Details:
>   Location:
> 
> org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V
>  @23: invokespecial
>   Reason:
> Type 
> 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList'
>  (current frame, stack[7]) is not assignable to 
> 'com/esotericsoftware/kryo/Serializer'
>   Current Frame:
> bci: @23
> flags: { }
> locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils', 
> 'java/util/LinkedHashMap' }
> stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6, 
> uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized 12, 
> 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList'
>  }
>   Bytecode:
> 0x000: 2b12 05b6 000b bb00 0c59 1205 bb00 0d59
> 0x010: bb00 0659 b700 0eb7 000f b700 10b6 0011
> 0x020: 57b1   
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8499) Kryo must not be child-first loaded

2018-01-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8499.
-
Resolution: Fixed

Fixed in
  - 1.4.1 via 15cb057bffd32ba8a853b46b207a5b7ea6bba430
  - 1.5.0 via 1b466c055d9d4cd481096af770118c7a899a90af

> Kryo must not be child-first loaded
> ---
>
> Key: FLINK-8499
> URL: https://issues.apache.org/jira/browse/FLINK-8499
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> Kryo classes are part of Flink API and hence move between Flink's core 
> (serializers) and the user-application (Avro-Kryo-utils).
> Duplicating the kryo dependency through reversed classloading yields 
> problems. If Kryo is in the user application jar, together with Avro, the 
> following error happens:
> (this seems a semi-bug in the JVM, because this should clearly be a 
> {{ClassCastException}}, not such a cryptic byte code error).
> {code}
> java.lang.VerifyError: Bad type on operand stack
> Exception Details:
>   Location:
> 
> org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V
>  @23: invokespecial
>   Reason:
> Type 
> 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList'
>  (current frame, stack[7]) is not assignable to 
> 'com/esotericsoftware/kryo/Serializer'
>   Current Frame:
> bci: @23
> flags: { }
> locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils', 
> 'java/util/LinkedHashMap' }
> stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6, 
> uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized 12, 
> 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList'
>  }
>   Bytecode:
> 0x000: 2b12 05b6 000b bb00 0c59 1205 bb00 0d59
> 0x010: bb00 0659 b700 0eb7 000f b700 10b6 0011
> 0x020: 57b1   
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-24 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r163684364
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

Cant we change `EventTimeTrigger` to extends `Trigger` 
instead of `Trigger`? Don't see any reason why this would 
fail any existing code/test and it's also `@PublicEvolving`


---


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338257#comment-16338257
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r163684364
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

Cant we change `EventTimeTrigger` to extends `Trigger` 
instead of `Trigger`? Don't see any reason why this would 
fail any existing code/test and it's also `@PublicEvolving`


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8505) SlotManager can reach inconsistent state

2018-01-24 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338001#comment-16338001
 ] 

Till Rohrmann commented on FLINK-8505:
--

Another instance: https://travis-ci.org/apache/flink/jobs/332770324

> SlotManager can reach inconsistent state
> 
>
> Key: FLINK-8505
> URL: https://issues.apache.org/jira/browse/FLINK-8505
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{SlotManager}} can reach an inconsistent state when a formerly free task 
> slot is reported as allocated by an incoming {{SlotReport}}. The problem is 
> that the slot won't be removed from the set of free slots and, thus, will be 
> considered for future slot requests.
> The solution is to remove the slot from the set of free slots once it is 
> reported allocated.
>  
> https://travis-ci.org/tillrohrmann/flink/jobs/332871241



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8505) SlotManager can reach inconsistent state

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337997#comment-16337997
 ] 

ASF GitHub Bot commented on FLINK-8505:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5354

[FLINK-8505] [flip6] Prevent SlotManager from reaching an inconsistent state

## What is the purpose of the change

The SlotManager could reach an inconsistent state when a formerly free slot 
is reported
to be allocated by an incoming SlotReport. This state transition did not 
remove the slot
from the set of free slots. As a consequence, a now allocated slot will be 
considered for
future SlotRequests. This caused a failure with an IllegalStateException.

The problem is solved by removing an updated slot which is now allocated 
from the set of
free slots.

## Brief change log

- Remove slot from `SlotManager#freeSlots` when slot is reported to be 
`ALLOCATED` instead of `FREE`

## Verifying this change

- Added `SlotManagerTest#testReportAllocatedSlot`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixSlotManagerInconsistency

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5354.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5354


commit 329bbb0e3bf7600054e865acf69c63187d04b4a0
Author: Till Rohrmann 
Date:   2018-01-24T17:32:47Z

[FLINK-8505] [flip6] Prevent SlotManager from reaching an inconsistent state

The SlotManager could reach an inconsistent state when a formerly free slot 
is reported
to be allocated by an incoming SlotReport. This state transition did not 
remove the slot
from the set of free slots. As a consequence, a now allocated slot will be 
considered for
future SlotRequests. This caused a failure with an IllegalStateException.

The problem is solved by removing an updated slot which is now allocated 
from the set of
free slots.




> SlotManager can reach inconsistent state
> 
>
> Key: FLINK-8505
> URL: https://issues.apache.org/jira/browse/FLINK-8505
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{SlotManager}} can reach an inconsistent state when a formerly free task 
> slot is reported as allocated by an incoming {{SlotReport}}. The problem is 
> that the slot won't be removed from the set of free slots and, thus, will be 
> considered for future slot requests.
> The solution is to remove the slot from the set of free slots once it is 
> reported allocated.
>  
> https://travis-ci.org/tillrohrmann/flink/jobs/332871241



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5354: [FLINK-8505] [flip6] Prevent SlotManager from reac...

2018-01-24 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5354

[FLINK-8505] [flip6] Prevent SlotManager from reaching an inconsistent state

## What is the purpose of the change

The SlotManager could reach an inconsistent state when a formerly free slot 
is reported
to be allocated by an incoming SlotReport. This state transition did not 
remove the slot
from the set of free slots. As a consequence, a now allocated slot will be 
considered for
future SlotRequests. This caused a failure with an IllegalStateException.

The problem is solved by removing an updated slot which is now allocated 
from the set of
free slots.

## Brief change log

- Remove slot from `SlotManager#freeSlots` when slot is reported to be 
`ALLOCATED` instead of `FREE`

## Verifying this change

- Added `SlotManagerTest#testReportAllocatedSlot`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixSlotManagerInconsistency

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5354.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5354


commit 329bbb0e3bf7600054e865acf69c63187d04b4a0
Author: Till Rohrmann 
Date:   2018-01-24T17:32:47Z

[FLINK-8505] [flip6] Prevent SlotManager from reaching an inconsistent state

The SlotManager could reach an inconsistent state when a formerly free slot 
is reported
to be allocated by an incoming SlotReport. This state transition did not 
remove the slot
from the set of free slots. As a consequence, a now allocated slot will be 
considered for
future SlotRequests. This caused a failure with an IllegalStateException.

The problem is solved by removing an updated slot which is now allocated 
from the set of
free slots.




---


[jira] [Created] (FLINK-8505) SlotManager can reach inconsistent state

2018-01-24 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8505:


 Summary: SlotManager can reach inconsistent state
 Key: FLINK-8505
 URL: https://issues.apache.org/jira/browse/FLINK-8505
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


The {{SlotManager}} can reach an inconsistent state when a formerly free task 
slot is reported as allocated by an incoming {{SlotReport}}. The problem is 
that the slot won't be removed from the set of free slots and, thus, will be 
considered for future slot requests.

The solution is to remove the slot from the set of free slots once it is 
reported allocated.

 

https://travis-ci.org/tillrohrmann/flink/jobs/332871241



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6623) BlobCacheSuccessTest fails on Windows

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337982#comment-16337982
 ] 

ASF GitHub Bot commented on FLINK-6623:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5351
  
@zentol I revived and rebased an old branch with an attempt to fix the 
concurrency issue of file deletes on Windows: 
https://github.com/StephanEwen/incubator-flink/commit/0504a66b82ca2afe9a40457413585ecab6d6cc33

Can you check this out, see if it works, and commit is it is helpful?


> BlobCacheSuccessTest fails on Windows
> -
>
> Key: FLINK-6623
> URL: https://issues.apache.org/jira/browse/FLINK-6623
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0, 1.5.0
> Environment: windows 10, java 1.8
>Reporter: constantine stanley
>Assignee: Chesnay Schepler
>Priority: Major
>
> All tests in {{BlobCacheSuccessTest}} fail on Windows.
> {code}
> java.nio.file.FileSystemException: 
> C:\Users\Zento\AppData\Local\Temp\junit1683251984771313204\junit8061309906182960047\blobStore-04a01d13-96c8-4c0d-b209-5ea0bf5e534d\incoming\temp-
>  -> 
> C:\Users\Zento\AppData\Local\Temp\junit1683251984771313204\junit8061309906182960047\blobStore-04a01d13-96c8-4c0d-b209-5ea0bf5e534d\job_a8fef824a8e43a546dfa05d0c8b73e57\blob_p-0ae4f711ef5d6e9d26c611fd2c8c8ac45ecbf9e7-cd525d0173571dc24f4c0723130892af:
>  The process cannot access the file because it is being used by another 
> process.
>   at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>   at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
>   at 
> sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
>   at java.nio.file.Files.move(Files.java:1395)
>   at 
> org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:464)
>   at 
> org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:708)
>   at 
> org.apache.flink.runtime.blob.BlobServer.putBuffer(BlobServer.java:608)
>   at 
> org.apache.flink.runtime.blob.BlobServer.putPermanent(BlobServer.java:568)
>   at 
> org.apache.flink.runtime.blob.BlobServerPutTest.put(BlobServerPutTest.java:778)
>   at 
> org.apache.flink.runtime.blob.BlobCacheSuccessTest.uploadFileGetTest(BlobCacheSuccessTest.java:173)
>   at 
> org.apache.flink.runtime.blob.BlobCacheSuccessTest.testBlobForJobCacheHa(BlobCacheSuccessTest.java:90)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5351: [FLINK-6623][Blob] BlobServer#putBuffer moves file after ...

2018-01-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5351
  
@zentol I revived and rebased an old branch with an attempt to fix the 
concurrency issue of file deletes on Windows: 
https://github.com/StephanEwen/incubator-flink/commit/0504a66b82ca2afe9a40457413585ecab6d6cc33

Can you check this out, see if it works, and commit is it is helpful?


---


[GitHub] flink pull request #5346: [FLINK-8490] [mesos] Allow custom docker parameter...

2018-01-24 Thread joerg84
Github user joerg84 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5346#discussion_r163624908
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 ---
@@ -365,6 +386,31 @@ public String call(String s) {
}
}
 
+   public static List 
buildDockerParameters(Option dockerParameters) {
+   if (dockerParameters.isEmpty()) {
+   return Collections.emptyList();
+   } else {
+   String[] dockerParameterSpecifications = 
dockerParameters.get().split(",");
+
+   List parameters = new 
ArrayList<>(dockerParameterSpecifications.length);
+
+   for (String dockerParameterSpecification : 
dockerParameterSpecifications) {
+   if 
(!dockerParameterSpecification.trim().isEmpty()) {
+   String[] match = 
dockerParameterSpecification.split("=", 2);
--- End diff --

I see, thanks for the explanation!


---


[jira] [Commented] (FLINK-8490) Allow custom docker parameters for docker tasks on Mesos

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337961#comment-16337961
 ] 

ASF GitHub Bot commented on FLINK-8490:
---

Github user joerg84 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5346#discussion_r163624908
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 ---
@@ -365,6 +386,31 @@ public String call(String s) {
}
}
 
+   public static List 
buildDockerParameters(Option dockerParameters) {
+   if (dockerParameters.isEmpty()) {
+   return Collections.emptyList();
+   } else {
+   String[] dockerParameterSpecifications = 
dockerParameters.get().split(",");
+
+   List parameters = new 
ArrayList<>(dockerParameterSpecifications.length);
+
+   for (String dockerParameterSpecification : 
dockerParameterSpecifications) {
+   if 
(!dockerParameterSpecification.trim().isEmpty()) {
+   String[] match = 
dockerParameterSpecification.split("=", 2);
--- End diff --

I see, thanks for the explanation!


> Allow custom docker parameters for docker tasks on Mesos
> 
>
> Key: FLINK-8490
> URL: https://issues.apache.org/jira/browse/FLINK-8490
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Reporter: Jörg Schad
>Priority: Major
>
> It would be great to pass custom parameters to Mesos when using the Docker 
> Containerizer.
> This could be similar to this spark example: 
> `spark.mesos.executor.docker.parameters privileged=true`
>  
> Originally brought up here: 
> https://stackoverflow.com/questions/48393153/passing-custom-parameters-to-docker-when-running-flink-on-mesos-marathon?noredirect=1#comment83777480_48393153



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8406) BucketingSink does not detect hadoop file systems

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337906#comment-16337906
 ] 

ASF GitHub Bot commented on FLINK-8406:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5330


> BucketingSink does not detect hadoop file systems
> -
>
> Key: FLINK-8406
> URL: https://issues.apache.org/jira/browse/FLINK-8406
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> In {{BucketingSink#createHadoopFileSystem}} one can find this piece of code:
> {code}
> final org.apache.flink.core.fs.FileSystem flinkFs =
>   org.apache.flink.core.fs.FileSystem.get(path.toUri());
> final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem)
>   ? ((HadoopFileSystem) flinkFs).getHadoopFileSystem()
>   : null;
> {code}
> {{FileSystem#get()}} wraps the created {{FileSystem}} in a 
> {{SafetyNetWrapperFileSystem}}, resulting in the instanceof check to 
> categorically fail.
> We may want to replace the {{get()}} call with {{getUnguardedFileSystem()}}. 
> We should also look for other occurrences of similar instanceof checks.
> According to a thread on the mailing list this causes the BucketingSink to be 
> unusable. 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/BucketingSink-broken-in-flink-1-4-0-td17710.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8466) ErrorInfo needs to hold Exception as SerializedThrowable

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337905#comment-16337905
 ] 

ASF GitHub Bot commented on FLINK-8466:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5348


> ErrorInfo needs to hold Exception as SerializedThrowable
> 
>
> Key: FLINK-8466
> URL: https://issues.apache.org/jira/browse/FLINK-8466
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: Screen Shot 2018-01-20 at 1.48.33 PM.png
>
>
> MemoryArchivist holding on to last thrown exception prevents the user code 
> classloader from being garbage collected.
> MemoryArchivist holds on to any number of ArchivedExecutionGraph's. These 
> ArchivedExecutionGraph instances contain a failureCause field of type 
> ErrorInfo that wrap the exception in case one was thrown to terminate the job
> This exception class will more often than not have been loaded by a user code 
> classloader, and as long as the MemoryArchivist holds on to this exception, 
> the JVM won't be able to reclaim the resources held by this classloader



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5330: [FLINK-8406] [bucketing sink] Fix proper access of...

2018-01-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5330


---


[GitHub] flink pull request #5348: [FLINK-8466] [runtime] Make sure ErrorInfo referen...

2018-01-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5348


---


[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337902#comment-16337902
 ] 

ASF GitHub Bot commented on FLINK-7124:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4510
  
@tzulitai is anything left to do here?


> Allow to rescale JobGraph on JobManager
> ---
>
> Key: FLINK-7124
> URL: https://issues.apache.org/jira/browse/FLINK-7124
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>
> In order to support dynamic scaling, the {{JobManager}} has to be able to 
> change the parallelism of the submitted {{JobGraph}}. This basically entails 
> that we can change the parallelism settings on the cluster side.
> We already have the functionality that we can change the parallelism if it 
> was set to {{ExecutionConfig.PARALLELISM_AUTO_MAX}}. Therefore, I think the 
> task is mostly about making sure that things really properly work when 
> requesting a parallelism change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #4510: [FLINK-7124] [flip-6] Add test to verify rescaling JobGra...

2018-01-24 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4510
  
@tzulitai is anything left to do here?


---


[jira] [Commented] (FLINK-8439) Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop

2018-01-24 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337897#comment-16337897
 ] 

Stephan Ewen commented on FLINK-8439:
-

Big +1!

> Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop
> 
>
> Key: FLINK-8439
> URL: https://issues.apache.org/jira/browse/FLINK-8439
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Dyana Rose
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> This came up when using the s3 for the file system backend and running under 
> ECS.
> With no credentials in the container, hadoop-aws will default to EC2 instance 
> level credentials when accessing S3. However when running under ECS, you will 
> generally want to default to the task definition's IAM role.
> In this case you need to set the hadoop property
> {code:java}
> fs.s3a.aws.credentials.provider{code}
> to a fully qualified class name(s). see [hadoop-aws 
> docs|https://github.com/apache/hadoop/blob/1ba491ff907fc5d2618add980734a3534e2be098/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md]
> This works as expected when you add this setting to flink-conf.yaml but there 
> is a further 'gotcha.'  Because the AWS sdk is shaded, the actual full class 
> name for, in this case, the ContainerCredentialsProvider is
> {code:java}
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
>  
> meaning the full setting is:
> {code:java}
> fs.s3a.aws.credentials.provider: 
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
> If you instead set it to the unshaded class name you will see a very 
> confusing error stating that the ContainerCredentialsProvider doesn't 
> implement AWSCredentialsProvider (which it most certainly does.)
> Adding this information (how to specify alternate Credential Providers, and 
> the name space gotcha) to the [AWS deployment 
> docs|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html]
>  would be useful to anyone else using S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8439) Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop

2018-01-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-8439:

Fix Version/s: 1.4.1

> Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop
> 
>
> Key: FLINK-8439
> URL: https://issues.apache.org/jira/browse/FLINK-8439
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Dyana Rose
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> This came up when using the s3 for the file system backend and running under 
> ECS.
> With no credentials in the container, hadoop-aws will default to EC2 instance 
> level credentials when accessing S3. However when running under ECS, you will 
> generally want to default to the task definition's IAM role.
> In this case you need to set the hadoop property
> {code:java}
> fs.s3a.aws.credentials.provider{code}
> to a fully qualified class name(s). see [hadoop-aws 
> docs|https://github.com/apache/hadoop/blob/1ba491ff907fc5d2618add980734a3534e2be098/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md]
> This works as expected when you add this setting to flink-conf.yaml but there 
> is a further 'gotcha.'  Because the AWS sdk is shaded, the actual full class 
> name for, in this case, the ContainerCredentialsProvider is
> {code:java}
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
>  
> meaning the full setting is:
> {code:java}
> fs.s3a.aws.credentials.provider: 
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
> If you instead set it to the unshaded class name you will see a very 
> confusing error stating that the ContainerCredentialsProvider doesn't 
> implement AWSCredentialsProvider (which it most certainly does.)
> Adding this information (how to specify alternate Credential Providers, and 
> the name space gotcha) to the [AWS deployment 
> docs|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html]
>  would be useful to anyone else using S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8503) Port TaskManagerLogHandler to new REST endpoint

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337865#comment-16337865
 ] 

ASF GitHub Bot commented on FLINK-8503:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5353

[FLINK-8503] [flip6] Display TaskExecutor logs and stdout files in web ui

## What is the purpose of the change

Introduce the AbstractHandler which takes a typed request and returns an 
untyped
response. The AbstractRestHandler extends the AbstractHandler to add typed 
reponses.

Introduce AbstractTaskManagerFileHandler which encapsulates the file 
loading logic.
Upon request of a TaskManager file, the handler will trigger the file 
upload via
the ResourceManager. The returned TransientBlobKey is then downloaded via 
the
TransientBlobService. Once downloaded, the file is served to the client. 
Each
transient blob key is cached for maximum duration after which it is purged 
and has
to be reuploaded by the TaskExecutor.

This PR is based on #5341 

## Brief change log

- Introduced untyped response handler `AbstractHandler`
- Added `AbstractTaskManagerFileHandler` which is responsible for serving 
files from the `TaskExecutor`
- The `AbstractTaskManagerFileHandler` triggers the file upload via the 
`ResourceManager` which knows the `TaskExecutors`, additionally it caches the 
`TransientBlobKeys` in order to not always trigger a file upload
- Added `TaskManagerLogFileHandler` to serve the log file
- Added `TaskManagerStdoutFileHandler` to serve the stdout file

## Verifying this change

- Added `AbstractTaskManagerFileHandlerTest`
- Tested functionality manually

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
enableTaskManagerLogRetrieval

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5353.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5353


commit b9db2ab22c346ec64363b446ed23692af1365239
Author: Till Rohrmann 
Date:   2018-01-23T18:12:27Z

[FLINK-8501] [flip6] Use single BlobCacheService per TaskExecutor

Instead of creating for each new JobManagerConnection a dedicated 
BlobCacheService
the TaskExecutor uses a single BlobCacheService which it shares between the
different JobManagerConnections. The initial BlobServer address is passed 
by the
ResourceManager when the TaskExecutor registers at it. In order to avoid 
the re-
creation of BlobCacheServices, this commit changes the behaviour such that 
one can
update the BlobServer address.

commit 5bd4db619ff0e984477ead323345f5f7fa740626
Author: Till Rohrmann 
Date:   2018-01-24T12:41:53Z

[hotfix] [tests] Remove JobManagerRunnerMockTest

The JobManagerRunnerMockTest is completely ignored. Moreover, it tests 
things with
heavy usage of Mockito which is hard to maintain.

commit c38adb65dd49150a03a2f5f7ea8d421b2ef34616
Author: Till Rohrmann 
Date:   2018-01-24T12:42:41Z

[FLINK-8502] [flip6] Remove LibraryCacheManager from JobMaster

This commit removes the LibraryCacheManager from the JobMaster since it is
no longer needed. The JobMaster is started with the correct user code class
loader and, thus, does not need the LibraryCacheManager.

This commit also corrects that the BlobServer is not closed by the
JobManagerServices#shutdown method.

commit eb3fed4ce96b4ce27f01d72f2dc76b151dbdd1ae
Author: Till Rohrmann 
Date:   2018-01-23T14:17:16Z

[FLINK-8495] [flip6] Enable main cluster component's log and stdout file 
retrieval

This commit enables the log and stdout file retrieval of the cluster's main 
component
via the web ui. This happens via the StaticFileServerHandler which serves 
the log
and stdout file.

commit 23496241962c69ceb3ada9bd0d67c0344554cf99
Author: Till Rohrmann 
Date:   2018-01-23T17:27:28Z

[FLINK-8503] [flip6] Display 

[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...

2018-01-24 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5353

[FLINK-8503] [flip6] Display TaskExecutor logs and stdout files in web ui

## What is the purpose of the change

Introduce the AbstractHandler which takes a typed request and returns an 
untyped
response. The AbstractRestHandler extends the AbstractHandler to add typed 
reponses.

Introduce AbstractTaskManagerFileHandler which encapsulates the file 
loading logic.
Upon request of a TaskManager file, the handler will trigger the file 
upload via
the ResourceManager. The returned TransientBlobKey is then downloaded via 
the
TransientBlobService. Once downloaded, the file is served to the client. 
Each
transient blob key is cached for maximum duration after which it is purged 
and has
to be reuploaded by the TaskExecutor.

This PR is based on #5341 

## Brief change log

- Introduced untyped response handler `AbstractHandler`
- Added `AbstractTaskManagerFileHandler` which is responsible for serving 
files from the `TaskExecutor`
- The `AbstractTaskManagerFileHandler` triggers the file upload via the 
`ResourceManager` which knows the `TaskExecutors`, additionally it caches the 
`TransientBlobKeys` in order to not always trigger a file upload
- Added `TaskManagerLogFileHandler` to serve the log file
- Added `TaskManagerStdoutFileHandler` to serve the stdout file

## Verifying this change

- Added `AbstractTaskManagerFileHandlerTest`
- Tested functionality manually

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
enableTaskManagerLogRetrieval

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5353.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5353


commit b9db2ab22c346ec64363b446ed23692af1365239
Author: Till Rohrmann 
Date:   2018-01-23T18:12:27Z

[FLINK-8501] [flip6] Use single BlobCacheService per TaskExecutor

Instead of creating for each new JobManagerConnection a dedicated 
BlobCacheService
the TaskExecutor uses a single BlobCacheService which it shares between the
different JobManagerConnections. The initial BlobServer address is passed 
by the
ResourceManager when the TaskExecutor registers at it. In order to avoid 
the re-
creation of BlobCacheServices, this commit changes the behaviour such that 
one can
update the BlobServer address.

commit 5bd4db619ff0e984477ead323345f5f7fa740626
Author: Till Rohrmann 
Date:   2018-01-24T12:41:53Z

[hotfix] [tests] Remove JobManagerRunnerMockTest

The JobManagerRunnerMockTest is completely ignored. Moreover, it tests 
things with
heavy usage of Mockito which is hard to maintain.

commit c38adb65dd49150a03a2f5f7ea8d421b2ef34616
Author: Till Rohrmann 
Date:   2018-01-24T12:42:41Z

[FLINK-8502] [flip6] Remove LibraryCacheManager from JobMaster

This commit removes the LibraryCacheManager from the JobMaster since it is
no longer needed. The JobMaster is started with the correct user code class
loader and, thus, does not need the LibraryCacheManager.

This commit also corrects that the BlobServer is not closed by the
JobManagerServices#shutdown method.

commit eb3fed4ce96b4ce27f01d72f2dc76b151dbdd1ae
Author: Till Rohrmann 
Date:   2018-01-23T14:17:16Z

[FLINK-8495] [flip6] Enable main cluster component's log and stdout file 
retrieval

This commit enables the log and stdout file retrieval of the cluster's main 
component
via the web ui. This happens via the StaticFileServerHandler which serves 
the log
and stdout file.

commit 23496241962c69ceb3ada9bd0d67c0344554cf99
Author: Till Rohrmann 
Date:   2018-01-23T17:27:28Z

[FLINK-8503] [flip6] Display TaskExecutor logs and stdout files in web ui

Introduce the AbstractHandler which takes a typed request and returns an 
untyped
response. The AbstractRestHandler extends the AbstractHandler to add typed 
reponses.

Introduce 

[jira] [Commented] (FLINK-8485) Running Flink inside Intellij no longer works after upgrading from 1.3.2 to 1.4.0

2018-01-24 Thread Xuan Nguyen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337854#comment-16337854
 ] 

Xuan Nguyen commented on FLINK-8485:


I saw the ClassDefNotFound so I included the:
{code:java}

 org.apache.flink
 flink-queryable-state-runtime_2.11
 1.4.0
{code}
Here is result with the same Exception:

 
{code:java}
/usr/lib/jvm/java-8-oracle/bin/java 
-javaagent:/home/xuan/java/idea/lib/idea_rt.jar=37173:/home/xuan/java/idea/bin 
-Dfile.encoding=UTF-8 -classpath 

[jira] [Commented] (FLINK-8485) Running Flink inside Intellij no longer works after upgrading from 1.3.2 to 1.4.0

2018-01-24 Thread Xuan Nguyen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337852#comment-16337852
 ] 

Xuan Nguyen commented on FLINK-8485:


Hi [~till.rohrmann] and [~StephanEwen]
 # I generated the quickstart using: 

{code:java}
mvn archetype:generate -DarchetypeGroupId=org.apache.flink 
-DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.4.0{code}

 # Opened the project in Intellij
 # Ran the WordCount inside Intellij

Here is my log with 
*log4j.logger.org.apache.flink.runtime.jobmanager.JobManager=trace* debugger on:
{code}
/usr/lib/jvm/java-8-oracle/bin/java 
-javaagent:/home/xuan/java/idea/lib/idea_rt.jar=36285:/home/xuan/java/idea/bin 
-Dfile.encoding=UTF-8 -classpath 

[jira] [Commented] (FLINK-8439) Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop

2018-01-24 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337845#comment-16337845
 ] 

Aljoscha Krettek commented on FLINK-8439:
-

We could also add code that automatically remaps config values to the shaded 
package if it detects the s3 path in there.

> Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop
> 
>
> Key: FLINK-8439
> URL: https://issues.apache.org/jira/browse/FLINK-8439
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Dyana Rose
>Priority: Blocker
> Fix For: 1.5.0
>
>
> This came up when using the s3 for the file system backend and running under 
> ECS.
> With no credentials in the container, hadoop-aws will default to EC2 instance 
> level credentials when accessing S3. However when running under ECS, you will 
> generally want to default to the task definition's IAM role.
> In this case you need to set the hadoop property
> {code:java}
> fs.s3a.aws.credentials.provider{code}
> to a fully qualified class name(s). see [hadoop-aws 
> docs|https://github.com/apache/hadoop/blob/1ba491ff907fc5d2618add980734a3534e2be098/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md]
> This works as expected when you add this setting to flink-conf.yaml but there 
> is a further 'gotcha.'  Because the AWS sdk is shaded, the actual full class 
> name for, in this case, the ContainerCredentialsProvider is
> {code:java}
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
>  
> meaning the full setting is:
> {code:java}
> fs.s3a.aws.credentials.provider: 
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
> If you instead set it to the unshaded class name you will see a very 
> confusing error stating that the ContainerCredentialsProvider doesn't 
> implement AWSCredentialsProvider (which it most certainly does.)
> Adding this information (how to specify alternate Credential Providers, and 
> the name space gotcha) to the [AWS deployment 
> docs|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html]
>  would be useful to anyone else using S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8439) Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop

2018-01-24 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8439:

Priority: Blocker  (was: Trivial)

> Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop
> 
>
> Key: FLINK-8439
> URL: https://issues.apache.org/jira/browse/FLINK-8439
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Dyana Rose
>Priority: Blocker
> Fix For: 1.5.0
>
>
> This came up when using the s3 for the file system backend and running under 
> ECS.
> With no credentials in the container, hadoop-aws will default to EC2 instance 
> level credentials when accessing S3. However when running under ECS, you will 
> generally want to default to the task definition's IAM role.
> In this case you need to set the hadoop property
> {code:java}
> fs.s3a.aws.credentials.provider{code}
> to a fully qualified class name(s). see [hadoop-aws 
> docs|https://github.com/apache/hadoop/blob/1ba491ff907fc5d2618add980734a3534e2be098/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md]
> This works as expected when you add this setting to flink-conf.yaml but there 
> is a further 'gotcha.'  Because the AWS sdk is shaded, the actual full class 
> name for, in this case, the ContainerCredentialsProvider is
> {code:java}
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
>  
> meaning the full setting is:
> {code:java}
> fs.s3a.aws.credentials.provider: 
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
> If you instead set it to the unshaded class name you will see a very 
> confusing error stating that the ContainerCredentialsProvider doesn't 
> implement AWSCredentialsProvider (which it most certainly does.)
> Adding this information (how to specify alternate Credential Providers, and 
> the name space gotcha) to the [AWS deployment 
> docs|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html]
>  would be useful to anyone else using S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8439) Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop

2018-01-24 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8439:

Fix Version/s: 1.5.0

> Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop
> 
>
> Key: FLINK-8439
> URL: https://issues.apache.org/jira/browse/FLINK-8439
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Dyana Rose
>Priority: Trivial
> Fix For: 1.5.0
>
>
> This came up when using the s3 for the file system backend and running under 
> ECS.
> With no credentials in the container, hadoop-aws will default to EC2 instance 
> level credentials when accessing S3. However when running under ECS, you will 
> generally want to default to the task definition's IAM role.
> In this case you need to set the hadoop property
> {code:java}
> fs.s3a.aws.credentials.provider{code}
> to a fully qualified class name(s). see [hadoop-aws 
> docs|https://github.com/apache/hadoop/blob/1ba491ff907fc5d2618add980734a3534e2be098/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md]
> This works as expected when you add this setting to flink-conf.yaml but there 
> is a further 'gotcha.'  Because the AWS sdk is shaded, the actual full class 
> name for, in this case, the ContainerCredentialsProvider is
> {code:java}
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
>  
> meaning the full setting is:
> {code:java}
> fs.s3a.aws.credentials.provider: 
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
> If you instead set it to the unshaded class name you will see a very 
> confusing error stating that the ContainerCredentialsProvider doesn't 
> implement AWSCredentialsProvider (which it most certainly does.)
> Adding this information (how to specify alternate Credential Providers, and 
> the name space gotcha) to the [AWS deployment 
> docs|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html]
>  would be useful to anyone else using S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5346: [FLINK-8490] [mesos] Allow custom docker parameters for d...

2018-01-24 Thread lishim
Github user lishim commented on the issue:

https://github.com/apache/flink/pull/5346
  
Thanks @joerg84 for your review and comments.  I have updated the PR to 
address your concerns.


---


[jira] [Commented] (FLINK-8490) Allow custom docker parameters for docker tasks on Mesos

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337843#comment-16337843
 ] 

ASF GitHub Bot commented on FLINK-8490:
---

Github user lishim commented on the issue:

https://github.com/apache/flink/pull/5346
  
Thanks @joerg84 for your review and comments.  I have updated the PR to 
address your concerns.


> Allow custom docker parameters for docker tasks on Mesos
> 
>
> Key: FLINK-8490
> URL: https://issues.apache.org/jira/browse/FLINK-8490
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Reporter: Jörg Schad
>Priority: Major
>
> It would be great to pass custom parameters to Mesos when using the Docker 
> Containerizer.
> This could be similar to this spark example: 
> `spark.mesos.executor.docker.parameters privileged=true`
>  
> Originally brought up here: 
> https://stackoverflow.com/questions/48393153/passing-custom-parameters-to-docker-when-running-flink-on-mesos-marathon?noredirect=1#comment83777480_48393153



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8364) Add iterator() to ListState which returns empty iterator when it has no value

2018-01-24 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8364:

Summary: Add iterator() to ListState which returns empty iterator when it 
has no value  (was: add getIterator() to ListState, and returns empty iterator 
when it has no value)

> Add iterator() to ListState which returns empty iterator when it has no value
> -
>
> Key: FLINK-8364
> URL: https://issues.apache.org/jira/browse/FLINK-8364
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> per discussion with [~stefanrichte...@gmail.com] in 
> https://github.com/apache/flink/pull/4963, we decide to have 
> {{ListState#get}} return empty iterator if it has no value in it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5346: [FLINK-8490] [mesos] Allow custom docker parameter...

2018-01-24 Thread lishim
Github user lishim commented on a diff in the pull request:

https://github.com/apache/flink/pull/5346#discussion_r163596583
  
--- Diff: 
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
 ---
@@ -84,6 +84,35 @@ public void testContainerVolumes() throws Exception {
assertEquals(Protos.Volume.Mode.RO, 
params.containerVolumes().get(0).getMode());
}
 
+   @Test
+   public void testContainerDockerParameter() throws Exception {
+   Configuration config = new Configuration();
+   
config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS,
 "testKey=testValue");
+
+   MesosTaskManagerParameters params = 
MesosTaskManagerParameters.create(config);
+   assertEquals(params.dockerParameters().get(0).getKey(), 
"testKey");
--- End diff --

Added a size check.


---


[jira] [Commented] (FLINK-8490) Allow custom docker parameters for docker tasks on Mesos

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337837#comment-16337837
 ] 

ASF GitHub Bot commented on FLINK-8490:
---

Github user lishim commented on a diff in the pull request:

https://github.com/apache/flink/pull/5346#discussion_r163596501
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 ---
@@ -365,6 +386,31 @@ public String call(String s) {
}
}
 
+   public static List 
buildDockerParameters(Option dockerParameters) {
+   if (dockerParameters.isEmpty()) {
+   return Collections.emptyList();
+   } else {
+   String[] dockerParameterSpecifications = 
dockerParameters.get().split(",");
+
+   List parameters = new 
ArrayList<>(dockerParameterSpecifications.length);
+
+   for (String dockerParameterSpecification : 
dockerParameterSpecifications) {
+   if 
(!dockerParameterSpecification.trim().isEmpty()) {
+   String[] match = 
dockerParameterSpecification.split("=", 2);
--- End diff --

Some Docker parameters are key=value pairs themselves, so key2=val1=val3 is 
actually valid specification.  I added a comment to clarify this, and included 
a test case.


> Allow custom docker parameters for docker tasks on Mesos
> 
>
> Key: FLINK-8490
> URL: https://issues.apache.org/jira/browse/FLINK-8490
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Reporter: Jörg Schad
>Priority: Major
>
> It would be great to pass custom parameters to Mesos when using the Docker 
> Containerizer.
> This could be similar to this spark example: 
> `spark.mesos.executor.docker.parameters privileged=true`
>  
> Originally brought up here: 
> https://stackoverflow.com/questions/48393153/passing-custom-parameters-to-docker-when-running-flink-on-mesos-marathon?noredirect=1#comment83777480_48393153



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8490) Allow custom docker parameters for docker tasks on Mesos

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337839#comment-16337839
 ] 

ASF GitHub Bot commented on FLINK-8490:
---

Github user lishim commented on a diff in the pull request:

https://github.com/apache/flink/pull/5346#discussion_r163596583
  
--- Diff: 
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
 ---
@@ -84,6 +84,35 @@ public void testContainerVolumes() throws Exception {
assertEquals(Protos.Volume.Mode.RO, 
params.containerVolumes().get(0).getMode());
}
 
+   @Test
+   public void testContainerDockerParameter() throws Exception {
+   Configuration config = new Configuration();
+   
config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS,
 "testKey=testValue");
+
+   MesosTaskManagerParameters params = 
MesosTaskManagerParameters.create(config);
+   assertEquals(params.dockerParameters().get(0).getKey(), 
"testKey");
--- End diff --

Added a size check.


> Allow custom docker parameters for docker tasks on Mesos
> 
>
> Key: FLINK-8490
> URL: https://issues.apache.org/jira/browse/FLINK-8490
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Reporter: Jörg Schad
>Priority: Major
>
> It would be great to pass custom parameters to Mesos when using the Docker 
> Containerizer.
> This could be similar to this spark example: 
> `spark.mesos.executor.docker.parameters privileged=true`
>  
> Originally brought up here: 
> https://stackoverflow.com/questions/48393153/passing-custom-parameters-to-docker-when-running-flink-on-mesos-marathon?noredirect=1#comment83777480_48393153



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5346: [FLINK-8490] [mesos] Allow custom docker parameter...

2018-01-24 Thread lishim
Github user lishim commented on a diff in the pull request:

https://github.com/apache/flink/pull/5346#discussion_r163596501
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 ---
@@ -365,6 +386,31 @@ public String call(String s) {
}
}
 
+   public static List 
buildDockerParameters(Option dockerParameters) {
+   if (dockerParameters.isEmpty()) {
+   return Collections.emptyList();
+   } else {
+   String[] dockerParameterSpecifications = 
dockerParameters.get().split(",");
+
+   List parameters = new 
ArrayList<>(dockerParameterSpecifications.length);
+
+   for (String dockerParameterSpecification : 
dockerParameterSpecifications) {
+   if 
(!dockerParameterSpecification.trim().isEmpty()) {
+   String[] match = 
dockerParameterSpecification.split("=", 2);
--- End diff --

Some Docker parameters are key=value pairs themselves, so key2=val1=val3 is 
actually valid specification.  I added a comment to clarify this, and included 
a test case.


---


[jira] [Commented] (FLINK-8490) Allow custom docker parameters for docker tasks on Mesos

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337834#comment-16337834
 ] 

ASF GitHub Bot commented on FLINK-8490:
---

Github user lishim commented on a diff in the pull request:

https://github.com/apache/flink/pull/5346#discussion_r163595893
  
--- Diff: docs/ops/deployment/mesos.md ---
@@ -264,6 +264,8 @@ May be set to -1 to disable this feature.
 
 `mesos.resourcemanager.tasks.container.volumes`: A comma separated list of 
`[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional 
volumes into your container. (**NO DEFAULT**)
 
+`mesos.resourcemanager.tasks.container.docker.parameters`: Custom 
parameters to be passed into docker run command when using the docker 
containerizer.  Comma separated list of key=value pairs. (**NO DEFAULT**)
--- End diff --

Fixed


> Allow custom docker parameters for docker tasks on Mesos
> 
>
> Key: FLINK-8490
> URL: https://issues.apache.org/jira/browse/FLINK-8490
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Reporter: Jörg Schad
>Priority: Major
>
> It would be great to pass custom parameters to Mesos when using the Docker 
> Containerizer.
> This could be similar to this spark example: 
> `spark.mesos.executor.docker.parameters privileged=true`
>  
> Originally brought up here: 
> https://stackoverflow.com/questions/48393153/passing-custom-parameters-to-docker-when-running-flink-on-mesos-marathon?noredirect=1#comment83777480_48393153



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8490) Allow custom docker parameters for docker tasks on Mesos

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337832#comment-16337832
 ] 

ASF GitHub Bot commented on FLINK-8490:
---

Github user lishim commented on a diff in the pull request:

https://github.com/apache/flink/pull/5346#discussion_r163595840
  
--- Diff: docs/ops/config.md ---
@@ -508,6 +508,8 @@ May be set to -1 to disable this feature.
 
 - `mesos.resourcemanager.tasks.container.volumes`: A comma separated list 
of `[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional 
volumes into your container. (**NO DEFAULT**)
 
+- `mesos.resourcemanager.tasks.container.docker.parameters`: Custom 
parameters to be passed into docker run command when using the docker 
containerizer.  Comma separated list of key=value pairs. (**NO DEFAULT**)
--- End diff --

Fixed


> Allow custom docker parameters for docker tasks on Mesos
> 
>
> Key: FLINK-8490
> URL: https://issues.apache.org/jira/browse/FLINK-8490
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Reporter: Jörg Schad
>Priority: Major
>
> It would be great to pass custom parameters to Mesos when using the Docker 
> Containerizer.
> This could be similar to this spark example: 
> `spark.mesos.executor.docker.parameters privileged=true`
>  
> Originally brought up here: 
> https://stackoverflow.com/questions/48393153/passing-custom-parameters-to-docker-when-running-flink-on-mesos-marathon?noredirect=1#comment83777480_48393153



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5346: [FLINK-8490] [mesos] Allow custom docker parameter...

2018-01-24 Thread lishim
Github user lishim commented on a diff in the pull request:

https://github.com/apache/flink/pull/5346#discussion_r163595840
  
--- Diff: docs/ops/config.md ---
@@ -508,6 +508,8 @@ May be set to -1 to disable this feature.
 
 - `mesos.resourcemanager.tasks.container.volumes`: A comma separated list 
of `[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional 
volumes into your container. (**NO DEFAULT**)
 
+- `mesos.resourcemanager.tasks.container.docker.parameters`: Custom 
parameters to be passed into docker run command when using the docker 
containerizer.  Comma separated list of key=value pairs. (**NO DEFAULT**)
--- End diff --

Fixed


---


[GitHub] flink pull request #5346: [FLINK-8490] [mesos] Allow custom docker parameter...

2018-01-24 Thread lishim
Github user lishim commented on a diff in the pull request:

https://github.com/apache/flink/pull/5346#discussion_r163595893
  
--- Diff: docs/ops/deployment/mesos.md ---
@@ -264,6 +264,8 @@ May be set to -1 to disable this feature.
 
 `mesos.resourcemanager.tasks.container.volumes`: A comma separated list of 
`[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional 
volumes into your container. (**NO DEFAULT**)
 
+`mesos.resourcemanager.tasks.container.docker.parameters`: Custom 
parameters to be passed into docker run command when using the docker 
containerizer.  Comma separated list of key=value pairs. (**NO DEFAULT**)
--- End diff --

Fixed


---


[jira] [Commented] (FLINK-8093) flink job fail because of kafka producer create fail of "javax.management.InstanceAlreadyExistsException"

2018-01-24 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337794#comment-16337794
 ] 

Aljoscha Krettek commented on FLINK-8093:
-

So this seems more like a Kafka bug than a Flink bug. [~tzulitai] Do you think 
we should mitigate this by setting a client.id ourselves when we detect that 
the user configuration doesn't have one? I'm not sure how we could ensure a 
unique ID, though.

> flink job fail because of kafka producer create fail of 
> "javax.management.InstanceAlreadyExistsException"
> -
>
> Key: FLINK-8093
> URL: https://issues.apache.org/jira/browse/FLINK-8093
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
> Environment: flink 1.3.2, kafka 0.9.1
>Reporter: dongtingting
>Priority: Critical
>
> one taskmanager has multiple taskslot, one task fail because of create 
> kafkaProducer fail,the reason for create kafkaProducer fail is 
> “javax.management.InstanceAlreadyExistsException: 
> kafka.producer:type=producer-metrics,client-id=producer-3”。 the detail trace 
> is :
> 2017-11-04 19:41:23,281 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Filter -> Map -> Filter -> Sink: 
> dp_client_**_log (7/80) (99551f3f892232d7df5eb9060fa9940c) switched from 
> RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
> at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:181)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getKafkaProducer(FlinkKafkaProducerBase.java:202)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:212)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.KafkaException: Error registering mbean 
> kafka.producer:type=producer-metrics,client-id=producer-3
> at 
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159)
> at 
> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288)
> at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:255)
> at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:239)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.registerMetrics(RecordAccumulator.java:137)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.(RecordAccumulator.java:111)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:261)
> ... 9 more
> Caused by: javax.management.InstanceAlreadyExistsException: 
> kafka.producer:type=producer-metrics,client-id=producer-3
> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
> at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
> at 
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157)
> ... 16 more
> I doubt that task in different taskslot of one taskmanager use different 
> classloader, and taskid may be  the same in one process。 So this lead to 
> create kafkaProducer fail in one taskManager。 
> Does anybody encountered the same problem? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7129) Support dynamically changing CEP patterns

2018-01-24 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7129:

Summary: Support dynamically changing CEP patterns  (was: Dynamically 
changing patterns)

> Support dynamically changing CEP patterns
> -
>
> Key: FLINK-7129
> URL: https://issues.apache.org/jira/browse/FLINK-7129
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> An umbrella task for introducing mechanism for injecting patterns through 
> coStream



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8407) Setting the parallelism after a partitioning operation should be forbidden

2018-01-24 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337780#comment-16337780
 ] 

Aljoscha Krettek commented on FLINK-8407:
-

The culprit is actually this piece of code: 
https://github.com/apache/flink/blob/e8edbcafae6561423d0f8f321d2b64805f09357b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L409

Setting a connection type, i.e. a partitioning or whatnot, should not return a 
{{SingleOutputStreamOperator}}. In this case the check in {{DataStream.scala}} 
will also work properly.

> Setting the parallelism after a partitioning operation should be forbidden
> --
>
> Key: FLINK-8407
> URL: https://issues.apache.org/jira/browse/FLINK-8407
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Major
>
> Partitioning operations ({{shuffle}}, {{rescale}}, etc.) for a {{DataStream}} 
> create new {{DataStreams}}, which allow the users to set parallelisms for 
> them. However, the {{PartitionTransformations}} in these returned 
> {{DataStreams}} will only add virtual nodes, whose parallelisms could not be 
> specified, in the execution graph. We should forbid users to set the 
> parallelism after a partitioning operation since they won't actually work. 
> Also the corresponding documents should be updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8492) [FLINK-8492][table] Fix calc cost bug

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337710#comment-16337710
 ] 

ASF GitHub Bot commented on FLINK-8492:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5347
  
Hi, @fhueske @twalthr ,

To solve the problem, we need to make the `estimateRowCount` in 
`CommonCalc` more accurate. I will update the pr tomorrow. Anyway, cost model 
can't solve the problem deterministically. The cost is just an estimate, so 
multi-cals will exist under some circumstances. 

As for correlate, to make sure unsupported exception won't be thrown, i 
will double check whether multi calcs are exist, and merge the calcs if need 
to. 

What do you think ? Thanks, Hequn.


> [FLINK-8492][table] Fix calc cost bug
> -
>
> Key: FLINK-8492
> URL: https://issues.apache.org/jira/browse/FLINK-8492
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Considering the following test, unsupported exception will be thrown due to 
> multi calc existing between correlate and TableFunctionScan.
> {code:java}
> // code placeholder
> @Test
> def testCrossJoinWithMultiFilter(): Unit = {
>   val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
>   val func0 = new TableFunc0
>   val result = t
> .join(func0('c) as('d, 'e))
> .select('c, 'd, 'e)
> .where('e > 10)
> .where('e > 20)
> .select('c, 'd)
> .toAppendStream[Row]
>   result.addSink(new StreamITCase.StringSink[Row])
>   env.execute()
>   val expected = mutable.MutableList("Jack#22,Jack,22", "Anna#44,Anna,44")
>   assertEquals(expected.sorted, StreamITCase.testResults.sorted)
> }
> {code}
> I can see two options to fix this problem:
>  # Adapt calcite OptRule to merge the continuous calc.
>  # Merge multi calc in correlate convert rule.
> I prefer the second one, not only it is easy to implement but also i think 
> with or without an optimize rule should not influence flink functionality. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5347: [FLINK-8492][table] Fix calc cost bug

2018-01-24 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5347
  
Hi, @fhueske @twalthr ,

To solve the problem, we need to make the `estimateRowCount` in 
`CommonCalc` more accurate. I will update the pr tomorrow. Anyway, cost model 
can't solve the problem deterministically. The cost is just an estimate, so 
multi-cals will exist under some circumstances. 

As for correlate, to make sure unsupported exception won't be thrown, i 
will double check whether multi calcs are exist, and merge the calcs if need 
to. 

What do you think ? Thanks, Hequn.


---


[GitHub] flink pull request #5309: [FLINK-8450] [flip6] Make JobMaster/DispatcherGate...

2018-01-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5309#discussion_r163571196
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 ---
@@ -228,10 +246,10 @@ public void testConcurrentAccess() throws Exception {
Collection allExecutionGraphs = 
allExecutionGraphFutures.get();
--- End diff --

Doesn't need to be changed but `ExecutorCompletionService` does something 
similar.


---


[jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337706#comment-16337706
 ] 

ASF GitHub Bot commented on FLINK-8450:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5309#discussion_r163571196
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 ---
@@ -228,10 +246,10 @@ public void testConcurrentAccess() throws Exception {
Collection allExecutionGraphs = 
allExecutionGraphFutures.get();
--- End diff --

Doesn't need to be changed but `ExecutorCompletionService` does something 
similar.


> Make JobMaster/DispatcherGateway#requestJob type safe
> -
>
> Key: FLINK-8450
> URL: https://issues.apache.org/jira/browse/FLINK-8450
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{RestfulGateway#requestJob}} returns a 
> {{CompletableFuture}}. Since {{AccessExecutionGraph}} 
> is non serializable it could fail if we execute this RPC from a remote 
> system. In order to make it typesafe we should change its signature to 
> {{SerializableExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337703#comment-16337703
 ] 

ASF GitHub Bot commented on FLINK-8450:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5309#discussion_r163546199
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 ---
@@ -30,62 +30,69 @@
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link ExecutionGraphCache}.
  */
 public class ExecutionGraphCacheTest extends TestLogger {
 
+   private static ArchivedExecutionGraph expectedExecutionGraph;
+   private static final JobID expectedJobId = new JobID();
+
+   @BeforeClass
+   public static void setup() {
+   expectedExecutionGraph = new 
ArchivedExecutionGraphBuilder().build();
+   }
+
/**
 * Tests that we can cache AccessExecutionGraphs over multiple accesses.
 */
@Test
public void testExecutionGraphCaching() throws Exception {
final Time timeout = Time.milliseconds(100L);
final Time timeToLive = Time.hours(1L);
-   final JobID jobId = new JobID();
-   final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
 
-   final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-   when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
+   final CountingRestfulGateway restfulGateway = 
createCountingRestfulGateway(expectedJobId, 
CompletableFuture.completedFuture(expectedExecutionGraph));
 
try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
-   CompletableFuture 
accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+   CompletableFuture 
accessExecutionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
-   assertEquals(accessExecutionGraph, 
accessExecutionGraphFuture.get());
+   assertEquals(expectedExecutionGraph, 
accessExecutionGraphFuture.get());
 
-   CompletableFuture 
accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+   CompletableFuture 
accessExecutionGraphFuture2 = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
-   assertEquals(accessExecutionGraph, 
accessExecutionGraphFuture2.get());
+   assertEquals(expectedExecutionGraph, 
accessExecutionGraphFuture2.get());
--- End diff --

You can reassign `accessExecutionGraphFuture` to avoid `2`. I don't think 
`2` is adding anything to the readability.


> Make JobMaster/DispatcherGateway#requestJob type safe
> -
>
>   

[jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337704#comment-16337704
 ] 

ASF GitHub Bot commented on FLINK-8450:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5309#discussion_r163549462
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -331,13 +331,13 @@ public void start() throws Exception {
}
 
@Override
-   public CompletableFuture requestJob(JobID jobId, 
Time timeout) {
+   public CompletableFuture requestJob(JobID 
jobId, Time timeout) {
--- End diff --

nit: The git commit message is wrong: 

*Let JobMasterGateway#requestJob and DispatcherGateway#requestJob return a 
CompletableFuture\*


> Make JobMaster/DispatcherGateway#requestJob type safe
> -
>
> Key: FLINK-8450
> URL: https://issues.apache.org/jira/browse/FLINK-8450
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{RestfulGateway#requestJob}} returns a 
> {{CompletableFuture}}. Since {{AccessExecutionGraph}} 
> is non serializable it could fail if we execute this RPC from a remote 
> system. In order to make it typesafe we should change its signature to 
> {{SerializableExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5309: [FLINK-8450] [flip6] Make JobMaster/DispatcherGate...

2018-01-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5309#discussion_r163570275
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 ---
@@ -199,12 +212,17 @@ public void testCacheEntryCleanup() throws Exception {
public void testConcurrentAccess() throws Exception {
final Time timeout = Time.milliseconds(100L);
final Time timeToLive = Time.hours(1L);
-   final JobID jobId = new JobID();
-
-   final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
 
-   final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-   when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
+   final AtomicInteger requestJobCalls = new AtomicInteger(0);
--- End diff --

Wouldn't it be easier to make `CountingRestfulGateway` thread safe, and 
reuse it here?


---


[jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337705#comment-16337705
 ] 

ASF GitHub Bot commented on FLINK-8450:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5309#discussion_r163570275
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 ---
@@ -199,12 +212,17 @@ public void testCacheEntryCleanup() throws Exception {
public void testConcurrentAccess() throws Exception {
final Time timeout = Time.milliseconds(100L);
final Time timeToLive = Time.hours(1L);
-   final JobID jobId = new JobID();
-
-   final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
 
-   final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-   when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
+   final AtomicInteger requestJobCalls = new AtomicInteger(0);
--- End diff --

Wouldn't it be easier to make `CountingRestfulGateway` thread safe, and 
reuse it here?


> Make JobMaster/DispatcherGateway#requestJob type safe
> -
>
> Key: FLINK-8450
> URL: https://issues.apache.org/jira/browse/FLINK-8450
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{RestfulGateway#requestJob}} returns a 
> {{CompletableFuture}}. Since {{AccessExecutionGraph}} 
> is non serializable it could fail if we execute this RPC from a remote 
> system. In order to make it typesafe we should change its signature to 
> {{SerializableExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5309: [FLINK-8450] [flip6] Make JobMaster/DispatcherGate...

2018-01-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5309#discussion_r163546199
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 ---
@@ -30,62 +30,69 @@
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link ExecutionGraphCache}.
  */
 public class ExecutionGraphCacheTest extends TestLogger {
 
+   private static ArchivedExecutionGraph expectedExecutionGraph;
+   private static final JobID expectedJobId = new JobID();
+
+   @BeforeClass
+   public static void setup() {
+   expectedExecutionGraph = new 
ArchivedExecutionGraphBuilder().build();
+   }
+
/**
 * Tests that we can cache AccessExecutionGraphs over multiple accesses.
 */
@Test
public void testExecutionGraphCaching() throws Exception {
final Time timeout = Time.milliseconds(100L);
final Time timeToLive = Time.hours(1L);
-   final JobID jobId = new JobID();
-   final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
 
-   final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-   when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
+   final CountingRestfulGateway restfulGateway = 
createCountingRestfulGateway(expectedJobId, 
CompletableFuture.completedFuture(expectedExecutionGraph));
 
try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
-   CompletableFuture 
accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+   CompletableFuture 
accessExecutionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
-   assertEquals(accessExecutionGraph, 
accessExecutionGraphFuture.get());
+   assertEquals(expectedExecutionGraph, 
accessExecutionGraphFuture.get());
 
-   CompletableFuture 
accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+   CompletableFuture 
accessExecutionGraphFuture2 = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
-   assertEquals(accessExecutionGraph, 
accessExecutionGraphFuture2.get());
+   assertEquals(expectedExecutionGraph, 
accessExecutionGraphFuture2.get());
--- End diff --

You can reassign `accessExecutionGraphFuture` to avoid `2`. I don't think 
`2` is adding anything to the readability.


---


[GitHub] flink pull request #5309: [FLINK-8450] [flip6] Make JobMaster/DispatcherGate...

2018-01-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5309#discussion_r163549462
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -331,13 +331,13 @@ public void start() throws Exception {
}
 
@Override
-   public CompletableFuture requestJob(JobID jobId, 
Time timeout) {
+   public CompletableFuture requestJob(JobID 
jobId, Time timeout) {
--- End diff --

nit: The git commit message is wrong: 

*Let JobMasterGateway#requestJob and DispatcherGateway#requestJob return a 
CompletableFuture\*


---


[jira] [Created] (FLINK-8504) TaskExecutor does not properly deregisters JobManager from JobLeaderService

2018-01-24 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8504:


 Summary: TaskExecutor does not properly deregisters JobManager 
from JobLeaderService
 Key: FLINK-8504
 URL: https://issues.apache.org/jira/browse/FLINK-8504
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
 Fix For: 1.5.0


The {{TaskExecutor}} should deregister jobs from the {{JobLeaderService}} once 
it no longer holds slots for this job. The problem is that before unregistering 
the job from the {{JobLeaderService}} in {{TaskExecutor#freeInternal}}, the 
actual slot is freed which also removes the {{JobID}} from the slot. Therefore, 
we lose the information to which job the slot belonged. An easy solution would 
be to return a {{SlotInformation}} object instead of the {{TaskSlot}} from 
{{TaskSlotTable#freeSlot}} which contains the respective information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337672#comment-16337672
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163544619
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 ---
@@ -250,19 +255,15 @@ public void testRegainLeadership() throws Exception {
private volatile boolean finishedByOther;
 
@Override
-   public void jobFinished(JobResult result) {
+   public void 
jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
checkArgument(!isJobFinished(), "job finished already");
checkArgument(!isJobFailed(), "job failed already");
 
-   this.result = result;
-   }
-
-   @Override
-   public void jobFailed(JobResult result) {
-   checkArgument(!isJobFinished(), "job finished already");
-   checkArgument(!isJobFailed(), "job failed already");
+   this.result = JobResult.createFrom(executionGraph);
 
-   this.failedCause = 
result.getSerializedThrowable().get();
+   if (!result.isSuccess()) {
--- End diff --

Maybe 
```
result.getSerializedThrowable()
.ifPresent(serializedThrowable -> failedCause = 
serializedThrowable);
```
to avoid IntelliJ's inspection warning.


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337670#comment-16337670
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163540049
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -1701,40 +1705,4 @@ void notifyExecutionChange(
}
}
}
-
-   @Override
-   public ArchivedExecutionGraph archive() {
--- End diff --

Why is it better to move this logic to `ArchivedExecutionGraph`?


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337669#comment-16337669
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163536073
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
 ---
@@ -173,4 +173,23 @@ public static String getResultsFormatted(Map map) {
return accumulators;
}
 
+   /**
+* Serializes the given accumulators.
+*
+* @param accumulators to serialize
+* @return Map of serialized accumulators
+* @throws IOException if an accumulator could not be serialized
+*/
+   public static Map 
serializeAccumulators(Map accumulators) throws 
IOException {
--- End diff --

I think you are not using this method anywhere.


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337671#comment-16337671
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163538231
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -752,16 +751,21 @@ public Executor getFutureExecutor() {
/**
 * Gets a serialized accumulator map.
 * @return The accumulator map with serialized accumulator values.
-* @throws IOException
 */
@Override
-   public Map getAccumulatorsSerialized() 
throws IOException {
+   public Map getAccumulatorsSerialized() 
{
 
Map accumulatorMap = 
aggregateUserAccumulators();
 
Map result = new 
HashMap<>(accumulatorMap.size());
for (Map.Entry entry : 
accumulatorMap.entrySet()) {
-   result.put(entry.getKey(), new 
SerializedValue<>(entry.getValue().getLocalValue()));
+
+   try {
+   final SerializedValue serializedValue = 
new SerializedValue<>(entry.getValue().getLocalValue());
+   result.put(entry.getKey(), serializedValue);
+   } catch (IOException ioe) {
+   LOG.info("Could not serialize accumulator " + 
entry.getKey() + '.', ioe);
--- End diff --

Why is it acceptable to change the behavior, i.e., to ignore the exception. 
It is not even logged on `ERROR` level.
Also:
```
LOG.info("Could not serialize accumulator {}.", entry.getKey(), ioe);
```


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163538231
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -752,16 +751,21 @@ public Executor getFutureExecutor() {
/**
 * Gets a serialized accumulator map.
 * @return The accumulator map with serialized accumulator values.
-* @throws IOException
 */
@Override
-   public Map getAccumulatorsSerialized() 
throws IOException {
+   public Map getAccumulatorsSerialized() 
{
 
Map accumulatorMap = 
aggregateUserAccumulators();
 
Map result = new 
HashMap<>(accumulatorMap.size());
for (Map.Entry entry : 
accumulatorMap.entrySet()) {
-   result.put(entry.getKey(), new 
SerializedValue<>(entry.getValue().getLocalValue()));
+
+   try {
+   final SerializedValue serializedValue = 
new SerializedValue<>(entry.getValue().getLocalValue());
+   result.put(entry.getKey(), serializedValue);
+   } catch (IOException ioe) {
+   LOG.info("Could not serialize accumulator " + 
entry.getKey() + '.', ioe);
--- End diff --

Why is it acceptable to change the behavior, i.e., to ignore the exception. 
It is not even logged on `ERROR` level.
Also:
```
LOG.info("Could not serialize accumulator {}.", entry.getKey(), ioe);
```


---


[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163536073
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
 ---
@@ -173,4 +173,23 @@ public static String getResultsFormatted(Map map) {
return accumulators;
}
 
+   /**
+* Serializes the given accumulators.
+*
+* @param accumulators to serialize
+* @return Map of serialized accumulators
+* @throws IOException if an accumulator could not be serialized
+*/
+   public static Map 
serializeAccumulators(Map accumulators) throws 
IOException {
--- End diff --

I think you are not using this method anywhere.


---


[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163540049
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -1701,40 +1705,4 @@ void notifyExecutionChange(
}
}
}
-
-   @Override
-   public ArchivedExecutionGraph archive() {
--- End diff --

Why is it better to move this logic to `ArchivedExecutionGraph`?


---


[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163544619
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 ---
@@ -250,19 +255,15 @@ public void testRegainLeadership() throws Exception {
private volatile boolean finishedByOther;
 
@Override
-   public void jobFinished(JobResult result) {
+   public void 
jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
checkArgument(!isJobFinished(), "job finished already");
checkArgument(!isJobFailed(), "job failed already");
 
-   this.result = result;
-   }
-
-   @Override
-   public void jobFailed(JobResult result) {
-   checkArgument(!isJobFinished(), "job finished already");
-   checkArgument(!isJobFailed(), "job failed already");
+   this.result = JobResult.createFrom(executionGraph);
 
-   this.failedCause = 
result.getSerializedThrowable().get();
+   if (!result.isSuccess()) {
--- End diff --

Maybe 
```
result.getSerializedThrowable()
.ifPresent(serializedThrowable -> failedCause = 
serializedThrowable);
```
to avoid IntelliJ's inspection warning.


---


[jira] [Created] (FLINK-8503) Port TaskManagerLogHandler to new REST endpoint

2018-01-24 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8503:


 Summary: Port TaskManagerLogHandler to new REST endpoint
 Key: FLINK-8503
 URL: https://issues.apache.org/jira/browse/FLINK-8503
 Project: Flink
  Issue Type: Sub-task
  Components: REST
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


In order to serve {{TaskExecutor}} log stdout files, we have to port the 
{{TaskManagerLogHandler}} to the new REST endpoint.

In order to properly support serving of files, I propose to introduce an 
{{AbstractHandler}} which takes a typed request but has not typed response. 
That way we can easily output the file contents.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8146) Potential resource leak in PythonPlanBinder#unzipPythonLibrary

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337621#comment-16337621
 ] 

ASF GitHub Bot commented on FLINK-8146:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5349
  
Looks good to me, +1


> Potential resource leak in PythonPlanBinder#unzipPythonLibrary
> --
>
> Key: FLINK-8146
> URL: https://issues.apache.org/jira/browse/FLINK-8146
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Ted Yu
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0, 1.4.1
>
>
> {code}
> while (entry != null) {
> ...
> }
> zis.closeEntry();
> {code}
> Looking at the catch block inside the loop, it seems the intention is to 
> close zis upon getting exception.
> zis.close() should be called outside the loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8466) ErrorInfo needs to hold Exception as SerializedThrowable

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337618#comment-16337618
 ] 

ASF GitHub Bot commented on FLINK-8466:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5348
  
Thanks for the review.

Merging this to `master` and `release-1.4`.


> ErrorInfo needs to hold Exception as SerializedThrowable
> 
>
> Key: FLINK-8466
> URL: https://issues.apache.org/jira/browse/FLINK-8466
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: Screen Shot 2018-01-20 at 1.48.33 PM.png
>
>
> MemoryArchivist holding on to last thrown exception prevents the user code 
> classloader from being garbage collected.
> MemoryArchivist holds on to any number of ArchivedExecutionGraph's. These 
> ArchivedExecutionGraph instances contain a failureCause field of type 
> ErrorInfo that wrap the exception in case one was thrown to terminate the job
> This exception class will more often than not have been loaded by a user code 
> classloader, and as long as the MemoryArchivist holds on to this exception, 
> the JVM won't be able to reclaim the resources held by this classloader



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5349: [FLINK-8146][py] Properly close ZipInputStream

2018-01-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5349
  
Looks good to me, +1


---


[GitHub] flink issue #5348: [FLINK-8466] [runtime] Make sure ErrorInfo references no ...

2018-01-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5348
  
Thanks for the review.

Merging this to `master` and `release-1.4`.


---


[jira] [Commented] (FLINK-6464) Metric name is not stable

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337608#comment-16337608
 ] 

ASF GitHub Bot commented on FLINK-6464:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5332
  
@aljoscha Done, we now include the function name instead. Is a bit more 
complicated that I had hoped though...


> Metric name is not stable
> -
>
> Key: FLINK-6464
> URL: https://issues.apache.org/jira/browse/FLINK-6464
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Metrics
>Affects Versions: 1.2.0
>Reporter: Andrey
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> Currently according to the documentation 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html)
>  operator metrics constructed using the following pattern:
> , 
> For some operators, "operator_name" could contain default implementation of 
> toString method. For example:
> {code}
> TriggerWindow(TumblingProcessingTimeWindows(3000), 
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@c65792d4},
>  xxx.Trigger@665fe457, WindowedStream.apply(WindowedStream.java:521)) -> 
> Sink: Unnamed
> {code}
> The part "@c65792d4" will be changed every time job is restarted/cancelled. 
> As a consequence it's not possible to store metrics for a long time.
> Expected:
> * ensure all operators return human readable, non-default names OR
> * change the way TriggerWindow generates it's name.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5332: [FLINK-6464][streaming] Stabilize default window operator...

2018-01-24 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5332
  
@aljoscha Done, we now include the function name instead. Is a bit more 
complicated that I had hoped though...


---


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-24 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337598#comment-16337598
 ] 

Aljoscha Krettek commented on FLINK-8500:
-

The timestamp of Kafka messages is attached to elements. You can, for example, 
access that using {{Context.timestamp()}} when using a {{ProcessFunction}}.

Is that good enough for your purposes?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8498) GetExecutionPlan fails with IllegalArgumentException in Comparator

2018-01-24 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8498:

Priority: Blocker  (was: Major)

> GetExecutionPlan fails with IllegalArgumentException in Comparator
> --
>
> Key: FLINK-8498
> URL: https://issues.apache.org/jira/browse/FLINK-8498
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
>Reporter: Julian Bauß
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> Hello everybody,
> we're currently encountering an exception while generating an ExecutionGraph 
> JSON in Flink v1.3.2.
> Actually executing the job does not cause an exception and everything works 
> as inteded.
>  
> This happens since we started adding side-outputs to many of our operators.
>  
> Below is the stacktrace. The problem seems to be a contract violation in the 
> comparator implementation.
>  
> {code:java}
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>     at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
>     at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>     at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
>     at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
>     at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
>     at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
>     at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
>     at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>     at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
>     at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
> Caused by: java.lang.RuntimeException: JSON plan creation failed
>     at 
> org.apache.flink.streaming.api.graph.StreamGraph.getStreamingPlanAsJSON(StreamGraph.java:668)
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1538)
>     at com.example.Main.main(Main.java:262)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>     ... 13 more
> Caused by: java.lang.IllegalArgumentException: Comparison method violates its 
> general contract!
>     at java.util.TimSort.mergeLo(TimSort.java:777)
>     at java.util.TimSort.mergeAt(TimSort.java:514)
>     at java.util.TimSort.mergeCollapse(TimSort.java:441)
>     at java.util.TimSort.sort(TimSort.java:245)
>     at java.util.Arrays.sort(Arrays.java:1512)
>     at java.util.ArrayList.sort(ArrayList.java:1454)
>     at java.util.Collections.sort(Collections.java:175)
>     at 
> org.apache.flink.streaming.api.graph.JSONGenerator.getJSON(JSONGenerator.java:60)
>     at 
> org.apache.flink.streaming.api.graph.StreamGraph.getStreamingPlanAsJSON(StreamGraph.java:665)
>     ... 21 more
> {code}
>  
>  
> Best Regards,
>  
> Julian



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8498) GetExecutionPlan fails with IllegalArgumentException in Comparator

2018-01-24 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8498:

Fix Version/s: 1.4.1
   1.5.0

> GetExecutionPlan fails with IllegalArgumentException in Comparator
> --
>
> Key: FLINK-8498
> URL: https://issues.apache.org/jira/browse/FLINK-8498
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
>Reporter: Julian Bauß
>Priority: Major
> Fix For: 1.5.0, 1.4.1
>
>
> Hello everybody,
> we're currently encountering an exception while generating an ExecutionGraph 
> JSON in Flink v1.3.2.
> Actually executing the job does not cause an exception and everything works 
> as inteded.
>  
> This happens since we started adding side-outputs to many of our operators.
>  
> Below is the stacktrace. The problem seems to be a contract violation in the 
> comparator implementation.
>  
> {code:java}
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>     at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
>     at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>     at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
>     at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
>     at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
>     at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
>     at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
>     at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>     at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
>     at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
> Caused by: java.lang.RuntimeException: JSON plan creation failed
>     at 
> org.apache.flink.streaming.api.graph.StreamGraph.getStreamingPlanAsJSON(StreamGraph.java:668)
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1538)
>     at com.example.Main.main(Main.java:262)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>     ... 13 more
> Caused by: java.lang.IllegalArgumentException: Comparison method violates its 
> general contract!
>     at java.util.TimSort.mergeLo(TimSort.java:777)
>     at java.util.TimSort.mergeAt(TimSort.java:514)
>     at java.util.TimSort.mergeCollapse(TimSort.java:441)
>     at java.util.TimSort.sort(TimSort.java:245)
>     at java.util.Arrays.sort(Arrays.java:1512)
>     at java.util.ArrayList.sort(ArrayList.java:1454)
>     at java.util.Collections.sort(Collections.java:175)
>     at 
> org.apache.flink.streaming.api.graph.JSONGenerator.getJSON(JSONGenerator.java:60)
>     at 
> org.apache.flink.streaming.api.graph.StreamGraph.getStreamingPlanAsJSON(StreamGraph.java:665)
>     ... 21 more
> {code}
>  
>  
> Best Regards,
>  
> Julian



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8477) Add api to support for user to skip the first incomplete window data

2018-01-24 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337579#comment-16337579
 ] 

Aljoscha Krettek commented on FLINK-8477:
-

Could you please provide some details about what this is about?

> Add api to support for user to skip the first incomplete window data
> 
>
> Key: FLINK-8477
> URL: https://issues.apache.org/jira/browse/FLINK-8477
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
> Fix For: 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5309: [FLINK-8450] [flip6] Make JobMaster/DispatcherGateway#req...

2018-01-24 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5309
  
This PR is based on #5308 


---


[jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337580#comment-16337580
 ] 

ASF GitHub Bot commented on FLINK-8450:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5309
  
This PR is based on #5308 


> Make JobMaster/DispatcherGateway#requestJob type safe
> -
>
> Key: FLINK-8450
> URL: https://issues.apache.org/jira/browse/FLINK-8450
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{RestfulGateway#requestJob}} returns a 
> {{CompletableFuture}}. Since {{AccessExecutionGraph}} 
> is non serializable it could fail if we execute this RPC from a remote 
> system. In order to make it typesafe we should change its signature to 
> {{SerializableExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8472) Extend migration tests for Flink 1.4

2018-01-24 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8472:

Priority: Blocker  (was: Critical)

> Extend migration tests for Flink 1.4
> 
>
> Key: FLINK-8472
> URL: https://issues.apache.org/jira/browse/FLINK-8472
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> The following migration tests should be extended to cover migrating Flink 
> jobs with a 1.4 savepoint.
>  * {{WindowOperatorMigrationTest}}
>  * {{CEPMigrationTest}}
>  * {{StatefulJobSavepointMigrationTestITCase}}
>  * {{FlinkKinesisConsumerMigrationTest}}
>  * {{FlinkKafkaConsumerBaseMigrationTest}}
>  * {{ContinuousFileProcessingMigrationTest}}
>  * {{BucketingSinkMigrationTest}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8470) DelayTrigger and DelayAndCountTrigger in Flink Streaming Window API

2018-01-24 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337578#comment-16337578
 ] 

Aljoscha Krettek commented on FLINK-8470:
-

Hi,
what is the motivation for these types of Triggers?

> DelayTrigger and DelayAndCountTrigger in Flink Streaming Window API
> ---
>
> Key: FLINK-8470
> URL: https://issues.apache.org/jira/browse/FLINK-8470
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 2.0.0
>Reporter: Vijay Kansal
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> In Flink streaming API, we do not have any in-built window trigger(s) 
> available for the below use cases:
>  1. DelayTrigger: Window function should trigger in case the 1st element 
> belonging to this window arrived more than maxDelay(ms) before the current 
> processing time.
> 2. DelayAndCountTrigger: Window function should trigger in case the 1st 
> element belonging to this window arrived more than maxDelay(ms) before the 
> current processing time or there are more than maxCount elements in the 
> window.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8468) Allow RMQSink to be extended

2018-01-24 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337576#comment-16337576
 ] 

Aljoscha Krettek commented on FLINK-8468:
-

Please open a PR for this change if you want. 

> Allow RMQSink to be extended 
> -
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
>  Issue Type: Improvement
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>
> We extend RMQSink to get a variable routingkey using fields from the "value" 
> and also to have a configurable exchange to publish to. But we got problems 
> with the class attribute "logFailuresOnly" of RMQSink which is private 
> (should be protected scope).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8468) Allow RMQSink to be extended

2018-01-24 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8468:

Summary: Allow RMQSink to be extended   (was: Allow the sink to be extended 
)

> Allow RMQSink to be extended 
> -
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
>  Issue Type: Improvement
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>
> We extend RMQSink to get a variable routingkey using fields from the "value" 
> and also to have a configurable exchange to publish to. But we got problems 
> with the class attribute "logFailuresOnly" of RMQSink which is private 
> (should be protected scope).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8445) hostname used in metric names for taskmanager and jobmanager are not consistent

2018-01-24 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8445.
---

> hostname used in metric names for taskmanager and jobmanager are not 
> consistent
> ---
>
> Key: FLINK-8445
> URL: https://issues.apache.org/jira/browse/FLINK-8445
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.3.1, 1.4.0
> Environment: I think that this problem is present for metrics 
> reporting enabled configurations that include '' as part of the scope 
> for the metrics.  For example, using Graphite reporting configuration in 
> flink-conf.yaml below:
> {code:java}
> metrics.scope.jm: flink..jobmanager
> metrics.scope.jm.job: flink..jobmanager.
> metrics.scope.tm: flink..taskmanager
> metrics.scope.tm.job: flink..taskmanager.
> metrics.scope.task: 
> flink..taskmanager...
> metrics.scope.operator: 
> flink..taskmanager...
> metrics.reporters: graphite
> metrics.reporter.graphite.class: 
> org.apache.flink.metrics.graphite.GraphiteReporter
> ...{code}
>Reporter: Chris Thomson
>Priority: Minor
>
> Enabled Flink metrics reporting using Graphite using system scopes that 
> contain '' for both the job manager and task manager.  The resulting 
> metrics reported to Graphite use two different representations for ''.
> For *Task Manager metrics* it uses the *short hostname* (without the DNS 
> domain).  This is a result of logic in 
> org.apache.flink.runtime.taskmanager.TaskManagerLocation constructor that 
> tries to extract the short hostname from the fully qualified domain name 
> looked up from InetAddress.getCanonicalHostName().
> For *Job Manager metrics* it uses the *fully qualified domain name* (with the 
> DNS domain). This is a result of there being no logic in 
> org.apache.flink.runtime.jobmanager.JobManagerRunner or 
> org.apache.flink.runtime.rpc.akka.AkkaRpcService to perform equivalent 
> normalization of the fully qualified domain name down to the short hostname.
> Ideally the '' placeholders in the system scopes for the job manager 
> and task manager related metrics would be replaced with a consistent value 
> (either the short hostname or the fully qualified domain name).  Even better 
> if there was a configuration option to decide which one should be used for 
> metric name generation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >