[jira] [Resolved] (FLINK-6563) Expose time indicator attributes in the KafkaTableSource
[ https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske resolved FLINK-6563. -- Resolution: Implemented Assignee: Fabian Hueske (was: Haohui Mai) Implemented for 1.4.0 with 0e92b6632f35b69c62d7747f1cbaa3ee207fb235 > Expose time indicator attributes in the KafkaTableSource > > > Key: FLINK-6563 > URL: https://issues.apache.org/jira/browse/FLINK-6563 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.4.0 > > > This is a follow up for FLINK-5884. > After FLINK-5884 requires the {{TableSource}} interfaces to expose the > processing time and the event time for the data stream. This jira proposes to > expose these two information in the Kafka table source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4931: [FLINK-7420] Move all Avro code to flink-avro
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4931 [FLINK-7420] Move all Avro code to flink-avro This hides interaction with Avro begin the interface `AvroUtils` that has two implementations: 1) default Avro utils, which are used when no Avro is present and which throws exceptions in case Avro is required. 2) proper Avro utils which are dynamically loaded when the `flink-avro` module is "in the classpath" which does proper Avro things. R: @StephanEwen You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink FLINK-7420 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4931.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4931 commit b74f84ee3d5795e30c92a2e7393fc25309c450e3 Author: twalthr Date: 2017-08-16T10:17:00Z [FLINK-7420] [core] Move all Avro code to flink-avro commit 3215a4b939f536b4a1130cad9e5106867c071789 Author: Aljoscha Krettek Date: 2017-10-25T15:38:24Z [FLINK-7420] Replace GenericData.Array by dummy when reading TypeSerializers This also adds a new test that verifies that we correctly register Avro Serializers when they are present and modifies an existing test to verify that we correctly register dummy classes. commit 94f19afcb65b7ef0d200bb3c4d0d82b8422ba905 Author: Aljoscha Krettek Date: 2017-10-26T12:56:09Z [FLINK-7420] Add Avro test-jar depdendency in Kafka modules commit 1d73f296667d909ce0506ceeb722112fed978af3 Author: Aljoscha Krettek Date: 2017-10-30T09:19:56Z [FLINK-7420] Fix TwitterExample.scala It seems this has a transitive dependency on Jackson, which slightly changed with the Avro reworking. commit a7289e06641b3303611bf5a1a8a2bf4ef56ac994 Author: Aljoscha Krettek Date: 2017-10-30T14:02:18Z [FLINK-7420] Abstract all Avro interaction behind AvroUtils Before, we would try and dynamicall load Avro-related classes in several places. Now, we only reflectively instantiate the right AvroUtils and all other operations are methods on this. The default AvroUtils throw exceptions with a helpful message for most operations. ---
[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro
[ https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233758#comment-16233758 ] ASF GitHub Bot commented on FLINK-7420: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4931 [FLINK-7420] Move all Avro code to flink-avro This hides interaction with Avro begin the interface `AvroUtils` that has two implementations: 1) default Avro utils, which are used when no Avro is present and which throws exceptions in case Avro is required. 2) proper Avro utils which are dynamically loaded when the `flink-avro` module is "in the classpath" which does proper Avro things. R: @StephanEwen You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink FLINK-7420 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4931.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4931 commit b74f84ee3d5795e30c92a2e7393fc25309c450e3 Author: twalthr Date: 2017-08-16T10:17:00Z [FLINK-7420] [core] Move all Avro code to flink-avro commit 3215a4b939f536b4a1130cad9e5106867c071789 Author: Aljoscha Krettek Date: 2017-10-25T15:38:24Z [FLINK-7420] Replace GenericData.Array by dummy when reading TypeSerializers This also adds a new test that verifies that we correctly register Avro Serializers when they are present and modifies an existing test to verify that we correctly register dummy classes. commit 94f19afcb65b7ef0d200bb3c4d0d82b8422ba905 Author: Aljoscha Krettek Date: 2017-10-26T12:56:09Z [FLINK-7420] Add Avro test-jar depdendency in Kafka modules commit 1d73f296667d909ce0506ceeb722112fed978af3 Author: Aljoscha Krettek Date: 2017-10-30T09:19:56Z [FLINK-7420] Fix TwitterExample.scala It seems this has a transitive dependency on Jackson, which slightly changed with the Avro reworking. commit a7289e06641b3303611bf5a1a8a2bf4ef56ac994 Author: Aljoscha Krettek Date: 2017-10-30T14:02:18Z [FLINK-7420] Abstract all Avro interaction behind AvroUtils Before, we would try and dynamicall load Avro-related classes in several places. Now, we only reflectively instantiate the right AvroUtils and all other operations are methods on this. The default AvroUtils throw exceptions with a helpful message for most operations. > Move all Avro code to flink-avro > > > Key: FLINK-7420 > URL: https://issues.apache.org/jira/browse/FLINK-7420 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > *Problem* > Currently, the {{flink-avro}} project is a shell with some tests and mostly > duplicate and dead code. The classes that use Avro are distributed quite > wildly through the code base, and introduce multiple direct dependencies on > Avro in a messy way. > That way, we cannot create a proper fat Avro dependency in which we shade > Jackson away. > Also, we expose Avro as a direct and hard dependency on many Flink modules, > while it should be a dependency that users that use Avro types selectively > add. > *Suggested Changes* > We should move all Avro related classes to {{flink-avro}}, and give > {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}. > - {{AvroTypeInfo}} > - {{AvroSerializer}} > - {{AvroRowSerializationSchema}} > - {{AvroRowDeserializationSchema}} > To be able to move the the avro serialization code from {{flink-ore}} to > {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, > similar to how we load the {{WritableTypeInfo}} for Hadoop. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4734 Hi @tillrohrmann, just to double check, so does the conclusion mean that #4805 subsumes this PR, and this one can be closed? ---
[jira] [Commented] (FLINK-7652) Port CurrentJobIdsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233767#comment-16233767 ] ASF GitHub Bot commented on FLINK-7652: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4734 Hi @tillrohrmann, just to double check, so does the conclusion mean that #4805 subsumes this PR, and this one can be closed? > Port CurrentJobIdsHandler to new REST endpoint > -- > > Key: FLINK-7652 > URL: https://issues.apache.org/jira/browse/FLINK-7652 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CurrentJobIdsHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4592: [FLINK-7515][network] allow actual 0-length conten...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4592#discussion_r148203203 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -64,12 +65,53 @@ // + /** +* Allocates a new (header and contents) buffer and adds some header information for the frame +* decoder. +* +* Before sending the buffer, you must write the actual length after adding the contents as +* an integer to position 0! +* +* @param allocator +* byte buffer allocator to use +* @param id +* {@link NettyMessage} subclass ID +* +* @return a newly allocated direct buffer with header data written for {@link +* NettyMessageDecoder} +*/ private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) { - return allocateBuffer(allocator, id, 0); + return allocateBuffer(allocator, id, -1); } + /** +* Allocates a new (header and contents) buffer and adds some header information for the frame +* decoder. +* +* If the length is unknown, you must write the actual length after adding the +* contents as an integer to position 0! +* +* @param allocator +* byte buffer allocator to use +* @param id +* {@link NettyMessage} subclass ID +* @param length +* content length (or -1 if unknown) +* +* @return a newly allocated direct buffer with header data written for {@link +* NettyMessageDecoder} +*/ private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length) { - final ByteBuf buffer = length != 0 ? allocator.directBuffer(HEADER_LENGTH + length) : allocator.directBuffer(); + Preconditions.checkArgument(length <= Integer.MAX_VALUE - HEADER_LENGTH); --- End diff -- import static? ---
[jira] [Commented] (FLINK-7515) allow actual 0-length content in NettyMessage#allocateBuffer()
[ https://issues.apache.org/jira/browse/FLINK-7515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233771#comment-16233771 ] ASF GitHub Bot commented on FLINK-7515: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4592#discussion_r148203203 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -64,12 +65,53 @@ // + /** +* Allocates a new (header and contents) buffer and adds some header information for the frame +* decoder. +* +* Before sending the buffer, you must write the actual length after adding the contents as +* an integer to position 0! +* +* @param allocator +* byte buffer allocator to use +* @param id +* {@link NettyMessage} subclass ID +* +* @return a newly allocated direct buffer with header data written for {@link +* NettyMessageDecoder} +*/ private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) { - return allocateBuffer(allocator, id, 0); + return allocateBuffer(allocator, id, -1); } + /** +* Allocates a new (header and contents) buffer and adds some header information for the frame +* decoder. +* +* If the length is unknown, you must write the actual length after adding the +* contents as an integer to position 0! +* +* @param allocator +* byte buffer allocator to use +* @param id +* {@link NettyMessage} subclass ID +* @param length +* content length (or -1 if unknown) +* +* @return a newly allocated direct buffer with header data written for {@link +* NettyMessageDecoder} +*/ private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length) { - final ByteBuf buffer = length != 0 ? allocator.directBuffer(HEADER_LENGTH + length) : allocator.directBuffer(); + Preconditions.checkArgument(length <= Integer.MAX_VALUE - HEADER_LENGTH); --- End diff -- import static? > allow actual 0-length content in NettyMessage#allocateBuffer() > -- > > Key: FLINK-7515 > URL: https://issues.apache.org/jira/browse/FLINK-7515 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Minor > > Previously, length {{0}} meant "unknown content length" but there are cases > where the actual length is 0 and we do not need a larger buffer. Let's use > {{-1}} for tagging the special case instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4593: [FLINK-7516][memory] do not allow copies into a re...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4593#discussion_r148204666 --- Diff: flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java --- @@ -306,6 +307,9 @@ public final void get(int offset, ByteBuffer target, int numBytes) { if ((offset | numBytes | (offset + numBytes)) < 0) { throw new IndexOutOfBoundsException(); } + if (target.isReadOnly()) { --- End diff -- Isn't this check redundant? Shouldn't the `ByteBuffer`s validate it on their own like `DirectByteBufferR` do? Putting this check here, it complicates the code and we have to pay for it on each call, even on happy path (cost should be very tiny). Maybe it should be put only in `if (target.isDirect())` branch to check `read-only` only before unsafe copy? ---
[jira] [Commented] (FLINK-7732) Invalid offset to commit in Kafka
[ https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233777#comment-16233777 ] ASF GitHub Bot commented on FLINK-7732: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4928#discussion_r148204398 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java --- @@ -95,6 +95,10 @@ public final boolean isOffsetDefined() { return offset != KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET; } + public final boolean isSentinel() { --- End diff -- nit: would `hasSentinelOffset` be a better name here? > Invalid offset to commit in Kafka > - > > Key: FLINK-7732 > URL: https://issues.apache.org/jira/browse/FLINK-7732 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > In a test run with unrelated changes in the network stack, the Kafa > end-to-end test was failing with an invalid offset: > {code} > 2017-09-28 06:34:10,736 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka version : 0.10.2.1 > 2017-09-28 06:34:10,744 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka commitId : e89bffd6b2eff799 > 2017-09-28 06:34:14,549 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,573 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,686 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,687 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,792 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:15,068 INFO > org.apache.flink.runtime.state.DefaultOperatorStateBackend- > DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous > part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: > Unnamed (1/1),5,Flink Task Threads] took 948 ms. > 2017-09-28 06:34:15,164 WARN > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - > Committing offsets to Kafka failed. This does not compromise Flink's > checkpoints. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > 2017-09-28 06:34:15,171 ERROR > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async > Kafka commit failed. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > {code} > https://travis-ci.org/apache/flink/jobs/280722829 > [~pnowojski] did a first analysis that r
[GitHub] flink pull request #4928: [FLINK-7732][kafka-consumer] Do not commit to kafk...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4928#discussion_r148204574 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java --- @@ -52,4 +52,7 @@ */ public static final long GROUP_OFFSET = -915623761773L; + public static boolean isSentinel(long offset) { + return offset < 0; --- End diff -- nit: this implementation could be a bit too broad. Could be a bit more specific by matching the static values in `KafkaTopicPartitionStateSentinel`. ---
[jira] [Commented] (FLINK-7732) Invalid offset to commit in Kafka
[ https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233778#comment-16233778 ] ASF GitHub Bot commented on FLINK-7732: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4928#discussion_r148204574 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java --- @@ -52,4 +52,7 @@ */ public static final long GROUP_OFFSET = -915623761773L; + public static boolean isSentinel(long offset) { + return offset < 0; --- End diff -- nit: this implementation could be a bit too broad. Could be a bit more specific by matching the static values in `KafkaTopicPartitionStateSentinel`. > Invalid offset to commit in Kafka > - > > Key: FLINK-7732 > URL: https://issues.apache.org/jira/browse/FLINK-7732 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > In a test run with unrelated changes in the network stack, the Kafa > end-to-end test was failing with an invalid offset: > {code} > 2017-09-28 06:34:10,736 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka version : 0.10.2.1 > 2017-09-28 06:34:10,744 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka commitId : e89bffd6b2eff799 > 2017-09-28 06:34:14,549 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,573 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,686 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,687 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,792 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:15,068 INFO > org.apache.flink.runtime.state.DefaultOperatorStateBackend- > DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous > part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: > Unnamed (1/1),5,Flink Task Threads] took 948 ms. > 2017-09-28 06:34:15,164 WARN > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - > Committing offsets to Kafka failed. This does not compromise Flink's > checkpoints. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > 2017-09-28 06:34:15,171 ERROR > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async > Kafka commit failed. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > {code} >
[GitHub] flink pull request #4928: [FLINK-7732][kafka-consumer] Do not commit to kafk...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4928#discussion_r148204398 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java --- @@ -95,6 +95,10 @@ public final boolean isOffsetDefined() { return offset != KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET; } + public final boolean isSentinel() { --- End diff -- nit: would `hasSentinelOffset` be a better name here? ---
[jira] [Commented] (FLINK-7516) HybridMemorySegment: do not allow copies into a read-only ByteBuffer
[ https://issues.apache.org/jira/browse/FLINK-7516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233776#comment-16233776 ] ASF GitHub Bot commented on FLINK-7516: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4593#discussion_r148204666 --- Diff: flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java --- @@ -306,6 +307,9 @@ public final void get(int offset, ByteBuffer target, int numBytes) { if ((offset | numBytes | (offset + numBytes)) < 0) { throw new IndexOutOfBoundsException(); } + if (target.isReadOnly()) { --- End diff -- Isn't this check redundant? Shouldn't the `ByteBuffer`s validate it on their own like `DirectByteBufferR` do? Putting this check here, it complicates the code and we have to pay for it on each call, even on happy path (cost should be very tiny). Maybe it should be put only in `if (target.isDirect())` branch to check `read-only` only before unsafe copy? > HybridMemorySegment: do not allow copies into a read-only ByteBuffer > > > Key: FLINK-7516 > URL: https://issues.apache.org/jira/browse/FLINK-7516 > Project: Flink > Issue Type: Sub-task > Components: Core, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > > {{HybridMemorySegment#get(int, ByteBuffer, int)}} allows writing into a > read-only {{ByteBuffer}} but this operation should be forbidden. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4928: [FLINK-7732][kafka-consumer] Do not commit to kafka Flink...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4928 By the way, what exactly was the error that caused the application crash in the described case? ---
[jira] [Commented] (FLINK-7732) Invalid offset to commit in Kafka
[ https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233780#comment-16233780 ] ASF GitHub Bot commented on FLINK-7732: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4928 By the way, what exactly was the error that caused the application crash in the described case? > Invalid offset to commit in Kafka > - > > Key: FLINK-7732 > URL: https://issues.apache.org/jira/browse/FLINK-7732 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > In a test run with unrelated changes in the network stack, the Kafa > end-to-end test was failing with an invalid offset: > {code} > 2017-09-28 06:34:10,736 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka version : 0.10.2.1 > 2017-09-28 06:34:10,744 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka commitId : e89bffd6b2eff799 > 2017-09-28 06:34:14,549 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,573 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,686 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,687 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,792 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:15,068 INFO > org.apache.flink.runtime.state.DefaultOperatorStateBackend- > DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous > part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: > Unnamed (1/1),5,Flink Task Threads] took 948 ms. > 2017-09-28 06:34:15,164 WARN > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - > Committing offsets to Kafka failed. This does not compromise Flink's > checkpoints. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > 2017-09-28 06:34:15,171 ERROR > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async > Kafka commit failed. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > {code} > https://travis-ci.org/apache/flink/jobs/280722829 > [~pnowojski] did a first analysis that revealed this: > In > org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java:229 > this is being sent: > {{long offsetToCommit = lastProcessedOffset + 1;}} > {{lastProcessedOffset}} comes from: > {{org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#snapshotState}} > either lines 741 or 749 > The value that we see is strangely similiar to > {{org.apach
[GitHub] flink issue #4926: [FLINK-7951] Load YarnConfiguration with default Hadoop c...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4926 A user reported that we maybe have to explicitly load the `yarn-site.xml` in the `HadoopUtils.getHadoopConfiguration` method as we do it for the `core-site.xml` and `hdfs-site.xml`. Will have to verify this. ---
[jira] [Commented] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration
[ https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233785#comment-16233785 ] ASF GitHub Bot commented on FLINK-7951: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4926 A user reported that we maybe have to explicitly load the `yarn-site.xml` in the `HadoopUtils.getHadoopConfiguration` method as we do it for the `core-site.xml` and `hdfs-site.xml`. Will have to verify this. > YarnApplicationMaster does not load HDFSConfiguration > - > > Key: FLINK-7951 > URL: https://issues.apache.org/jira/browse/FLINK-7951 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0 > > > When instantiating the {{YarnConfiguration}} we do not load the corresponding > {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7652) Port CurrentJobIdsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233794#comment-16233794 ] ASF GitHub Bot commented on FLINK-7652: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4734 @tzulitai, I actually want to re-check whether it's indeed not possible to register the `CurrentJobsOverviewHandler` under `jobs/overview`. If this should indeed be the case, then it will most likely subsume this handler. If not, then we'll register this handler under `/jobs` > Port CurrentJobIdsHandler to new REST endpoint > -- > > Key: FLINK-7652 > URL: https://issues.apache.org/jira/browse/FLINK-7652 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CurrentJobIdsHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4734 @tzulitai, I actually want to re-check whether it's indeed not possible to register the `CurrentJobsOverviewHandler` under `jobs/overview`. If this should indeed be the case, then it will most likely subsume this handler. If not, then we'll register this handler under `/jobs` ---
[GitHub] flink issue #4506: [FLINK-7400][cluster] fix off-heap limits set to conserva...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4506 Thanks for running the test @NicoK. Could you please rebase this PR onto the latest master. Once Travis passes all tests, I'll merge this PR then. ---
[jira] [Commented] (FLINK-7400) off-heap limits set to conservatively in cluster environments
[ https://issues.apache.org/jira/browse/FLINK-7400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233796#comment-16233796 ] ASF GitHub Bot commented on FLINK-7400: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4506 Thanks for running the test @NicoK. Could you please rebase this PR onto the latest master. Once Travis passes all tests, I'll merge this PR then. > off-heap limits set to conservatively in cluster environments > - > > Key: FLINK-7400 > URL: https://issues.apache.org/jira/browse/FLINK-7400 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Mesos, YARN >Affects Versions: 1.3.0, 1.3.1, 1.4.0, 1.3.2 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > Inside {{ContaineredTaskManagerParameters}}, since FLINK-6217, the > {{offHeapSize}} is set to the amount of memory Flink will use off-heap which > will be set as the value for {{-XX:MaxDirectMemorySize}} in various cases. > This does not account for any off-heap use by other components than Flink, > e.g. RocksDB, other libraries, or the JVM itself. > We should add the {{cutoff}} from the {{CONTAINERIZED_HEAP_CUTOFF_RATIO}} > configuration parameter to {{offHeapSize}} as implied by the description on > what this parameter is there for. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148209063 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) { // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost - final ArrayList resources = new ArrayList<>(getNumberOfExecutionJobVertices()); final boolean queued = allowQueuedScheduling; - // we use this flag to handle failures in a 'finally' clause - // that allows us to not go through clumsy cast-and-rethrow logic - boolean successful = false; + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); - try { - // collecting all the slots may resize and fail in that operation without slots getting lost - final ArrayList> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures --- End diff -- There is no specific reason why we iterate over the vertices in topological order. We could also choose a completely random order for eager scheduling because the scheduling order will be determined by the preferred location futures (which at the moment is based on inputs only). If we should switch to state location then it basically means that we schedule the individual tasks independently because the vertices don't depend on the input locations. ---
[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly
[ https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233800#comment-16233800 ] ASF GitHub Bot commented on FLINK-7153: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148209063 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) { // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost - final ArrayList resources = new ArrayList<>(getNumberOfExecutionJobVertices()); final boolean queued = allowQueuedScheduling; - // we use this flag to handle failures in a 'finally' clause - // that allows us to not go through clumsy cast-and-rethrow logic - boolean successful = false; + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); - try { - // collecting all the slots may resize and fail in that operation without slots getting lost - final ArrayList> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures --- End diff -- There is no specific reason why we iterate over the vertices in topological order. We could also choose a completely random order for eager scheduling because the scheduling order will be determined by the preferred location futures (which at the moment is based on inputs only). If we should switch to state location then it basically means that we schedule the individual tasks independently because the vertices don't depend on the input locations. > Eager Scheduling can't allocate source for ExecutionGraph correctly > --- > > Key: FLINK-7153 > URL: https://issues.apache.org/jira/browse/FLINK-7153 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.1 >Reporter: Sihua Zhou >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex > one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is > two problem about it: > 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return > empty, cause `sourceSlot` always be null until `ExectionVertex` has been > deployed via 'Execution.deployToSlot()'. So allocate resource base on > prefered location can't work correctly, we need to set the slot info for > `Execution` as soon as Execution.allocateSlotForExecution() called > successfully? > 2. Current allocate strategy can't allocate the slot optimize. Here is the > test case: > {code} > JobVertex v1 = new JobVertex("v1", jid1); > JobVertex v2 = new JobVertex("v2", jid2); > SlotSharingGroup group = new SlotSharingGroup(); > v1.setSlotSharingGroup(group); > v2.setSlotSharingGroup(group); > v1.setParallelism(2); > v2.setParallelism(4); > v1.setInvokableClass(BatchTask.class); > v2.setInvokableClass(BatchTask.class); > v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, > ResultPartitionType.PIPELINED_BOUNDED); > {code} > Currently, after allocate for v1,v2, we got a local partition and three > remote partition. But actually, it should be 2 local partition and 2 remote > partition. > The causes of the above problems is becuase that the current allocate > strategy is allocate the resource for execution one by one(if the execution > can allocate from SlotGroup than get it, Otherwise ask for a new one for it). > If we change the allocate strategy to two step will solve this problem, below > is the Pseudo code: > {code} > for (ExecutionJobVertex ejv: getVerticesTopologically) { > //step 1: try to allocate from SlothGroup base on inputs one by one (which > only allocate resource base on location). > //step 2: allocate for the remain execution. > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148209318 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall( // Miscellaneous // + /** +* Calculates the preferred locations based on the location preference constraint. +* +* @param locationPreferenceConstraint constraint for the location preference +* @return Future containing the collection of preferred locations. This might not be completed if not all inputs +* have been a resource assigned. +*/ + @VisibleForTesting + public CompletableFuture> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) { + final Collection> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs(); + final CompletableFuture> preferredLocationsFuture; --- End diff -- Yes, that is exactly the idea. ---
[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly
[ https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233802#comment-16233802 ] ASF GitHub Bot commented on FLINK-7153: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148209318 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall( // Miscellaneous // + /** +* Calculates the preferred locations based on the location preference constraint. +* +* @param locationPreferenceConstraint constraint for the location preference +* @return Future containing the collection of preferred locations. This might not be completed if not all inputs +* have been a resource assigned. +*/ + @VisibleForTesting + public CompletableFuture> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) { + final Collection> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs(); + final CompletableFuture> preferredLocationsFuture; --- End diff -- Yes, that is exactly the idea. > Eager Scheduling can't allocate source for ExecutionGraph correctly > --- > > Key: FLINK-7153 > URL: https://issues.apache.org/jira/browse/FLINK-7153 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.1 >Reporter: Sihua Zhou >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex > one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is > two problem about it: > 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return > empty, cause `sourceSlot` always be null until `ExectionVertex` has been > deployed via 'Execution.deployToSlot()'. So allocate resource base on > prefered location can't work correctly, we need to set the slot info for > `Execution` as soon as Execution.allocateSlotForExecution() called > successfully? > 2. Current allocate strategy can't allocate the slot optimize. Here is the > test case: > {code} > JobVertex v1 = new JobVertex("v1", jid1); > JobVertex v2 = new JobVertex("v2", jid2); > SlotSharingGroup group = new SlotSharingGroup(); > v1.setSlotSharingGroup(group); > v2.setSlotSharingGroup(group); > v1.setParallelism(2); > v2.setParallelism(4); > v1.setInvokableClass(BatchTask.class); > v2.setInvokableClass(BatchTask.class); > v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, > ResultPartitionType.PIPELINED_BOUNDED); > {code} > Currently, after allocate for v1,v2, we got a local partition and three > remote partition. But actually, it should be 2 local partition and 2 remote > partition. > The causes of the above problems is becuase that the current allocate > strategy is allocate the resource for execution one by one(if the execution > can allocate from SlotGroup than get it, Otherwise ask for a new one for it). > If we change the allocate strategy to two step will solve this problem, below > is the Pseudo code: > {code} > for (ExecutionJobVertex ejv: getVerticesTopologically) { > //step 1: try to allocate from SlothGroup base on inputs one by one (which > only allocate resource base on location). > //step 2: allocate for the remain execution. > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4593: [FLINK-7516][memory] do not allow copies into a re...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4593#discussion_r148209947 --- Diff: flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java --- @@ -306,6 +307,9 @@ public final void get(int offset, ByteBuffer target, int numBytes) { if ((offset | numBytes | (offset + numBytes)) < 0) { throw new IndexOutOfBoundsException(); } + if (target.isReadOnly()) { --- End diff -- you are right - the non-direct buffers path is based on `ByteBuffer#array()` which will throw a `ReadOnlyBufferException` for read-only buffers, so it really is enough in the direct buffers code path where the `UNSAFE.copyMemory` is not checking the source pointer (how should it?!) ---
[jira] [Commented] (FLINK-7516) HybridMemorySegment: do not allow copies into a read-only ByteBuffer
[ https://issues.apache.org/jira/browse/FLINK-7516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233804#comment-16233804 ] ASF GitHub Bot commented on FLINK-7516: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4593#discussion_r148209947 --- Diff: flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java --- @@ -306,6 +307,9 @@ public final void get(int offset, ByteBuffer target, int numBytes) { if ((offset | numBytes | (offset + numBytes)) < 0) { throw new IndexOutOfBoundsException(); } + if (target.isReadOnly()) { --- End diff -- you are right - the non-direct buffers path is based on `ByteBuffer#array()` which will throw a `ReadOnlyBufferException` for read-only buffers, so it really is enough in the direct buffers code path where the `UNSAFE.copyMemory` is not checking the source pointer (how should it?!) > HybridMemorySegment: do not allow copies into a read-only ByteBuffer > > > Key: FLINK-7516 > URL: https://issues.apache.org/jira/browse/FLINK-7516 > Project: Flink > Issue Type: Sub-task > Components: Core, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > > {{HybridMemorySegment#get(int, ByteBuffer, int)}} allows writing into a > read-only {{ByteBuffer}} but this operation should be forbidden. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4506: [FLINK-7400][cluster] fix off-heap limits set to conserva...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4506 rebased successfully, waiting for Travis now... ---
[jira] [Commented] (FLINK-7400) off-heap limits set to conservatively in cluster environments
[ https://issues.apache.org/jira/browse/FLINK-7400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233813#comment-16233813 ] ASF GitHub Bot commented on FLINK-7400: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4506 rebased successfully, waiting for Travis now... > off-heap limits set to conservatively in cluster environments > - > > Key: FLINK-7400 > URL: https://issues.apache.org/jira/browse/FLINK-7400 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Mesos, YARN >Affects Versions: 1.3.0, 1.3.1, 1.4.0, 1.3.2 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > Inside {{ContaineredTaskManagerParameters}}, since FLINK-6217, the > {{offHeapSize}} is set to the amount of memory Flink will use off-heap which > will be set as the value for {{-XX:MaxDirectMemorySize}} in various cases. > This does not account for any off-heap use by other components than Flink, > e.g. RocksDB, other libraries, or the JVM itself. > We should add the {{cutoff}} from the {{CONTAINERIZED_HEAP_CUTOFF_RATIO}} > configuration parameter to {{offHeapSize}} as implied by the description on > what this parameter is there for. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148211942 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) { // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost - final ArrayList resources = new ArrayList<>(getNumberOfExecutionJobVertices()); final boolean queued = allowQueuedScheduling; - // we use this flag to handle failures in a 'finally' clause - // that allows us to not go through clumsy cast-and-rethrow logic - boolean successful = false; + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); - try { - // collecting all the slots may resize and fail in that operation without slots getting lost - final ArrayList> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures --- End diff -- If we switch to state location then we can't allocate resources according to the order of topologically, because stateless vertices may share the same SlotSharingGroup with stateful vertices, if stateless vertices allocated before the stateful vertices, the result can be bad. An intuitive way to do this is to allocate resources to stateful vertices firstly. ---
[GitHub] flink pull request #4592: [FLINK-7515][network] allow actual 0-length conten...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4592#discussion_r148212023 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -64,12 +65,53 @@ // + /** +* Allocates a new (header and contents) buffer and adds some header information for the frame +* decoder. +* +* Before sending the buffer, you must write the actual length after adding the contents as +* an integer to position 0! +* +* @param allocator +* byte buffer allocator to use +* @param id +* {@link NettyMessage} subclass ID +* +* @return a newly allocated direct buffer with header data written for {@link +* NettyMessageDecoder} +*/ private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) { - return allocateBuffer(allocator, id, 0); + return allocateBuffer(allocator, id, -1); } + /** +* Allocates a new (header and contents) buffer and adds some header information for the frame +* decoder. +* +* If the length is unknown, you must write the actual length after adding the +* contents as an integer to position 0! +* +* @param allocator +* byte buffer allocator to use +* @param id +* {@link NettyMessage} subclass ID +* @param length +* content length (or -1 if unknown) +* +* @return a newly allocated direct buffer with header data written for {@link +* NettyMessageDecoder} +*/ private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length) { - final ByteBuf buffer = length != 0 ? allocator.directBuffer(HEADER_LENGTH + length) : allocator.directBuffer(); + Preconditions.checkArgument(length <= Integer.MAX_VALUE - HEADER_LENGTH); --- End diff -- why not... ---
[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly
[ https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233815#comment-16233815 ] ASF GitHub Bot commented on FLINK-7153: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148211942 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) { // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost - final ArrayList resources = new ArrayList<>(getNumberOfExecutionJobVertices()); final boolean queued = allowQueuedScheduling; - // we use this flag to handle failures in a 'finally' clause - // that allows us to not go through clumsy cast-and-rethrow logic - boolean successful = false; + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); - try { - // collecting all the slots may resize and fail in that operation without slots getting lost - final ArrayList> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures --- End diff -- If we switch to state location then we can't allocate resources according to the order of topologically, because stateless vertices may share the same SlotSharingGroup with stateful vertices, if stateless vertices allocated before the stateful vertices, the result can be bad. An intuitive way to do this is to allocate resources to stateful vertices firstly. > Eager Scheduling can't allocate source for ExecutionGraph correctly > --- > > Key: FLINK-7153 > URL: https://issues.apache.org/jira/browse/FLINK-7153 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.1 >Reporter: Sihua Zhou >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex > one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is > two problem about it: > 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return > empty, cause `sourceSlot` always be null until `ExectionVertex` has been > deployed via 'Execution.deployToSlot()'. So allocate resource base on > prefered location can't work correctly, we need to set the slot info for > `Execution` as soon as Execution.allocateSlotForExecution() called > successfully? > 2. Current allocate strategy can't allocate the slot optimize. Here is the > test case: > {code} > JobVertex v1 = new JobVertex("v1", jid1); > JobVertex v2 = new JobVertex("v2", jid2); > SlotSharingGroup group = new SlotSharingGroup(); > v1.setSlotSharingGroup(group); > v2.setSlotSharingGroup(group); > v1.setParallelism(2); > v2.setParallelism(4); > v1.setInvokableClass(BatchTask.class); > v2.setInvokableClass(BatchTask.class); > v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, > ResultPartitionType.PIPELINED_BOUNDED); > {code} > Currently, after allocate for v1,v2, we got a local partition and three > remote partition. But actually, it should be 2 local partition and 2 remote > partition. > The causes of the above problems is becuase that the current allocate > strategy is allocate the resource for execution one by one(if the execution > can allocate from SlotGroup than get it, Otherwise ask for a new one for it). > If we change the allocate strategy to two step will solve this problem, below > is the Pseudo code: > {code} > for (ExecutionJobVertex ejv: getVerticesTopologically) { > //step 1: try to allocate from SlothGroup base on inputs one by one (which > only allocate resource base on location). > //step 2: allocate for the remain execution. > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7515) allow actual 0-length content in NettyMessage#allocateBuffer()
[ https://issues.apache.org/jira/browse/FLINK-7515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233816#comment-16233816 ] ASF GitHub Bot commented on FLINK-7515: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4592#discussion_r148212023 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -64,12 +65,53 @@ // + /** +* Allocates a new (header and contents) buffer and adds some header information for the frame +* decoder. +* +* Before sending the buffer, you must write the actual length after adding the contents as +* an integer to position 0! +* +* @param allocator +* byte buffer allocator to use +* @param id +* {@link NettyMessage} subclass ID +* +* @return a newly allocated direct buffer with header data written for {@link +* NettyMessageDecoder} +*/ private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) { - return allocateBuffer(allocator, id, 0); + return allocateBuffer(allocator, id, -1); } + /** +* Allocates a new (header and contents) buffer and adds some header information for the frame +* decoder. +* +* If the length is unknown, you must write the actual length after adding the +* contents as an integer to position 0! +* +* @param allocator +* byte buffer allocator to use +* @param id +* {@link NettyMessage} subclass ID +* @param length +* content length (or -1 if unknown) +* +* @return a newly allocated direct buffer with header data written for {@link +* NettyMessageDecoder} +*/ private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length) { - final ByteBuf buffer = length != 0 ? allocator.directBuffer(HEADER_LENGTH + length) : allocator.directBuffer(); + Preconditions.checkArgument(length <= Integer.MAX_VALUE - HEADER_LENGTH); --- End diff -- why not... > allow actual 0-length content in NettyMessage#allocateBuffer() > -- > > Key: FLINK-7515 > URL: https://issues.apache.org/jira/browse/FLINK-7515 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Minor > > Previously, length {{0}} meant "unknown content length" but there are cases > where the actual length is 0 and we do not need a larger buffer. Let's use > {{-1}} for tagging the special case instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4919: [FLINK-7902] Use TypeSerializer in TwoPhaseCommitS...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/4919 ---
[jira] [Commented] (FLINK-7902) TwoPhaseCommitSinkFunctions should use custom TypeSerializer
[ https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233824#comment-16233824 ] ASF GitHub Bot commented on FLINK-7902: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/4919 > TwoPhaseCommitSinkFunctions should use custom TypeSerializer > > > Key: FLINK-7902 > URL: https://issues.apache.org/jira/browse/FLINK-7902 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new > TypeHint>() {})}} to > create a {{TypeInformation}} which in turn is used to create a > {{StateDescriptor}} for the state that the Kafka sink stores. > Behind the scenes, this would be roughly analysed as a > {{PojoType(GenericType, > GenericType)}} which means we don't have explicit > control over the serialisation format and we also use Kryo (which is the > default for {{GenericTypeInfo}}). This can be problematic if we want to > evolve the state schema in the future or if we want to change Kryo versions. > We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor: > {code} > public TwoPhaseCommitSinkFunction(TypeSerializer> > stateSerializer) { > {code} > and we should then change the {{FlinkKafkaProducer011}} to hand in a > custom-made {{TypeSerializer}} for the state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7902) TwoPhaseCommitSinkFunctions should use custom TypeSerializer
[ https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7902. --- Resolution: Fixed Fixed in 0ba528c71e35858a043bd513ead37800262f7e0c > TwoPhaseCommitSinkFunctions should use custom TypeSerializer > > > Key: FLINK-7902 > URL: https://issues.apache.org/jira/browse/FLINK-7902 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new > TypeHint>() {})}} to > create a {{TypeInformation}} which in turn is used to create a > {{StateDescriptor}} for the state that the Kafka sink stores. > Behind the scenes, this would be roughly analysed as a > {{PojoType(GenericType, > GenericType)}} which means we don't have explicit > control over the serialisation format and we also use Kryo (which is the > default for {{GenericTypeInfo}}). This can be problematic if we want to > evolve the state schema in the future or if we want to change Kryo versions. > We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor: > {code} > public TwoPhaseCommitSinkFunction(TypeSerializer> > stateSerializer) { > {code} > and we should then change the {{FlinkKafkaProducer011}} to hand in a > custom-made {{TypeSerializer}} for the state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4919: [FLINK-7902] Use TypeSerializer in TwoPhaseCommitSinkFunc...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4919 Thanks for reviewing, @kl0u! ð ---
[jira] [Commented] (FLINK-7902) TwoPhaseCommitSinkFunctions should use custom TypeSerializer
[ https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233826#comment-16233826 ] ASF GitHub Bot commented on FLINK-7902: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4919 Thanks for reviewing, @kl0u! 😃 > TwoPhaseCommitSinkFunctions should use custom TypeSerializer > > > Key: FLINK-7902 > URL: https://issues.apache.org/jira/browse/FLINK-7902 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new > TypeHint>() {})}} to > create a {{TypeInformation}} which in turn is used to create a > {{StateDescriptor}} for the state that the Kafka sink stores. > Behind the scenes, this would be roughly analysed as a > {{PojoType(GenericType, > GenericType)}} which means we don't have explicit > control over the serialisation format and we also use Kryo (which is the > default for {{GenericTypeInfo}}). This can be problematic if we want to > evolve the state schema in the future or if we want to change Kryo versions. > We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor: > {code} > public TwoPhaseCommitSinkFunction(TypeSerializer> > stateSerializer) { > {code} > and we should then change the {{FlinkKafkaProducer011}} to hand in a > custom-made {{TypeSerializer}} for the state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7955) Rowtime attribute cannot be aggregated in group window of stream Table API
Fabian Hueske created FLINK-7955: Summary: Rowtime attribute cannot be aggregated in group window of stream Table API Key: FLINK-7955 URL: https://issues.apache.org/jira/browse/FLINK-7955 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.3.2, 1.4.0 Reporter: Fabian Hueske The following query fails with a {{NullPointerException}}: {code} val windowedTable = table .window(Tumble over 5.milli on 'rowtime as 'w) .groupBy('w, 'word) .select('word, 'rowtime.count, 'w.start, 'w.end) {code} Equivalent SQL queries or Table API queries that use a processing time attribute are working. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148216607 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) { // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost - final ArrayList resources = new ArrayList<>(getNumberOfExecutionJobVertices()); final boolean queued = allowQueuedScheduling; - // we use this flag to handle failures in a 'finally' clause - // that allows us to not go through clumsy cast-and-rethrow logic - boolean successful = false; + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); - try { - // collecting all the slots may resize and fail in that operation without slots getting lost - final ArrayList> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures --- End diff -- In that case, it depends a bit on how the scheduler values the state location preference. If it is implemented that it strictly schedules tasks to its previous state location, then it could happen that these tasks don't end up in the same slot as other tasks with which they shared a slot before. ---
[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly
[ https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233845#comment-16233845 ] ASF GitHub Bot commented on FLINK-7153: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148216607 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) { // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost - final ArrayList resources = new ArrayList<>(getNumberOfExecutionJobVertices()); final boolean queued = allowQueuedScheduling; - // we use this flag to handle failures in a 'finally' clause - // that allows us to not go through clumsy cast-and-rethrow logic - boolean successful = false; + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); - try { - // collecting all the slots may resize and fail in that operation without slots getting lost - final ArrayList> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures --- End diff -- In that case, it depends a bit on how the scheduler values the state location preference. If it is implemented that it strictly schedules tasks to its previous state location, then it could happen that these tasks don't end up in the same slot as other tasks with which they shared a slot before. > Eager Scheduling can't allocate source for ExecutionGraph correctly > --- > > Key: FLINK-7153 > URL: https://issues.apache.org/jira/browse/FLINK-7153 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.1 >Reporter: Sihua Zhou >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex > one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is > two problem about it: > 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return > empty, cause `sourceSlot` always be null until `ExectionVertex` has been > deployed via 'Execution.deployToSlot()'. So allocate resource base on > prefered location can't work correctly, we need to set the slot info for > `Execution` as soon as Execution.allocateSlotForExecution() called > successfully? > 2. Current allocate strategy can't allocate the slot optimize. Here is the > test case: > {code} > JobVertex v1 = new JobVertex("v1", jid1); > JobVertex v2 = new JobVertex("v2", jid2); > SlotSharingGroup group = new SlotSharingGroup(); > v1.setSlotSharingGroup(group); > v2.setSlotSharingGroup(group); > v1.setParallelism(2); > v2.setParallelism(4); > v1.setInvokableClass(BatchTask.class); > v2.setInvokableClass(BatchTask.class); > v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, > ResultPartitionType.PIPELINED_BOUNDED); > {code} > Currently, after allocate for v1,v2, we got a local partition and three > remote partition. But actually, it should be 2 local partition and 2 remote > partition. > The causes of the above problems is becuase that the current allocate > strategy is allocate the resource for execution one by one(if the execution > can allocate from SlotGroup than get it, Otherwise ask for a new one for it). > If we change the allocate strategy to two step will solve this problem, below > is the Pseudo code: > {code} > for (ExecutionJobVertex ejv: getVerticesTopologically) { > //step 1: try to allocate from SlothGroup base on inputs one by one (which > only allocate resource base on location). > //step 2: allocate for the remain execution. > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148216894 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) { // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost - final ArrayList resources = new ArrayList<>(getNumberOfExecutionJobVertices()); final boolean queued = allowQueuedScheduling; - // we use this flag to handle failures in a 'finally' clause - // that allows us to not go through clumsy cast-and-rethrow logic - boolean successful = false; + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); - try { - // collecting all the slots may resize and fail in that operation without slots getting lost - final ArrayList> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures --- End diff -- But be aware that the `allocateAndAssign` call is non-blocking and the actual order depends on the preferred locations futures. ---
[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly
[ https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233848#comment-16233848 ] ASF GitHub Bot commented on FLINK-7153: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148216894 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) { // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost - final ArrayList resources = new ArrayList<>(getNumberOfExecutionJobVertices()); final boolean queued = allowQueuedScheduling; - // we use this flag to handle failures in a 'finally' clause - // that allows us to not go through clumsy cast-and-rethrow logic - boolean successful = false; + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); - try { - // collecting all the slots may resize and fail in that operation without slots getting lost - final ArrayList> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures --- End diff -- But be aware that the `allocateAndAssign` call is non-blocking and the actual order depends on the preferred locations futures. > Eager Scheduling can't allocate source for ExecutionGraph correctly > --- > > Key: FLINK-7153 > URL: https://issues.apache.org/jira/browse/FLINK-7153 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.1 >Reporter: Sihua Zhou >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex > one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is > two problem about it: > 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return > empty, cause `sourceSlot` always be null until `ExectionVertex` has been > deployed via 'Execution.deployToSlot()'. So allocate resource base on > prefered location can't work correctly, we need to set the slot info for > `Execution` as soon as Execution.allocateSlotForExecution() called > successfully? > 2. Current allocate strategy can't allocate the slot optimize. Here is the > test case: > {code} > JobVertex v1 = new JobVertex("v1", jid1); > JobVertex v2 = new JobVertex("v2", jid2); > SlotSharingGroup group = new SlotSharingGroup(); > v1.setSlotSharingGroup(group); > v2.setSlotSharingGroup(group); > v1.setParallelism(2); > v2.setParallelism(4); > v1.setInvokableClass(BatchTask.class); > v2.setInvokableClass(BatchTask.class); > v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, > ResultPartitionType.PIPELINED_BOUNDED); > {code} > Currently, after allocate for v1,v2, we got a local partition and three > remote partition. But actually, it should be 2 local partition and 2 remote > partition. > The causes of the above problems is becuase that the current allocate > strategy is allocate the resource for execution one by one(if the execution > can allocate from SlotGroup than get it, Otherwise ask for a new one for it). > If we change the allocate strategy to two step will solve this problem, below > is the Pseudo code: > {code} > for (ExecutionJobVertex ejv: getVerticesTopologically) { > //step 1: try to allocate from SlothGroup base on inputs one by one (which > only allocate resource base on location). > //step 2: allocate for the remain execution. > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4926: [FLINK-7951] Load YarnConfiguration with default Hadoop c...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4926 The changes look good but let's see what you find regarding `yarn-site.xml`. ð ---
[jira] [Commented] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration
[ https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233853#comment-16233853 ] ASF GitHub Bot commented on FLINK-7951: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4926 The changes look good but let's see what you find regarding `yarn-site.xml`. 👍 > YarnApplicationMaster does not load HDFSConfiguration > - > > Key: FLINK-7951 > URL: https://issues.apache.org/jira/browse/FLINK-7951 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0 > > > When instantiating the {{YarnConfiguration}} we do not load the corresponding > {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7953) Kafka consumer printing error - java.lang.IllegalArgumentException: Invalid offset: -915623761772
[ https://issues.apache.org/jira/browse/FLINK-7953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7953. --- Resolution: Duplicate Duplicate of FLINK-7732 > Kafka consumer printing error - java.lang.IllegalArgumentException: Invalid > offset: -915623761772 > - > > Key: FLINK-7953 > URL: https://issues.apache.org/jira/browse/FLINK-7953 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.2 > Environment: kafka 10.0, yarn >Reporter: Shashank Agarwal >Priority: Minor > > As it's printing as Warning and not impacting running program so marked it > minor. > {code} > 2017-10-31 19:26:09,218 WARN > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - > Committing offsets to Kafka failed. This does not compromise Flink's > checkpoints. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:223) > 2017-10-31 19:26:09,223 ERROR > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async > Kafka commit failed. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:223) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233867#comment-16233867 ] Aljoscha Krettek commented on FLINK-4228: - Thanks, [~NicoK] for finding this! I think we should downgrade this to "Critical" since it's a very specific problem and we can provide a fix for this in a bug fix release? Unless you know a very quick fix for this? > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Priority: Blocker > Fix For: 1.4.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) > ... 9 more > {code} > Running with S3NFileSystem, the error does not occur. The problem might be > due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created > automatically. We might need to manually create folders and copy only actual > files for {{S3AFileSystem}}. More investigation is required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4928: [FLINK-7732][kafka-consumer] Do not commit to kafk...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4928#discussion_r148220943 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java --- @@ -95,6 +95,10 @@ public final boolean isOffsetDefined() { return offset != KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET; } + public final boolean isSentinel() { --- End diff -- In new approach method was dropped ---
[jira] [Commented] (FLINK-7732) Invalid offset to commit in Kafka
[ https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233870#comment-16233870 ] ASF GitHub Bot commented on FLINK-7732: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4928#discussion_r148220943 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java --- @@ -95,6 +95,10 @@ public final boolean isOffsetDefined() { return offset != KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET; } + public final boolean isSentinel() { --- End diff -- In new approach method was dropped > Invalid offset to commit in Kafka > - > > Key: FLINK-7732 > URL: https://issues.apache.org/jira/browse/FLINK-7732 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > In a test run with unrelated changes in the network stack, the Kafa > end-to-end test was failing with an invalid offset: > {code} > 2017-09-28 06:34:10,736 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka version : 0.10.2.1 > 2017-09-28 06:34:10,744 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka commitId : e89bffd6b2eff799 > 2017-09-28 06:34:14,549 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,573 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,686 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,687 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,792 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:15,068 INFO > org.apache.flink.runtime.state.DefaultOperatorStateBackend- > DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous > part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: > Unnamed (1/1),5,Flink Task Threads] took 948 ms. > 2017-09-28 06:34:15,164 WARN > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - > Committing offsets to Kafka failed. This does not compromise Flink's > checkpoints. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > 2017-09-28 06:34:15,171 ERROR > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async > Kafka commit failed. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > {code} > https://travis-ci.org/apache/flink/jobs/280722829 > [~pnowojski] did a first analysis that revealed this: > In
[jira] [Commented] (FLINK-7732) Invalid offset to commit in Kafka
[ https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233871#comment-16233871 ] ASF GitHub Bot commented on FLINK-7732: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4928#discussion_r148221257 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java --- @@ -52,4 +52,7 @@ */ public static final long GROUP_OFFSET = -915623761773L; + public static boolean isSentinel(long offset) { + return offset < 0; --- End diff -- Kafka doesn't allow to commit any negative values > Invalid offset to commit in Kafka > - > > Key: FLINK-7732 > URL: https://issues.apache.org/jira/browse/FLINK-7732 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > In a test run with unrelated changes in the network stack, the Kafa > end-to-end test was failing with an invalid offset: > {code} > 2017-09-28 06:34:10,736 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka version : 0.10.2.1 > 2017-09-28 06:34:10,744 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka commitId : e89bffd6b2eff799 > 2017-09-28 06:34:14,549 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,573 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,686 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,687 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,792 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:15,068 INFO > org.apache.flink.runtime.state.DefaultOperatorStateBackend- > DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous > part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: > Unnamed (1/1),5,Flink Task Threads] took 948 ms. > 2017-09-28 06:34:15,164 WARN > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - > Committing offsets to Kafka failed. This does not compromise Flink's > checkpoints. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > 2017-09-28 06:34:15,171 ERROR > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async > Kafka commit failed. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > {code} > https://travis-ci.org/apache/flink/jobs/280722829 > [~pnowojski] did a first analysis that reveale
[GitHub] flink pull request #4928: [FLINK-7732][kafka-consumer] Do not commit to kafk...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4928#discussion_r148221257 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java --- @@ -52,4 +52,7 @@ */ public static final long GROUP_OFFSET = -915623761773L; + public static boolean isSentinel(long offset) { + return offset < 0; --- End diff -- Kafka doesn't allow to commit any negative values ---
[GitHub] flink issue #4835: [FLINK-7847][avro] Fix typo in jackson shading pattern
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4835 ð ð ---
[GitHub] flink issue #4928: [FLINK-7732][kafka-consumer] Do not commit to kafka Flink...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4928 @tzulitai please check the details in the ticket: https://issues.apache.org/jira/browse/FLINK-7732 I have changed the approach as we discussed and now we filtering out happens just before committing offsets. ---
[jira] [Commented] (FLINK-7847) Fix typo in flink-avro shading pattern
[ https://issues.apache.org/jira/browse/FLINK-7847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233873#comment-16233873 ] ASF GitHub Bot commented on FLINK-7847: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4835 👍 😂 > Fix typo in flink-avro shading pattern > -- > > Key: FLINK-7847 > URL: https://issues.apache.org/jira/browse/FLINK-7847 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.4.0 > > > {code} > > org.codehaus.jackson > > org.apache.flink.avro.shaded.org.codehouse.jackson > > {code} > The shaded pattern should be > "org.apache.flink.avro.shaded.org.codehaus.jackson". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7732) Invalid offset to commit in Kafka
[ https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233872#comment-16233872 ] ASF GitHub Bot commented on FLINK-7732: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4928 @tzulitai please check the details in the ticket: https://issues.apache.org/jira/browse/FLINK-7732 I have changed the approach as we discussed and now we filtering out happens just before committing offsets. > Invalid offset to commit in Kafka > - > > Key: FLINK-7732 > URL: https://issues.apache.org/jira/browse/FLINK-7732 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > In a test run with unrelated changes in the network stack, the Kafa > end-to-end test was failing with an invalid offset: > {code} > 2017-09-28 06:34:10,736 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka version : 0.10.2.1 > 2017-09-28 06:34:10,744 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka commitId : e89bffd6b2eff799 > 2017-09-28 06:34:14,549 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,573 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,686 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,687 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,792 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:15,068 INFO > org.apache.flink.runtime.state.DefaultOperatorStateBackend- > DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous > part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: > Unnamed (1/1),5,Flink Task Threads] took 948 ms. > 2017-09-28 06:34:15,164 WARN > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - > Committing offsets to Kafka failed. This does not compromise Flink's > checkpoints. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > 2017-09-28 06:34:15,171 ERROR > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async > Kafka commit failed. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > {code} > https://travis-ci.org/apache/flink/jobs/280722829 > [~pnowojski] did a first analysis that revealed this: > In > org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java:229 > this is being sent: > {{long offsetToCommit = lastProcessedOffset + 1;}} > {{lastProcessedOffset}} comes from: > {{org.apache.flink.streaming.connectors.kafka.FlinkK
[jira] [Created] (FLINK-7956) Add support for scheduling with slot sharing
Till Rohrmann created FLINK-7956: Summary: Add support for scheduling with slot sharing Key: FLINK-7956 URL: https://issues.apache.org/jira/browse/FLINK-7956 Project: Flink Issue Type: Sub-task Components: Scheduler Affects Versions: 1.4.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Priority: Major In order to reach feature equivalence with the old code base, we should add support for scheduling with slot sharing to the {{SlotPool}}. This will also allow us to run all the IT cases based on the {{AbstractTestBase}} on the Flip-6 {{MiniCluster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4592: [FLINK-7515][network] allow actual 0-length content in Ne...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4592 LGTM (besides "Conflicting files") ---
[jira] [Commented] (FLINK-7515) allow actual 0-length content in NettyMessage#allocateBuffer()
[ https://issues.apache.org/jira/browse/FLINK-7515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233880#comment-16233880 ] ASF GitHub Bot commented on FLINK-7515: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4592 LGTM (besides "Conflicting files") > allow actual 0-length content in NettyMessage#allocateBuffer() > -- > > Key: FLINK-7515 > URL: https://issues.apache.org/jira/browse/FLINK-7515 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Minor > > Previously, length {{0}} meant "unknown content length" but there are cases > where the actual length is 0 and we do not need a larger buffer. Let's use > {{-1}} for tagging the special case instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4927: [FLINK-7778] [build] Shade Curator/ZooKeeper dependency
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4927 Changes look good! @StephanEwen could you please have a look at the follow-up changes? ---
[jira] [Commented] (FLINK-7778) Relocate ZooKeeper
[ https://issues.apache.org/jira/browse/FLINK-7778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233883#comment-16233883 ] ASF GitHub Bot commented on FLINK-7778: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4927 Changes look good! @StephanEwen could you please have a look at the follow-up changes? > Relocate ZooKeeper > -- > > Key: FLINK-7778 > URL: https://issues.apache.org/jira/browse/FLINK-7778 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Priority: Critical > Fix For: 1.4.0 > > > If possible, then we should also try to relocate {{ZooKeeper}} in order to > avoid dependency clashes between Flink's {{ZooKeeper}} and Hadoop's > {{ZooKeeper}} dependency. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7073) Create RESTful JobManager endpoint
[ https://issues.apache.org/jira/browse/FLINK-7073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-7073: - Description: In order to communicate from the {{client}} with a running {{JobManager}} we have to provide a RESTful endpoint for job specific operations. These operations are: * Cancel job (PUT): Cancel the given job * Stop job (PUT): Stops the given job * Take savepoint (POST): Take savepoint for given job (How to return the savepoint under which the savepoint was stored? Maybe always having to specify a path) * Poll/Subscribe to notifications The REST JobManager endpoint should also serve the information required for the web ui. was: In order to communicate from the {{client}} with a running {{JobManager}} we have to provide a RESTful endpoint for job specific operations. These operations are: * Cancel job (PUT): Cancel the given job * Stop job (PUT): Stops the given job * Take savepoint (POST): Take savepoint for given job (How to return the savepoint under which the savepoint was stored? Maybe always having to specify a path) * Poll/Subscribe to notifications > Create RESTful JobManager endpoint > -- > > Key: FLINK-7073 > URL: https://issues.apache.org/jira/browse/FLINK-7073 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Till Rohrmann >Priority: Major > Labels: flip-6 > > In order to communicate from the {{client}} with a running {{JobManager}} we > have to provide a RESTful endpoint for job specific operations. These > operations are: > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Poll/Subscribe to notifications > The REST JobManager endpoint should also serve the information required for > the web ui. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4592: [FLINK-7515][network] allow actual 0-length content in Ne...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4592 of course, the last change added a conflict...rebased now ---
[jira] [Commented] (FLINK-7515) allow actual 0-length content in NettyMessage#allocateBuffer()
[ https://issues.apache.org/jira/browse/FLINK-7515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233887#comment-16233887 ] ASF GitHub Bot commented on FLINK-7515: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4592 of course, the last change added a conflict...rebased now > allow actual 0-length content in NettyMessage#allocateBuffer() > -- > > Key: FLINK-7515 > URL: https://issues.apache.org/jira/browse/FLINK-7515 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Minor > > Previously, length {{0}} meant "unknown content length" but there are cases > where the actual length is 0 and we do not need a larger buffer. Let's use > {{-1}} for tagging the special case instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r148226096 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala --- @@ -31,7 +31,8 @@ import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup +import org.apache.flink.runtime.metrics.{MetricRegistryImpl => FlinkMetricRegistry} --- End diff -- Will remove it. ---
[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r148226047 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java --- @@ -166,4 +169,12 @@ void notifySlotAvailable( * @return Future containing the resource overview */ CompletableFuture requestResourceOverview(@RpcTimeout Time timeout); + + /** +* Requests the paths for the TaskManager's {@link MetricQueryService} to query. +* +* @param timeout for the asynchronous operation +* @return Future containing the collection of instance ids and the corresponding metric query service path --- End diff -- Good catch. ---
[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID
[ https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233894#comment-16233894 ] ASF GitHub Bot commented on FLINK-7876: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r148226047 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java --- @@ -166,4 +169,12 @@ void notifySlotAvailable( * @return Future containing the resource overview */ CompletableFuture requestResourceOverview(@RpcTimeout Time timeout); + + /** +* Requests the paths for the TaskManager's {@link MetricQueryService} to query. +* +* @param timeout for the asynchronous operation +* @return Future containing the collection of instance ids and the corresponding metric query service path --- End diff -- Good catch. > Register TaskManagerMetricGroup under ResourceID instead of InstanceID > -- > > Key: FLINK-7876 > URL: https://issues.apache.org/jira/browse/FLINK-7876 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under > its {{InstanceID}} and thereby binding its metrics effectively to the > lifetime of its registration with the {{JobManager}}. This has also > implications how the REST handler retrieve the TaskManager metrics, namely by > its {{InstanceID}}. > I would actually propose to register the {{TaskManagerMetricGroup}} under the > {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole > lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be > able to query metrics independent of the connection status to the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID
[ https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233895#comment-16233895 ] ASF GitHub Bot commented on FLINK-7876: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r148226096 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala --- @@ -31,7 +31,8 @@ import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup +import org.apache.flink.runtime.metrics.{MetricRegistryImpl => FlinkMetricRegistry} --- End diff -- Will remove it. > Register TaskManagerMetricGroup under ResourceID instead of InstanceID > -- > > Key: FLINK-7876 > URL: https://issues.apache.org/jira/browse/FLINK-7876 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under > its {{InstanceID}} and thereby binding its metrics effectively to the > lifetime of its registration with the {{JobManager}}. This has also > implications how the REST handler retrieve the TaskManager metrics, namely by > its {{InstanceID}}. > I would actually propose to register the {{TaskManagerMetricGroup}} under the > {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole > lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be > able to query metrics independent of the connection status to the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4928: [FLINK-7732][kafka-consumer] Do not commit to kafk...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4928#discussion_r148226195 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -242,10 +243,25 @@ public void addDiscoveredPartitions(List newPartitions) thr * @param commitCallback The callback that the user should trigger when a commit request completes or fails. * @throws Exception This method forwards exceptions. */ - public abstract void commitInternalOffsetsToKafka( + public final void commitInternalOffsetsToKafka( + Map offsets, + @Nonnull KafkaCommitCallback commitCallback) throws Exception { + // Ignore sentinels. They might appear here if snapshot has started before actual offsets values + // replaced sentinels + doCommitInternalOffsetsToKafka(filerOutSentinels(offsets), commitCallback); + } + + protected abstract void doCommitInternalOffsetsToKafka( Map offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception; + private Map filerOutSentinels(Map offsets) { --- End diff -- typo: `filterOutSentinels`, missing `t`. ---
[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r148226738 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java --- @@ -163,12 +156,14 @@ public JobLeaderService getJobLeaderService() { * * @param resourceID resource ID of the task manager * @param taskManagerServicesConfiguration task manager configuration +* @param metricRegistry to register the TaskManagerMetricGroup * @return task manager components * @throws Exception */ public static TaskManagerServices fromConfiguration( --- End diff -- True, I'll pull the `TaskManagerMetricGroup` instantiation out of the `TaskManagerServices#fromConfiguration`. ---
[jira] [Commented] (FLINK-7732) Invalid offset to commit in Kafka
[ https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233896#comment-16233896 ] ASF GitHub Bot commented on FLINK-7732: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4928#discussion_r148226195 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -242,10 +243,25 @@ public void addDiscoveredPartitions(List newPartitions) thr * @param commitCallback The callback that the user should trigger when a commit request completes or fails. * @throws Exception This method forwards exceptions. */ - public abstract void commitInternalOffsetsToKafka( + public final void commitInternalOffsetsToKafka( + Map offsets, + @Nonnull KafkaCommitCallback commitCallback) throws Exception { + // Ignore sentinels. They might appear here if snapshot has started before actual offsets values + // replaced sentinels + doCommitInternalOffsetsToKafka(filerOutSentinels(offsets), commitCallback); + } + + protected abstract void doCommitInternalOffsetsToKafka( Map offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception; + private Map filerOutSentinels(Map offsets) { --- End diff -- typo: `filterOutSentinels`, missing `t`. > Invalid offset to commit in Kafka > - > > Key: FLINK-7732 > URL: https://issues.apache.org/jira/browse/FLINK-7732 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > In a test run with unrelated changes in the network stack, the Kafa > end-to-end test was failing with an invalid offset: > {code} > 2017-09-28 06:34:10,736 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka version : 0.10.2.1 > 2017-09-28 06:34:10,744 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka commitId : e89bffd6b2eff799 > 2017-09-28 06:34:14,549 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,573 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,686 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,687 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,792 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:15,068 INFO > org.apache.flink.runtime.state.DefaultOperatorStateBackend- > DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous > part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: > Unnamed (1/1),5,Flink Task Threads] took 948 ms. > 2017-09-28 06:34:15,164 WARN > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - > Committing offsets to Kafka failed. This does not compromise Flink's > checkpoints. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > 2017-09-28 06:34:15,171 ERROR > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async >
[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID
[ https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233897#comment-16233897 ] ASF GitHub Bot commented on FLINK-7876: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r148226738 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java --- @@ -163,12 +156,14 @@ public JobLeaderService getJobLeaderService() { * * @param resourceID resource ID of the task manager * @param taskManagerServicesConfiguration task manager configuration +* @param metricRegistry to register the TaskManagerMetricGroup * @return task manager components * @throws Exception */ public static TaskManagerServices fromConfiguration( --- End diff -- True, I'll pull the `TaskManagerMetricGroup` instantiation out of the `TaskManagerServices#fromConfiguration`. > Register TaskManagerMetricGroup under ResourceID instead of InstanceID > -- > > Key: FLINK-7876 > URL: https://issues.apache.org/jira/browse/FLINK-7876 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under > its {{InstanceID}} and thereby binding its metrics effectively to the > lifetime of its registration with the {{JobManager}}. This has also > implications how the REST handler retrieve the TaskManager metrics, namely by > its {{InstanceID}}. > I would actually propose to register the {{TaskManagerMetricGroup}} under the > {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole > lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be > able to query metrics independent of the connection status to the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r148226920 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java --- @@ -61,7 +61,7 @@ * IT cases for the {@link FlinkKafkaProducer011}. */ @SuppressWarnings("serial") -public class FlinkKafkaProducer011Tests extends KafkaTestBase { +public class FlinkKafkaProducer011Test extends KafkaTestBase { --- End diff -- I haven't looked into too many `ITCase`s but coding guidelines require unit tests to be subsecond execution speed: >Please use unit tests to test isolated functionality, such as methods. Unit tests should execute in subseconds and should be preferred whenever possible. The name of unit test classes have to on *Test. Use integration tests to implement long-running tests. http://flink.apache.org/contribute-code.html#coding-guidelines Then again, I don't know how consistent the tests are. ---
[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r148226893 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java --- @@ -76,25 +74,22 @@ public void testUpdate() throws Exception { JobID jobID = new JobID(); InstanceID tmID = new InstanceID(); --- End diff -- Good catch. Will change it. ---
[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r148226952 --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala --- @@ -42,7 +42,8 @@ import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, Submitt import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.messages.JobManagerMessages import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.metrics.MetricRegistry +import org.apache.flink.runtime.metrics.MetricRegistryImpl --- End diff -- Will remove it. ---
[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish
[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233899#comment-16233899 ] ASF GitHub Bot commented on FLINK-7838: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r148226920 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java --- @@ -61,7 +61,7 @@ * IT cases for the {@link FlinkKafkaProducer011}. */ @SuppressWarnings("serial") -public class FlinkKafkaProducer011Tests extends KafkaTestBase { +public class FlinkKafkaProducer011Test extends KafkaTestBase { --- End diff -- I haven't looked into too many `ITCase`s but coding guidelines require unit tests to be subsecond execution speed: >Please use unit tests to test isolated functionality, such as methods. Unit tests should execute in subseconds and should be preferred whenever possible. The name of unit test classes have to on *Test. Use integration tests to implement long-running tests. http://flink.apache.org/contribute-code.html#coding-guidelines Then again, I don't know how consistent the tests are. > Kafka011ProducerExactlyOnceITCase do not finish > --- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID
[ https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233898#comment-16233898 ] ASF GitHub Bot commented on FLINK-7876: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r148226893 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java --- @@ -76,25 +74,22 @@ public void testUpdate() throws Exception { JobID jobID = new JobID(); InstanceID tmID = new InstanceID(); --- End diff -- Good catch. Will change it. > Register TaskManagerMetricGroup under ResourceID instead of InstanceID > -- > > Key: FLINK-7876 > URL: https://issues.apache.org/jira/browse/FLINK-7876 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under > its {{InstanceID}} and thereby binding its metrics effectively to the > lifetime of its registration with the {{JobManager}}. This has also > implications how the REST handler retrieve the TaskManager metrics, namely by > its {{InstanceID}}. > I would actually propose to register the {{TaskManagerMetricGroup}} under the > {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole > lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be > able to query metrics independent of the connection status to the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID
[ https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233900#comment-16233900 ] ASF GitHub Bot commented on FLINK-7876: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r148226952 --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala --- @@ -42,7 +42,8 @@ import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, Submitt import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.messages.JobManagerMessages import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.metrics.MetricRegistry +import org.apache.flink.runtime.metrics.MetricRegistryImpl --- End diff -- Will remove it. > Register TaskManagerMetricGroup under ResourceID instead of InstanceID > -- > > Key: FLINK-7876 > URL: https://issues.apache.org/jira/browse/FLINK-7876 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under > its {{InstanceID}} and thereby binding its metrics effectively to the > lifetime of its registration with the {{JobManager}}. This has also > implications how the REST handler retrieve the TaskManager metrics, namely by > its {{InstanceID}}. > I would actually propose to register the {{TaskManagerMetricGroup}} under the > {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole > lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be > able to query metrics independent of the connection status to the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r148227159 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java --- @@ -80,7 +80,6 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; - --- End diff -- will do. ---
[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID
[ https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233904#comment-16233904 ] ASF GitHub Bot commented on FLINK-7876: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r148227334 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java --- @@ -28,15 +28,15 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; import java.util.UUID; public class UnregisteredTaskMetricsGroup extends TaskMetricGroup { - private static final MetricRegistry EMPTY_REGISTRY = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + private static final MetricRegistryImpl EMPTY_REGISTRY = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); --- End diff -- good catch. Will change it. > Register TaskManagerMetricGroup under ResourceID instead of InstanceID > -- > > Key: FLINK-7876 > URL: https://issues.apache.org/jira/browse/FLINK-7876 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under > its {{InstanceID}} and thereby binding its metrics effectively to the > lifetime of its registration with the {{JobManager}}. This has also > implications how the REST handler retrieve the TaskManager metrics, namely by > its {{InstanceID}}. > I would actually propose to register the {{TaskManagerMetricGroup}} under the > {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole > lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be > able to query metrics independent of the connection status to the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r148227334 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java --- @@ -28,15 +28,15 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; import java.util.UUID; public class UnregisteredTaskMetricsGroup extends TaskMetricGroup { - private static final MetricRegistry EMPTY_REGISTRY = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + private static final MetricRegistryImpl EMPTY_REGISTRY = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); --- End diff -- good catch. Will change it. ---
[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r148227364 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java --- @@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() { private void flushNewPartitions() { LOG.info("Flushing new partitions"); + enqueueNewPartitions().await(); + } + + private TransactionalRequestResult enqueueNewPartitions() { Object transactionManager = getValue(kafkaProducer, "transactionManager"); - Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); - invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler}); - TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result"); - Object sender = getValue(kafkaProducer, "sender"); - invoke(sender, "wakeup"); - result.await(); + synchronized (transactionManager) { + Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); --- End diff -- Did you add the `!newPartitionsInTransaction.isEmpty()` check in the end? I couldn't find it on first glance. Regarding tests, if you add the check, would your current test fail? If not, I think the behaviour isn't properly tested. ---
[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID
[ https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233901#comment-16233901 ] ASF GitHub Bot commented on FLINK-7876: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r148227159 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java --- @@ -80,7 +80,6 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; - --- End diff -- will do. > Register TaskManagerMetricGroup under ResourceID instead of InstanceID > -- > > Key: FLINK-7876 > URL: https://issues.apache.org/jira/browse/FLINK-7876 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under > its {{InstanceID}} and thereby binding its metrics effectively to the > lifetime of its registration with the {{JobManager}}. This has also > implications how the REST handler retrieve the TaskManager metrics, namely by > its {{InstanceID}}. > I would actually propose to register the {{TaskManagerMetricGroup}} under the > {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole > lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be > able to query metrics independent of the connection status to the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish
[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233905#comment-16233905 ] ASF GitHub Bot commented on FLINK-7838: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r148227364 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java --- @@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() { private void flushNewPartitions() { LOG.info("Flushing new partitions"); + enqueueNewPartitions().await(); + } + + private TransactionalRequestResult enqueueNewPartitions() { Object transactionManager = getValue(kafkaProducer, "transactionManager"); - Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); - invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler}); - TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result"); - Object sender = getValue(kafkaProducer, "sender"); - invoke(sender, "wakeup"); - result.await(); + synchronized (transactionManager) { + Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); --- End diff -- Did you add the `!newPartitionsInTransaction.isEmpty()` check in the end? I couldn't find it on first glance. Regarding tests, if you add the check, would your current test fail? If not, I think the behaviour isn't properly tested. > Kafka011ProducerExactlyOnceITCase do not finish > --- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4852: [FLINK-7863] Generalize MetricFetcher to work with a Rest...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4852 Merging this PR once Travis gives green light. ---
[jira] [Commented] (FLINK-7863) Make MetricFetcher work with RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-7863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233920#comment-16233920 ] ASF GitHub Bot commented on FLINK-7863: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4852 Merging this PR once Travis gives green light. > Make MetricFetcher work with RestfulGateway > --- > > Key: FLINK-7863 > URL: https://issues.apache.org/jira/browse/FLINK-7863 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > > In order to make the {{MetricFetcher}} work together with the new > architecture, we have to remove it's dependence on the {{JobManagerGateway}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4853: [FLINK-7867] Start MetricQueryService in TaskManagerRunne...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4853 Merging this PR once Travis gives green light. ---
[jira] [Commented] (FLINK-7867) Start MetricQueryService on TaskManagerRunner
[ https://issues.apache.org/jira/browse/FLINK-7867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233921#comment-16233921 ] ASF GitHub Bot commented on FLINK-7867: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4853 Merging this PR once Travis gives green light. > Start MetricQueryService on TaskManagerRunner > - > > Key: FLINK-7867 > URL: https://issues.apache.org/jira/browse/FLINK-7867 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > > The {{TaskManagerRunner}} should start a {{MetricQueryService}} such that the > web ui can query the {{TaskExecutor}} metrics. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3748 Thanks for the update @PangZhi. Btw. it is recommended to post a short comment when you update a PR. Pushing an update does not trigger a notification. The PR looks good. Will merge this. ---
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233926#comment-16233926 ] ASF GitHub Bot commented on FLINK-6225: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3748 Thanks for the update @PangZhi. Btw. it is recommended to post a short comment when you update a PR. Pushing an update does not trigger a notification. The PR looks good. Will merge this. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai >Priority: Major > Fix For: 1.4.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7939) DataStream of atomic type cannot be converted to Table with time attributes
[ https://issues.apache.org/jira/browse/FLINK-7939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-7939. Resolution: Fixed Fixed for 1.3.3 with 168378d98ddf591f780a939ee74310ec8d04d517 Fixed for 1.4.0 with 505d478d55c93e07a7227e375939eca19ec4d082 > DataStream of atomic type cannot be converted to Table with time attributes > --- > > Key: FLINK-7939 > URL: https://issues.apache.org/jira/browse/FLINK-7939 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.3 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Fix For: 1.4.0, 1.3.3 > > > A DataStream of an atomic type, such as {{DataStream}} or > {{DataStream}} cannot be converted into a {{Table}} with a time > attribute. > {code} > DataStream stream = ... > Table table = tEnv.fromDataStream(stream, "string, rowtime.rowtime") > {code} > yields > {code} > Exception in thread "main" org.apache.flink.table.api.TableException: Field > reference expression requested. > at > org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:630) > at > org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) > at > org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624) > at > org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398) > at > org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85) > {code} > As a workaround the atomic type can be wrapped in {{Tuple1}}, i.e., convert a > {{DataStream}} into a {{DataStream>}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4794: [build][minor] Add missing licenses
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4794 The `masters` and `slaves` file probably does not need a license header (although the rules of when you need one and when not are not very clear to me). I think config files frequently have a license header, so I would take the `zoo.cfg` change. @yew1eb Are you okay if we merge this without the changes to `masters` and `slaves`? ---
[GitHub] flink issue #4872: [FLINK-7876] Register TaskManagerMetricGroup under Resour...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4872 Thanks for the review @zentol. I've addressed your comments and rebased onto the latest master. If Travis gives green light, then I'll merge this PR. ---
[GitHub] flink issue #4835: [FLINK-7847][avro] Fix typo in jackson shading pattern
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4835 @aljoscha do you want to include this in your avro PR? ---
[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID
[ https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233949#comment-16233949 ] ASF GitHub Bot commented on FLINK-7876: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4872 Thanks for the review @zentol. I've addressed your comments and rebased onto the latest master. If Travis gives green light, then I'll merge this PR. > Register TaskManagerMetricGroup under ResourceID instead of InstanceID > -- > > Key: FLINK-7876 > URL: https://issues.apache.org/jira/browse/FLINK-7876 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under > its {{InstanceID}} and thereby binding its metrics effectively to the > lifetime of its registration with the {{JobManager}}. This has also > implications how the REST handler retrieve the TaskManager metrics, namely by > its {{InstanceID}}. > I would actually propose to register the {{TaskManagerMetricGroup}} under the > {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole > lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be > able to query metrics independent of the connection status to the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7847) Fix typo in flink-avro shading pattern
[ https://issues.apache.org/jira/browse/FLINK-7847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233950#comment-16233950 ] ASF GitHub Bot commented on FLINK-7847: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4835 @aljoscha do you want to include this in your avro PR? > Fix typo in flink-avro shading pattern > -- > > Key: FLINK-7847 > URL: https://issues.apache.org/jira/browse/FLINK-7847 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.4.0 > > > {code} > > org.codehaus.jackson > > org.apache.flink.avro.shaded.org.codehouse.jackson > > {code} > The shaded pattern should be > "org.apache.flink.avro.shaded.org.codehaus.jackson". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4920: [FLINK-7944] Allow configuring Hadoop classpath
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4920 Can you describe that this does and how it works? Only getting a rough idea from the shell scripts... ---
[jira] [Commented] (FLINK-7944) Allow configuring Hadoop Classpath
[ https://issues.apache.org/jira/browse/FLINK-7944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233956#comment-16233956 ] ASF GitHub Bot commented on FLINK-7944: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4920 Can you describe that this does and how it works? Only getting a rough idea from the shell scripts... > Allow configuring Hadoop Classpath > -- > > Key: FLINK-7944 > URL: https://issues.apache.org/jira/browse/FLINK-7944 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.4.0 > > > Currently, we have some paths hardcoded in {{config.sh}} in addition to some > magic that tries to derive a classpath from the {{hadoop}}/{{hbase}} > commands. We should make the classpath configurable using a separate script > and put the classpath in a {{classpath}} file that will be picked up by the > scripts. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7881) flink can't deployed on yarn with ha
[ https://issues.apache.org/jira/browse/FLINK-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233955#comment-16233955 ] Aljoscha Krettek commented on FLINK-7881: - [~trohrm...@apache.org] FLINK-7951 is a duplicate of this one, right? And you already have the fix so we can probably close this one. > flink can't deployed on yarn with ha > > > Key: FLINK-7881 > URL: https://issues.apache.org/jira/browse/FLINK-7881 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.2 >Reporter: deng >Priority: Blocker > Attachments: screenshot-1.png, screenshot-2.png > > > I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It > always connect to hdfs://master:8020, it should be 9000, my hdfs defaultfs is > hdfs://master. > I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work. > Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10 > 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client > - IPC Client (1035144464) connection to > startdt/173.16.5.215:8020 from admin: closed > 2017-10-20 11:00:05,398 ERROR > org.apache.flink.yarn.YarnApplicationMasterRunner - YARN > Application Master initialization failed > java.net.ConnectException: Call From spark3/173.16.5.216 to master:8020 > failed on connection exception: java.net.ConnectException: Connection > refused; For more details see: > http://wiki.apache.org/hadoop/ConnectionRefused > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792) > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732) > at org.apache.hadoop.ipc.Client.call(Client.java:1479) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7881) flink can't deployed on yarn with ha
[ https://issues.apache.org/jira/browse/FLINK-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233958#comment-16233958 ] Aljoscha Krettek commented on FLINK-7881: - And thanks to [~djh4230] for also finding the fix in parallel. 👍 > flink can't deployed on yarn with ha > > > Key: FLINK-7881 > URL: https://issues.apache.org/jira/browse/FLINK-7881 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.2 >Reporter: deng >Priority: Blocker > Attachments: screenshot-1.png, screenshot-2.png > > > I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It > always connect to hdfs://master:8020, it should be 9000, my hdfs defaultfs is > hdfs://master. > I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work. > Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10 > 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client > - IPC Client (1035144464) connection to > startdt/173.16.5.215:8020 from admin: closed > 2017-10-20 11:00:05,398 ERROR > org.apache.flink.yarn.YarnApplicationMasterRunner - YARN > Application Master initialization failed > java.net.ConnectException: Call From spark3/173.16.5.216 to master:8020 > failed on connection exception: java.net.ConnectException: Connection > refused; For more details see: > http://wiki.apache.org/hadoop/ConnectionRefused > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792) > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732) > at org.apache.hadoop.ipc.Client.call(Client.java:1479) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4921: [FLINK-7943] Make ParameterTool thread safe
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4921 Travis failure: ``` Tests run: 17, Failures: 0, Errors: 6, Skipped: 0, Time elapsed: 0.029 sec <<< FAILURE! - in org.apache.flink.api.java.utils.RequiredParametersTest testApplyToWithOptionWithLongAndShortNameAndDefaultValue(org.apache.flink.api.java.utils.RequiredParametersTest) Time elapsed: 0.012 sec <<< ERROR! java.lang.UnsupportedOperationException: null at java.util.Collections$UnmodifiableMap.put(Collections.java:1457) at org.apache.flink.api.java.utils.RequiredParameters.hasNoDefaultValueAndNoValuePassedOnAlternativeName(RequiredParameters.java:147) at org.apache.flink.api.java.utils.RequiredParameters.checkAndApplyDefaultValue(RequiredParameters.java:116) at org.apache.flink.api.java.utils.RequiredParameters.applyTo(RequiredParameters.java:94) at org.apache.flink.api.java.utils.RequiredParametersTest.testApplyToWithOptionWithLongAndShortNameAndDefaultValue(RequiredParametersTest.java:195) testApplyToWithMultipleTypes(org.apache.flink.api.java.utils.RequiredParametersTest) Time elapsed: 0.002 sec <<< ERROR! java.lang.UnsupportedOperationException: null at java.util.Collections$UnmodifiableMap.put(Collections.java:1457) at org.apache.flink.api.java.utils.RequiredParameters.hasNoDefaultValueAndNoValuePassedOnAlternativeName(RequiredParameters.java:147) at org.apache.flink.api.java.utils.RequiredParameters.applyTo(RequiredParameters.java:103) at org.apache.flink.api.java.utils.RequiredParametersTest.testApplyToWithMultipleTypes(RequiredParametersTest.java:228) testApplyToWithOptionMultipleOptionsAndOneDefaultValue(org.apache.flink.api.java.utils.RequiredParametersTest) Time elapsed: 0.001 sec <<< ERROR! java.lang.UnsupportedOperationException: null at java.util.Collections$UnmodifiableMap.put(Collections.java:1457) at org.apache.flink.api.java.utils.RequiredParameters.hasNoDefaultValueAndNoValuePassedOnAlternativeName(RequiredParameters.java:147) at org.apache.flink.api.java.utils.RequiredParameters.applyTo(RequiredParameters.java:103) at org.apache.flink.api.java.utils.RequiredParametersTest.testApplyToWithOptionMultipleOptionsAndOneDefaultValue(RequiredParametersTest.java:210) testApplyToWithOptionAndDefaultValue(org.apache.flink.api.java.utils.RequiredParametersTest) Time elapsed: 0 sec <<< ERROR! java.lang.UnsupportedOperationException: null at java.util.Collections$UnmodifiableMap.put(Collections.java:1457) at org.apache.flink.api.java.utils.RequiredParameters.hasNoDefaultValueAndNoValuePassedOnAlternativeName(RequiredParameters.java:147) at org.apache.flink.api.java.utils.RequiredParameters.checkAndApplyDefaultValue(RequiredParameters.java:116) at org.apache.flink.api.java.utils.RequiredParameters.applyTo(RequiredParameters.java:94) at org.apache.flink.api.java.utils.RequiredParametersTest.testApplyToWithOptionAndDefaultValue(RequiredParametersTest.java:182) testApplyToMovesValuePassedOnShortNameToLongNameIfLongNameIsUndefined(org.apache.flink.api.java.utils.RequiredParametersTest) Time elapsed: 0.001 sec <<< ERROR! java.lang.UnsupportedOperationException: null at java.util.Collections$UnmodifiableMap.put(Collections.java:1457) at org.apache.flink.api.java.utils.RequiredParameters.hasNoDefaultValueAndNoValuePassedOnAlternativeName(RequiredParameters.java:144) at org.apache.flink.api.java.utils.RequiredParameters.applyTo(RequiredParameters.java:103) at org.apache.flink.api.java.utils.RequiredParametersTest.testApplyToMovesValuePassedOnShortNameToLongNameIfLongNameIsUndefined(RequiredParametersTest.java:127) testDefaultValueDoesNotOverrideValuePassedOnShortKeyIfLongKeyIsNotPassedButPresent(org.apache.flink.api.java.utils.RequiredParametersTest) Time elapsed: 0.001 sec <<< ERROR! java.lang.UnsupportedOperationException: null at java.util.Collections$UnmodifiableMap.put(Collections.java:1457) at org.apache.flink.api.java.utils.RequiredParameters.hasNoDefaultValueAndNoValuePassedOnAlternativeName(RequiredParameters.java:144) at org.apache.flink.api.java.utils.RequiredParameters.checkAndApplyDefaultValue(RequiredParameters.java:116) at org.apache.flink.api.java.utils.RequiredParameters.applyTo(RequiredParameters.java:94) at org.apache.flink.api.java.utils.RequiredParametersTest.testDefaultValueDoesNotOverrideValuePassedOnShortKeyIfLongKeyIsNotPassedButPresent(RequiredParametersTest.java:142) ``` ---