[GitHub] flink issue #5409: [FLINK-8555][TableAPI & SQL] Fix TableFunction varargs le...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5409 Thanks for the PR @Xpray. The changes look good. I will make sure that the `SqlOperandTypeChecker` is also used for the other function types. ---
[jira] [Commented] (FLINK-8555) Fix TableFunction varargs length exceeds 254 for SQL
[ https://issues.apache.org/jira/browse/FLINK-8555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353528#comment-16353528 ] ASF GitHub Bot commented on FLINK-8555: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5409 Thanks for the PR @Xpray. The changes look good. I will make sure that the `SqlOperandTypeChecker` is also used for the other function types. > Fix TableFunction varargs length exceeds 254 for SQL > > > Key: FLINK-8555 > URL: https://issues.apache.org/jira/browse/FLINK-8555 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > > With Varargs, TableAPI can handle table function call with parameters exceeds > 254 correctly. > This issue is intend to support long parameters for SQL -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8563) Support consecutive DOT operators
[ https://issues.apache.org/jira/browse/FLINK-8563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-8563: Issue Type: Sub-task (was: Improvement) Parent: FLINK-8507 > Support consecutive DOT operators > -- > > Key: FLINK-8563 > URL: https://issues.apache.org/jira/browse/FLINK-8563 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Priority: Major > > We added support for accessing fields of arrays of composite types in > FLINK-7923. However, accessing another nested subfield is not supported by > Calcite. See CALCITE-2162. We should fix this once we upgrade to Calcite 1.16. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...
Github user cjolif commented on the issue: https://github.com/apache/flink/pull/5374 @tzulitai did you have a chance to look at this? If you have any question please let me know? ---
[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353511#comment-16353511 ] ASF GitHub Bot commented on FLINK-8101: --- Github user cjolif commented on the issue: https://github.com/apache/flink/pull/5374 @tzulitai did you have a chance to look at this? If you have any question please let me know? > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Flavio Pompermaier >Priority: Major > Fix For: 1.5.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8563) Support consecutive DOT operators
Timo Walther created FLINK-8563: --- Summary: Support consecutive DOT operators Key: FLINK-8563 URL: https://issues.apache.org/jira/browse/FLINK-8563 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther We added support for accessing fields of arrays of composite types in FLINK-7923. However, accessing another nested subfield is not supported by Calcite. See CALCITE-2162. We should fix this once we upgrade to Calcite 1.16. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353508#comment-16353508 ] Xingcan Cui commented on FLINK-8538: Thanks for your reply, [~fhueske]. I'll think it over. > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-7923) Support accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353506#comment-16353506 ] Timo Walther edited comment on FLINK-7923 at 2/6/18 7:32 AM: - Fixed in 1.5.0: 91b7d011022fc1211aebe9114dfa6b497da49553. was (Author: twalthr): Fixed in 91b7d011022fc1211aebe9114dfa6b497da49553. > Support accessing subfields of a Composite element in an Object Array type > column > - > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > Fix For: 1.5.0 > > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7923) Support accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353507#comment-16353507 ] ASF GitHub Bot commented on FLINK-7923: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5367 > Support accessing subfields of a Composite element in an Object Array type > column > - > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > Fix For: 1.5.0 > > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-7923) Support accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-7923. - Resolution: Fixed Fixed in 91b7d011022fc1211aebe9114dfa6b497da49553. > Support accessing subfields of a Composite element in an Object Array type > column > - > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > Fix For: 1.5.0 > > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5367 ---
[jira] [Commented] (FLINK-7923) Support accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353486#comment-16353486 ] ASF GitHub Bot commented on FLINK-7923: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5367 Btw I will remove the null tests because fields of tuples and case classes are not allowed to be null. The serializers would throw an exception. > Support accessing subfields of a Composite element in an Object Array type > column > - > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > Fix For: 1.5.0 > > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5367 Btw I will remove the null tests because fields of tuples and case classes are not allowed to be null. The serializers would throw an exception. ---
[jira] [Commented] (FLINK-7923) Support accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353479#comment-16353479 ] ASF GitHub Bot commented on FLINK-7923: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5367 Thanks for the update @suez1224. The changes look good. I will merge this... > Support accessing subfields of a Composite element in an Object Array type > column > - > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > Fix For: 1.5.0 > > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5367 Thanks for the update @suez1224. The changes look good. I will merge this... ---
[jira] [Updated] (FLINK-8562) Fix YARNSessionFIFOSecuredITCase
[ https://issues.apache.org/jira/browse/FLINK-8562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-8562: -- Description: Currently, YARNSessionFIFOSecuredITCase will not fail even if the current Flink YARN Kerberos integration is failing in production. Please see FLINK-8275. (was: Currently, YARNSessionFIFOSecuredITCase will not fail even if the current Flink YARN Kerberos integration test is failing. Please see FLINK-8275.) > Fix YARNSessionFIFOSecuredITCase > > > Key: FLINK-8562 > URL: https://issues.apache.org/jira/browse/FLINK-8562 > Project: Flink > Issue Type: Bug > Components: Security >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > Currently, YARNSessionFIFOSecuredITCase will not fail even if the current > Flink YARN Kerberos integration is failing in production. Please see > FLINK-8275. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8562) Fix YARNSessionFIFOSecuredITCase
Shuyi Chen created FLINK-8562: - Summary: Fix YARNSessionFIFOSecuredITCase Key: FLINK-8562 URL: https://issues.apache.org/jira/browse/FLINK-8562 Project: Flink Issue Type: Bug Components: Security Reporter: Shuyi Chen Assignee: Shuyi Chen Currently, YARNSessionFIFOSecuredITCase will not fail even if the current Flink YARN Kerberos integration test is failing. Please see FLINK-8275. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-6469) Configure Memory Sizes with units
[ https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-6469: --- Assignee: vinoyang > Configure Memory Sizes with units > - > > Key: FLINK-6469 > URL: https://issues.apache.org/jira/browse/FLINK-6469 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > > Currently, memory sizes are configured by pure numbers, the interpretation is > different from configuration parameter to parameter. > For example, heap sizes are configured in megabytes, network buffer memory is > configured in bytes, alignment thresholds are configured in bytes. > I propose to configure all memory parameters the same way, with units similar > to time. The JVM itself configured heap size similarly: {{Xmx5g}} or > {{Xmx2000m}}. > {code} > 1 -> bytes > 10 kb > 64 mb > 1 gb > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353271#comment-16353271 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r166175837 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java --- @@ -131,10 +131,14 @@ public StreamInputProcessor( long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); if (!(maxAlign == -1 || maxAlign > 0)) { throw new IllegalConfigurationException( - TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() - + " must be positive or -1 (infinite)"); + TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() --- End diff -- I think we can change the current `CheckpointBarrierHandler` interface into abstract class and then add a `createBarrierHanlder` method for extracting the common parts in `StreamInputProcessor` and `StreamTwoInputProcessor`. Or we define a new class for the common method. I prefer the first way. What do you think? > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r166175837 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java --- @@ -131,10 +131,14 @@ public StreamInputProcessor( long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); if (!(maxAlign == -1 || maxAlign > 0)) { throw new IllegalConfigurationException( - TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() - + " must be positive or -1 (infinite)"); + TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() --- End diff -- I think we can change the current `CheckpointBarrierHandler` interface into abstract class and then add a `createBarrierHanlder` method for extracting the common parts in `StreamInputProcessor` and `StreamTwoInputProcessor`. Or we define a new class for the common method. I prefer the first way. What do you think? ---
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353265#comment-16353265 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r166174743 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until + * all inputs have received the barrier for a given checkpoint. + * + * The BarrierBuffer continues receiving buffers from the blocked channels and buffered them + * internally until the blocks are released. It will not cause deadlocks based on credit-based + * flow control. + */ +@Internal +public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedBarrierBuffer.class); + + /** The gate that the buffer draws its input from. */ + private final InputGate inputGate; + + /** Flags that indicate whether a channel is currently blocked/buffered. */ + private final boolean[] blockedChannels; + + /** The total number of channels that this buffer handles data from. */ + private final int totalNumberOfInputChannels; + + /** The utility to buffer blocked data in the memory queue. */ + private final CreditBasedBufferBlocker bufferBlocker; + + /** +* The pending blocked buffer/event sequences. Must be consumed before requesting further data +* from the input gate. +*/ + private final ArrayDeque queuedBuffered; --- End diff -- I think we can not directly mix all the blocked buffers for different checkpoint ids into one `ArrayDeque`. It also needs the `BufferOrEventSequence` which indicates the blocked buffers for a specific checkpoint id, otherwise we can not know when the blocked buffers are exhausted after reset a specific checkpoint id. If we want to use only one `ArrayDeque` for blocking all buffers, we may need to insert extra hints of checkpoint id into this queue for helping when to stop reading blocked buffers from the queue. For example: channel1: [cp1,cp2,b1,cp3,b2,b3] channel2: [cp2] 1. When reading cp1 first from channel1, [cp2,b1,cp3,b2,b3] are blocked as sep
[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r166174743 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until + * all inputs have received the barrier for a given checkpoint. + * + * The BarrierBuffer continues receiving buffers from the blocked channels and buffered them + * internally until the blocks are released. It will not cause deadlocks based on credit-based + * flow control. + */ +@Internal +public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedBarrierBuffer.class); + + /** The gate that the buffer draws its input from. */ + private final InputGate inputGate; + + /** Flags that indicate whether a channel is currently blocked/buffered. */ + private final boolean[] blockedChannels; + + /** The total number of channels that this buffer handles data from. */ + private final int totalNumberOfInputChannels; + + /** The utility to buffer blocked data in the memory queue. */ + private final CreditBasedBufferBlocker bufferBlocker; + + /** +* The pending blocked buffer/event sequences. Must be consumed before requesting further data +* from the input gate. +*/ + private final ArrayDeque queuedBuffered; --- End diff -- I think we can not directly mix all the blocked buffers for different checkpoint ids into one `ArrayDeque`. It also needs the `BufferOrEventSequence` which indicates the blocked buffers for a specific checkpoint id, otherwise we can not know when the blocked buffers are exhausted after reset a specific checkpoint id. If we want to use only one `ArrayDeque` for blocking all buffers, we may need to insert extra hints of checkpoint id into this queue for helping when to stop reading blocked buffers from the queue. For example: channel1: [cp1,cp2,b1,cp3,b2,b3] channel2: [cp2] 1. When reading cp1 first from channel1, [cp2,b1,cp3,b2,b3] are blocked as separate sequence1. 2. When reading cp2 from channel2, the cp1 is released and begins to read sequence1. 3. When reading cp2 from seq1, the following buffers will be blocked in new seq2. 4. When reading cp3 from seq1,the cp2 is released and the
[jira] [Commented] (FLINK-8552) CliFrontend don't exit normal after job has been submitted with 'bin/flink run some_jar_ball'
[ https://issues.apache.org/jira/browse/FLINK-8552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353191#comment-16353191 ] Lynch Lee commented on FLINK-8552: -- OK. Thanks ! > CliFrontend don't exit normal after job has been submitted with 'bin/flink > run some_jar_ball' > - > > Key: FLINK-8552 > URL: https://issues.apache.org/jira/browse/FLINK-8552 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.4.0 >Reporter: Lynch Lee >Priority: Major > > I used cmd 'bin/flink run some_jar_ball' to submit my job into remote > cluster, but I found it the java process did not exit normally while my > submitting action is done and job status changed into RUNNING . > > Is this a Bug to fixed ? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352692#comment-16352692 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on the issue: https://github.com/apache/flink/pull/5295 I can see it's gone through Travis and is now in master, so closing as requested > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Assignee: Dyana Rose >Priority: Minor > Fix For: 1.5.0 > > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...
Github user dyanarose closed the pull request at: https://github.com/apache/flink/pull/5295 ---
[GitHub] flink issue #5295: [FLINK-8384] [streaming] Session Window Assigner with Dyn...
Github user dyanarose commented on the issue: https://github.com/apache/flink/pull/5295 I can see it's gone through Travis and is now in master, so closing as requested ---
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352693#comment-16352693 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose closed the pull request at: https://github.com/apache/flink/pull/5295 > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Assignee: Dyana Rose >Priority: Minor > Fix For: 1.5.0 > > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352657#comment-16352657 ] Fabian Hueske commented on FLINK-8538: -- Hi [~xccui], please see my comments: # Yes, the idea is to make this quite modular. Not only for users that implement other connectors, but also for us. For instance, we'd like to reuse code for JSON formats when reading from Kafka, Kinesis, Pravega, Files, etc. # This is not clear yet and will be driven by user demand. We'll focus on the most popular systems for which there are source functions that we can wrap (Kafka, Kinesis, file system, etc.) # I think so too. Most of the existing sources (which are not too many) would need to be refactored to become more modular. For instance, it does not really make sense to have Kafka table sources for each version of Kafka and (Avro, JSON). This will become hard to maintain once we add more formats like ProtoBuf, CSV, or ... # The current version does not feature format factories, but this is something we are thinking about to add soon as well. A format factory would need to provide a format parser that parses data in the format but also returns the {{TypeInformation}} of the returned type. It's great that you want to help with this effort! It would be great if we could have a first {{KafkaJsonTableSourceFactory}} soon, to have a Kafka source that can be used with the SQL CLI client. Starting from the +CsvTableSourceFactory+ sounds like a good idea to me. We should make sure though, that the things I discussed above can be later refactored without breaking too much API. Thanks, Fabian > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8561) SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.
[ https://issues.apache.org/jira/browse/FLINK-8561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-8561. - Resolution: Fixed > SharedBuffer line 573 uses == to compare BufferEntries instead of .equals. > -- > > Key: FLINK-8561 > URL: https://issues.apache.org/jira/browse/FLINK-8561 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.4.0, 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0, 1.4.1 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8561) SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.
[ https://issues.apache.org/jira/browse/FLINK-8561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352646#comment-16352646 ] Kostas Kloudas commented on FLINK-8561: --- Merged on master 3d323ba10171da6e6416efca832b76da46ea010b and on 1.4 33efaf74a12d7fe436a9d61d5377329496c6c6a0 > SharedBuffer line 573 uses == to compare BufferEntries instead of .equals. > -- > > Key: FLINK-8561 > URL: https://issues.apache.org/jira/browse/FLINK-8561 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.4.0, 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0, 1.4.1 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7923) Support accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-7923: -- Fix Version/s: 1.5.0 > Support accessing subfields of a Composite element in an Object Array type > column > - > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > Fix For: 1.5.0 > > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352634#comment-16352634 ] Xingcan Cui commented on FLINK-8538: Thanks for the explanation [~twalthr]. I've glanced over the PR for FLINK-8240 and tried to comprehend the work. 1. We are going to provide a unified interface, which is described by general string properties, to define the table sources and sinks. Ideally, the users can combine the descriptors (in different dimensions such as type, format, and schema) to make their own table sources/sinks in the future, right? 2. Do you have any plans for how many table source/sink factories we are going to support? 3. I got a feeling that we must do some refactorings for the existing connectors since the source types and formats seem to be heavily coupled. 4. When mentioned {{format}} discovery, did you mean to implement different {{FormatDescriptors}} and to match them just like using the {{SPI}} for the {{TableSourceFactories}}? I'd like to assist to implement the whole feature :) For now, maybe I can take the {{CsvTableSourceFactory}} as a demo and imitate it to implement an elementary {{KafkaTableSourceFactory}}. What do you think? > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5415: Multipath merged
GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/5415 Multipath merged ## What is the purpose of the change * Add support to `FileInputFormat` to read from multiple paths. At the moment only a single path is supported that can be recursively read if the path is a directory. * Maintain compatibility with existing input formats that extend `FileInputFormat` This PR is an extension / rework of PR #1990. ## Brief change log `FileInputFormat` is declared as `@Public` interface. * deprecate the `protected` variable `FileInputFormat.filePath` of type `Path`. We need to store more than one path to support multiple paths. * deprecate the method `FileInputFormat.getFilePath()`. We need to return a list of paths. * add a private member variable to `FileInputFormat` to store multiple paths. * add methods to set and get multiple paths. * add a `deprecated` method `public boolean FileInputFormat.supportsMultiPaths()` with default implementation `return false;` that input formats can override if they support multiple paths. If the method returns `true`, the deprecated `filePath` variable is not used anymore and will always be `null`. * changed all public sub classes of `FileInputFormat` to not directly access the deprecated `filePath` variable but use the more generic `getFilePaths()` method. * We cannot override `supportsMultiPaths` in `DelimitedInputFormat` and `BinaryInputFormat` because they are part of the public API and not final. ## Verifying this change * added a couple of tests to validate that splits and statistics are correctly generated. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **YES** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **yes** - If yes, how is the feature documented? Only in JavaDocs so far. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink multipath-merged Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5415.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5415 commit 13e84ff078d6112648dace7559139dea85f46c5d Author: Phetsarath, Sourigna Date: 2016-05-13T16:24:46Z [FLINK-3655] Multiple File Paths for FileInputFormat. commit d46b32726e3043debc94d097f2dc296841689d36 Author: Fabian Hueske Date: 2018-02-05T13:40:50Z [FLINK-3655] Multiple File Paths for FileInputFormat. - Reverted API-breaking changes. ---
[jira] [Updated] (FLINK-7923) Support accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-7923: -- Issue Type: New Feature (was: Bug) Summary: Support accessing subfields of a Composite element in an Object Array type column (was: SQL parser exception when accessing subfields of a Composite element in an Object Array type column) > Support accessing subfields of a Composite element in an Object Array type > column > - > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8243) OrcTableSource should recursively read all files in nested directories of the input path.
[ https://issues.apache.org/jira/browse/FLINK-8243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-8243. Resolution: Fixed Implemented for 1.4.1 with e5c1261919765876b3ad873abbd9f21bee2fe12b Implemented for 1.5.0 with de3d85ba19d11ad8a7ab38b30b74953421e6383d > OrcTableSource should recursively read all files in nested directories of the > input path. > - > > Key: FLINK-8243 > URL: https://issues.apache.org/jira/browse/FLINK-8243 > Project: Flink > Issue Type: Improvement > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The {{OrcTableSource}} only reads files on the first level of the provided > input path. > Hive's default behavior is to recursively read all nested files in the input > path. We should follow this behavior and add a switch to disable recursive > reading. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8242) ClassCastException in OrcTableSource.toOrcPredicate
[ https://issues.apache.org/jira/browse/FLINK-8242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-8242. Resolution: Fixed Fixed for 1.4.1 with 19fcd5ebadabdebc4d56716920937f229f06f5d3 Fixed for 1.5.0 with 63a19e879bfb4c4981f151e7b50481df094dcb09 > ClassCastException in OrcTableSource.toOrcPredicate > --- > > Key: FLINK-8242 > URL: https://issues.apache.org/jira/browse/FLINK-8242 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The {{OrcTableSource}} tries to cast all predicate literals to > {{Serializable}} in its {{toOrcPredicate()}} method. This fails with a > {{ClassCastException}} if a literal is not serializable. > Instead of failing, we should ignore the predicate and log a WARN message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5414: Cep inv
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5414 ---
[jira] [Commented] (FLINK-8243) OrcTableSource should recursively read all files in nested directories of the input path.
[ https://issues.apache.org/jira/browse/FLINK-8243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352600#comment-16352600 ] ASF GitHub Bot commented on FLINK-8243: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5344 > OrcTableSource should recursively read all files in nested directories of the > input path. > - > > Key: FLINK-8243 > URL: https://issues.apache.org/jira/browse/FLINK-8243 > Project: Flink > Issue Type: Improvement > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The {{OrcTableSource}} only reads files on the first level of the provided > input path. > Hive's default behavior is to recursively read all nested files in the input > path. We should follow this behavior and add a switch to disable recursive > reading. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8242) ClassCastException in OrcTableSource.toOrcPredicate
[ https://issues.apache.org/jira/browse/FLINK-8242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352601#comment-16352601 ] ASF GitHub Bot commented on FLINK-8242: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5345 > ClassCastException in OrcTableSource.toOrcPredicate > --- > > Key: FLINK-8242 > URL: https://issues.apache.org/jira/browse/FLINK-8242 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The {{OrcTableSource}} tries to cast all predicate literals to > {{Serializable}} in its {{toOrcPredicate()}} method. This fails with a > {{ClassCastException}} if a literal is not serializable. > Instead of failing, we should ignore the predicate and log a WARN message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5345: [FLINK-8242] [orc] Fix predicate push-down of OrcT...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5345 ---
[GitHub] flink pull request #5344: [FLINK-8243] [orc] OrcTableSource reads input path...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5344 ---
[GitHub] flink pull request #5414: Cep inv
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5414#discussion_r166021070 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java --- @@ -519,8 +498,9 @@ public int hashCode() { private final ValueTimeWrapper valueTime; private final Set> edges; private final SharedBufferPage page; + private int referenceCounter; - private transient int entryId; + private int entryId; --- End diff -- we were not serializing it anyway and the class is not `Serializable`. ---
[GitHub] flink pull request #5414: Cep inv
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5414#discussion_r166018249 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java --- @@ -609,22 +580,27 @@ public String toString() { @Override public boolean equals(Object obj) { - if (obj instanceof SharedBufferEntry) { - @SuppressWarnings("unchecked") - SharedBufferEntry other = (SharedBufferEntry) obj; + if (!(obj instanceof SharedBufferEntry)) { + return false; + } - return valueTime.equals(other.valueTime) && + @SuppressWarnings("unchecked") + SharedBufferEntry other = (SharedBufferEntry) obj; + + return valueTime.equals(other.valueTime) && getKey().equals(other.getKey()) && referenceCounter == other.referenceCounter && - edges.equals(other.edges); - } else { - return false; - } + Objects.equals(edges, other.edges); } @Override public int hashCode() { - return Objects.hash(valueTime, getKey(), referenceCounter, edges); + int result = 1; --- End diff -- Why not use `Objects.hash()` here anymore? ---
[GitHub] flink pull request #5414: Cep inv
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5414#discussion_r166020117 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java --- @@ -542,15 +542,12 @@ public void addEdge(SharedBufferEdge edge) { /** * Remove edges with the specified targets. */ - private void removeEdges(final List> prunedEntries) { - Iterator> itor = edges.iterator(); - while (itor.hasNext()) { - SharedBufferEdge edge = itor.next(); - for (SharedBufferEntry prunedEntry : prunedEntries) { - if (prunedEntry == edge.getTarget()) { - itor.remove(); - break; - } + private void removeEdges(final Set> prunedEntries) { + Iterator> it = edges.iterator(); + while (it.hasNext()) { + SharedBufferEdge edge = it.next(); + if (prunedEntries.contains(edge.getTarget())) { --- End diff -- Very subtle ... ð The `equals()` is used internally in `Set.contains()`. ---
[GitHub] flink pull request #5414: Cep inv
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5414#discussion_r166017493 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java --- @@ -437,72 +425,63 @@ public void add(final ValueTimeWrapper valueTime, final SharedBufferEntry> prunedEntries) { - Iterator, SharedBufferEntry>> iterator = entries.entrySet().iterator(); - boolean continuePruning = true; - - while (iterator.hasNext() && continuePruning) { - SharedBufferEntry entry = iterator.next().getValue(); - + private void prune(final long pruningTimestamp, final List> prunedEntries) { + Iterator, SharedBufferEntry>> it = entries.entrySet().iterator(); + while (it.hasNext()) { + SharedBufferEntry entry = it.next().getValue(); if (entry.getValueTime().getTimestamp() <= pruningTimestamp) { prunedEntries.add(entry); - iterator.remove(); - } else { - continuePruning = false; + it.remove(); --- End diff -- was the early cancel not working before? Or: why was `continuePruning` removed. ---
[GitHub] flink pull request #5414: Cep inv
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5414#discussion_r166017932 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java --- @@ -519,8 +498,9 @@ public int hashCode() { private final ValueTimeWrapper valueTime; private final Set> edges; private final SharedBufferPage page; + private int referenceCounter; - private transient int entryId; + private int entryId; --- End diff -- Why is this not `transient` anymore? ---
[GitHub] flink pull request #5414: Cep inv
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5414#discussion_r166018025 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java --- @@ -578,14 +555,8 @@ private void removeEdges(final List> prunedEntries) { } } - public boolean remove() { - if (page != null) { - page.remove(valueTime); - - return true; - } else { - return false; - } + public void remove() { + page.remove(valueTime); --- End diff -- Did we never need the check before? ---
[jira] [Resolved] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui resolved FLINK-7797. Resolution: Fixed Fix Version/s: 1.5.0 > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui >Priority: Major > Fix For: 1.5.0 > > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5414: Cep inv
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/5414 Cep inv R @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink cep-inv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5414.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5414 commit 41dba903691b9b3c4e8fcc8831a1c57930ee3071 Author: kkloudas Date: 2018-01-25T09:59:30Z [hotfix] [cep] SharedBuffer refactoring. commit 7a3d9327e0877fcd9496ea2b367ba69e6220568c Author: kkloudas Date: 2018-02-05T13:36:53Z [FLINK-8561] [cep] Fix SharedBuffer.removeEdges to use .equals(). ---
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352575#comment-16352575 ] ASF GitHub Bot commented on FLINK-8384: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5295 Thanks a lot for working on this and iterating so quickly! 👍 I merged this but could you please close the PR if it doesn't close automatically? > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Assignee: Dyana Rose >Priority: Minor > Fix For: 1.5.0 > > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5295: [FLINK-8384] [streaming] Session Window Assigner with Dyn...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5295 Thanks a lot for working on this and iterating so quickly! ð I merged this but could you please close the PR if it doesn't close automatically? ---
[jira] [Assigned] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-8384: --- Assignee: Dyana Rose > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Assignee: Dyana Rose >Priority: Minor > Fix For: 1.5.0 > > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-8384. --- Resolution: Fixed Implemented on master in ede4c0751b630503605248e8d22f29977f58624a > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Assignee: Dyana Rose >Priority: Minor > Fix For: 1.5.0 > > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8384: Fix Version/s: 1.5.0 > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > Fix For: 1.5.0 > > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7803) Update savepoint Documentation
[ https://issues.apache.org/jira/browse/FLINK-7803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352541#comment-16352541 ] ASF GitHub Bot commented on FLINK-7803: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/4809#discussion_r166008336 --- Diff: docs/ops/state/savepoints.md --- @@ -120,6 +120,10 @@ This will atomically trigger a savepoint for the job with ID `:jobid` and cancel If you don't specify a target directory, you need to have [configured a default directory](#configuration). Otherwise, cancelling the job with a savepoint will fail. + --- End diff -- 👍 > Update savepoint Documentation > -- > > Key: FLINK-7803 > URL: https://issues.apache.org/jira/browse/FLINK-7803 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Razvan >Assignee: Razvan >Priority: Major > > Can you please update > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html > to specify the savepoint location *MUST* always be a location accessible by > all hosts? > I spent quite some time believing it'S a bug and trying to find solutions, > see https://issues.apache.org/jira/browse/FLINK-7750. It's not obvious in the > current documentation and other might waste time also believing it's an > actual issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #4809: [FLINK-7803][Documentation] Add missing savepoint ...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/4809#discussion_r166008336 --- Diff: docs/ops/state/savepoints.md --- @@ -120,6 +120,10 @@ This will atomically trigger a savepoint for the job with ID `:jobid` and cancel If you don't specify a target directory, you need to have [configured a default directory](#configuration). Otherwise, cancelling the job with a savepoint will fail. + --- End diff -- ð ---
[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
[ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352526#comment-16352526 ] ASF GitHub Bot commented on FLINK-8516: --- Github user tweise commented on the issue: https://github.com/apache/flink/pull/5393 @tzulitai please see changes and couple questions. > FlinkKinesisConsumer does not balance shards over subtasks > -- > > Key: FLINK-8516 > URL: https://issues.apache.org/jira/browse/FLINK-8516 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2, 1.5.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > > The hash code of the shard is used to distribute discovered shards over > subtasks round robin. This works as long as shard identifiers are sequential. > After shards are rebalanced in Kinesis, that may no longer be the case and > the distribution become skewed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5393 @tzulitai please see changes and couple questions. ---
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352520#comment-16352520 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5400 @pnowojski , thanks for reviews! I understand your concerns and I should deduplicate some common utils in these tests. I will do that tomorrow together with other comments! > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7803) Update savepoint Documentation
[ https://issues.apache.org/jira/browse/FLINK-7803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352524#comment-16352524 ] ASF GitHub Bot commented on FLINK-7803: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4809#discussion_r166001086 --- Diff: docs/ops/state/savepoints.md --- @@ -120,6 +120,10 @@ This will atomically trigger a savepoint for the job with ID `:jobid` and cancel If you don't specify a target directory, you need to have [configured a default directory](#configuration). Otherwise, cancelling the job with a savepoint will fail. + --- End diff -- Should be something like: ``` Attention: targetDirectory has to be a location accessible by both the JobManager(s) and TaskManager(s), e.g., a location on a distributed file system. ``` because markdown is not rendered within HTML tags. Also, the *warning* you have added is currently in the *Cancel Job with Savepoint* section. I think it should be moved to the section above (*Trigger a Savepoint*), or somewhere else since it does not only apply to the cancelation case. @uce > Update savepoint Documentation > -- > > Key: FLINK-7803 > URL: https://issues.apache.org/jira/browse/FLINK-7803 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Razvan >Assignee: Razvan >Priority: Major > > Can you please update > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html > to specify the savepoint location *MUST* always be a location accessible by > all hosts? > I spent quite some time believing it'S a bug and trying to find solutions, > see https://issues.apache.org/jira/browse/FLINK-7750. It's not obvious in the > current documentation and other might waste time also believing it's an > actual issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5400: [FLINK-8547][network] Implement CheckpointBarrierHandler ...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5400 @pnowojski , thanks for reviews! I understand your concerns and I should deduplicate some common utils in these tests. I will do that tomorrow together with other comments! ---
[GitHub] flink pull request #4809: [FLINK-7803][Documentation] Add missing savepoint ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4809#discussion_r166001086 --- Diff: docs/ops/state/savepoints.md --- @@ -120,6 +120,10 @@ This will atomically trigger a savepoint for the job with ID `:jobid` and cancel If you don't specify a target directory, you need to have [configured a default directory](#configuration). Otherwise, cancelling the job with a savepoint will fail. + --- End diff -- Should be something like: ``` Attention: targetDirectory has to be a location accessible by both the JobManager(s) and TaskManager(s), e.g., a location on a distributed file system. ``` because markdown is not rendered within HTML tags. Also, the *warning* you have added is currently in the *Cancel Job with Savepoint* section. I think it should be moved to the section above (*Trigger a Savepoint*), or somewhere else since it does not only apply to the cancelation case. @uce ---
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352509#comment-16352509 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165998584 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java --- @@ -131,10 +131,14 @@ public StreamInputProcessor( long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); if (!(maxAlign == -1 || maxAlign > 0)) { throw new IllegalConfigurationException( - TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() - + " must be positive or -1 (infinite)"); + TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() --- End diff -- yes, i will consider a proper way > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165998584 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java --- @@ -131,10 +131,14 @@ public StreamInputProcessor( long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); if (!(maxAlign == -1 || maxAlign > 0)) { throw new IllegalConfigurationException( - TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() - + " must be positive or -1 (infinite)"); + TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() --- End diff -- yes, i will consider a proper way ---
[jira] [Updated] (FLINK-8020) Deadlock found in Async I/O operator
[ https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-8020: Priority: Critical (was: Blocker) > Deadlock found in Async I/O operator > > > Key: FLINK-8020 > URL: https://issues.apache.org/jira/browse/FLINK-8020 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming, Streaming Connectors >Affects Versions: 1.3.2 > Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode >Reporter: Weihua Jiang >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > Attachments: jstack53009(2).out, jstack67976-2.log > > > Our streaming job run into trouble in these days after a long time smooth > running. One issue we found is > [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this > one. > After analyzing the jstack, we believe we found a DEAD LOCK in flink: > 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" > hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940. > 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: > hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock > 0x0007b6aa1788. > This DEADLOCK made the job fail to proceed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352505#comment-16352505 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165997853 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until + * all inputs have received the barrier for a given checkpoint. + * + * The BarrierBuffer continues receiving buffers from the blocked channels and buffered them + * internally until the blocks are released. It will not cause deadlocks based on credit-based + * flow control. + */ +@Internal +public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedBarrierBuffer.class); + + /** The gate that the buffer draws its input from. */ + private final InputGate inputGate; + + /** Flags that indicate whether a channel is currently blocked/buffered. */ + private final boolean[] blockedChannels; + + /** The total number of channels that this buffer handles data from. */ + private final int totalNumberOfInputChannels; + + /** The utility to buffer blocked data in the memory queue. */ + private final CreditBasedBufferBlocker bufferBlocker; + + /** +* The pending blocked buffer/event sequences. Must be consumed before requesting further data +* from the input gate. +*/ + private final ArrayDeque queuedBuffered; --- End diff -- The current implementation keeps the same logic with `BarrierBuffer`. I am wondering whether it can make sense if only keeping one `ArrayDeque` for holding all blocking buffers for different checkpoint ids. Especially for the uncommon case mentioned on line 496 in `BarrierBuffer`. I will double check that logic and reply to you later. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink >
[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165997853 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until + * all inputs have received the barrier for a given checkpoint. + * + * The BarrierBuffer continues receiving buffers from the blocked channels and buffered them + * internally until the blocks are released. It will not cause deadlocks based on credit-based + * flow control. + */ +@Internal +public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedBarrierBuffer.class); + + /** The gate that the buffer draws its input from. */ + private final InputGate inputGate; + + /** Flags that indicate whether a channel is currently blocked/buffered. */ + private final boolean[] blockedChannels; + + /** The total number of channels that this buffer handles data from. */ + private final int totalNumberOfInputChannels; + + /** The utility to buffer blocked data in the memory queue. */ + private final CreditBasedBufferBlocker bufferBlocker; + + /** +* The pending blocked buffer/event sequences. Must be consumed before requesting further data +* from the input gate. +*/ + private final ArrayDeque queuedBuffered; --- End diff -- The current implementation keeps the same logic with `BarrierBuffer`. I am wondering whether it can make sense if only keeping one `ArrayDeque` for holding all blocking buffers for different checkpoint ids. Especially for the uncommon case mentioned on line 496 in `BarrierBuffer`. I will double check that logic and reply to you later. ---
[GitHub] flink pull request #5413: [hotfix][table][tests] Set @Ignore description for...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5413 [hotfix][table][tests] Set @Ignore description for RowCsvInputFormatT⦠Trivial change that moves the reasoning for `@Ignore` from a comment into the annotation itself. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink hotfix_ignore Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5413.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5413 commit 9a189ee22769ca34ee71d3145ee48d5cecd5271c Author: zentol Date: 2018-02-05T15:01:48Z [hotfix][table][tests] Set @Ignore description for RowCsvInputFormatTest#testParserCorrectness ---
[jira] [Commented] (FLINK-8471) Add Flip-6 configuration switch
[ https://issues.apache.org/jira/browse/FLINK-8471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352489#comment-16352489 ] ASF GitHub Bot commented on FLINK-8471: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/5334 I have a usability concern. It's possible to run ``` FLINK_MODE=flip6 bin/start-cluster.sh ``` This will start a cluster in flilp6 mode. However, ``` FLINK_MODE=flip6 bin/flink list ``` won't work. If the first way is not intended, then it doesn't matter. > Add Flip-6 configuration switch > --- > > Key: FLINK-8471 > URL: https://issues.apache.org/jira/browse/FLINK-8471 > Project: Flink > Issue Type: Improvement > Components: Configuration >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should add a configuration switch to activate and de-activate the Flip-6 > code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5334: [FLINK-8471] [flip6] Introduce configuration switch for F...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/5334 I have a usability concern. It's possible to run ``` FLINK_MODE=flip6 bin/start-cluster.sh ``` This will start a cluster in flilp6 mode. However, ``` FLINK_MODE=flip6 bin/flink list ``` won't work. If the first way is not intended, then it doesn't matter. ---
[jira] [Commented] (FLINK-8534) if insert too much BucketEntry into one bucket in join of iteration will cause a error (Caused : java.io.FileNotFoundException release file error)
[ https://issues.apache.org/jira/browse/FLINK-8534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352487#comment-16352487 ] zhu.qing commented on FLINK-8534: - Do I need provide more information? -- -- Best Regards Qing zhu East China Normal University > if insert too much BucketEntry into one bucket in join of iteration will > cause a error (Caused : java.io.FileNotFoundException release file error) > -- > > Key: FLINK-8534 > URL: https://issues.apache.org/jira/browse/FLINK-8534 > Project: Flink > Issue Type: Bug > Components: Local Runtime > Environment: windows, intellij idea, 8g ram, 4core i5 cpu, Flink > 1.4.0, and parallelism = 2 will cause problem and others will not. >Reporter: zhu.qing >Priority: Major > Attachments: T2AdjSetBfs.java > > > When insert too much entry into bucket (MutableHashTable insertBucketEntry() > line 1054 more than 255) will cause spillPartition() (HashPartition line > 317). So > this.buildSideChannel = ioAccess.createBlockChannelWriter(targetChannel, > bufferReturnQueue); > And in > prepareNextPartition() of ReOpenableMutableHashTable (line 156) > furtherPartitioning = true; > so in > finalizeProbePhase() in HashPartition (line 367) > this.probeSideChannel.close(); > //the file will be delete > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > after deleteChannel the next iteartion will fail. > > And I use web-google(SNAP) as dataset. > > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.io.IOException: Channel to path > 'C:\Users\sanquan.qz\AppData\Local\Temp\flink-io-5af23edc-1ec0-4718-87a5-916ee022a8be\fc08af25b6f879b8e7bb24291c47ea1d.18.channel' > could not be opened. > at > org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.(AbstractFileIOChannel.java:61) > at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.(AsynchronousFileIOChannel.java:86) > at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader.(AsynchronousBulkBlockReader.java:46) > at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader.(AsynchronousBulkBlockReader.java:39) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.createBulkBlockChannelReader(IOManagerAsync.java:294) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:880) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:637) > at > org.apache.flink.runtime.operators.hash.ReOpenableMutableHashTable.prepareNextPartition(ReOpenableMutableHashTable.java:170) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:675) > at > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:117) > at > org.apache.flink.runtime.operators.AbstractCachedBuildSideJoinDriver.run(AbstractCachedBuildSideJoinDriver.java:176) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:145) > at > org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:93) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread
[jira] [Commented] (FLINK-7758) Fix bug Kafka09Fetcher add offset metrics
[ https://issues.apache.org/jira/browse/FLINK-7758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352461#comment-16352461 ] ASF GitHub Bot commented on FLINK-7758: --- Github user yew1eb closed the pull request at: https://github.com/apache/flink/pull/4769 > Fix bug Kafka09Fetcher add offset metrics > --- > > Key: FLINK-7758 > URL: https://issues.apache.org/jira/browse/FLINK-7758 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Metrics >Affects Versions: 1.3.2 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Major > > in Kafka09Fetcher, add _KafkaConsumer_ kafkaMetricGroup. > No judgment that the useMetrics variable is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7984) Bump snappy-java to 1.1.4
[ https://issues.apache.org/jira/browse/FLINK-7984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352458#comment-16352458 ] ASF GitHub Bot commented on FLINK-7984: --- Github user yew1eb closed the pull request at: https://github.com/apache/flink/pull/5072 > Bump snappy-java to 1.1.4 > - > > Key: FLINK-7984 > URL: https://issues.apache.org/jira/browse/FLINK-7984 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Major > Fix For: 1.5.0 > > > Upgrade the snappy java version to 1.1.4(the latest, May, 2017). The older > version has some issues like memory leak > (https://github.com/xerial/snappy-java/issues/91). > Snappy Java [Release > Notes|https://github.com/xerial/snappy-java/blob/master/Milestone.md] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #4769: [FLINK-7758][kafka][hotfix] Fix bug Kafka09Fetcher...
Github user yew1eb closed the pull request at: https://github.com/apache/flink/pull/4769 ---
[GitHub] flink pull request #5072: [FLINK-7984][build] Bump snappy-java to 1.1.4
Github user yew1eb closed the pull request at: https://github.com/apache/flink/pull/5072 ---
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352441#comment-16352441 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165983714 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until + * all inputs have received the barrier for a given checkpoint. + * + * The BarrierBuffer continues receiving buffers from the blocked channels and buffered them + * internally until the blocks are released. It will not cause deadlocks based on credit-based --- End diff -- sure > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352438#comment-16352438 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165983607 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; --- End diff -- the checkstyle failures are fixed > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165983607 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; --- End diff -- the checkstyle failures are fixed ---
[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165983714 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until + * all inputs have received the barrier for a given checkpoint. + * + * The BarrierBuffer continues receiving buffers from the blocked channels and buffered them + * internally until the blocks are released. It will not cause deadlocks based on credit-based --- End diff -- sure ---
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352436#comment-16352436 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165983496 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java --- @@ -184,6 +184,18 @@ key("taskmanager.network.detailed-metrics") .defaultValue(false); + /** +* Config parameter defining whether to spill data for channels with barrier or not in exactly-once +* mode based on credit-based flow control. +* +* @deprecated Will be removed for Flink 1.6 when the old code will be dropped in favour of +* credit-based flow control. +*/ + @Deprecated + public static final ConfigOption EXACTLY_ONCE_BLOCKING_DATA_ENABLED = + key("taskmanager.exactly-once.blocking.data.enabled") + .defaultValue(false); --- End diff -- yes, the default value should be true, but I think it should be changed after the `FLINK-7456` is merged to make the credit-based work. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165983496 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java --- @@ -184,6 +184,18 @@ key("taskmanager.network.detailed-metrics") .defaultValue(false); + /** +* Config parameter defining whether to spill data for channels with barrier or not in exactly-once +* mode based on credit-based flow control. +* +* @deprecated Will be removed for Flink 1.6 when the old code will be dropped in favour of +* credit-based flow control. +*/ + @Deprecated + public static final ConfigOption EXACTLY_ONCE_BLOCKING_DATA_ENABLED = + key("taskmanager.exactly-once.blocking.data.enabled") + .defaultValue(false); --- End diff -- yes, the default value should be true, but I think it should be changed after the `FLINK-7456` is merged to make the credit-based work. ---
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352418#comment-16352418 ] ASF GitHub Bot commented on FLINK-7608: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5161 merging. > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Major > Fix For: 1.5.0 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8559) Exceptions in RocksDBIncrementalSnapshotOperation#takeSnapshot cause job to get stuck
[ https://issues.apache.org/jira/browse/FLINK-8559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352419#comment-16352419 ] ASF GitHub Bot commented on FLINK-8559: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5412 merging. > Exceptions in RocksDBIncrementalSnapshotOperation#takeSnapshot cause job to > get stuck > - > > Key: FLINK-8559 > URL: https://issues.apache.org/jira/browse/FLINK-8559 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Priority: Blocker > > In the {{RocksDBKeyedStatebackend#snapshotIncrementally}} we can find this > code > > {code:java} > final RocksDBIncrementalSnapshotOperation snapshotOperation = > new RocksDBIncrementalSnapshotOperation<>( > this, > checkpointStreamFactory, > checkpointId, > checkpointTimestamp); > snapshotOperation.takeSnapshot(); > return new FutureTask( > new Callable() { > @Override > public KeyedStateHandle call() throws Exception { > return snapshotOperation.materializeSnapshot(); > } > } > ) { > @Override > public boolean cancel(boolean mayInterruptIfRunning) { > snapshotOperation.stop(); > return super.cancel(mayInterruptIfRunning); > } > @Override > protected void done() { > snapshotOperation.releaseResources(isCancelled()); > } > }; > {code} > In the constructor of RocksDBIncrementalSnapshotOperation we call > {{aquireResource()}} on the RocksDB {{ResourceGuard}}. If > {{snapshotOperation.takeSnapshot()}} fails with an exception these resources > are never released. When the task is shutdown due to the exception it will > get stuck on releasing RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5161: [FLINK-7608][metric] Refactor latency statistics metric
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5161 merging. ---
[GitHub] flink issue #5412: [FLINK-8559][RocksDB] Release resources if snapshot opera...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5412 merging. ---
[jira] [Commented] (FLINK-7984) Bump snappy-java to 1.1.4
[ https://issues.apache.org/jira/browse/FLINK-7984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352409#comment-16352409 ] ASF GitHub Bot commented on FLINK-7984: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5072 @yew1eb Could you close the PR? The issue was addressed in f1e4d25c11a678688064492d50ffad38c39ea877. > Bump snappy-java to 1.1.4 > - > > Key: FLINK-7984 > URL: https://issues.apache.org/jira/browse/FLINK-7984 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Major > Fix For: 1.5.0 > > > Upgrade the snappy java version to 1.1.4(the latest, May, 2017). The older > version has some issues like memory leak > (https://github.com/xerial/snappy-java/issues/91). > Snappy Java [Release > Notes|https://github.com/xerial/snappy-java/blob/master/Milestone.md] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5072: [FLINK-7984][build] Bump snappy-java to 1.1.4
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5072 @yew1eb Could you close the PR? The issue was addressed in f1e4d25c11a678688064492d50ffad38c39ea877. ---
[jira] [Closed] (FLINK-7984) Bump snappy-java to 1.1.4
[ https://issues.apache.org/jira/browse/FLINK-7984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7984. --- Resolution: Fixed master: f1e4d25c11a678688064492d50ffad38c39ea877 > Bump snappy-java to 1.1.4 > - > > Key: FLINK-7984 > URL: https://issues.apache.org/jira/browse/FLINK-7984 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Major > Fix For: 1.5.0 > > > Upgrade the snappy java version to 1.1.4(the latest, May, 2017). The older > version has some issues like memory leak > (https://github.com/xerial/snappy-java/issues/91). > Snappy Java [Release > Notes|https://github.com/xerial/snappy-java/blob/master/Milestone.md] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8503) Port TaskManagerLogHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-8503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352385#comment-16352385 ] ASF GitHub Bot commented on FLINK-8503: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5353#discussion_r165972923 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.TransientBlobKey; +import org.apache.flink.runtime.blob.TransientBlobService; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.AbstractHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalNotification; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.a
[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5353#discussion_r165972923 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.TransientBlobKey; +import org.apache.flink.runtime.blob.TransientBlobService; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.AbstractHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalNotification; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Base class for serving files from the {@link TaskExecut
[jira] [Commented] (FLINK-8172) Remove unnecessary synchronisation in RecordSerializer
[ https://issues.apache.org/jira/browse/FLINK-8172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352373#comment-16352373 ] Chesnay Schepler commented on FLINK-8172: - [~pnowojski] Was this issue subsumed by FLINK-8178? > Remove unnecessary synchronisation in RecordSerializer > -- > > Key: FLINK-8172 > URL: https://issues.apache.org/jira/browse/FLINK-8172 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.4.0, 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Fix For: 1.5.0 > > > While writing the records, RecordSerializer is the only owner of the `Buffer` > into which data are written. Yet we are synchronisation twice per record > while accessing MemorySegment. Removing this synchronisation speeds up the > Network throughput in point to point benchmark by a factor of two (from > ~12500records/ms up to 23000 records/ms). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8561) SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.
[ https://issues.apache.org/jira/browse/FLINK-8561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-8561: Affects Version/s: 1.5.0 > SharedBuffer line 573 uses == to compare BufferEntries instead of .equals. > -- > > Key: FLINK-8561 > URL: https://issues.apache.org/jira/browse/FLINK-8561 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.4.0, 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0, 1.4.1 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8561) SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.
[ https://issues.apache.org/jira/browse/FLINK-8561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-8561: Fix Version/s: 1.5.0 > SharedBuffer line 573 uses == to compare BufferEntries instead of .equals. > -- > > Key: FLINK-8561 > URL: https://issues.apache.org/jira/browse/FLINK-8561 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.4.0, 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0, 1.4.1 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8561) SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.
Kostas Kloudas created FLINK-8561: - Summary: SharedBuffer line 573 uses == to compare BufferEntries instead of .equals. Key: FLINK-8561 URL: https://issues.apache.org/jira/browse/FLINK-8561 Project: Flink Issue Type: Bug Components: CEP Affects Versions: 1.4.0 Reporter: Kostas Kloudas Assignee: Kostas Kloudas Fix For: 1.4.1 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB
[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352365#comment-16352365 ] ASF GitHub Bot commented on FLINK-5544: --- Github user orsher commented on the issue: https://github.com/apache/flink/pull/3359 Hi, Is there any progress here? This feature will made our life much easier! > Implement Internal Timer Service in RocksDB > --- > > Key: FLINK-5544 > URL: https://issues.apache.org/jira/browse/FLINK-5544 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi >Priority: Major > > Now the only implementation of internal timer service is > HeapInternalTimerService which stores all timers in memory. In the cases > where the number of keys is very large, the timer service will cost too much > memory. A implementation which stores timers in RocksDB seems good to deal > with these cases. > It might be a little challenging to implement a RocksDB timer service because > the timers are accessed in different ways. When timers are triggered, we need > to access timers in the order of timestamp. But when performing checkpoints, > we must have a method to obtain all timers of a given key group. > A good implementation, as suggested by [~StephanEwen], follows the idea of > merge sorting. We can store timers in RocksDB with the format > {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put > together and are sorted. > Then we can deploy an in-memory heap which keeps the first timer of each key > group to get the next timer to trigger. When a key group's first timer is > updated, we can efficiently update the heap. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...
Github user orsher commented on the issue: https://github.com/apache/flink/pull/3359 Hi, Is there any progress here? This feature will made our life much easier! ---
[jira] [Commented] (FLINK-8559) Exceptions in RocksDBIncrementalSnapshotOperation#takeSnapshot cause job to get stuck
[ https://issues.apache.org/jira/browse/FLINK-8559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352354#comment-16352354 ] ASF GitHub Bot commented on FLINK-8559: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5412 LGTM > Exceptions in RocksDBIncrementalSnapshotOperation#takeSnapshot cause job to > get stuck > - > > Key: FLINK-8559 > URL: https://issues.apache.org/jira/browse/FLINK-8559 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Priority: Blocker > > In the {{RocksDBKeyedStatebackend#snapshotIncrementally}} we can find this > code > > {code:java} > final RocksDBIncrementalSnapshotOperation snapshotOperation = > new RocksDBIncrementalSnapshotOperation<>( > this, > checkpointStreamFactory, > checkpointId, > checkpointTimestamp); > snapshotOperation.takeSnapshot(); > return new FutureTask( > new Callable() { > @Override > public KeyedStateHandle call() throws Exception { > return snapshotOperation.materializeSnapshot(); > } > } > ) { > @Override > public boolean cancel(boolean mayInterruptIfRunning) { > snapshotOperation.stop(); > return super.cancel(mayInterruptIfRunning); > } > @Override > protected void done() { > snapshotOperation.releaseResources(isCancelled()); > } > }; > {code} > In the constructor of RocksDBIncrementalSnapshotOperation we call > {{aquireResource()}} on the RocksDB {{ResourceGuard}}. If > {{snapshotOperation.takeSnapshot()}} fails with an exception these resources > are never released. When the task is shutdown due to the exception it will > get stuck on releasing RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5412: [FLINK-8559][RocksDB] Release resources if snapshot opera...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5412 LGTM ---
[jira] [Commented] (FLINK-8559) Exceptions in RocksDBIncrementalSnapshotOperation#takeSnapshot cause job to get stuck
[ https://issues.apache.org/jira/browse/FLINK-8559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352344#comment-16352344 ] ASF GitHub Bot commented on FLINK-8559: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5412 [FLINK-8559][RocksDB] Release resources if snapshot operation fails ## What is the purpose of the change This PR ensures that RocksDB resources are released if `RocksDBIncrementalSnapshotOperation#takeSnapshot` throws an Exception. We now catch the exception, cancel the SnapshotOperation, and re-throw the original exception. ## Verifying this change I've verified this manually by running `JobManagerHACheckpointRecoveryITCase` on Windows where `takeSnapshot` fails due to FLINK-8557. I couldn't come up with proper test. The method hardly does anything in the first place and every solution i could think of would depend a lot on implementation details (like mocking `Checkpoint.create()` to throw an exception). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) @StefanRRichter You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8559 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5412.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5412 commit 05f0ff5e353117894af4ba7dc096c3256d80450b Author: zentol Date: 2018-02-05T12:15:29Z [FLINK-8559][RocksDB] Release resources if snapshot operation fails > Exceptions in RocksDBIncrementalSnapshotOperation#takeSnapshot cause job to > get stuck > - > > Key: FLINK-8559 > URL: https://issues.apache.org/jira/browse/FLINK-8559 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Priority: Blocker > > In the {{RocksDBKeyedStatebackend#snapshotIncrementally}} we can find this > code > > {code:java} > final RocksDBIncrementalSnapshotOperation snapshotOperation = > new RocksDBIncrementalSnapshotOperation<>( > this, > checkpointStreamFactory, > checkpointId, > checkpointTimestamp); > snapshotOperation.takeSnapshot(); > return new FutureTask( > new Callable() { > @Override > public KeyedStateHandle call() throws Exception { > return snapshotOperation.materializeSnapshot(); > } > } > ) { > @Override > public boolean cancel(boolean mayInterruptIfRunning) { > snapshotOperation.stop(); > return super.cancel(mayInterruptIfRunning); > } > @Override > protected void done() { > snapshotOperation.releaseResources(isCancelled()); > } > }; > {code} > In the constructor of RocksDBIncrementalSnapshotOperation we call > {{aquireResource()}} on the RocksDB {{ResourceGuard}}. If > {{snapshotOperation.takeSnapshot()}} fails with an exception these resources > are never released. When the task is shutdown due to the exception it will > get stuck on releasing RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5412: [FLINK-8559][RocksDB] Release resources if snapsho...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5412 [FLINK-8559][RocksDB] Release resources if snapshot operation fails ## What is the purpose of the change This PR ensures that RocksDB resources are released if `RocksDBIncrementalSnapshotOperation#takeSnapshot` throws an Exception. We now catch the exception, cancel the SnapshotOperation, and re-throw the original exception. ## Verifying this change I've verified this manually by running `JobManagerHACheckpointRecoveryITCase` on Windows where `takeSnapshot` fails due to FLINK-8557. I couldn't come up with proper test. The method hardly does anything in the first place and every solution i could think of would depend a lot on implementation details (like mocking `Checkpoint.create()` to throw an exception). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) @StefanRRichter You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8559 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5412.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5412 commit 05f0ff5e353117894af4ba7dc096c3256d80450b Author: zentol Date: 2018-02-05T12:15:29Z [FLINK-8559][RocksDB] Release resources if snapshot operation fails ---