[jira] [Resolved] (FLINK-6563) Expose time indicator attributes in the KafkaTableSource

2017-11-01 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske resolved FLINK-6563.
--
Resolution: Implemented
  Assignee: Fabian Hueske  (was: Haohui Mai)

Implemented for 1.4.0 with 0e92b6632f35b69c62d7747f1cbaa3ee207fb235

> Expose time indicator attributes in the KafkaTableSource
> 
>
> Key: FLINK-6563
> URL: https://issues.apache.org/jira/browse/FLINK-6563
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.4.0
>
>
> This is a follow up for FLINK-5884.
> After FLINK-5884 requires the {{TableSource}} interfaces to expose the 
> processing time and the event time for the data stream. This jira proposes to 
> expose these two information in the Kafka table source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4931: [FLINK-7420] Move all Avro code to flink-avro

2017-11-01 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4931

[FLINK-7420] Move all Avro code to flink-avro

This hides interaction with Avro begin the interface `AvroUtils` that has 
two implementations: 1) default Avro utils, which are used when no Avro is 
present and which throws exceptions in case Avro is required. 2) proper Avro 
utils which are dynamically loaded when the `flink-avro` module is "in the 
classpath" which does proper Avro things.

R: @StephanEwen 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink FLINK-7420

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4931.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4931


commit b74f84ee3d5795e30c92a2e7393fc25309c450e3
Author: twalthr 
Date:   2017-08-16T10:17:00Z

[FLINK-7420] [core] Move all Avro code to flink-avro

commit 3215a4b939f536b4a1130cad9e5106867c071789
Author: Aljoscha Krettek 
Date:   2017-10-25T15:38:24Z

[FLINK-7420] Replace GenericData.Array by dummy when reading TypeSerializers

This also adds a new test that verifies that we correctly register
Avro Serializers when they are present and modifies an existing test to
verify that we correctly register dummy classes.

commit 94f19afcb65b7ef0d200bb3c4d0d82b8422ba905
Author: Aljoscha Krettek 
Date:   2017-10-26T12:56:09Z

[FLINK-7420] Add Avro test-jar depdendency in Kafka modules

commit 1d73f296667d909ce0506ceeb722112fed978af3
Author: Aljoscha Krettek 
Date:   2017-10-30T09:19:56Z

[FLINK-7420] Fix TwitterExample.scala

It seems this has a transitive dependency on Jackson, which slightly
changed with the Avro reworking.

commit a7289e06641b3303611bf5a1a8a2bf4ef56ac994
Author: Aljoscha Krettek 
Date:   2017-10-30T14:02:18Z

[FLINK-7420] Abstract all Avro interaction behind AvroUtils

Before, we would try and dynamicall load Avro-related classes in several
places. Now, we only reflectively instantiate the right AvroUtils and
all other operations are methods on this.

The default AvroUtils throw exceptions with a helpful message for most
operations.




---


[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233758#comment-16233758
 ] 

ASF GitHub Bot commented on FLINK-7420:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4931

[FLINK-7420] Move all Avro code to flink-avro

This hides interaction with Avro begin the interface `AvroUtils` that has 
two implementations: 1) default Avro utils, which are used when no Avro is 
present and which throws exceptions in case Avro is required. 2) proper Avro 
utils which are dynamically loaded when the `flink-avro` module is "in the 
classpath" which does proper Avro things.

R: @StephanEwen 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink FLINK-7420

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4931.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4931


commit b74f84ee3d5795e30c92a2e7393fc25309c450e3
Author: twalthr 
Date:   2017-08-16T10:17:00Z

[FLINK-7420] [core] Move all Avro code to flink-avro

commit 3215a4b939f536b4a1130cad9e5106867c071789
Author: Aljoscha Krettek 
Date:   2017-10-25T15:38:24Z

[FLINK-7420] Replace GenericData.Array by dummy when reading TypeSerializers

This also adds a new test that verifies that we correctly register
Avro Serializers when they are present and modifies an existing test to
verify that we correctly register dummy classes.

commit 94f19afcb65b7ef0d200bb3c4d0d82b8422ba905
Author: Aljoscha Krettek 
Date:   2017-10-26T12:56:09Z

[FLINK-7420] Add Avro test-jar depdendency in Kafka modules

commit 1d73f296667d909ce0506ceeb722112fed978af3
Author: Aljoscha Krettek 
Date:   2017-10-30T09:19:56Z

[FLINK-7420] Fix TwitterExample.scala

It seems this has a transitive dependency on Jackson, which slightly
changed with the Avro reworking.

commit a7289e06641b3303611bf5a1a8a2bf4ef56ac994
Author: Aljoscha Krettek 
Date:   2017-10-30T14:02:18Z

[FLINK-7420] Abstract all Avro interaction behind AvroUtils

Before, we would try and dynamicall load Avro-related classes in several
places. Now, we only reflectively instantiate the right AvroUtils and
all other operations are methods on this.

The default AvroUtils throw exceptions with a helpful message for most
operations.




> Move all Avro code to flink-avro
> 
>
> Key: FLINK-7420
> URL: https://issues.apache.org/jira/browse/FLINK-7420
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem*
> Currently, the {{flink-avro}} project is a shell with some tests and mostly 
> duplicate and dead code. The classes that use Avro are distributed quite 
> wildly through the code base, and introduce multiple direct dependencies on 
> Avro in a messy way.
> That way, we cannot create a proper fat Avro dependency in which we shade 
> Jackson away.
> Also, we expose Avro as a direct and hard dependency on many Flink modules, 
> while it should be a dependency that users that use Avro types selectively 
> add.
> *Suggested Changes*
> We should move all Avro related classes to {{flink-avro}}, and give 
> {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}.
>   - {{AvroTypeInfo}}
>   - {{AvroSerializer}}
>   - {{AvroRowSerializationSchema}}
>   - {{AvroRowDeserializationSchema}}
> To be able to move the the avro serialization code from {{flink-ore}} to 
> {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, 
> similar to how we load the {{WritableTypeInfo}} for Hadoop.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

2017-11-01 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4734
  
Hi @tillrohrmann, just to double check, so does the conclusion mean that 
#4805 subsumes this PR, and this one can be closed? 


---


[jira] [Commented] (FLINK-7652) Port CurrentJobIdsHandler to new REST endpoint

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233767#comment-16233767
 ] 

ASF GitHub Bot commented on FLINK-7652:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4734
  
Hi @tillrohrmann, just to double check, so does the conclusion mean that 
#4805 subsumes this PR, and this one can be closed? 


> Port CurrentJobIdsHandler to new REST endpoint
> --
>
> Key: FLINK-7652
> URL: https://issues.apache.org/jira/browse/FLINK-7652
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CurrentJobIdsHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4592: [FLINK-7515][network] allow actual 0-length conten...

2017-11-01 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4592#discussion_r148203203
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -64,12 +65,53 @@
 
// 

 
+   /**
+* Allocates a new (header and contents) buffer and adds some header 
information for the frame
+* decoder.
+*
+* Before sending the buffer, you must write the actual length after 
adding the contents as
+* an integer to position 0!
+*
+* @param allocator
+*  byte buffer allocator to use
+* @param id
+*  {@link NettyMessage} subclass ID
+*
+* @return a newly allocated direct buffer with header data written for 
{@link
+* NettyMessageDecoder}
+*/
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id) {
-   return allocateBuffer(allocator, id, 0);
+   return allocateBuffer(allocator, id, -1);
}
 
+   /**
+* Allocates a new (header and contents) buffer and adds some header 
information for the frame
+* decoder.
+*
+* If the length is unknown, you must write the actual 
length after adding the
+* contents as an integer to position 0!
+*
+* @param allocator
+*  byte buffer allocator to use
+* @param id
+*  {@link NettyMessage} subclass ID
+* @param length
+*  content length (or -1 if unknown)
+*
+* @return a newly allocated direct buffer with header data written for 
{@link
+* NettyMessageDecoder}
+*/
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, int length) {
-   final ByteBuf buffer = length != 0 ? 
allocator.directBuffer(HEADER_LENGTH + length) : allocator.directBuffer();
+   Preconditions.checkArgument(length <= Integer.MAX_VALUE - 
HEADER_LENGTH);
--- End diff --

import static?


---


[jira] [Commented] (FLINK-7515) allow actual 0-length content in NettyMessage#allocateBuffer()

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233771#comment-16233771
 ] 

ASF GitHub Bot commented on FLINK-7515:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4592#discussion_r148203203
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -64,12 +65,53 @@
 
// 

 
+   /**
+* Allocates a new (header and contents) buffer and adds some header 
information for the frame
+* decoder.
+*
+* Before sending the buffer, you must write the actual length after 
adding the contents as
+* an integer to position 0!
+*
+* @param allocator
+*  byte buffer allocator to use
+* @param id
+*  {@link NettyMessage} subclass ID
+*
+* @return a newly allocated direct buffer with header data written for 
{@link
+* NettyMessageDecoder}
+*/
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id) {
-   return allocateBuffer(allocator, id, 0);
+   return allocateBuffer(allocator, id, -1);
}
 
+   /**
+* Allocates a new (header and contents) buffer and adds some header 
information for the frame
+* decoder.
+*
+* If the length is unknown, you must write the actual 
length after adding the
+* contents as an integer to position 0!
+*
+* @param allocator
+*  byte buffer allocator to use
+* @param id
+*  {@link NettyMessage} subclass ID
+* @param length
+*  content length (or -1 if unknown)
+*
+* @return a newly allocated direct buffer with header data written for 
{@link
+* NettyMessageDecoder}
+*/
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, int length) {
-   final ByteBuf buffer = length != 0 ? 
allocator.directBuffer(HEADER_LENGTH + length) : allocator.directBuffer();
+   Preconditions.checkArgument(length <= Integer.MAX_VALUE - 
HEADER_LENGTH);
--- End diff --

import static?


> allow actual 0-length content in NettyMessage#allocateBuffer()
> --
>
> Key: FLINK-7515
> URL: https://issues.apache.org/jira/browse/FLINK-7515
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
>
> Previously, length {{0}} meant "unknown content length" but there are cases 
> where the actual length is 0 and we do not need a larger buffer. Let's use 
> {{-1}} for tagging the special case instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4593: [FLINK-7516][memory] do not allow copies into a re...

2017-11-01 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4593#discussion_r148204666
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java 
---
@@ -306,6 +307,9 @@ public final void get(int offset, ByteBuffer target, 
int numBytes) {
if ((offset | numBytes | (offset + numBytes)) < 0) {
throw new IndexOutOfBoundsException();
}
+   if (target.isReadOnly()) {
--- End diff --

Isn't this check redundant? Shouldn't the `ByteBuffer`s validate it on 
their own like `DirectByteBufferR` do? Putting this check here, it complicates 
the code and we have to pay for it on each call, even on happy path (cost 
should be very tiny). 

Maybe it should be put only in `if (target.isDirect())` branch to check 
`read-only` only before unsafe copy?


---


[jira] [Commented] (FLINK-7732) Invalid offset to commit in Kafka

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233777#comment-16233777
 ] 

ASF GitHub Bot commented on FLINK-7732:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4928#discussion_r148204398
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
 ---
@@ -95,6 +95,10 @@ public final boolean isOffsetDefined() {
return offset != 
KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
}
 
+   public final boolean isSentinel() {
--- End diff --

nit: would `hasSentinelOffset` be a better name here?


> Invalid offset to commit in Kafka
> -
>
> Key: FLINK-7732
> URL: https://issues.apache.org/jira/browse/FLINK-7732
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> In a test run with unrelated changes in the network stack, the Kafa 
> end-to-end test was failing with an invalid offset:
> {code}
> 2017-09-28 06:34:10,736 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka version : 0.10.2.1
> 2017-09-28 06:34:10,744 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka commitId : e89bffd6b2eff799
> 2017-09-28 06:34:14,549 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,573 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,686 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,687 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,792 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:15,068 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
> DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
> part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: 
> Unnamed (1/1),5,Flink Task Threads] took 948 ms.
> 2017-09-28 06:34:15,164 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka failed. This does not compromise Flink's 
> checkpoints.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> 2017-09-28 06:34:15,171 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Async 
> Kafka commit failed.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> {code}
> https://travis-ci.org/apache/flink/jobs/280722829
> [~pnowojski] did a first analysis that r

[GitHub] flink pull request #4928: [FLINK-7732][kafka-consumer] Do not commit to kafk...

2017-11-01 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4928#discussion_r148204574
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
 ---
@@ -52,4 +52,7 @@
 */
public static final long GROUP_OFFSET = -915623761773L;
 
+   public static boolean isSentinel(long offset) {
+   return offset < 0;
--- End diff --

nit: this implementation could be a bit too broad. Could be a bit more 
specific by matching the static values in `KafkaTopicPartitionStateSentinel`.


---


[jira] [Commented] (FLINK-7732) Invalid offset to commit in Kafka

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233778#comment-16233778
 ] 

ASF GitHub Bot commented on FLINK-7732:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4928#discussion_r148204574
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
 ---
@@ -52,4 +52,7 @@
 */
public static final long GROUP_OFFSET = -915623761773L;
 
+   public static boolean isSentinel(long offset) {
+   return offset < 0;
--- End diff --

nit: this implementation could be a bit too broad. Could be a bit more 
specific by matching the static values in `KafkaTopicPartitionStateSentinel`.


> Invalid offset to commit in Kafka
> -
>
> Key: FLINK-7732
> URL: https://issues.apache.org/jira/browse/FLINK-7732
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> In a test run with unrelated changes in the network stack, the Kafa 
> end-to-end test was failing with an invalid offset:
> {code}
> 2017-09-28 06:34:10,736 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka version : 0.10.2.1
> 2017-09-28 06:34:10,744 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka commitId : e89bffd6b2eff799
> 2017-09-28 06:34:14,549 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,573 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,686 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,687 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,792 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:15,068 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
> DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
> part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: 
> Unnamed (1/1),5,Flink Task Threads] took 948 ms.
> 2017-09-28 06:34:15,164 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka failed. This does not compromise Flink's 
> checkpoints.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> 2017-09-28 06:34:15,171 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Async 
> Kafka commit failed.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> {code}
> 

[GitHub] flink pull request #4928: [FLINK-7732][kafka-consumer] Do not commit to kafk...

2017-11-01 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4928#discussion_r148204398
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
 ---
@@ -95,6 +95,10 @@ public final boolean isOffsetDefined() {
return offset != 
KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
}
 
+   public final boolean isSentinel() {
--- End diff --

nit: would `hasSentinelOffset` be a better name here?


---


[jira] [Commented] (FLINK-7516) HybridMemorySegment: do not allow copies into a read-only ByteBuffer

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233776#comment-16233776
 ] 

ASF GitHub Bot commented on FLINK-7516:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4593#discussion_r148204666
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java 
---
@@ -306,6 +307,9 @@ public final void get(int offset, ByteBuffer target, 
int numBytes) {
if ((offset | numBytes | (offset + numBytes)) < 0) {
throw new IndexOutOfBoundsException();
}
+   if (target.isReadOnly()) {
--- End diff --

Isn't this check redundant? Shouldn't the `ByteBuffer`s validate it on 
their own like `DirectByteBufferR` do? Putting this check here, it complicates 
the code and we have to pay for it on each call, even on happy path (cost 
should be very tiny). 

Maybe it should be put only in `if (target.isDirect())` branch to check 
`read-only` only before unsafe copy?


> HybridMemorySegment: do not allow copies into a read-only ByteBuffer
> 
>
> Key: FLINK-7516
> URL: https://issues.apache.org/jira/browse/FLINK-7516
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> {{HybridMemorySegment#get(int, ByteBuffer, int)}} allows writing into a 
> read-only {{ByteBuffer}} but this operation should be forbidden.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4928: [FLINK-7732][kafka-consumer] Do not commit to kafka Flink...

2017-11-01 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4928
  
By the way, what exactly was the error that caused the application crash in 
the described case?


---


[jira] [Commented] (FLINK-7732) Invalid offset to commit in Kafka

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233780#comment-16233780
 ] 

ASF GitHub Bot commented on FLINK-7732:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4928
  
By the way, what exactly was the error that caused the application crash in 
the described case?


> Invalid offset to commit in Kafka
> -
>
> Key: FLINK-7732
> URL: https://issues.apache.org/jira/browse/FLINK-7732
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> In a test run with unrelated changes in the network stack, the Kafa 
> end-to-end test was failing with an invalid offset:
> {code}
> 2017-09-28 06:34:10,736 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka version : 0.10.2.1
> 2017-09-28 06:34:10,744 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka commitId : e89bffd6b2eff799
> 2017-09-28 06:34:14,549 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,573 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,686 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,687 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,792 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:15,068 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
> DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
> part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: 
> Unnamed (1/1),5,Flink Task Threads] took 948 ms.
> 2017-09-28 06:34:15,164 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka failed. This does not compromise Flink's 
> checkpoints.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> 2017-09-28 06:34:15,171 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Async 
> Kafka commit failed.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> {code}
> https://travis-ci.org/apache/flink/jobs/280722829
> [~pnowojski] did a first analysis that revealed this:
> In 
> org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java:229 
> this is being sent:
> {{long offsetToCommit = lastProcessedOffset + 1;}}
> {{lastProcessedOffset}} comes from:
> {{org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#snapshotState}}
>  either lines 741 or 749
> The value that we see is strangely similiar to 
> {{org.apach

[GitHub] flink issue #4926: [FLINK-7951] Load YarnConfiguration with default Hadoop c...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4926
  
A user reported that we maybe have to explicitly load the `yarn-site.xml` 
in the `HadoopUtils.getHadoopConfiguration` method as we do it for the 
`core-site.xml` and `hdfs-site.xml`. Will have to verify this.


---


[jira] [Commented] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233785#comment-16233785
 ] 

ASF GitHub Bot commented on FLINK-7951:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4926
  
A user reported that we maybe have to explicitly load the `yarn-site.xml` 
in the `HadoopUtils.getHadoopConfiguration` method as we do it for the 
`core-site.xml` and `hdfs-site.xml`. Will have to verify this.


> YarnApplicationMaster does not load HDFSConfiguration
> -
>
> Key: FLINK-7951
> URL: https://issues.apache.org/jira/browse/FLINK-7951
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0
>
>
> When instantiating the {{YarnConfiguration}} we do not load the corresponding 
> {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7652) Port CurrentJobIdsHandler to new REST endpoint

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233794#comment-16233794
 ] 

ASF GitHub Bot commented on FLINK-7652:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4734
  
@tzulitai, I actually want to re-check whether it's indeed not possible to 
register the `CurrentJobsOverviewHandler` under `jobs/overview`. If this should 
indeed be the case, then it will most likely subsume this handler. If not, then 
we'll register this handler under `/jobs`


> Port CurrentJobIdsHandler to new REST endpoint
> --
>
> Key: FLINK-7652
> URL: https://issues.apache.org/jira/browse/FLINK-7652
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CurrentJobIdsHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4734
  
@tzulitai, I actually want to re-check whether it's indeed not possible to 
register the `CurrentJobsOverviewHandler` under `jobs/overview`. If this should 
indeed be the case, then it will most likely subsume this handler. If not, then 
we'll register this handler under `/jobs`


---


[GitHub] flink issue #4506: [FLINK-7400][cluster] fix off-heap limits set to conserva...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4506
  
Thanks for running the test @NicoK. Could you please rebase this PR onto 
the latest master. Once Travis passes all tests, I'll merge this PR then.


---


[jira] [Commented] (FLINK-7400) off-heap limits set to conservatively in cluster environments

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233796#comment-16233796
 ] 

ASF GitHub Bot commented on FLINK-7400:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4506
  
Thanks for running the test @NicoK. Could you please rebase this PR onto 
the latest master. Once Travis passes all tests, I'll merge this PR then.


> off-heap limits set to conservatively in cluster environments
> -
>
> Key: FLINK-7400
> URL: https://issues.apache.org/jira/browse/FLINK-7400
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Mesos, YARN
>Affects Versions: 1.3.0, 1.3.1, 1.4.0, 1.3.2
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> Inside {{ContaineredTaskManagerParameters}}, since FLINK-6217, the 
> {{offHeapSize}} is set to the amount of memory Flink will use off-heap which 
> will be set as the value for {{-XX:MaxDirectMemorySize}} in various cases. 
> This does not account for any off-heap use by other components than Flink, 
> e.g. RocksDB, other libraries, or the JVM itself.
> We should add the {{cutoff}} from the {{CONTAINERIZED_HEAP_CUTOFF_RATIO}} 
> configuration parameter to {{offHeapSize}} as implied by the description on 
> what this parameter is there for.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4916#discussion_r148209063
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider 
slotProvider, final Time timeout) {
// that way we do not have any operation that can fail between 
allocating the slots
// and adding them to the list. If we had a failure in between 
there, that would
// cause the slots to get lost
-   final ArrayList resources = new 
ArrayList<>(getNumberOfExecutionJobVertices());
final boolean queued = allowQueuedScheduling;
 
-   // we use this flag to handle failures in a 'finally' clause
-   // that allows us to not go through clumsy cast-and-rethrow 
logic
-   boolean successful = false;
+   // collecting all the slots may resize and fail in that 
operation without slots getting lost
+   final ArrayList> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
-   try {
-   // collecting all the slots may resize and fail in that 
operation without slots getting lost
-   final ArrayList> 
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+   // allocate the slots (obtain all their futures
+   for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+   // these calls are not blocking, they only return 
futures
--- End diff --

There is no specific reason why we iterate over the vertices in topological 
order. We could also choose a completely random order for eager scheduling 
because the scheduling order will be determined by the preferred location 
futures (which at the moment is based on inputs only). If we should switch to 
state location then it basically means that we schedule the individual tasks 
independently because the vertices don't depend on the input locations.


---


[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233800#comment-16233800
 ] 

ASF GitHub Bot commented on FLINK-7153:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4916#discussion_r148209063
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider 
slotProvider, final Time timeout) {
// that way we do not have any operation that can fail between 
allocating the slots
// and adding them to the list. If we had a failure in between 
there, that would
// cause the slots to get lost
-   final ArrayList resources = new 
ArrayList<>(getNumberOfExecutionJobVertices());
final boolean queued = allowQueuedScheduling;
 
-   // we use this flag to handle failures in a 'finally' clause
-   // that allows us to not go through clumsy cast-and-rethrow 
logic
-   boolean successful = false;
+   // collecting all the slots may resize and fail in that 
operation without slots getting lost
+   final ArrayList> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
-   try {
-   // collecting all the slots may resize and fail in that 
operation without slots getting lost
-   final ArrayList> 
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+   // allocate the slots (obtain all their futures
+   for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+   // these calls are not blocking, they only return 
futures
--- End diff --

There is no specific reason why we iterate over the vertices in topological 
order. We could also choose a completely random order for eager scheduling 
because the scheduling order will be determined by the preferred location 
futures (which at the moment is based on inputs only). If we should switch to 
state location then it basically means that we schedule the individual tasks 
independently because the vertices don't depend on the input locations.


> Eager Scheduling can't allocate source for ExecutionGraph correctly
> ---
>
> Key: FLINK-7153
> URL: https://issues.apache.org/jira/browse/FLINK-7153
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.1
>Reporter: Sihua Zhou
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex 
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is 
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
> deployed via 'Execution.deployToSlot()'. So allocate resource base on 
> prefered location can't work correctly, we need to set the slot info for 
> `Execution` as soon as Execution.allocateSlotForExecution() called 
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the 
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three 
> remote partition. But actually, it should be 2 local partition and 2 remote 
> partition. 
> The causes of the above problems is becuase that the current allocate 
> strategy is allocate the resource for execution one by one(if the execution 
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below 
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which 
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4916#discussion_r148209318
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
//  Miscellaneous
// 

 
+   /**
+* Calculates the preferred locations based on the location preference 
constraint.
+*
+* @param locationPreferenceConstraint constraint for the location 
preference
+* @return Future containing the collection of preferred locations. 
This might not be completed if not all inputs
+*  have been a resource assigned.
+*/
+   @VisibleForTesting
+   public CompletableFuture> 
calculatePreferredLocations(LocationPreferenceConstraint 
locationPreferenceConstraint) {
+   final Collection> 
preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
+   final CompletableFuture> 
preferredLocationsFuture;
--- End diff --

Yes, that is exactly the idea. 


---


[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233802#comment-16233802
 ] 

ASF GitHub Bot commented on FLINK-7153:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4916#discussion_r148209318
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
//  Miscellaneous
// 

 
+   /**
+* Calculates the preferred locations based on the location preference 
constraint.
+*
+* @param locationPreferenceConstraint constraint for the location 
preference
+* @return Future containing the collection of preferred locations. 
This might not be completed if not all inputs
+*  have been a resource assigned.
+*/
+   @VisibleForTesting
+   public CompletableFuture> 
calculatePreferredLocations(LocationPreferenceConstraint 
locationPreferenceConstraint) {
+   final Collection> 
preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
+   final CompletableFuture> 
preferredLocationsFuture;
--- End diff --

Yes, that is exactly the idea. 


> Eager Scheduling can't allocate source for ExecutionGraph correctly
> ---
>
> Key: FLINK-7153
> URL: https://issues.apache.org/jira/browse/FLINK-7153
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.1
>Reporter: Sihua Zhou
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex 
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is 
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
> deployed via 'Execution.deployToSlot()'. So allocate resource base on 
> prefered location can't work correctly, we need to set the slot info for 
> `Execution` as soon as Execution.allocateSlotForExecution() called 
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the 
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three 
> remote partition. But actually, it should be 2 local partition and 2 remote 
> partition. 
> The causes of the above problems is becuase that the current allocate 
> strategy is allocate the resource for execution one by one(if the execution 
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below 
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which 
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4593: [FLINK-7516][memory] do not allow copies into a re...

2017-11-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4593#discussion_r148209947
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java 
---
@@ -306,6 +307,9 @@ public final void get(int offset, ByteBuffer target, 
int numBytes) {
if ((offset | numBytes | (offset + numBytes)) < 0) {
throw new IndexOutOfBoundsException();
}
+   if (target.isReadOnly()) {
--- End diff --

you are right - the non-direct buffers path is based on 
`ByteBuffer#array()` which will throw a `ReadOnlyBufferException` for read-only 
buffers, so it really is enough in the direct buffers code path where the 
`UNSAFE.copyMemory` is not checking the source pointer (how should it?!)


---


[jira] [Commented] (FLINK-7516) HybridMemorySegment: do not allow copies into a read-only ByteBuffer

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233804#comment-16233804
 ] 

ASF GitHub Bot commented on FLINK-7516:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4593#discussion_r148209947
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java 
---
@@ -306,6 +307,9 @@ public final void get(int offset, ByteBuffer target, 
int numBytes) {
if ((offset | numBytes | (offset + numBytes)) < 0) {
throw new IndexOutOfBoundsException();
}
+   if (target.isReadOnly()) {
--- End diff --

you are right - the non-direct buffers path is based on 
`ByteBuffer#array()` which will throw a `ReadOnlyBufferException` for read-only 
buffers, so it really is enough in the direct buffers code path where the 
`UNSAFE.copyMemory` is not checking the source pointer (how should it?!)


> HybridMemorySegment: do not allow copies into a read-only ByteBuffer
> 
>
> Key: FLINK-7516
> URL: https://issues.apache.org/jira/browse/FLINK-7516
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> {{HybridMemorySegment#get(int, ByteBuffer, int)}} allows writing into a 
> read-only {{ByteBuffer}} but this operation should be forbidden.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4506: [FLINK-7400][cluster] fix off-heap limits set to conserva...

2017-11-01 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4506
  
rebased successfully, waiting for Travis now...


---


[jira] [Commented] (FLINK-7400) off-heap limits set to conservatively in cluster environments

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233813#comment-16233813
 ] 

ASF GitHub Bot commented on FLINK-7400:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4506
  
rebased successfully, waiting for Travis now...


> off-heap limits set to conservatively in cluster environments
> -
>
> Key: FLINK-7400
> URL: https://issues.apache.org/jira/browse/FLINK-7400
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Mesos, YARN
>Affects Versions: 1.3.0, 1.3.1, 1.4.0, 1.3.2
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> Inside {{ContaineredTaskManagerParameters}}, since FLINK-6217, the 
> {{offHeapSize}} is set to the amount of memory Flink will use off-heap which 
> will be set as the value for {{-XX:MaxDirectMemorySize}} in various cases. 
> This does not account for any off-heap use by other components than Flink, 
> e.g. RocksDB, other libraries, or the JVM itself.
> We should add the {{cutoff}} from the {{CONTAINERIZED_HEAP_CUTOFF_RATIO}} 
> configuration parameter to {{offHeapSize}} as implied by the description on 
> what this parameter is there for.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

2017-11-01 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4916#discussion_r148211942
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider 
slotProvider, final Time timeout) {
// that way we do not have any operation that can fail between 
allocating the slots
// and adding them to the list. If we had a failure in between 
there, that would
// cause the slots to get lost
-   final ArrayList resources = new 
ArrayList<>(getNumberOfExecutionJobVertices());
final boolean queued = allowQueuedScheduling;
 
-   // we use this flag to handle failures in a 'finally' clause
-   // that allows us to not go through clumsy cast-and-rethrow 
logic
-   boolean successful = false;
+   // collecting all the slots may resize and fail in that 
operation without slots getting lost
+   final ArrayList> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
-   try {
-   // collecting all the slots may resize and fail in that 
operation without slots getting lost
-   final ArrayList> 
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+   // allocate the slots (obtain all their futures
+   for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+   // these calls are not blocking, they only return 
futures
--- End diff --

If we switch to state location then we can't allocate resources according 
to the order of topologically, because stateless vertices may share the same  
SlotSharingGroup with stateful vertices, if stateless vertices allocated before 
the stateful vertices, the result can be bad.  An intuitive way to do this is 
to allocate resources to stateful vertices firstly.


---


[GitHub] flink pull request #4592: [FLINK-7515][network] allow actual 0-length conten...

2017-11-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4592#discussion_r148212023
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -64,12 +65,53 @@
 
// 

 
+   /**
+* Allocates a new (header and contents) buffer and adds some header 
information for the frame
+* decoder.
+*
+* Before sending the buffer, you must write the actual length after 
adding the contents as
+* an integer to position 0!
+*
+* @param allocator
+*  byte buffer allocator to use
+* @param id
+*  {@link NettyMessage} subclass ID
+*
+* @return a newly allocated direct buffer with header data written for 
{@link
+* NettyMessageDecoder}
+*/
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id) {
-   return allocateBuffer(allocator, id, 0);
+   return allocateBuffer(allocator, id, -1);
}
 
+   /**
+* Allocates a new (header and contents) buffer and adds some header 
information for the frame
+* decoder.
+*
+* If the length is unknown, you must write the actual 
length after adding the
+* contents as an integer to position 0!
+*
+* @param allocator
+*  byte buffer allocator to use
+* @param id
+*  {@link NettyMessage} subclass ID
+* @param length
+*  content length (or -1 if unknown)
+*
+* @return a newly allocated direct buffer with header data written for 
{@link
+* NettyMessageDecoder}
+*/
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, int length) {
-   final ByteBuf buffer = length != 0 ? 
allocator.directBuffer(HEADER_LENGTH + length) : allocator.directBuffer();
+   Preconditions.checkArgument(length <= Integer.MAX_VALUE - 
HEADER_LENGTH);
--- End diff --

why not...


---


[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233815#comment-16233815
 ] 

ASF GitHub Bot commented on FLINK-7153:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4916#discussion_r148211942
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider 
slotProvider, final Time timeout) {
// that way we do not have any operation that can fail between 
allocating the slots
// and adding them to the list. If we had a failure in between 
there, that would
// cause the slots to get lost
-   final ArrayList resources = new 
ArrayList<>(getNumberOfExecutionJobVertices());
final boolean queued = allowQueuedScheduling;
 
-   // we use this flag to handle failures in a 'finally' clause
-   // that allows us to not go through clumsy cast-and-rethrow 
logic
-   boolean successful = false;
+   // collecting all the slots may resize and fail in that 
operation without slots getting lost
+   final ArrayList> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
-   try {
-   // collecting all the slots may resize and fail in that 
operation without slots getting lost
-   final ArrayList> 
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+   // allocate the slots (obtain all their futures
+   for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+   // these calls are not blocking, they only return 
futures
--- End diff --

If we switch to state location then we can't allocate resources according 
to the order of topologically, because stateless vertices may share the same  
SlotSharingGroup with stateful vertices, if stateless vertices allocated before 
the stateful vertices, the result can be bad.  An intuitive way to do this is 
to allocate resources to stateful vertices firstly.


> Eager Scheduling can't allocate source for ExecutionGraph correctly
> ---
>
> Key: FLINK-7153
> URL: https://issues.apache.org/jira/browse/FLINK-7153
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.1
>Reporter: Sihua Zhou
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex 
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is 
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
> deployed via 'Execution.deployToSlot()'. So allocate resource base on 
> prefered location can't work correctly, we need to set the slot info for 
> `Execution` as soon as Execution.allocateSlotForExecution() called 
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the 
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three 
> remote partition. But actually, it should be 2 local partition and 2 remote 
> partition. 
> The causes of the above problems is becuase that the current allocate 
> strategy is allocate the resource for execution one by one(if the execution 
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below 
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which 
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7515) allow actual 0-length content in NettyMessage#allocateBuffer()

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233816#comment-16233816
 ] 

ASF GitHub Bot commented on FLINK-7515:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4592#discussion_r148212023
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -64,12 +65,53 @@
 
// 

 
+   /**
+* Allocates a new (header and contents) buffer and adds some header 
information for the frame
+* decoder.
+*
+* Before sending the buffer, you must write the actual length after 
adding the contents as
+* an integer to position 0!
+*
+* @param allocator
+*  byte buffer allocator to use
+* @param id
+*  {@link NettyMessage} subclass ID
+*
+* @return a newly allocated direct buffer with header data written for 
{@link
+* NettyMessageDecoder}
+*/
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id) {
-   return allocateBuffer(allocator, id, 0);
+   return allocateBuffer(allocator, id, -1);
}
 
+   /**
+* Allocates a new (header and contents) buffer and adds some header 
information for the frame
+* decoder.
+*
+* If the length is unknown, you must write the actual 
length after adding the
+* contents as an integer to position 0!
+*
+* @param allocator
+*  byte buffer allocator to use
+* @param id
+*  {@link NettyMessage} subclass ID
+* @param length
+*  content length (or -1 if unknown)
+*
+* @return a newly allocated direct buffer with header data written for 
{@link
+* NettyMessageDecoder}
+*/
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, int length) {
-   final ByteBuf buffer = length != 0 ? 
allocator.directBuffer(HEADER_LENGTH + length) : allocator.directBuffer();
+   Preconditions.checkArgument(length <= Integer.MAX_VALUE - 
HEADER_LENGTH);
--- End diff --

why not...


> allow actual 0-length content in NettyMessage#allocateBuffer()
> --
>
> Key: FLINK-7515
> URL: https://issues.apache.org/jira/browse/FLINK-7515
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
>
> Previously, length {{0}} meant "unknown content length" but there are cases 
> where the actual length is 0 and we do not need a larger buffer. Let's use 
> {{-1}} for tagging the special case instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4919: [FLINK-7902] Use TypeSerializer in TwoPhaseCommitS...

2017-11-01 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/4919


---


[jira] [Commented] (FLINK-7902) TwoPhaseCommitSinkFunctions should use custom TypeSerializer

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233824#comment-16233824
 ] 

ASF GitHub Bot commented on FLINK-7902:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/4919


> TwoPhaseCommitSinkFunctions should use custom TypeSerializer
> 
>
> Key: FLINK-7902
> URL: https://issues.apache.org/jira/browse/FLINK-7902
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new 
> TypeHint>() {})}} to 
> create a {{TypeInformation}} which in turn is used to create a 
> {{StateDescriptor}} for the state that the Kafka sink stores.
> Behind the scenes, this would be roughly analysed as a 
> {{PojoType(GenericType, 
> GenericType)}} which means we don't have explicit 
> control over the serialisation format and we also use Kryo (which is the 
> default for {{GenericTypeInfo}}). This can be problematic if we want to 
> evolve the state schema in the future or if we want to change Kryo versions.
> We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor:
> {code}
> public TwoPhaseCommitSinkFunction(TypeSerializer> 
> stateSerializer) {
> {code}
> and we should then change the {{FlinkKafkaProducer011}} to hand in a 
> custom-made {{TypeSerializer}} for the state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7902) TwoPhaseCommitSinkFunctions should use custom TypeSerializer

2017-11-01 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7902.
---
Resolution: Fixed

Fixed in 0ba528c71e35858a043bd513ead37800262f7e0c

> TwoPhaseCommitSinkFunctions should use custom TypeSerializer
> 
>
> Key: FLINK-7902
> URL: https://issues.apache.org/jira/browse/FLINK-7902
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new 
> TypeHint>() {})}} to 
> create a {{TypeInformation}} which in turn is used to create a 
> {{StateDescriptor}} for the state that the Kafka sink stores.
> Behind the scenes, this would be roughly analysed as a 
> {{PojoType(GenericType, 
> GenericType)}} which means we don't have explicit 
> control over the serialisation format and we also use Kryo (which is the 
> default for {{GenericTypeInfo}}). This can be problematic if we want to 
> evolve the state schema in the future or if we want to change Kryo versions.
> We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor:
> {code}
> public TwoPhaseCommitSinkFunction(TypeSerializer> 
> stateSerializer) {
> {code}
> and we should then change the {{FlinkKafkaProducer011}} to hand in a 
> custom-made {{TypeSerializer}} for the state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4919: [FLINK-7902] Use TypeSerializer in TwoPhaseCommitSinkFunc...

2017-11-01 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4919
  
Thanks for reviewing, @kl0u! 😃 



---


[jira] [Commented] (FLINK-7902) TwoPhaseCommitSinkFunctions should use custom TypeSerializer

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233826#comment-16233826
 ] 

ASF GitHub Bot commented on FLINK-7902:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4919
  
Thanks for reviewing, @kl0u! 😃 



> TwoPhaseCommitSinkFunctions should use custom TypeSerializer
> 
>
> Key: FLINK-7902
> URL: https://issues.apache.org/jira/browse/FLINK-7902
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new 
> TypeHint>() {})}} to 
> create a {{TypeInformation}} which in turn is used to create a 
> {{StateDescriptor}} for the state that the Kafka sink stores.
> Behind the scenes, this would be roughly analysed as a 
> {{PojoType(GenericType, 
> GenericType)}} which means we don't have explicit 
> control over the serialisation format and we also use Kryo (which is the 
> default for {{GenericTypeInfo}}). This can be problematic if we want to 
> evolve the state schema in the future or if we want to change Kryo versions.
> We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor:
> {code}
> public TwoPhaseCommitSinkFunction(TypeSerializer> 
> stateSerializer) {
> {code}
> and we should then change the {{FlinkKafkaProducer011}} to hand in a 
> custom-made {{TypeSerializer}} for the state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7955) Rowtime attribute cannot be aggregated in group window of stream Table API

2017-11-01 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-7955:


 Summary: Rowtime attribute cannot be aggregated in group window of 
stream Table API
 Key: FLINK-7955
 URL: https://issues.apache.org/jira/browse/FLINK-7955
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.3.2, 1.4.0
Reporter: Fabian Hueske


The following query fails with a {{NullPointerException}}:

{code}
val windowedTable = table
  .window(Tumble over 5.milli on 'rowtime as 'w)
  .groupBy('w, 'word)
  .select('word, 'rowtime.count, 'w.start, 'w.end)
{code}

Equivalent SQL queries or Table API queries that use a processing time 
attribute are working.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4916#discussion_r148216607
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider 
slotProvider, final Time timeout) {
// that way we do not have any operation that can fail between 
allocating the slots
// and adding them to the list. If we had a failure in between 
there, that would
// cause the slots to get lost
-   final ArrayList resources = new 
ArrayList<>(getNumberOfExecutionJobVertices());
final boolean queued = allowQueuedScheduling;
 
-   // we use this flag to handle failures in a 'finally' clause
-   // that allows us to not go through clumsy cast-and-rethrow 
logic
-   boolean successful = false;
+   // collecting all the slots may resize and fail in that 
operation without slots getting lost
+   final ArrayList> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
-   try {
-   // collecting all the slots may resize and fail in that 
operation without slots getting lost
-   final ArrayList> 
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+   // allocate the slots (obtain all their futures
+   for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+   // these calls are not blocking, they only return 
futures
--- End diff --

In that case, it depends a bit on how the scheduler values the state 
location preference. If it is implemented that it strictly schedules tasks to 
its previous state location, then it could happen that these tasks don't end up 
in the same slot as other tasks with which they shared a slot before.


---


[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233845#comment-16233845
 ] 

ASF GitHub Bot commented on FLINK-7153:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4916#discussion_r148216607
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider 
slotProvider, final Time timeout) {
// that way we do not have any operation that can fail between 
allocating the slots
// and adding them to the list. If we had a failure in between 
there, that would
// cause the slots to get lost
-   final ArrayList resources = new 
ArrayList<>(getNumberOfExecutionJobVertices());
final boolean queued = allowQueuedScheduling;
 
-   // we use this flag to handle failures in a 'finally' clause
-   // that allows us to not go through clumsy cast-and-rethrow 
logic
-   boolean successful = false;
+   // collecting all the slots may resize and fail in that 
operation without slots getting lost
+   final ArrayList> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
-   try {
-   // collecting all the slots may resize and fail in that 
operation without slots getting lost
-   final ArrayList> 
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+   // allocate the slots (obtain all their futures
+   for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+   // these calls are not blocking, they only return 
futures
--- End diff --

In that case, it depends a bit on how the scheduler values the state 
location preference. If it is implemented that it strictly schedules tasks to 
its previous state location, then it could happen that these tasks don't end up 
in the same slot as other tasks with which they shared a slot before.


> Eager Scheduling can't allocate source for ExecutionGraph correctly
> ---
>
> Key: FLINK-7153
> URL: https://issues.apache.org/jira/browse/FLINK-7153
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.1
>Reporter: Sihua Zhou
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex 
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is 
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
> deployed via 'Execution.deployToSlot()'. So allocate resource base on 
> prefered location can't work correctly, we need to set the slot info for 
> `Execution` as soon as Execution.allocateSlotForExecution() called 
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the 
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three 
> remote partition. But actually, it should be 2 local partition and 2 remote 
> partition. 
> The causes of the above problems is becuase that the current allocate 
> strategy is allocate the resource for execution one by one(if the execution 
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below 
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which 
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4916#discussion_r148216894
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider 
slotProvider, final Time timeout) {
// that way we do not have any operation that can fail between 
allocating the slots
// and adding them to the list. If we had a failure in between 
there, that would
// cause the slots to get lost
-   final ArrayList resources = new 
ArrayList<>(getNumberOfExecutionJobVertices());
final boolean queued = allowQueuedScheduling;
 
-   // we use this flag to handle failures in a 'finally' clause
-   // that allows us to not go through clumsy cast-and-rethrow 
logic
-   boolean successful = false;
+   // collecting all the slots may resize and fail in that 
operation without slots getting lost
+   final ArrayList> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
-   try {
-   // collecting all the slots may resize and fail in that 
operation without slots getting lost
-   final ArrayList> 
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+   // allocate the slots (obtain all their futures
+   for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+   // these calls are not blocking, they only return 
futures
--- End diff --

But be aware that the `allocateAndAssign` call is non-blocking and the 
actual order depends on the preferred locations futures.


---


[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233848#comment-16233848
 ] 

ASF GitHub Bot commented on FLINK-7153:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4916#discussion_r148216894
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider 
slotProvider, final Time timeout) {
// that way we do not have any operation that can fail between 
allocating the slots
// and adding them to the list. If we had a failure in between 
there, that would
// cause the slots to get lost
-   final ArrayList resources = new 
ArrayList<>(getNumberOfExecutionJobVertices());
final boolean queued = allowQueuedScheduling;
 
-   // we use this flag to handle failures in a 'finally' clause
-   // that allows us to not go through clumsy cast-and-rethrow 
logic
-   boolean successful = false;
+   // collecting all the slots may resize and fail in that 
operation without slots getting lost
+   final ArrayList> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
-   try {
-   // collecting all the slots may resize and fail in that 
operation without slots getting lost
-   final ArrayList> 
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+   // allocate the slots (obtain all their futures
+   for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+   // these calls are not blocking, they only return 
futures
--- End diff --

But be aware that the `allocateAndAssign` call is non-blocking and the 
actual order depends on the preferred locations futures.


> Eager Scheduling can't allocate source for ExecutionGraph correctly
> ---
>
> Key: FLINK-7153
> URL: https://issues.apache.org/jira/browse/FLINK-7153
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.1
>Reporter: Sihua Zhou
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex 
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is 
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
> deployed via 'Execution.deployToSlot()'. So allocate resource base on 
> prefered location can't work correctly, we need to set the slot info for 
> `Execution` as soon as Execution.allocateSlotForExecution() called 
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the 
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three 
> remote partition. But actually, it should be 2 local partition and 2 remote 
> partition. 
> The causes of the above problems is becuase that the current allocate 
> strategy is allocate the resource for execution one by one(if the execution 
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below 
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which 
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4926: [FLINK-7951] Load YarnConfiguration with default Hadoop c...

2017-11-01 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4926
  
The changes look good but let's see what you find regarding 
`yarn-site.xml`. 👍 


---


[jira] [Commented] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233853#comment-16233853
 ] 

ASF GitHub Bot commented on FLINK-7951:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4926
  
The changes look good but let's see what you find regarding 
`yarn-site.xml`. 👍 


> YarnApplicationMaster does not load HDFSConfiguration
> -
>
> Key: FLINK-7951
> URL: https://issues.apache.org/jira/browse/FLINK-7951
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0
>
>
> When instantiating the {{YarnConfiguration}} we do not load the corresponding 
> {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7953) Kafka consumer printing error - java.lang.IllegalArgumentException: Invalid offset: -915623761772

2017-11-01 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7953.
---
Resolution: Duplicate

Duplicate of FLINK-7732

> Kafka consumer printing error - java.lang.IllegalArgumentException: Invalid 
> offset: -915623761772
> -
>
> Key: FLINK-7953
> URL: https://issues.apache.org/jira/browse/FLINK-7953
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.2
> Environment: kafka 10.0, yarn
>Reporter: Shashank Agarwal
>Priority: Minor
>
> As it's printing as Warning and not impacting running program so marked it 
> minor. 
> {code}
> 2017-10-31 19:26:09,218 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka failed. This does not compromise Flink's 
> checkpoints.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:223)
> 2017-10-31 19:26:09,223 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Async 
> Kafka commit failed.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:223)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-01 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233867#comment-16233867
 ] 

Aljoscha Krettek commented on FLINK-4228:
-

Thanks, [~NicoK] for finding this! I think we should downgrade this to 
"Critical" since it's a very specific problem and we can provide a fix for this 
in a bug fix release? Unless you know a very quick fix for this?

> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4928: [FLINK-7732][kafka-consumer] Do not commit to kafk...

2017-11-01 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4928#discussion_r148220943
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
 ---
@@ -95,6 +95,10 @@ public final boolean isOffsetDefined() {
return offset != 
KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
}
 
+   public final boolean isSentinel() {
--- End diff --

In new approach method was dropped


---


[jira] [Commented] (FLINK-7732) Invalid offset to commit in Kafka

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233870#comment-16233870
 ] 

ASF GitHub Bot commented on FLINK-7732:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4928#discussion_r148220943
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
 ---
@@ -95,6 +95,10 @@ public final boolean isOffsetDefined() {
return offset != 
KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
}
 
+   public final boolean isSentinel() {
--- End diff --

In new approach method was dropped


> Invalid offset to commit in Kafka
> -
>
> Key: FLINK-7732
> URL: https://issues.apache.org/jira/browse/FLINK-7732
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> In a test run with unrelated changes in the network stack, the Kafa 
> end-to-end test was failing with an invalid offset:
> {code}
> 2017-09-28 06:34:10,736 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka version : 0.10.2.1
> 2017-09-28 06:34:10,744 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka commitId : e89bffd6b2eff799
> 2017-09-28 06:34:14,549 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,573 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,686 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,687 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,792 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:15,068 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
> DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
> part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: 
> Unnamed (1/1),5,Flink Task Threads] took 948 ms.
> 2017-09-28 06:34:15,164 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka failed. This does not compromise Flink's 
> checkpoints.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> 2017-09-28 06:34:15,171 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Async 
> Kafka commit failed.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> {code}
> https://travis-ci.org/apache/flink/jobs/280722829
> [~pnowojski] did a first analysis that revealed this:
> In

[jira] [Commented] (FLINK-7732) Invalid offset to commit in Kafka

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233871#comment-16233871
 ] 

ASF GitHub Bot commented on FLINK-7732:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4928#discussion_r148221257
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
 ---
@@ -52,4 +52,7 @@
 */
public static final long GROUP_OFFSET = -915623761773L;
 
+   public static boolean isSentinel(long offset) {
+   return offset < 0;
--- End diff --

Kafka doesn't allow to commit any negative values 


> Invalid offset to commit in Kafka
> -
>
> Key: FLINK-7732
> URL: https://issues.apache.org/jira/browse/FLINK-7732
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> In a test run with unrelated changes in the network stack, the Kafa 
> end-to-end test was failing with an invalid offset:
> {code}
> 2017-09-28 06:34:10,736 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka version : 0.10.2.1
> 2017-09-28 06:34:10,744 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka commitId : e89bffd6b2eff799
> 2017-09-28 06:34:14,549 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,573 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,686 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,687 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,792 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:15,068 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
> DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
> part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: 
> Unnamed (1/1),5,Flink Task Threads] took 948 ms.
> 2017-09-28 06:34:15,164 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka failed. This does not compromise Flink's 
> checkpoints.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> 2017-09-28 06:34:15,171 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Async 
> Kafka commit failed.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> {code}
> https://travis-ci.org/apache/flink/jobs/280722829
> [~pnowojski] did a first analysis that reveale

[GitHub] flink pull request #4928: [FLINK-7732][kafka-consumer] Do not commit to kafk...

2017-11-01 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4928#discussion_r148221257
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
 ---
@@ -52,4 +52,7 @@
 */
public static final long GROUP_OFFSET = -915623761773L;
 
+   public static boolean isSentinel(long offset) {
+   return offset < 0;
--- End diff --

Kafka doesn't allow to commit any negative values 


---


[GitHub] flink issue #4835: [FLINK-7847][avro] Fix typo in jackson shading pattern

2017-11-01 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4835
  
👍 😂 


---


[GitHub] flink issue #4928: [FLINK-7732][kafka-consumer] Do not commit to kafka Flink...

2017-11-01 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4928
  
@tzulitai please check the details in the ticket: 
https://issues.apache.org/jira/browse/FLINK-7732

I have changed the approach as we discussed and now we filtering out 
happens just before committing offsets.


---


[jira] [Commented] (FLINK-7847) Fix typo in flink-avro shading pattern

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233873#comment-16233873
 ] 

ASF GitHub Bot commented on FLINK-7847:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4835
  
👍 😂 


> Fix typo in flink-avro shading pattern
> --
>
> Key: FLINK-7847
> URL: https://issues.apache.org/jira/browse/FLINK-7847
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.4.0
>
>
> {code}
> 
>   org.codehaus.jackson
>   
> org.apache.flink.avro.shaded.org.codehouse.jackson
> 
> {code}
> The shaded pattern should be 
> "org.apache.flink.avro.shaded.org.codehaus.jackson".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7732) Invalid offset to commit in Kafka

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233872#comment-16233872
 ] 

ASF GitHub Bot commented on FLINK-7732:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4928
  
@tzulitai please check the details in the ticket: 
https://issues.apache.org/jira/browse/FLINK-7732

I have changed the approach as we discussed and now we filtering out 
happens just before committing offsets.


> Invalid offset to commit in Kafka
> -
>
> Key: FLINK-7732
> URL: https://issues.apache.org/jira/browse/FLINK-7732
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> In a test run with unrelated changes in the network stack, the Kafa 
> end-to-end test was failing with an invalid offset:
> {code}
> 2017-09-28 06:34:10,736 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka version : 0.10.2.1
> 2017-09-28 06:34:10,744 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka commitId : e89bffd6b2eff799
> 2017-09-28 06:34:14,549 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,573 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,686 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,687 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,792 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:15,068 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
> DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
> part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: 
> Unnamed (1/1),5,Flink Task Threads] took 948 ms.
> 2017-09-28 06:34:15,164 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka failed. This does not compromise Flink's 
> checkpoints.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> 2017-09-28 06:34:15,171 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Async 
> Kafka commit failed.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> {code}
> https://travis-ci.org/apache/flink/jobs/280722829
> [~pnowojski] did a first analysis that revealed this:
> In 
> org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java:229 
> this is being sent:
> {{long offsetToCommit = lastProcessedOffset + 1;}}
> {{lastProcessedOffset}} comes from:
> {{org.apache.flink.streaming.connectors.kafka.FlinkK

[jira] [Created] (FLINK-7956) Add support for scheduling with slot sharing

2017-11-01 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7956:


 Summary: Add support for scheduling with slot sharing
 Key: FLINK-7956
 URL: https://issues.apache.org/jira/browse/FLINK-7956
 Project: Flink
  Issue Type: Sub-task
  Components: Scheduler
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Major


In order to reach feature equivalence with the old code base, we should add 
support for scheduling with slot sharing to the {{SlotPool}}. This will also 
allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4592: [FLINK-7515][network] allow actual 0-length content in Ne...

2017-11-01 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4592
  
LGTM (besides "Conflicting files")


---


[jira] [Commented] (FLINK-7515) allow actual 0-length content in NettyMessage#allocateBuffer()

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233880#comment-16233880
 ] 

ASF GitHub Bot commented on FLINK-7515:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4592
  
LGTM (besides "Conflicting files")


> allow actual 0-length content in NettyMessage#allocateBuffer()
> --
>
> Key: FLINK-7515
> URL: https://issues.apache.org/jira/browse/FLINK-7515
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
>
> Previously, length {{0}} meant "unknown content length" but there are cases 
> where the actual length is 0 and we do not need a larger buffer. Let's use 
> {{-1}} for tagging the special case instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4927: [FLINK-7778] [build] Shade Curator/ZooKeeper dependency

2017-11-01 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4927
  
Changes look good! @StephanEwen could you please have a look at the 
follow-up changes?


---


[jira] [Commented] (FLINK-7778) Relocate ZooKeeper

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233883#comment-16233883
 ] 

ASF GitHub Bot commented on FLINK-7778:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4927
  
Changes look good! @StephanEwen could you please have a look at the 
follow-up changes?


> Relocate ZooKeeper
> --
>
> Key: FLINK-7778
> URL: https://issues.apache.org/jira/browse/FLINK-7778
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0
>
>
> If possible, then we should also try to relocate {{ZooKeeper}} in order to 
> avoid dependency clashes between Flink's {{ZooKeeper}} and Hadoop's 
> {{ZooKeeper}} dependency.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7073) Create RESTful JobManager endpoint

2017-11-01 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-7073:
-
Description: 
In order to communicate from the {{client}} with a running {{JobManager}} we 
have to provide a RESTful endpoint for job specific operations. These 
operations are:

* Cancel job (PUT): Cancel the given job
* Stop job (PUT): Stops the given job
* Take savepoint (POST): Take savepoint for given job (How to return the 
savepoint under which the savepoint was stored? Maybe always having to specify 
a path)
* Poll/Subscribe to notifications

The REST JobManager endpoint should also serve the information required for the 
web ui.


  was:
In order to communicate from the {{client}} with a running {{JobManager}} we 
have to provide a RESTful endpoint for job specific operations. These 
operations are:

* Cancel job (PUT): Cancel the given job
* Stop job (PUT): Stops the given job
* Take savepoint (POST): Take savepoint for given job (How to return the 
savepoint under which the savepoint was stored? Maybe always having to specify 
a path)
* Poll/Subscribe to notifications



> Create RESTful JobManager endpoint
> --
>
> Key: FLINK-7073
> URL: https://issues.apache.org/jira/browse/FLINK-7073
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Till Rohrmann
>Priority: Major
>  Labels: flip-6
>
> In order to communicate from the {{client}} with a running {{JobManager}} we 
> have to provide a RESTful endpoint for job specific operations. These 
> operations are:
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Poll/Subscribe to notifications
> The REST JobManager endpoint should also serve the information required for 
> the web ui.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4592: [FLINK-7515][network] allow actual 0-length content in Ne...

2017-11-01 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4592
  
of course, the last change added a conflict...rebased now


---


[jira] [Commented] (FLINK-7515) allow actual 0-length content in NettyMessage#allocateBuffer()

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233887#comment-16233887
 ] 

ASF GitHub Bot commented on FLINK-7515:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4592
  
of course, the last change added a conflict...rebased now


> allow actual 0-length content in NettyMessage#allocateBuffer()
> --
>
> Key: FLINK-7515
> URL: https://issues.apache.org/jira/browse/FLINK-7515
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
>
> Previously, length {{0}} meant "unknown content length" but there are cases 
> where the actual length is 0 and we do not need a larger buffer. Let's use 
> {{-1}} for tagging the special case instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r148226096
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
 ---
@@ -31,7 +31,8 @@ import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.{MetricRegistry => 
FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
+import org.apache.flink.runtime.metrics.{MetricRegistryImpl => 
FlinkMetricRegistry}
--- End diff --

Will remove it.


---


[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r148226047
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 ---
@@ -166,4 +169,12 @@ void notifySlotAvailable(
 * @return Future containing the resource overview
 */
CompletableFuture requestResourceOverview(@RpcTimeout 
Time timeout);
+
+   /**
+* Requests the paths for the TaskManager's {@link MetricQueryService} 
to query.
+*
+* @param timeout for the asynchronous operation
+* @return Future containing the collection of instance ids and the 
corresponding metric query service path
--- End diff --

Good catch.


---


[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233894#comment-16233894
 ] 

ASF GitHub Bot commented on FLINK-7876:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r148226047
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 ---
@@ -166,4 +169,12 @@ void notifySlotAvailable(
 * @return Future containing the resource overview
 */
CompletableFuture requestResourceOverview(@RpcTimeout 
Time timeout);
+
+   /**
+* Requests the paths for the TaskManager's {@link MetricQueryService} 
to query.
+*
+* @param timeout for the asynchronous operation
+* @return Future containing the collection of instance ids and the 
corresponding metric query service path
--- End diff --

Good catch.


> Register TaskManagerMetricGroup under ResourceID instead of InstanceID
> --
>
> Key: FLINK-7876
> URL: https://issues.apache.org/jira/browse/FLINK-7876
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under 
> its {{InstanceID}} and thereby binding its metrics effectively to the 
> lifetime of its registration with the {{JobManager}}. This has also 
> implications how the REST handler retrieve the TaskManager metrics, namely by 
> its {{InstanceID}}.
> I would actually propose to register the {{TaskManagerMetricGroup}} under the 
> {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole 
> lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be 
> able to query metrics independent of the connection status to the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233895#comment-16233895
 ] 

ASF GitHub Bot commented on FLINK-7876:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r148226096
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
 ---
@@ -31,7 +31,8 @@ import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.{MetricRegistry => 
FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
+import org.apache.flink.runtime.metrics.{MetricRegistryImpl => 
FlinkMetricRegistry}
--- End diff --

Will remove it.


> Register TaskManagerMetricGroup under ResourceID instead of InstanceID
> --
>
> Key: FLINK-7876
> URL: https://issues.apache.org/jira/browse/FLINK-7876
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under 
> its {{InstanceID}} and thereby binding its metrics effectively to the 
> lifetime of its registration with the {{JobManager}}. This has also 
> implications how the REST handler retrieve the TaskManager metrics, namely by 
> its {{InstanceID}}.
> I would actually propose to register the {{TaskManagerMetricGroup}} under the 
> {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole 
> lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be 
> able to query metrics independent of the connection status to the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4928: [FLINK-7732][kafka-consumer] Do not commit to kafk...

2017-11-01 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4928#discussion_r148226195
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -242,10 +243,25 @@ public void 
addDiscoveredPartitions(List newPartitions) thr
 * @param commitCallback The callback that the user should trigger when 
a commit request completes or fails.
 * @throws Exception This method forwards exceptions.
 */
-   public abstract void commitInternalOffsetsToKafka(
+   public final void commitInternalOffsetsToKafka(
+   Map offsets,
+   @Nonnull KafkaCommitCallback commitCallback) throws 
Exception {
+   // Ignore sentinels. They might appear here if snapshot has 
started before actual offsets values
+   // replaced sentinels
+   doCommitInternalOffsetsToKafka(filerOutSentinels(offsets), 
commitCallback);
+   }
+
+   protected abstract void doCommitInternalOffsetsToKafka(
Map offsets,
@Nonnull KafkaCommitCallback commitCallback) throws 
Exception;
 
+   private Map 
filerOutSentinels(Map offsets) {
--- End diff --

typo: `filterOutSentinels`, missing `t`.


---


[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r148226738
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -163,12 +156,14 @@ public JobLeaderService getJobLeaderService() {
 *
 * @param resourceID resource ID of the task manager
 * @param taskManagerServicesConfiguration task manager configuration
+* @param metricRegistry to register the TaskManagerMetricGroup
 * @return task manager components
 * @throws Exception
 */
public static TaskManagerServices fromConfiguration(
--- End diff --

True, I'll pull the `TaskManagerMetricGroup` instantiation out of the 
`TaskManagerServices#fromConfiguration`.


---


[jira] [Commented] (FLINK-7732) Invalid offset to commit in Kafka

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233896#comment-16233896
 ] 

ASF GitHub Bot commented on FLINK-7732:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4928#discussion_r148226195
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -242,10 +243,25 @@ public void 
addDiscoveredPartitions(List newPartitions) thr
 * @param commitCallback The callback that the user should trigger when 
a commit request completes or fails.
 * @throws Exception This method forwards exceptions.
 */
-   public abstract void commitInternalOffsetsToKafka(
+   public final void commitInternalOffsetsToKafka(
+   Map offsets,
+   @Nonnull KafkaCommitCallback commitCallback) throws 
Exception {
+   // Ignore sentinels. They might appear here if snapshot has 
started before actual offsets values
+   // replaced sentinels
+   doCommitInternalOffsetsToKafka(filerOutSentinels(offsets), 
commitCallback);
+   }
+
+   protected abstract void doCommitInternalOffsetsToKafka(
Map offsets,
@Nonnull KafkaCommitCallback commitCallback) throws 
Exception;
 
+   private Map 
filerOutSentinels(Map offsets) {
--- End diff --

typo: `filterOutSentinels`, missing `t`.


> Invalid offset to commit in Kafka
> -
>
> Key: FLINK-7732
> URL: https://issues.apache.org/jira/browse/FLINK-7732
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> In a test run with unrelated changes in the network stack, the Kafa 
> end-to-end test was failing with an invalid offset:
> {code}
> 2017-09-28 06:34:10,736 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka version : 0.10.2.1
> 2017-09-28 06:34:10,744 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka commitId : e89bffd6b2eff799
> 2017-09-28 06:34:14,549 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,573 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,686 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,687 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,792 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:15,068 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
> DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
> part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: 
> Unnamed (1/1),5,Flink Task Threads] took 948 ms.
> 2017-09-28 06:34:15,164 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka failed. This does not compromise Flink's 
> checkpoints.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> 2017-09-28 06:34:15,171 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Async 
>

[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233897#comment-16233897
 ] 

ASF GitHub Bot commented on FLINK-7876:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r148226738
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -163,12 +156,14 @@ public JobLeaderService getJobLeaderService() {
 *
 * @param resourceID resource ID of the task manager
 * @param taskManagerServicesConfiguration task manager configuration
+* @param metricRegistry to register the TaskManagerMetricGroup
 * @return task manager components
 * @throws Exception
 */
public static TaskManagerServices fromConfiguration(
--- End diff --

True, I'll pull the `TaskManagerMetricGroup` instantiation out of the 
`TaskManagerServices#fromConfiguration`.


> Register TaskManagerMetricGroup under ResourceID instead of InstanceID
> --
>
> Key: FLINK-7876
> URL: https://issues.apache.org/jira/browse/FLINK-7876
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under 
> its {{InstanceID}} and thereby binding its metrics effectively to the 
> lifetime of its registration with the {{JobManager}}. This has also 
> implications how the REST handler retrieve the TaskManager metrics, namely by 
> its {{InstanceID}}.
> I would actually propose to register the {{TaskManagerMetricGroup}} under the 
> {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole 
> lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be 
> able to query metrics independent of the connection status to the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-11-01 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r148226920
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java
 ---
@@ -61,7 +61,7 @@
  * IT cases for the {@link FlinkKafkaProducer011}.
  */
 @SuppressWarnings("serial")
-public class FlinkKafkaProducer011Tests extends KafkaTestBase {
+public class FlinkKafkaProducer011Test extends KafkaTestBase {
--- End diff --

I haven't looked into too many `ITCase`s but coding guidelines require unit 
tests to be subsecond execution speed:
>Please use unit tests to test isolated functionality, such as methods. 
Unit tests should execute in subseconds and should be preferred whenever 
possible. The name of unit test classes have to on *Test. Use integration tests 
to implement long-running tests.

http://flink.apache.org/contribute-code.html#coding-guidelines

Then again, I don't know how consistent the tests are.


---


[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r148226893
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
 ---
@@ -76,25 +74,22 @@ public void testUpdate() throws Exception {
JobID jobID = new JobID();
InstanceID tmID = new InstanceID();
--- End diff --

Good catch. Will change it.


---


[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r148226952
  
--- Diff: 
flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 ---
@@ -42,7 +42,8 @@ import org.apache.flink.runtime.jobmanager.{JobManager, 
MemoryArchivist, Submitt
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.messages.JobManagerMessages
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.MetricRegistryImpl
--- End diff --

Will remove it.


---


[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233899#comment-16233899
 ] 

ASF GitHub Bot commented on FLINK-7838:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r148226920
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java
 ---
@@ -61,7 +61,7 @@
  * IT cases for the {@link FlinkKafkaProducer011}.
  */
 @SuppressWarnings("serial")
-public class FlinkKafkaProducer011Tests extends KafkaTestBase {
+public class FlinkKafkaProducer011Test extends KafkaTestBase {
--- End diff --

I haven't looked into too many `ITCase`s but coding guidelines require unit 
tests to be subsecond execution speed:
>Please use unit tests to test isolated functionality, such as methods. 
Unit tests should execute in subseconds and should be preferred whenever 
possible. The name of unit test classes have to on *Test. Use integration tests 
to implement long-running tests.

http://flink.apache.org/contribute-code.html#coding-guidelines

Then again, I don't know how consistent the tests are.


> Kafka011ProducerExactlyOnceITCase do not finish
> ---
>
> Key: FLINK-7838
> URL: https://issues.apache.org/jira/browse/FLINK-7838
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: initTransactions_deadlock.txt, log.txt
>
>
> See attached log



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233898#comment-16233898
 ] 

ASF GitHub Bot commented on FLINK-7876:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r148226893
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
 ---
@@ -76,25 +74,22 @@ public void testUpdate() throws Exception {
JobID jobID = new JobID();
InstanceID tmID = new InstanceID();
--- End diff --

Good catch. Will change it.


> Register TaskManagerMetricGroup under ResourceID instead of InstanceID
> --
>
> Key: FLINK-7876
> URL: https://issues.apache.org/jira/browse/FLINK-7876
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under 
> its {{InstanceID}} and thereby binding its metrics effectively to the 
> lifetime of its registration with the {{JobManager}}. This has also 
> implications how the REST handler retrieve the TaskManager metrics, namely by 
> its {{InstanceID}}.
> I would actually propose to register the {{TaskManagerMetricGroup}} under the 
> {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole 
> lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be 
> able to query metrics independent of the connection status to the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233900#comment-16233900
 ] 

ASF GitHub Bot commented on FLINK-7876:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r148226952
  
--- Diff: 
flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 ---
@@ -42,7 +42,8 @@ import org.apache.flink.runtime.jobmanager.{JobManager, 
MemoryArchivist, Submitt
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.messages.JobManagerMessages
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.MetricRegistryImpl
--- End diff --

Will remove it.


> Register TaskManagerMetricGroup under ResourceID instead of InstanceID
> --
>
> Key: FLINK-7876
> URL: https://issues.apache.org/jira/browse/FLINK-7876
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under 
> its {{InstanceID}} and thereby binding its metrics effectively to the 
> lifetime of its registration with the {{JobManager}}. This has also 
> implications how the REST handler retrieve the TaskManager metrics, namely by 
> its {{InstanceID}}.
> I would actually propose to register the {{TaskManagerMetricGroup}} under the 
> {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole 
> lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be 
> able to query metrics independent of the connection status to the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r148227159
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 ---
@@ -80,7 +80,6 @@
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-
--- End diff --

will do.


---


[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233904#comment-16233904
 ] 

ASF GitHub Bot commented on FLINK-7876:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r148227334
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
 ---
@@ -28,15 +28,15 @@
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import java.util.UUID;
 
 public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {

-   private static final MetricRegistry EMPTY_REGISTRY = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+   private static final MetricRegistryImpl EMPTY_REGISTRY = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
--- End diff --

good catch. Will change it.


> Register TaskManagerMetricGroup under ResourceID instead of InstanceID
> --
>
> Key: FLINK-7876
> URL: https://issues.apache.org/jira/browse/FLINK-7876
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under 
> its {{InstanceID}} and thereby binding its metrics effectively to the 
> lifetime of its registration with the {{JobManager}}. This has also 
> implications how the REST handler retrieve the TaskManager metrics, namely by 
> its {{InstanceID}}.
> I would actually propose to register the {{TaskManagerMetricGroup}} under the 
> {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole 
> lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be 
> able to query metrics independent of the connection status to the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r148227334
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
 ---
@@ -28,15 +28,15 @@
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import java.util.UUID;
 
 public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {

-   private static final MetricRegistry EMPTY_REGISTRY = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+   private static final MetricRegistryImpl EMPTY_REGISTRY = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
--- End diff --

good catch. Will change it.


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-11-01 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r148227364
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
@@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
 
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
+   enqueueNewPartitions().await();
+   }
+
+   private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
-   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-   invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
-   TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
-   Object sender = getValue(kafkaProducer, "sender");
-   invoke(sender, "wakeup");
-   result.await();
+   synchronized (transactionManager) {
+   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
--- End diff --

Did you add the `!newPartitionsInTransaction.isEmpty()` check in the end? I 
couldn't find it on first glance. 

Regarding tests, if you add the check, would your current test fail? If 
not, I think the behaviour isn't properly tested.


---


[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233901#comment-16233901
 ] 

ASF GitHub Bot commented on FLINK-7876:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r148227159
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 ---
@@ -80,7 +80,6 @@
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-
--- End diff --

will do.


> Register TaskManagerMetricGroup under ResourceID instead of InstanceID
> --
>
> Key: FLINK-7876
> URL: https://issues.apache.org/jira/browse/FLINK-7876
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under 
> its {{InstanceID}} and thereby binding its metrics effectively to the 
> lifetime of its registration with the {{JobManager}}. This has also 
> implications how the REST handler retrieve the TaskManager metrics, namely by 
> its {{InstanceID}}.
> I would actually propose to register the {{TaskManagerMetricGroup}} under the 
> {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole 
> lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be 
> able to query metrics independent of the connection status to the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233905#comment-16233905
 ] 

ASF GitHub Bot commented on FLINK-7838:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r148227364
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
@@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
 
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
+   enqueueNewPartitions().await();
+   }
+
+   private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
-   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-   invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
-   TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
-   Object sender = getValue(kafkaProducer, "sender");
-   invoke(sender, "wakeup");
-   result.await();
+   synchronized (transactionManager) {
+   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
--- End diff --

Did you add the `!newPartitionsInTransaction.isEmpty()` check in the end? I 
couldn't find it on first glance. 

Regarding tests, if you add the check, would your current test fail? If 
not, I think the behaviour isn't properly tested.


> Kafka011ProducerExactlyOnceITCase do not finish
> ---
>
> Key: FLINK-7838
> URL: https://issues.apache.org/jira/browse/FLINK-7838
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: initTransactions_deadlock.txt, log.txt
>
>
> See attached log



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4852: [FLINK-7863] Generalize MetricFetcher to work with a Rest...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4852
  
Merging this PR once Travis gives green light.


---


[jira] [Commented] (FLINK-7863) Make MetricFetcher work with RestfulGateway

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233920#comment-16233920
 ] 

ASF GitHub Bot commented on FLINK-7863:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4852
  
Merging this PR once Travis gives green light.


> Make MetricFetcher work with RestfulGateway
> ---
>
> Key: FLINK-7863
> URL: https://issues.apache.org/jira/browse/FLINK-7863
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
>
> In order to make the {{MetricFetcher}} work together with the new 
> architecture, we have to remove it's dependence on the {{JobManagerGateway}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4853: [FLINK-7867] Start MetricQueryService in TaskManagerRunne...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4853
  
Merging this PR once Travis gives green light.


---


[jira] [Commented] (FLINK-7867) Start MetricQueryService on TaskManagerRunner

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233921#comment-16233921
 ] 

ASF GitHub Bot commented on FLINK-7867:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4853
  
Merging this PR once Travis gives green light.


> Start MetricQueryService on TaskManagerRunner
> -
>
> Key: FLINK-7867
> URL: https://issues.apache.org/jira/browse/FLINK-7867
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
>
> The {{TaskManagerRunner}} should start a {{MetricQueryService}} such that the 
> web ui can query the {{TaskExecutor}} metrics.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink

2017-11-01 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3748
  
Thanks for the update @PangZhi. 
Btw. it is recommended to post a short comment when you update a PR. 
Pushing an update does not trigger a notification.

The PR looks good. Will merge this.


---


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233926#comment-16233926
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3748
  
Thanks for the update @PangZhi. 
Btw. it is recommended to post a short comment when you update a PR. 
Pushing an update does not trigger a notification.

The PR looks good. Will merge this.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
>Priority: Major
> Fix For: 1.4.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7939) DataStream of atomic type cannot be converted to Table with time attributes

2017-11-01 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-7939.

Resolution: Fixed

Fixed for 1.3.3 with 168378d98ddf591f780a939ee74310ec8d04d517
Fixed for 1.4.0 with 505d478d55c93e07a7227e375939eca19ec4d082

> DataStream of atomic type cannot be converted to Table with time attributes
> ---
>
> Key: FLINK-7939
> URL: https://issues.apache.org/jira/browse/FLINK-7939
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.3
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
> Fix For: 1.4.0, 1.3.3
>
>
> A DataStream of an atomic type, such as {{DataStream}} or 
> {{DataStream}} cannot be converted into a {{Table}} with a time 
> attribute.
> {code}
> DataStream stream = ...
> Table table = tEnv.fromDataStream(stream, "string, rowtime.rowtime")
> {code}
> yields
> {code}
> Exception in thread "main" org.apache.flink.table.api.TableException: Field 
> reference expression requested.
> at 
> org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:630)
> at 
> org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
> at 
> org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
> at 
> org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
> at 
> org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
> {code}
> As a workaround the atomic type can be wrapped in {{Tuple1}}, i.e., convert a 
> {{DataStream}} into a {{DataStream>}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4794: [build][minor] Add missing licenses

2017-11-01 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4794
  
The `masters` and `slaves` file probably does not need a license header 
(although the rules of when you need one and when not are not very clear to me).

I think config files frequently have a license header, so I would take the 
`zoo.cfg` change.

@yew1eb Are you okay if we merge this without the changes to `masters` and 
`slaves`?


---


[GitHub] flink issue #4872: [FLINK-7876] Register TaskManagerMetricGroup under Resour...

2017-11-01 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4872
  
Thanks for the review @zentol. I've addressed your comments and rebased 
onto the latest master. If Travis gives green light, then I'll merge this PR.


---


[GitHub] flink issue #4835: [FLINK-7847][avro] Fix typo in jackson shading pattern

2017-11-01 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4835
  
@aljoscha do you want to include this in your avro PR?


---


[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233949#comment-16233949
 ] 

ASF GitHub Bot commented on FLINK-7876:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4872
  
Thanks for the review @zentol. I've addressed your comments and rebased 
onto the latest master. If Travis gives green light, then I'll merge this PR.


> Register TaskManagerMetricGroup under ResourceID instead of InstanceID
> --
>
> Key: FLINK-7876
> URL: https://issues.apache.org/jira/browse/FLINK-7876
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under 
> its {{InstanceID}} and thereby binding its metrics effectively to the 
> lifetime of its registration with the {{JobManager}}. This has also 
> implications how the REST handler retrieve the TaskManager metrics, namely by 
> its {{InstanceID}}.
> I would actually propose to register the {{TaskManagerMetricGroup}} under the 
> {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole 
> lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be 
> able to query metrics independent of the connection status to the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7847) Fix typo in flink-avro shading pattern

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233950#comment-16233950
 ] 

ASF GitHub Bot commented on FLINK-7847:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4835
  
@aljoscha do you want to include this in your avro PR?


> Fix typo in flink-avro shading pattern
> --
>
> Key: FLINK-7847
> URL: https://issues.apache.org/jira/browse/FLINK-7847
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.4.0
>
>
> {code}
> 
>   org.codehaus.jackson
>   
> org.apache.flink.avro.shaded.org.codehouse.jackson
> 
> {code}
> The shaded pattern should be 
> "org.apache.flink.avro.shaded.org.codehaus.jackson".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4920: [FLINK-7944] Allow configuring Hadoop classpath

2017-11-01 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4920
  
Can you describe that this does and how it works?
Only getting a rough idea from the shell scripts...


---


[jira] [Commented] (FLINK-7944) Allow configuring Hadoop Classpath

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233956#comment-16233956
 ] 

ASF GitHub Bot commented on FLINK-7944:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4920
  
Can you describe that this does and how it works?
Only getting a rough idea from the shell scripts...


> Allow configuring Hadoop Classpath
> --
>
> Key: FLINK-7944
> URL: https://issues.apache.org/jira/browse/FLINK-7944
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.4.0
>
>
> Currently, we have some paths hardcoded in {{config.sh}} in addition to some 
> magic that tries to derive a classpath from the {{hadoop}}/{{hbase}} 
> commands. We should make the classpath configurable using a separate script 
> and put the classpath in a {{classpath}} file that will be picked up by the 
> scripts.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7881) flink can't deployed on yarn with ha

2017-11-01 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233955#comment-16233955
 ] 

Aljoscha Krettek commented on FLINK-7881:
-

[~trohrm...@apache.org] FLINK-7951 is a duplicate of this one, right? And you 
already have the fix so we can probably close this one.

> flink can't deployed on yarn with ha
> 
>
> Key: FLINK-7881
> URL: https://issues.apache.org/jira/browse/FLINK-7881
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.2
>Reporter: deng
>Priority: Blocker
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It 
> always connect to hdfs://master:8020, it should be 9000, my hdfs defaultfs is 
> hdfs://master.
> I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work.
> Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10
> 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1035144464) connection to 
> startdt/173.16.5.215:8020 from admin: closed
> 2017-10-20 11:00:05,398 ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner - YARN 
> Application Master initialization failed
> java.net.ConnectException: Call From spark3/173.16.5.216 to master:8020 
> failed on connection exception: java.net.ConnectException: Connection 
> refused; For more details see:  
> http://wiki.apache.org/hadoop/ConnectionRefused
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>   at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
>   at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>   at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
>   at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7881) flink can't deployed on yarn with ha

2017-11-01 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233958#comment-16233958
 ] 

Aljoscha Krettek commented on FLINK-7881:
-

And thanks to [~djh4230] for also finding the fix in parallel. 👍

> flink can't deployed on yarn with ha
> 
>
> Key: FLINK-7881
> URL: https://issues.apache.org/jira/browse/FLINK-7881
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.2
>Reporter: deng
>Priority: Blocker
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It 
> always connect to hdfs://master:8020, it should be 9000, my hdfs defaultfs is 
> hdfs://master.
> I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work.
> Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10
> 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1035144464) connection to 
> startdt/173.16.5.215:8020 from admin: closed
> 2017-10-20 11:00:05,398 ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner - YARN 
> Application Master initialization failed
> java.net.ConnectException: Call From spark3/173.16.5.216 to master:8020 
> failed on connection exception: java.net.ConnectException: Connection 
> refused; For more details see:  
> http://wiki.apache.org/hadoop/ConnectionRefused
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>   at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
>   at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>   at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
>   at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4921: [FLINK-7943] Make ParameterTool thread safe

2017-11-01 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4921
  
Travis failure:
```
Tests run: 17, Failures: 0, Errors: 6, Skipped: 0, Time elapsed: 0.029 sec 
<<< FAILURE! - in org.apache.flink.api.java.utils.RequiredParametersTest

testApplyToWithOptionWithLongAndShortNameAndDefaultValue(org.apache.flink.api.java.utils.RequiredParametersTest)
  Time elapsed: 0.012 sec  <<< ERROR!
java.lang.UnsupportedOperationException: null
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at 
org.apache.flink.api.java.utils.RequiredParameters.hasNoDefaultValueAndNoValuePassedOnAlternativeName(RequiredParameters.java:147)
at 
org.apache.flink.api.java.utils.RequiredParameters.checkAndApplyDefaultValue(RequiredParameters.java:116)
at 
org.apache.flink.api.java.utils.RequiredParameters.applyTo(RequiredParameters.java:94)
at 
org.apache.flink.api.java.utils.RequiredParametersTest.testApplyToWithOptionWithLongAndShortNameAndDefaultValue(RequiredParametersTest.java:195)


testApplyToWithMultipleTypes(org.apache.flink.api.java.utils.RequiredParametersTest)
  Time elapsed: 0.002 sec  <<< ERROR!
java.lang.UnsupportedOperationException: null
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at 
org.apache.flink.api.java.utils.RequiredParameters.hasNoDefaultValueAndNoValuePassedOnAlternativeName(RequiredParameters.java:147)
at 
org.apache.flink.api.java.utils.RequiredParameters.applyTo(RequiredParameters.java:103)
at 
org.apache.flink.api.java.utils.RequiredParametersTest.testApplyToWithMultipleTypes(RequiredParametersTest.java:228)


testApplyToWithOptionMultipleOptionsAndOneDefaultValue(org.apache.flink.api.java.utils.RequiredParametersTest)
  Time elapsed: 0.001 sec  <<< ERROR!
java.lang.UnsupportedOperationException: null
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at 
org.apache.flink.api.java.utils.RequiredParameters.hasNoDefaultValueAndNoValuePassedOnAlternativeName(RequiredParameters.java:147)
at 
org.apache.flink.api.java.utils.RequiredParameters.applyTo(RequiredParameters.java:103)
at 
org.apache.flink.api.java.utils.RequiredParametersTest.testApplyToWithOptionMultipleOptionsAndOneDefaultValue(RequiredParametersTest.java:210)


testApplyToWithOptionAndDefaultValue(org.apache.flink.api.java.utils.RequiredParametersTest)
  Time elapsed: 0 sec  <<< ERROR!
java.lang.UnsupportedOperationException: null
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at 
org.apache.flink.api.java.utils.RequiredParameters.hasNoDefaultValueAndNoValuePassedOnAlternativeName(RequiredParameters.java:147)
at 
org.apache.flink.api.java.utils.RequiredParameters.checkAndApplyDefaultValue(RequiredParameters.java:116)
at 
org.apache.flink.api.java.utils.RequiredParameters.applyTo(RequiredParameters.java:94)
at 
org.apache.flink.api.java.utils.RequiredParametersTest.testApplyToWithOptionAndDefaultValue(RequiredParametersTest.java:182)


testApplyToMovesValuePassedOnShortNameToLongNameIfLongNameIsUndefined(org.apache.flink.api.java.utils.RequiredParametersTest)
  Time elapsed: 0.001 sec  <<< ERROR!
java.lang.UnsupportedOperationException: null
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at 
org.apache.flink.api.java.utils.RequiredParameters.hasNoDefaultValueAndNoValuePassedOnAlternativeName(RequiredParameters.java:144)
at 
org.apache.flink.api.java.utils.RequiredParameters.applyTo(RequiredParameters.java:103)
at 
org.apache.flink.api.java.utils.RequiredParametersTest.testApplyToMovesValuePassedOnShortNameToLongNameIfLongNameIsUndefined(RequiredParametersTest.java:127)


testDefaultValueDoesNotOverrideValuePassedOnShortKeyIfLongKeyIsNotPassedButPresent(org.apache.flink.api.java.utils.RequiredParametersTest)
  Time elapsed: 0.001 sec  <<< ERROR!
java.lang.UnsupportedOperationException: null
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at 
org.apache.flink.api.java.utils.RequiredParameters.hasNoDefaultValueAndNoValuePassedOnAlternativeName(RequiredParameters.java:144)
at 
org.apache.flink.api.java.utils.RequiredParameters.checkAndApplyDefaultValue(RequiredParameters.java:116)
at 
org.apache.flink.api.java.utils.RequiredParameters.applyTo(RequiredParameters.java:94)
at 
org.apache.flink.api.java.utils.RequiredParametersTest.testDefaultValueDoesNotOverrideValuePassedOnShortKeyIfLongKeyIsNotPassedButPresent(RequiredParametersTest.java:142)
```


---


  1   2   3   4   >