[jira] [Commented] (FLINK-4613) Extend ALS to handle implicit feedback datasets
[ https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535242#comment-15535242 ] ASF GitHub Bot commented on FLINK-4613: --- Github user mbalassi commented on the issue: https://github.com/apache/flink/pull/2542 @gaborhermann @thvasilo I would definitely like to see a test on a larger dataset, that is actually what I was asking for when I mentioned "benchmark", maybe I was not clear then. > Extend ALS to handle implicit feedback datasets > --- > > Key: FLINK-4613 > URL: https://issues.apache.org/jira/browse/FLINK-4613 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Gábor Hermann >Assignee: Gábor Hermann > > The Alternating Least Squares implementation should be extended to handle > _implicit feedback_ datasets. These datasets do not contain explicit ratings > by users, they are rather built by collecting user behavior (e.g. user > listened to artist X for Y minutes), and they require a slightly different > optimization objective. See details by [Hu et > al|http://dx.doi.org/10.1109/ICDM.2008.22]. > We do not need to modify much in the original ALS algorithm. See [Spark ALS > implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala], > which could be a basis for this extension. Only the updating factor part is > modified, and most of the changes are in the local parts of the algorithm > (i.e. UDFs). In fact, the only modification that is not local, is > precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, > which we can do with broadcast DataSets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2542: [FLINK-4613] [ml] Extend ALS to handle implicit feedback ...
Github user mbalassi commented on the issue: https://github.com/apache/flink/pull/2542 @gaborhermann @thvasilo I would definitely like to see a test on a larger dataset, that is actually what I was asking for when I mentioned "benchmark", maybe I was not clear then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4704) Move Table API to org.apache.flink.table
[ https://issues.apache.org/jira/browse/FLINK-4704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535169#comment-15535169 ] Jark Wu commented on FLINK-4704: +1 Table API is still in beta, so it's not too late to do this now. > Move Table API to org.apache.flink.table > > > Key: FLINK-4704 > URL: https://issues.apache.org/jira/browse/FLINK-4704 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > This would be a large change. But maybe now is still a good time to do it. > Otherwise we will never fix this. > Actually, the Table API is in the wrong package. At the moment it is in > {{org.apache.flink.api.table}} and the actual Scala/Java APIs are in > {{org.apache.flink.api.java/scala.table}}. All other APIs such as Python, > Gelly, Flink ML do not use the {{org.apache.flink.api}} namespace. > I suggest the following packages: > {code} > org.apache.flink.table > org.apache.flink.table.api.java > org.apache.flink.table.api.scala > {code} > What do you think? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka
[ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535151#comment-15535151 ] Tzu-Li (Gordon) Tai commented on FLINK-4618: Hi [~melmoth], I'd like to make sure this bug is fixed in the upcoming 1.1.3 bugfix release. Let me know if you'd like to continue working on this :) Otherwise I can pick it up soon. > FlinkKafkaConsumer09 should start from the next record on startup from > offsets in Kafka > --- > > Key: FLINK-4618 > URL: https://issues.apache.org/jira/browse/FLINK-4618 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 > Environment: Flink 1.1.2 > Kafka Broker 0.10.0 > Hadoop 2.7.0 >Reporter: Melmoth > Fix For: 1.2.0, 1.1.3 > > > **Original reported ticket title: Last kafka message gets consumed twice when > restarting job** > There seem to be an issue with the offset management in Flink. When a job is > stopped and startet again, a message from the previous offset is read again. > I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new > consumer group and emitted one record. > You can cleary see, that the consumer waits for a new record at offset > 4848911, which is correct. After restarting, it consumes a record at 4848910, > causing the record to be consumed more than once. > I checked the offset with the Kafka CMD tools, the commited offset in > zookeeper is 4848910. > Here is my log output: > {code} > 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient >- Initiating connection to node 2147482646 at hdp1:6667. > 10:29:24,225 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching > committed offsets for partitions: [myTopic-0] > 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient >- Completed connection to node 2147482646 > 10:29:24,234 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No > committed offset for partition myTopic-0 > 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Resetting offset for partition myTopic-0 to latest offset. > 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Fetched offset 4848910 for partition myTopic-0 > 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > -- Inserting a new event here > 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Adding fetched record for partition myTopic-0 with offset 4848910 to > buffered record list > 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Returning fetched records at offset 4848910 for assigned partition > myTopic-0 and update position to 4848911 > 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Triggering checkpoint 6 @ 1473841823887 > 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 6 (in 96 ms) > 10:30:24,196 TRACE > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending > offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, > metadata=''}} to Node(2147482646, hdp1, 6667) > 10:30:24,204 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed > offset 4848910 for partition myTopic-0 > 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:48,057 INFO org.apache.flink.runtime.blob.B
[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously
[ https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535141#comment-15535141 ] ASF GitHub Bot commented on FLINK-4702: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81282102 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka09Fetcher}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(Kafka09Fetcher.class) +public class Kafka09FetcherTest { + + @Test + public void testCommitDoesNotBlock() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // - the mock consumer with blocking poll calls + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer>() { + + @Override + public ConsumerRecords answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); --- End diff --
[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously
[ https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535144#comment-15535144 ] ASF GitHub Bot commented on FLINK-4702: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81275213 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java --- @@ -283,10 +296,16 @@ public void commitSpecificOffsetsToKafka(Map offsets) } } - if (this.consumer != null) { - synchronized (consumerLock) { - this.consumer.commitSync(offsetsToCommit); - } + if (commitInProgress) { + LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + + "Some checkpoints may be subsumed before committed. " + + "This does not compromise Flink's checkpoint integrity."); + } --- End diff -- Will it make sense to simply move this warning to the `if (toCommit != null && !commitInProgress)` block in the main thread? That's where `commitInProgress` will actually determine whether or not the current offsets to commit will be dropped. Also, the actual committing should happen right after anyways because of `consumer.wakeup()`, so I don't see the purpose of an eager warning here. > Kafka consumer must commit offsets asynchronously > - > > Key: FLINK-4702 > URL: https://issues.apache.org/jira/browse/FLINK-4702 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.2.0, 1.1.3 > > > The offset commit calls to Kafka may occasionally take very long. > In that case, the {{notifyCheckpointComplete()}} method blocks for long and > the KafkaConsumer cannot make progress and cannot perform checkpoints. > Kafka 0.9+ have methods to commit asynchronously. > We should use those and make sure no more than one commit is concurrently in > progress, to that commit requests do not pile up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously
[ https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535142#comment-15535142 ] ASF GitHub Bot commented on FLINK-4702: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81277067 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka09Fetcher}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(Kafka09Fetcher.class) +public class Kafka09FetcherTest { + + @Test + public void testCommitDoesNotBlock() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // - the mock consumer with blocking poll calls + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer>() { + + @Override + public ConsumerRecords answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); +
[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously
[ https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535143#comment-15535143 ] ASF GitHub Bot commented on FLINK-4702: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81277361 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka09Fetcher}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(Kafka09Fetcher.class) +public class Kafka09FetcherTest { --- End diff -- Should we also add a test to make sure that `KafkaConsumer` is immediately called `wakeup()` in `commitSpecificOffsetsToKafka`? Otherwise we are not ensuring the behaviour of "committing offsets back to Kafka on checkpoints" Perhaps this can be integrated into `testCommitDoesNotBlock()`. > Kafka consumer must commit offsets asynchronously > - > > Key: FLINK-4702 > URL: https://issues.apache.org/jira/browse/FLINK-4702 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.2.0, 1.1.3 > > > The offset commit calls to Kafka may occasionally take very long. > In that case, the {{notifyCheckpointComplete()}} method blocks for long and > the KafkaConsumer cannot make progress and cannot perform checkpoints. > Kafka 0.9+ have methods to commit asynchronously. > We should u
[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81277067 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka09Fetcher}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(Kafka09Fetcher.class) +public class Kafka09FetcherTest { + + @Test + public void testCommitDoesNotBlock() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // - the mock consumer with blocking poll calls + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer>() { + + @Override + public ConsumerRecords answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { +
[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81282102 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka09Fetcher}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(Kafka09Fetcher.class) +public class Kafka09FetcherTest { + + @Test + public void testCommitDoesNotBlock() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // - the mock consumer with blocking poll calls + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer>() { + + @Override + public ConsumerRecords answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); --- End diff -- We are not ensuring that `blockLatch` is returned here, correct? Like my comment above, perhaps we should check that to to ensure that `wakeup` is called in `commitSpecificOffsetsToKafka`. --- If your project is set up for it, you can reply to
[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81275213 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java --- @@ -283,10 +296,16 @@ public void commitSpecificOffsetsToKafka(Map offsets) } } - if (this.consumer != null) { - synchronized (consumerLock) { - this.consumer.commitSync(offsetsToCommit); - } + if (commitInProgress) { + LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + + "Some checkpoints may be subsumed before committed. " + + "This does not compromise Flink's checkpoint integrity."); + } --- End diff -- Will it make sense to simply move this warning to the `if (toCommit != null && !commitInProgress)` block in the main thread? That's where `commitInProgress` will actually determine whether or not the current offsets to commit will be dropped. Also, the actual committing should happen right after anyways because of `consumer.wakeup()`, so I don't see the purpose of an eager warning here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81277361 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka09Fetcher}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(Kafka09Fetcher.class) +public class Kafka09FetcherTest { --- End diff -- Should we also add a test to make sure that `KafkaConsumer` is immediately called `wakeup()` in `commitSpecificOffsetsToKafka`? Otherwise we are not ensuring the behaviour of "committing offsets back to Kafka on checkpoints" Perhaps this can be integrated into `testCommitDoesNotBlock()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.
[ https://issues.apache.org/jira/browse/FLINK-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534886#comment-15534886 ] ASF GitHub Bot commented on FLINK-4068: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/2560 @twalthr I add some Table API tests but skip some tests doesn't supported in Table API, such as `NULLIF`, `IN`, `EXTRACT` > Move constant computations out of code-generated `flatMap` functions. > - > > Key: FLINK-4068 > URL: https://issues.apache.org/jira/browse/FLINK-4068 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Jark Wu > > The generated functions for expressions of the Table API or SQL include > constant computations. > For instance the code generated for a predicate like: > {code} > myInt < (10 + 20) > {code} > looks roughly like: > {code} > public void flatMap(Row in, Collector out) { > Integer in1 = in.productElement(1); > int temp = 10 + 20; > if (in1 < temp) { > out.collect(in) > } > } > {code} > In this example the computation of {{temp}} is constant and could be moved > out of the {{flatMap()}} method. > The same might apply for generated function other than {{FlatMap}} as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2560: [FLINK-4068] [table] Move constant computations out of co...
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/2560 @twalthr I add some Table API tests but skip some tests doesn't supported in Table API, such as `NULLIF`, `IN`, `EXTRACT` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4718) Confusing label in Parallel Streams Diagram
[ https://issues.apache.org/jira/browse/FLINK-4718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534817#comment-15534817 ] ASF GitHub Bot commented on FLINK-4718: --- GitHub user nderraugh opened a pull request: https://github.com/apache/flink/pull/2575 'Id' is the key of the stream and not the id of the event. FLINK-4718 Confusing label in Parallel Streams Diagram Minor text change to clarify a mildly confusing label in a diagram. You can merge this pull request into a Git repository by running: $ git pull https://github.com/nderraugh/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2575.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2575 commit 479c8401f9680b4e0bf88af4c7b68033aeb54cdc Author: Neil Derraugh Date: 2016-09-29T17:53:10Z 'Id' is the key of the stream and not the id of the event. > Confusing label in Parallel Streams Diagram > --- > > Key: FLINK-4718 > URL: https://issues.apache.org/jira/browse/FLINK-4718 > Project: Flink > Issue Type: Bug > Components: Project Website >Affects Versions: 1.1.0 >Reporter: Neil Derraugh >Priority: Trivial > Labels: newbie > Fix For: 1.1.2 > > > The Event [id|timestamp] label in the Parallel Streams Diagram is confusing. > The 'id' is in fact the key of the stream and not the id of the event record. > Hence we have B35 and B33. > The 'id' label should be changed to key or key_id to better represent its > nature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2575: 'Id' is the key of the stream and not the id of th...
GitHub user nderraugh opened a pull request: https://github.com/apache/flink/pull/2575 'Id' is the key of the stream and not the id of the event. FLINK-4718 Confusing label in Parallel Streams Diagram Minor text change to clarify a mildly confusing label in a diagram. You can merge this pull request into a Git repository by running: $ git pull https://github.com/nderraugh/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2575.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2575 commit 479c8401f9680b4e0bf88af4c7b68033aeb54cdc Author: Neil Derraugh Date: 2016-09-29T17:53:10Z 'Id' is the key of the stream and not the id of the event. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.
[ https://issues.apache.org/jira/browse/FLINK-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534809#comment-15534809 ] ASF GitHub Bot commented on FLINK-4068: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2560#discussion_r81270921 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala --- @@ -60,6 +60,7 @@ class FlinkPlannerImpl( var root: RelRoot = _ private def ready() { +planner.setExecutor(config.getExecutor) --- End diff -- Yes you are right ! This is the reason why Table API not work. > Move constant computations out of code-generated `flatMap` functions. > - > > Key: FLINK-4068 > URL: https://issues.apache.org/jira/browse/FLINK-4068 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Jark Wu > > The generated functions for expressions of the Table API or SQL include > constant computations. > For instance the code generated for a predicate like: > {code} > myInt < (10 + 20) > {code} > looks roughly like: > {code} > public void flatMap(Row in, Collector out) { > Integer in1 = in.productElement(1); > int temp = 10 + 20; > if (in1 < temp) { > out.collect(in) > } > } > {code} > In this example the computation of {{temp}} is constant and could be moved > out of the {{flatMap()}} method. > The same might apply for generated function other than {{FlatMap}} as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2560: [FLINK-4068] [table] Move constant computations ou...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2560#discussion_r81270921 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala --- @@ -60,6 +60,7 @@ class FlinkPlannerImpl( var root: RelRoot = _ private def ready() { +planner.setExecutor(config.getExecutor) --- End diff -- Yes you are right ! This is the reason why Table API not work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534797#comment-15534797 ] Zhijiang Wang edited comment on FLINK-4715 at 9/30/16 2:31 AM: --- Yes, we already experienced this problem in real production many times, because the user code can not be controlled. If the thread is waiting for synchronized lock or other cases, it can not be cancelled. We take the way that if the job master cancel the task failed many times, the job master will let the task manager exit itself. was (Author: zjwang): Yes, we already experienced this problem in real production many times, because the user code can not be controlled. If the thread is waiting for synchronized lock or other cases, it can not be cancelled, and the job master cancel the task failed many times, the job master will let the task manager exit itself. > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor
[ https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534800#comment-15534800 ] ASF GitHub Bot commented on FLINK-4348: --- Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2571 @mxm, What would happen under following cases: taskExecutor releases a registered slot, then taskExecutor reports its latest slotReport to ResourceManager, ResourceManager should remove the old slot allocation from its own view and mark the slot free. So I think we should keep the following code in the old SlotManager `updateSlotStatus`: _else { // slot is reported empty // check whether we also thought this slot is empty if (allocationMap.isAllocated(slotId)) { LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null", slotId, allocationMap.getAllocationID(slotId)); // we thought the slot is in use, correct it allocationMap.removeAllocation(slotId); // we have a free slot! handleFreeSlot(slot); } }_ > Implement slot allocation protocol with TaskExecutor > > > Key: FLINK-4348 > URL: https://issues.apache.org/jira/browse/FLINK-4348 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Kurt Young >Assignee: Maximilian Michels > > When slotManager finds a proper slot in the free pool for a slot request, > slotManager marks the slot as occupied, then tells the taskExecutor to give > the slot to the specified JobMaster. > when a slot request is sent to taskExecutor, it should contain following > parameters: AllocationID, JobID, slotID, resourceManagerLeaderSessionID. > There exists 3 following possibilities of the response from taskExecutor, we > will discuss when each possibility happens and how to handle. > 1. Ack request which means the taskExecutor gives the slot to the specified > jobMaster as expected. > 2. Decline request if the slot is already occupied by other AllocationID. > 3. Timeout which could caused by lost of request message or response message > or slow network transfer. > On the first occasion, ResourceManager need to do nothing. However, under the > second and third occasion, ResourceManager need to notify slotManager, > slotManager will verify and clear all the previous allocate information for > this slot request firstly, then try to find a proper slot for the slot > request again. This may cause some duplicate allocation, e.g. the slot > request to TaskManager is successful but the response is lost somehow, so we > may request a slot in another TaskManager, this causes two slots assigned to > one request, but it can be taken care of by rejecting registration at > JobMaster. > There are still some question need to discuss in a step further. > 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I > think it's better that SlotManager delegates the rpc call to ResourceManager > when SlotManager need to communicate with outside world. ResourceManager > know which taskExecutor to send the request based on ResourceID. Besides this > RPC call which used to request slot to taskExecutor should not be a > RpcMethod, because we hope only SlotManager has permission to call the > method, but the other component, for example JobMaster and TaskExecutor, > cannot call this method directly. > 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor > should notify the free slot to ResourceManager immediately, or wait for next > heartbeat sync. The advantage of first way is the resourceManager’s view > could be updated faster. The advantage of second way is save a RPC method in > ResourceManager. > 3. There are two communication type. First, the slot request could be sent as > an ask operation where the response is returned as a future. Second, > resourceManager send the slot request in fire and forget way, the response > could be returned by an RPC call. I prefer the first one because it is more > simple and could save a RPC method in ResourceManager (for callback in the > second way). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4718) Confusing label in Parallel Streams Diagram
Neil Derraugh created FLINK-4718: Summary: Confusing label in Parallel Streams Diagram Key: FLINK-4718 URL: https://issues.apache.org/jira/browse/FLINK-4718 Project: Flink Issue Type: Bug Components: Project Website Affects Versions: 1.1.0 Reporter: Neil Derraugh Priority: Trivial Fix For: 1.1.2 The Event [id|timestamp] label in the Parallel Streams Diagram is confusing. The 'id' is in fact the key of the stream and not the id of the event record. Hence we have B35 and B33. The 'id' label should be changed to key or key_id to better represent its nature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2571 @mxm, What would happen under following cases: taskExecutor releases a registered slot, then taskExecutor reports its latest slotReport to ResourceManager, ResourceManager should remove the old slot allocation from its own view and mark the slot free. So I think we should keep the following code in the old SlotManager `updateSlotStatus`: _else { // slot is reported empty // check whether we also thought this slot is empty if (allocationMap.isAllocated(slotId)) { LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null", slotId, allocationMap.getAllocationID(slotId)); // we thought the slot is in use, correct it allocationMap.removeAllocation(slotId); // we have a free slot! handleFreeSlot(slot); } }_ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534797#comment-15534797 ] Zhijiang Wang commented on FLINK-4715: -- Yes, we already experienced this problem in real production many times, because the user code can not be controlled. If the thread is waiting for synchronized lock or other cases, it can not be cancelled, and the job master cancel the task failed many times, the job master will let the task manager exit itself. > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534249#comment-15534249 ] ASF GitHub Bot commented on FLINK-2254: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81246840 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair + * of top vertices. + * + * Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + * Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or + * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods. + * + * @param the key type of the top vertices + * @param the key type of the bottom vertices + * @param the top vertices value type + * @param the bottom vertices value type + * @param the edge value type + */ +public class BipartiteGraph { + private final ExecutionEnvironment context; + private final DataSet> topVertices; + private final DataSet> bottomVertices; + private final DataSet> edges; + + private BipartiteGraph( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** +* Create bipartite graph from datasets. +* +* @param topVertices dataset of top vertices in the graph +* @param bottomVertices dataset of bottom vertices in the graph +* @param edges dataset of edges between vertices +* @param context Flink execution context +* @param the key type of the top vertices +* @param the key type of the bottom vertices +* @param the top vertices value type +* @param the bottom vertices value type +* @param the edge value type +* @return new bipartite graph created from provided datasets +*/ + public static BipartiteGraph fromDataSet( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** +* Get dataset with top vertices. +* +* @return dataset with top vertices +*/ + public DataSet> getTopVertices() { + return topVertices; + } + + /** +* Get dataset with bottom vertices. +* +* @return dataset with bottom vertices +*/ + public DataSet> getBottomVertices() {
[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81246840 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair + * of top vertices. + * + * Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + * Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or + * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods. + * + * @param the key type of the top vertices + * @param the key type of the bottom vertices + * @param the top vertices value type + * @param the bottom vertices value type + * @param the edge value type + */ +public class BipartiteGraph { + private final ExecutionEnvironment context; + private final DataSet> topVertices; + private final DataSet> bottomVertices; + private final DataSet> edges; + + private BipartiteGraph( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** +* Create bipartite graph from datasets. +* +* @param topVertices dataset of top vertices in the graph +* @param bottomVertices dataset of bottom vertices in the graph +* @param edges dataset of edges between vertices +* @param context Flink execution context +* @param the key type of the top vertices +* @param the key type of the bottom vertices +* @param the top vertices value type +* @param the bottom vertices value type +* @param the edge value type +* @return new bipartite graph created from provided datasets +*/ + public static BipartiteGraph fromDataSet( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** +* Get dataset with top vertices. +* +* @return dataset with top vertices +*/ + public DataSet> getTopVertices() { + return topVertices; + } + + /** +* Get dataset with bottom vertices. +* +* @return dataset with bottom vertices +*/ + public DataSet> getBottomVertices() { + return bottomVertices; + } + + /** +* Get dataset with graph edges. +* +* @return dataset with graph edges +*/ + public DataSet> getEdges() { + return edges; + } +
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534241#comment-15534241 ] ASF GitHub Bot commented on FLINK-2254: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81246465 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java --- @@ -34,10 +34,10 @@ public Edge(){} - public Edge(K src, K trg, V val) { - this.f0 = src; - this.f1 = trg; - this.f2 = val; + public Edge(K source, K target, V value) { --- End diff -- To make them consistent with naming style in other classes. Do you suggest to revert this? > Add Bipartite Graph Support for Gelly > - > > Key: FLINK-2254 > URL: https://issues.apache.org/jira/browse/FLINK-2254 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Ivan Mushketyk > Labels: requires-design-doc > > A bipartite graph is a graph for which the set of vertices can be divided > into two disjoint sets such that each edge having a source vertex in the > first set, will have a target vertex in the second set. We would like to > support efficient operations for this type of graphs along with a set of > metrics(http://jponnela.com/web_documents/twomode.pdf). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81246465 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java --- @@ -34,10 +34,10 @@ public Edge(){} - public Edge(K src, K trg, V val) { - this.f0 = src; - this.f1 = trg; - this.f2 = val; + public Edge(K source, K target, V value) { --- End diff -- To make them consistent with naming style in other classes. Do you suggest to revert this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534239#comment-15534239 ] ASF GitHub Bot commented on FLINK-2254: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2564 Hi @greghogan, I like your ideas about providing different API for projections. This should be better than my approach. @vasia What do you think about this? > Add Bipartite Graph Support for Gelly > - > > Key: FLINK-2254 > URL: https://issues.apache.org/jira/browse/FLINK-2254 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Ivan Mushketyk > Labels: requires-design-doc > > A bipartite graph is a graph for which the set of vertices can be divided > into two disjoint sets such that each edge having a source vertex in the > first set, will have a target vertex in the second set. We would like to > support efficient operations for this type of graphs along with a set of > metrics(http://jponnela.com/web_documents/twomode.pdf). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2564 Hi @greghogan, I like your ideas about providing different API for projections. This should be better than my approach. @vasia What do you think about this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534224#comment-15534224 ] ASF GitHub Bot commented on FLINK-2254: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81245767 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java --- @@ -480,7 +480,8 @@ protected static File asFile(String path) { } } - assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length); + assertEquals(String.format("Wrong number of elements result. Expected: %s. Result: %s.", Arrays.toString(expectedStrings), Arrays.toString(resultStrings)), --- End diff -- The issue here is that it compares lengths of objects and therefore JUnit only prints compared numbers (say 2 and 0) and not content of arrays. > Add Bipartite Graph Support for Gelly > - > > Key: FLINK-2254 > URL: https://issues.apache.org/jira/browse/FLINK-2254 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Ivan Mushketyk > Labels: requires-design-doc > > A bipartite graph is a graph for which the set of vertices can be divided > into two disjoint sets such that each edge having a source vertex in the > first set, will have a target vertex in the second set. We would like to > support efficient operations for this type of graphs along with a set of > metrics(http://jponnela.com/web_documents/twomode.pdf). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81245767 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java --- @@ -480,7 +480,8 @@ protected static File asFile(String path) { } } - assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length); + assertEquals(String.format("Wrong number of elements result. Expected: %s. Result: %s.", Arrays.toString(expectedStrings), Arrays.toString(resultStrings)), --- End diff -- The issue here is that it compares lengths of objects and therefore JUnit only prints compared numbers (say 2 and 0) and not content of arrays. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534216#comment-15534216 ] ASF GitHub Bot commented on FLINK-4329: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2546 Hi @StephanEwen . If I understand correctly, your suggestion is to make the test something like the following: 1) put the split in the reader 2) read the split 3) when the split finishes update the time in the provider 4) observe the time in the output elements. If this is the case, then the problem is that the reader just puts the split in a queue, and this is picked up by another thread that reads it. In this context, there is no way of knowing when the reading thread has finished reading the split and goes to the next one. So step 3) cannot be synchronized correctly. This is the reason I am just having a thread in the test that tries (without guarantees - the race condition you mentioned) to update the time while the reader is still reading. Any suggestions are welcome. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2546: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2546 Hi @StephanEwen . If I understand correctly, your suggestion is to make the test something like the following: 1) put the split in the reader 2) read the split 3) when the split finishes update the time in the provider 4) observe the time in the output elements. If this is the case, then the problem is that the reader just puts the split in a queue, and this is picked up by another thread that reads it. In this context, there is no way of knowing when the reading thread has finished reading the split and goes to the next one. So step 3) cannot be synchronized correctly. This is the reason I am just having a thread in the test that tries (without guarantees - the race condition you mentioned) to update the time while the reader is still reading. Any suggestions are welcome. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534014#comment-15534014 ] ASF GitHub Bot commented on FLINK-2254: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81225166 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java --- @@ -480,7 +480,8 @@ protected static File asFile(String path) { } } - assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length); + assertEquals(String.format("Wrong number of elements result. Expected: %s. Result: %s.", Arrays.toString(expectedStrings), Arrays.toString(resultStrings)), --- End diff -- Doesn't IntelliJ offer to view the different results? > Add Bipartite Graph Support for Gelly > - > > Key: FLINK-2254 > URL: https://issues.apache.org/jira/browse/FLINK-2254 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Ivan Mushketyk > Labels: requires-design-doc > > A bipartite graph is a graph for which the set of vertices can be divided > into two disjoint sets such that each edge having a source vertex in the > first set, will have a target vertex in the second set. We would like to > support efficient operations for this type of graphs along with a set of > metrics(http://jponnela.com/web_documents/twomode.pdf). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534015#comment-15534015 ] ASF GitHub Bot commented on FLINK-2254: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81226805 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair + * of top vertices. + * + * Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + * Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or + * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods. + * + * @param the key type of the top vertices + * @param the key type of the bottom vertices + * @param the top vertices value type + * @param the bottom vertices value type + * @param the edge value type + */ +public class BipartiteGraph { + private final ExecutionEnvironment context; + private final DataSet> topVertices; + private final DataSet> bottomVertices; + private final DataSet> edges; + + private BipartiteGraph( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** +* Create bipartite graph from datasets. +* +* @param topVertices dataset of top vertices in the graph +* @param bottomVertices dataset of bottom vertices in the graph +* @param edges dataset of edges between vertices +* @param context Flink execution context +* @param the key type of the top vertices +* @param the key type of the bottom vertices +* @param the top vertices value type +* @param the bottom vertices value type +* @param the edge value type +* @return new bipartite graph created from provided datasets +*/ + public static BipartiteGraph fromDataSet( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** +* Get dataset with top vertices. +* +* @return dataset with top vertices +*/ + public DataSet> getTopVertices() { + return topVertices; + } + + /** +* Get dataset with bottom vertices. +* +* @return dataset with bottom vertices +*/ + public DataSet> getBottomVertices() {
[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81225166 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java --- @@ -480,7 +480,8 @@ protected static File asFile(String path) { } } - assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length); + assertEquals(String.format("Wrong number of elements result. Expected: %s. Result: %s.", Arrays.toString(expectedStrings), Arrays.toString(resultStrings)), --- End diff -- Doesn't IntelliJ offer to view the different results? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81226805 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair + * of top vertices. + * + * Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + * Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or + * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods. + * + * @param the key type of the top vertices + * @param the key type of the bottom vertices + * @param the top vertices value type + * @param the bottom vertices value type + * @param the edge value type + */ +public class BipartiteGraph { + private final ExecutionEnvironment context; + private final DataSet> topVertices; + private final DataSet> bottomVertices; + private final DataSet> edges; + + private BipartiteGraph( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** +* Create bipartite graph from datasets. +* +* @param topVertices dataset of top vertices in the graph +* @param bottomVertices dataset of bottom vertices in the graph +* @param edges dataset of edges between vertices +* @param context Flink execution context +* @param the key type of the top vertices +* @param the key type of the bottom vertices +* @param the top vertices value type +* @param the bottom vertices value type +* @param the edge value type +* @return new bipartite graph created from provided datasets +*/ + public static BipartiteGraph fromDataSet( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** +* Get dataset with top vertices. +* +* @return dataset with top vertices +*/ + public DataSet> getTopVertices() { + return topVertices; + } + + /** +* Get dataset with bottom vertices. +* +* @return dataset with bottom vertices +*/ + public DataSet> getBottomVertices() { + return bottomVertices; + } + + /** +* Get dataset with graph edges. +* +* @return dataset with graph edges +*/ + public DataSet> getEdges() { + return edges; + } +
[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81217922 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist --- End diff -- I would rephrase that to "... a graph whose vertices can be divided into two disjoint sets" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81218538 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java --- @@ -34,10 +34,10 @@ public Edge(){} - public Edge(K src, K trg, V val) { - this.f0 = src; - this.f1 = trg; - this.f2 = val; + public Edge(K source, K target, V value) { --- End diff -- Why did you change these? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81218403 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair + * of top vertices. + * + * Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + * Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or + * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods. + * + * @param the key type of the top vertices + * @param the key type of the bottom vertices + * @param the top vertices value type + * @param the bottom vertices value type + * @param the edge value type + */ +public class BipartiteGraph { + private final ExecutionEnvironment context; + private final DataSet> topVertices; + private final DataSet> bottomVertices; + private final DataSet> edges; + + private BipartiteGraph( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** +* Create bipartite graph from datasets. +* +* @param topVertices dataset of top vertices in the graph +* @param bottomVertices dataset of bottom vertices in the graph +* @param edges dataset of edges between vertices +* @param context Flink execution context +* @param the key type of the top vertices +* @param the key type of the bottom vertices +* @param the top vertices value type +* @param the bottom vertices value type +* @param the edge value type +* @return new bipartite graph created from provided datasets +*/ + public static BipartiteGraph fromDataSet( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** +* Get dataset with top vertices. +* +* @return dataset with top vertices +*/ + public DataSet> getTopVertices() { + return topVertices; + } + + /** +* Get dataset with bottom vertices. +* +* @return dataset with bottom vertices +*/ + public DataSet> getBottomVertices() { + return bottomVertices; + } + + /** +* Get dataset with graph edges. +* +* @return dataset with graph edges +*/ + public DataSet> getEdges() { + return edges; + } + +
[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81218056 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair + * of top vertices. + * + * Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications, --- End diff -- Bipartite graphs are useful... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81218490 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair + * of top vertices. + * + * Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + * Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or + * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods. + * + * @param the key type of the top vertices + * @param the key type of the bottom vertices + * @param the top vertices value type + * @param the bottom vertices value type + * @param the edge value type + */ +public class BipartiteGraph { + private final ExecutionEnvironment context; + private final DataSet> topVertices; + private final DataSet> bottomVertices; + private final DataSet> edges; + + private BipartiteGraph( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** +* Create bipartite graph from datasets. +* +* @param topVertices dataset of top vertices in the graph +* @param bottomVertices dataset of bottom vertices in the graph +* @param edges dataset of edges between vertices +* @param context Flink execution context +* @param the key type of the top vertices +* @param the key type of the bottom vertices +* @param the top vertices value type +* @param the bottom vertices value type +* @param the edge value type +* @return new bipartite graph created from provided datasets +*/ + public static BipartiteGraph fromDataSet( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** +* Get dataset with top vertices. +* +* @return dataset with top vertices +*/ + public DataSet> getTopVertices() { + return topVertices; + } + + /** +* Get dataset with bottom vertices. +* +* @return dataset with bottom vertices +*/ + public DataSet> getBottomVertices() { + return bottomVertices; + } + + /** +* Get dataset with graph edges. +* +* @return dataset with graph edges +*/ + public DataSet> getEdges() { + return edges; + } + +
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533864#comment-15533864 ] ASF GitHub Bot commented on FLINK-2254: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81218403 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair + * of top vertices. + * + * Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + * Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or + * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods. + * + * @param the key type of the top vertices + * @param the key type of the bottom vertices + * @param the top vertices value type + * @param the bottom vertices value type + * @param the edge value type + */ +public class BipartiteGraph { + private final ExecutionEnvironment context; + private final DataSet> topVertices; + private final DataSet> bottomVertices; + private final DataSet> edges; + + private BipartiteGraph( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** +* Create bipartite graph from datasets. +* +* @param topVertices dataset of top vertices in the graph +* @param bottomVertices dataset of bottom vertices in the graph +* @param edges dataset of edges between vertices +* @param context Flink execution context +* @param the key type of the top vertices +* @param the key type of the bottom vertices +* @param the top vertices value type +* @param the bottom vertices value type +* @param the edge value type +* @return new bipartite graph created from provided datasets +*/ + public static BipartiteGraph fromDataSet( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** +* Get dataset with top vertices. +* +* @return dataset with top vertices +*/ + public DataSet> getTopVertices() { + return topVertices; + } + + /** +* Get dataset with bottom vertices. +* +* @return dataset with bottom vertices +*/ + public DataSet> getBottomVertices() { +
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533866#comment-15533866 ] ASF GitHub Bot commented on FLINK-2254: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81217922 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist --- End diff -- I would rephrase that to "... a graph whose vertices can be divided into two disjoint sets" > Add Bipartite Graph Support for Gelly > - > > Key: FLINK-2254 > URL: https://issues.apache.org/jira/browse/FLINK-2254 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Ivan Mushketyk > Labels: requires-design-doc > > A bipartite graph is a graph for which the set of vertices can be divided > into two disjoint sets such that each edge having a source vertex in the > first set, will have a target vertex in the second set. We would like to > support efficient operations for this type of graphs along with a set of > metrics(http://jponnela.com/web_documents/twomode.pdf). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533865#comment-15533865 ] ASF GitHub Bot commented on FLINK-2254: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81218538 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java --- @@ -34,10 +34,10 @@ public Edge(){} - public Edge(K src, K trg, V val) { - this.f0 = src; - this.f1 = trg; - this.f2 = val; + public Edge(K source, K target, V value) { --- End diff -- Why did you change these? > Add Bipartite Graph Support for Gelly > - > > Key: FLINK-2254 > URL: https://issues.apache.org/jira/browse/FLINK-2254 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Ivan Mushketyk > Labels: requires-design-doc > > A bipartite graph is a graph for which the set of vertices can be divided > into two disjoint sets such that each edge having a source vertex in the > first set, will have a target vertex in the second set. We would like to > support efficient operations for this type of graphs along with a set of > metrics(http://jponnela.com/web_documents/twomode.pdf). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533863#comment-15533863 ] ASF GitHub Bot commented on FLINK-2254: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81218056 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair + * of top vertices. + * + * Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications, --- End diff -- Bipartite graphs are useful... > Add Bipartite Graph Support for Gelly > - > > Key: FLINK-2254 > URL: https://issues.apache.org/jira/browse/FLINK-2254 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Ivan Mushketyk > Labels: requires-design-doc > > A bipartite graph is a graph for which the set of vertices can be divided > into two disjoint sets such that each edge having a source vertex in the > first set, will have a target vertex in the second set. We would like to > support efficient operations for this type of graphs along with a set of > metrics(http://jponnela.com/web_documents/twomode.pdf). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533862#comment-15533862 ] ASF GitHub Bot commented on FLINK-2254: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81218490 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair + * of top vertices. + * + * Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + * Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or + * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods. + * + * @param the key type of the top vertices + * @param the key type of the bottom vertices + * @param the top vertices value type + * @param the bottom vertices value type + * @param the edge value type + */ +public class BipartiteGraph { + private final ExecutionEnvironment context; + private final DataSet> topVertices; + private final DataSet> bottomVertices; + private final DataSet> edges; + + private BipartiteGraph( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** +* Create bipartite graph from datasets. +* +* @param topVertices dataset of top vertices in the graph +* @param bottomVertices dataset of bottom vertices in the graph +* @param edges dataset of edges between vertices +* @param context Flink execution context +* @param the key type of the top vertices +* @param the key type of the bottom vertices +* @param the top vertices value type +* @param the bottom vertices value type +* @param the edge value type +* @return new bipartite graph created from provided datasets +*/ + public static BipartiteGraph fromDataSet( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** +* Get dataset with top vertices. +* +* @return dataset with top vertices +*/ + public DataSet> getTopVertices() { + return topVertices; + } + + /** +* Get dataset with bottom vertices. +* +* @return dataset with bottom vertices +*/ + public DataSet> getBottomVertices() { +
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533664#comment-15533664 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81204726 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() throws Exception { private int getLineNo(String line) { String[] tkns = line.split("\\s"); - Assert.assertEquals(tkns.length, 6); + Assert.assertEquals(6, tkns.length); return Integer.parseInt(tkns[tkns.length - 1]); } + private class TimeUpdatingThread extends Thread { + + private volatile boolean isRunning; + + private final TestTimeServiceProvider timeServiceProvider; + private final OneInputStreamOperatorTestHarness testHarness; + private final long wmInterval; + private final int elementUntilUpdating; + + TimeUpdatingThread(final TestTimeServiceProvider timeServiceProvider, + final OneInputStreamOperatorTestHarness testHarness, + final long wmInterval, + final int elementUntilUpdating) { + + this.timeServiceProvider = timeServiceProvider; + this.testHarness = testHarness; + this.wmInterval = wmInterval; + this.elementUntilUpdating = elementUntilUpdating; + this.isRunning = true; + } + + @Override + public void run() { + try { + while (isRunning) { + if (testHarness.getOutput().size() % elementUntilUpdating == 0) { --- End diff -- There is a "race" between the operator emitting elements and this thread. Both run in loops without delays. Only if this condition is evaluated by chance at the exact point in time when the list happens to have so many result elements, there will actually be a time advance. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533663#comment-15533663 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81204828 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() throws Exception { private int getLineNo(String line) { String[] tkns = line.split("\\s"); - Assert.assertEquals(tkns.length, 6); + Assert.assertEquals(6, tkns.length); return Integer.parseInt(tkns[tkns.length - 1]); } + private class TimeUpdatingThread extends Thread { + + private volatile boolean isRunning; + + private final TestTimeServiceProvider timeServiceProvider; + private final OneInputStreamOperatorTestHarness testHarness; + private final long wmInterval; + private final int elementUntilUpdating; + + TimeUpdatingThread(final TestTimeServiceProvider timeServiceProvider, + final OneInputStreamOperatorTestHarness testHarness, + final long wmInterval, + final int elementUntilUpdating) { + + this.timeServiceProvider = timeServiceProvider; + this.testHarness = testHarness; + this.wmInterval = wmInterval; + this.elementUntilUpdating = elementUntilUpdating; + this.isRunning = true; + } + + @Override + public void run() { + try { + while (isRunning) { + if (testHarness.getOutput().size() % elementUntilUpdating == 0) { + long now = timeServiceProvider.getCurrentProcessingTime(); + timeServiceProvider.setCurrentTime(now + wmInterval); + } + } + } catch (Exception e) { + e.printStackTrace(); --- End diff -- This will not result in any meaningful feedback to the test. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81204726 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() throws Exception { private int getLineNo(String line) { String[] tkns = line.split("\\s"); - Assert.assertEquals(tkns.length, 6); + Assert.assertEquals(6, tkns.length); return Integer.parseInt(tkns[tkns.length - 1]); } + private class TimeUpdatingThread extends Thread { + + private volatile boolean isRunning; + + private final TestTimeServiceProvider timeServiceProvider; + private final OneInputStreamOperatorTestHarness testHarness; + private final long wmInterval; + private final int elementUntilUpdating; + + TimeUpdatingThread(final TestTimeServiceProvider timeServiceProvider, + final OneInputStreamOperatorTestHarness testHarness, + final long wmInterval, + final int elementUntilUpdating) { + + this.timeServiceProvider = timeServiceProvider; + this.testHarness = testHarness; + this.wmInterval = wmInterval; + this.elementUntilUpdating = elementUntilUpdating; + this.isRunning = true; + } + + @Override + public void run() { + try { + while (isRunning) { + if (testHarness.getOutput().size() % elementUntilUpdating == 0) { --- End diff -- There is a "race" between the operator emitting elements and this thread. Both run in loops without delays. Only if this condition is evaluated by chance at the exact point in time when the list happens to have so many result elements, there will actually be a time advance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533662#comment-15533662 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81204990 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -190,12 +213,30 @@ public void testFileReadingOperatorWithIngestionTime() throws Exception { } content.add(element.getValue() + "\n"); } else if (line instanceof Watermark) { - Assert.assertEquals(((Watermark) line).getTimestamp(), Long.MAX_VALUE); + watermarkTimestamps.add(((Watermark) line).getTimestamp()); } else { Assert.fail("Unknown element in the list."); } } + // check if all the input was read + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, noOfLines); + + // check if the last element is the LongMax watermark + Assert.assertTrue(lastElement instanceof Watermark); + Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp()); + + System.out.println(watermarkTimestamps.size()); --- End diff -- Leftover sysout printing. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81204990 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -190,12 +213,30 @@ public void testFileReadingOperatorWithIngestionTime() throws Exception { } content.add(element.getValue() + "\n"); } else if (line instanceof Watermark) { - Assert.assertEquals(((Watermark) line).getTimestamp(), Long.MAX_VALUE); + watermarkTimestamps.add(((Watermark) line).getTimestamp()); } else { Assert.fail("Unknown element in the list."); } } + // check if all the input was read + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, noOfLines); + + // check if the last element is the LongMax watermark + Assert.assertTrue(lastElement instanceof Watermark); + Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp()); + + System.out.println(watermarkTimestamps.size()); --- End diff -- Leftover sysout printing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81204828 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() throws Exception { private int getLineNo(String line) { String[] tkns = line.split("\\s"); - Assert.assertEquals(tkns.length, 6); + Assert.assertEquals(6, tkns.length); return Integer.parseInt(tkns[tkns.length - 1]); } + private class TimeUpdatingThread extends Thread { + + private volatile boolean isRunning; + + private final TestTimeServiceProvider timeServiceProvider; + private final OneInputStreamOperatorTestHarness testHarness; + private final long wmInterval; + private final int elementUntilUpdating; + + TimeUpdatingThread(final TestTimeServiceProvider timeServiceProvider, + final OneInputStreamOperatorTestHarness testHarness, + final long wmInterval, + final int elementUntilUpdating) { + + this.timeServiceProvider = timeServiceProvider; + this.testHarness = testHarness; + this.wmInterval = wmInterval; + this.elementUntilUpdating = elementUntilUpdating; + this.isRunning = true; + } + + @Override + public void run() { + try { + while (isRunning) { + if (testHarness.getOutput().size() % elementUntilUpdating == 0) { + long now = timeServiceProvider.getCurrentProcessingTime(); + timeServiceProvider.setCurrentTime(now + wmInterval); + } + } + } catch (Exception e) { + e.printStackTrace(); --- End diff -- This will not result in any meaningful feedback to the test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4636) AbstractCEPPatternOperator fails to restore state
[ https://issues.apache.org/jira/browse/FLINK-4636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533563#comment-15533563 ] ASF GitHub Bot commented on FLINK-4636: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2568#discussion_r81200110 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java --- @@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) throws Exception { int numberPriorityQueueEntries = ois.readInt(); - priorityQueue = new PriorityQueue>(numberPriorityQueueEntries, new StreamRecordComparator()); - - for (int i = 0; i asRecord()); + if(numberPriorityQueueEntries > 0) { + priorityQueue = new PriorityQueue>(numberPriorityQueueEntries, new StreamRecordComparator()); --- End diff -- The queue is created in `open()`. > AbstractCEPPatternOperator fails to restore state > - > > Key: FLINK-4636 > URL: https://issues.apache.org/jira/browse/FLINK-4636 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.2.0, 1.1.2 >Reporter: Fabian Hueske >Assignee: Jagadish Bihani > Fix For: 1.2.0, 1.1.3 > > > The {{restoreState()}} of the {{AbstractCEPPatternOperator}} restores the a > Java {{PriorityQueue}}. For that it first reads the number of elements to > insert and then creates a {{PriorityQueue}} object. However, Java's > {{PriorityQueue}} cannot be instantiated with an initial capacity of {{0}}, > which is not checked. > In case of an empty queue, the {{PriorityQueue}} should be instantiated with > an initial size of {{1}}. > See > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-CEPPatternOperator-when-taskmanager-is-killed-tp9024.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4643) Average Clustering Coefficient
[ https://issues.apache.org/jira/browse/FLINK-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533562#comment-15533562 ] ASF GitHub Bot commented on FLINK-4643: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2528 Looks good to me. Test failure is unrelated. From my side, feel free to merge... > Average Clustering Coefficient > -- > > Key: FLINK-4643 > URL: https://issues.apache.org/jira/browse/FLINK-4643 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Gelly has Global Clustering Coefficient and Local Clustering Coefficient. > This adds Average Clustering Coefficient. The distinction is discussed in > [http://jponnela.com/web_documents/twomode.pdf] (pdf page 2, document page > 32). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2568: [FLINK-4636] Add boundary check for priorityqueue ...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2568#discussion_r81200110 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java --- @@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) throws Exception { int numberPriorityQueueEntries = ois.readInt(); - priorityQueue = new PriorityQueue>(numberPriorityQueueEntries, new StreamRecordComparator()); - - for (int i = 0; i asRecord()); + if(numberPriorityQueueEntries > 0) { + priorityQueue = new PriorityQueue>(numberPriorityQueueEntries, new StreamRecordComparator()); --- End diff -- The queue is created in `open()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2528: [FLINK-4643] [gelly] Average Clustering Coefficient
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2528 Looks good to me. Test failure is unrelated. From my side, feel free to merge... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2568: [FLINK-4636] Add boundary check for priorityqueue ...
Github user jaxbihani commented on a diff in the pull request: https://github.com/apache/flink/pull/2568#discussion_r81199485 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java --- @@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) throws Exception { int numberPriorityQueueEntries = ois.readInt(); - priorityQueue = new PriorityQueue>(numberPriorityQueueEntries, new StreamRecordComparator()); - - for (int i = 0; i asRecord()); + if(numberPriorityQueueEntries > 0) { + priorityQueue = new PriorityQueue>(numberPriorityQueueEntries, new StreamRecordComparator()); --- End diff -- I think if number of objects read are 0 then why would we want to create object at all? Thats unnecessary and condition check will save some CPU cycles. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4636) AbstractCEPPatternOperator fails to restore state
[ https://issues.apache.org/jira/browse/FLINK-4636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533555#comment-15533555 ] ASF GitHub Bot commented on FLINK-4636: --- Github user jaxbihani commented on a diff in the pull request: https://github.com/apache/flink/pull/2568#discussion_r81199485 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java --- @@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) throws Exception { int numberPriorityQueueEntries = ois.readInt(); - priorityQueue = new PriorityQueue>(numberPriorityQueueEntries, new StreamRecordComparator()); - - for (int i = 0; i asRecord()); + if(numberPriorityQueueEntries > 0) { + priorityQueue = new PriorityQueue>(numberPriorityQueueEntries, new StreamRecordComparator()); --- End diff -- I think if number of objects read are 0 then why would we want to create object at all? Thats unnecessary and condition check will save some CPU cycles. > AbstractCEPPatternOperator fails to restore state > - > > Key: FLINK-4636 > URL: https://issues.apache.org/jira/browse/FLINK-4636 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.2.0, 1.1.2 >Reporter: Fabian Hueske >Assignee: Jagadish Bihani > Fix For: 1.2.0, 1.1.3 > > > The {{restoreState()}} of the {{AbstractCEPPatternOperator}} restores the a > Java {{PriorityQueue}}. For that it first reads the number of elements to > insert and then creates a {{PriorityQueue}} object. However, Java's > {{PriorityQueue}} cannot be instantiated with an initial capacity of {{0}}, > which is not checked. > In case of an empty queue, the {{PriorityQueue}} should be instantiated with > an initial size of {{1}}. > See > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-CEPPatternOperator-when-taskmanager-is-killed-tp9024.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4717) Naive version of atomic stop signal with savepoint
Till Rohrmann created FLINK-4717: Summary: Naive version of atomic stop signal with savepoint Key: FLINK-4717 URL: https://issues.apache.org/jira/browse/FLINK-4717 Project: Flink Issue Type: New Feature Components: State Backends, Checkpointing Affects Versions: 1.2.0 Reporter: Till Rohrmann Priority: Minor Fix For: 1.2.0 As a first step towards atomic stopping with savepoints we should implement a cancel command which prior to cancelling takes a savepoint. Additionally, it should turn off the periodic checkpointing so that there won't be checkpoints executed between the savepoint and the cancel command. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4573) Potential resource leak due to unclosed RandomAccessFile in TaskManagerLogHandler
[ https://issues.apache.org/jira/browse/FLINK-4573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533528#comment-15533528 ] ASF GitHub Bot commented on FLINK-4573: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2556 Merging this... > Potential resource leak due to unclosed RandomAccessFile in > TaskManagerLogHandler > - > > Key: FLINK-4573 > URL: https://issues.apache.org/jira/browse/FLINK-4573 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > try { > raf = new > RandomAccessFile(file, "r"); > } catch > (FileNotFoundException e) { > display(ctx, request, > "Displaying TaskManager log failed."); > LOG.error("Displaying > TaskManager log failed.", e); > return; > } > long fileLength = > raf.length(); > final FileChannel fc = > raf.getChannel(); > {code} > If length() throws IOException, raf would be left unclosed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2556: [FLINK-4573] Fix potential resource leak due to unclosed ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2556 Merging this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4650) Frequent task manager disconnects from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-4650: - Fix Version/s: 1.2.0 > Frequent task manager disconnects from JobManager > - > > Key: FLINK-4650 > URL: https://issues.apache.org/jira/browse/FLINK-4650 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Network >Affects Versions: 1.2.0 >Reporter: Nagarjun Guraja > Fix For: 1.2.0 > > > Not sure of the exact reason but we observe more frequent task manager > disconnects while using 1.2 snapshot build as compared to 1.1.2 release build -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4650) Frequent task manager disconnects from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-4650: - Affects Version/s: 1.2.0 > Frequent task manager disconnects from JobManager > - > > Key: FLINK-4650 > URL: https://issues.apache.org/jira/browse/FLINK-4650 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Network >Affects Versions: 1.2.0 >Reporter: Nagarjun Guraja > Fix For: 1.2.0 > > > Not sure of the exact reason but we observe more frequent task manager > disconnects while using 1.2 snapshot build as compared to 1.1.2 release build -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4716) Add trigger full recovery button to web UI
Till Rohrmann created FLINK-4716: Summary: Add trigger full recovery button to web UI Key: FLINK-4716 URL: https://issues.apache.org/jira/browse/FLINK-4716 Project: Flink Issue Type: Improvement Components: Webfrontend Affects Versions: 1.2.0 Reporter: Till Rohrmann Priority: Minor Fix For: 1.2.0 Add a trigger full recovery button to the web UI. The full recovery button will take the latest completed checkpoint and resume from there. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4715) TaskManager should commit suicide after cancellation failure
Till Rohrmann created FLINK-4715: Summary: TaskManager should commit suicide after cancellation failure Key: FLINK-4715 URL: https://issues.apache.org/jira/browse/FLINK-4715 Project: Flink Issue Type: Improvement Components: TaskManager Affects Versions: 1.2.0 Reporter: Till Rohrmann Fix For: 1.2.0 In case of a failed cancellation, e.g. the task cannot be cancelled after a given time, the {{TaskManager}} should kill itself. That way we guarantee that there is no resource leak. This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4714) Set task state to RUNNING after state has been restored
Till Rohrmann created FLINK-4714: Summary: Set task state to RUNNING after state has been restored Key: FLINK-4714 URL: https://issues.apache.org/jira/browse/FLINK-4714 Project: Flink Issue Type: Improvement Components: Distributed Coordination, State Backends, Checkpointing Affects Versions: 1.2.0 Reporter: Till Rohrmann Fix For: 1.2.0 The task state is set to {{RUNNING}} as soon as the {{Task}} is executed. That, however, happens before the state of the {{StreamTask}} invokable has been restored. As a result, the {{CheckpointCoordinator}} starts to trigger checkpoints even though the {{StreamTask}} is not ready. In order to avoid aborting checkpoints and properly start it, we should switch the task state to {{RUNNING}} after the state has been restored. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4700) Harden the TimeProvider test
[ https://issues.apache.org/jira/browse/FLINK-4700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533260#comment-15533260 ] Kostas Kloudas commented on FLINK-4700: --- [~StephanEwen] I am thinking that the best solution to this is to remove the testDefaultTimeProvider() test altogether. The only thing it actually tests is the java ScheduledExecutorService and there is no way to make it stable without adding test-specific methods (visible for testing) in the DefaultTimeServiceProvider. What do you think? > Harden the TimeProvider test > > > Key: FLINK-4700 > URL: https://issues.apache.org/jira/browse/FLINK-4700 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > > Currently the TimeProvider test fails due to a race condition. This task aims > at fixing it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously
[ https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533216#comment-15533216 ] ASF GitHub Bot commented on FLINK-4702: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2574 @tzulitai @rmetzger We need to make sure that the Kafka 0.10 code picks up this change and the test case. > Kafka consumer must commit offsets asynchronously > - > > Key: FLINK-4702 > URL: https://issues.apache.org/jira/browse/FLINK-4702 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.2.0, 1.1.3 > > > The offset commit calls to Kafka may occasionally take very long. > In that case, the {{notifyCheckpointComplete()}} method blocks for long and > the KafkaConsumer cannot make progress and cannot perform checkpoints. > Kafka 0.9+ have methods to commit asynchronously. > We should use those and make sure no more than one commit is concurrently in > progress, to that commit requests do not pile up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4711) TaskManager can crash due to failing onPartitionStateUpdate call
[ https://issues.apache.org/jira/browse/FLINK-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533215#comment-15533215 ] ASF GitHub Bot commented on FLINK-4711: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2569 Thanks for the review @uce. If Travis gives green light, I'll merge the PR. > TaskManager can crash due to failing onPartitionStateUpdate call > > > Key: FLINK-4711 > URL: https://issues.apache.org/jira/browse/FLINK-4711 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0 > > > The {{TaskManager}} can crash because it calls > {{Task.onPartitionStateUpdate}} when it receives a {{PartitionState}} > message. The {{onPartitionStateUpdate}} method can throw an {{IOException}} > or {{InterruptedException}} which are not handled on the {{TaskManager}} > level. > Another problem is that the initial partition state request is triggered > within the {{SingleInputGate}}. The request causes the {{JobManager}} to send > a {{PartitionState}} message to the {{TaskManager}} which forwards it to the > {{Task}}. If the at any of these points a message gets lost, then it is not > retried and the partition state remains unknown. > In order to handle the exceptions, to make the data flow clearer and to add > automatic retries, I propose to let the {{Task}} send the partition state > check requests. Furthermore, the {{JobManager}} should directly answer to the > {{Task}} by replying to an ask operation. That way the message does not have > to be routed through the {{TaskManager}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2574: [FLINK-4702] [kafka connector] Commit offsets to Kafka as...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2574 @tzulitai @rmetzger We need to make sure that the Kafka 0.10 code picks up this change and the test case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2569: [FLINK-4711] Let the Task trigger partition state request...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2569 Thanks for the review @uce. If Travis gives green light, I'll merge the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously
[ https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533211#comment-15533211 ] ASF GitHub Bot commented on FLINK-4702: --- Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/2559 > Kafka consumer must commit offsets asynchronously > - > > Key: FLINK-4702 > URL: https://issues.apache.org/jira/browse/FLINK-4702 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.2.0, 1.1.3 > > > The offset commit calls to Kafka may occasionally take very long. > In that case, the {{notifyCheckpointComplete()}} method blocks for long and > the KafkaConsumer cannot make progress and cannot perform checkpoints. > Kafka 0.9+ have methods to commit asynchronously. > We should use those and make sure no more than one commit is concurrently in > progress, to that commit requests do not pile up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously
[ https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533210#comment-15533210 ] ASF GitHub Bot commented on FLINK-4702: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2559 Closing this for #2574 > Kafka consumer must commit offsets asynchronously > - > > Key: FLINK-4702 > URL: https://issues.apache.org/jira/browse/FLINK-4702 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.2.0, 1.1.3 > > > The offset commit calls to Kafka may occasionally take very long. > In that case, the {{notifyCheckpointComplete()}} method blocks for long and > the KafkaConsumer cannot make progress and cannot perform checkpoints. > Kafka 0.9+ have methods to commit asynchronously. > We should use those and make sure no more than one commit is concurrently in > progress, to that commit requests do not pile up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/2559 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2559 Closing this for #2574 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously
[ https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533206#comment-15533206 ] ASF GitHub Bot commented on FLINK-4702: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2574 [FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls This fix is quite critical! While the KafkaConsumer is polling for new data (with a timeout), it holds the consumer lock. If no data comes in Kafka, the lock is not released before the poll timeout is over. During that time, no offset commit can make progress, because it needs the consumer lock. The `notifyCheckpointComplete()` method of the Kafka Consumer hence blocks until the poll timeout is over and the lock is released. For low-throughput Kafka Topics, this can cause wildly long checkpoint delays. This changes `notifyCheckpointComplete()` to only "schedule" offsets to be committed, while the main fetcher thread actually kick off the asynchronous offset commits. That way, there is no interference between the `notifyCheckpointComplete()` method (which is executed under checkpoint lock) and the consumer lock. In fact, the only KafkaConsumer method accessed concurrently to the main fetcher thread is `wakeup()` which is actually thread-safe (where the rest of the KafkaConsumer is not). The consumer lock was hence completely removed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink kafka_09_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2574.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2574 commit 0846fd907db7d52d7e5fb7d704c5e1c13462e331 Author: Stephan Ewen Date: 2016-09-29T16:09:51Z [FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls Letting the Kafka commit block on polls means that 'notifyCheckpointComplete()' may take very long. This is mostly relevant for low-throughput Kafka topics. > Kafka consumer must commit offsets asynchronously > - > > Key: FLINK-4702 > URL: https://issues.apache.org/jira/browse/FLINK-4702 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.2.0, 1.1.3 > > > The offset commit calls to Kafka may occasionally take very long. > In that case, the {{notifyCheckpointComplete()}} method blocks for long and > the KafkaConsumer cannot make progress and cannot perform checkpoints. > Kafka 0.9+ have methods to commit asynchronously. > We should use those and make sure no more than one commit is concurrently in > progress, to that commit requests do not pile up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor
[ https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533204#comment-15533204 ] ASF GitHub Bot commented on FLINK-4348: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2571 CC @beyond1920 @KurtYoung > Implement slot allocation protocol with TaskExecutor > > > Key: FLINK-4348 > URL: https://issues.apache.org/jira/browse/FLINK-4348 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Kurt Young >Assignee: Maximilian Michels > > When slotManager finds a proper slot in the free pool for a slot request, > slotManager marks the slot as occupied, then tells the taskExecutor to give > the slot to the specified JobMaster. > when a slot request is sent to taskExecutor, it should contain following > parameters: AllocationID, JobID, slotID, resourceManagerLeaderSessionID. > There exists 3 following possibilities of the response from taskExecutor, we > will discuss when each possibility happens and how to handle. > 1. Ack request which means the taskExecutor gives the slot to the specified > jobMaster as expected. > 2. Decline request if the slot is already occupied by other AllocationID. > 3. Timeout which could caused by lost of request message or response message > or slow network transfer. > On the first occasion, ResourceManager need to do nothing. However, under the > second and third occasion, ResourceManager need to notify slotManager, > slotManager will verify and clear all the previous allocate information for > this slot request firstly, then try to find a proper slot for the slot > request again. This may cause some duplicate allocation, e.g. the slot > request to TaskManager is successful but the response is lost somehow, so we > may request a slot in another TaskManager, this causes two slots assigned to > one request, but it can be taken care of by rejecting registration at > JobMaster. > There are still some question need to discuss in a step further. > 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I > think it's better that SlotManager delegates the rpc call to ResourceManager > when SlotManager need to communicate with outside world. ResourceManager > know which taskExecutor to send the request based on ResourceID. Besides this > RPC call which used to request slot to taskExecutor should not be a > RpcMethod, because we hope only SlotManager has permission to call the > method, but the other component, for example JobMaster and TaskExecutor, > cannot call this method directly. > 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor > should notify the free slot to ResourceManager immediately, or wait for next > heartbeat sync. The advantage of first way is the resourceManager’s view > could be updated faster. The advantage of second way is save a RPC method in > ResourceManager. > 3. There are two communication type. First, the slot request could be sent as > an ask operation where the response is returned as a future. Second, > resourceManager send the slot request in fire and forget way, the response > could be returned by an RPC call. I prefer the first one because it is more > simple and could save a RPC method in ResourceManager (for callback in the > second way). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2574 [FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls This fix is quite critical! While the KafkaConsumer is polling for new data (with a timeout), it holds the consumer lock. If no data comes in Kafka, the lock is not released before the poll timeout is over. During that time, no offset commit can make progress, because it needs the consumer lock. The `notifyCheckpointComplete()` method of the Kafka Consumer hence blocks until the poll timeout is over and the lock is released. For low-throughput Kafka Topics, this can cause wildly long checkpoint delays. This changes `notifyCheckpointComplete()` to only "schedule" offsets to be committed, while the main fetcher thread actually kick off the asynchronous offset commits. That way, there is no interference between the `notifyCheckpointComplete()` method (which is executed under checkpoint lock) and the consumer lock. In fact, the only KafkaConsumer method accessed concurrently to the main fetcher thread is `wakeup()` which is actually thread-safe (where the rest of the KafkaConsumer is not). The consumer lock was hence completely removed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink kafka_09_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2574.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2574 commit 0846fd907db7d52d7e5fb7d704c5e1c13462e331 Author: Stephan Ewen Date: 2016-09-29T16:09:51Z [FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls Letting the Kafka commit block on polls means that 'notifyCheckpointComplete()' may take very long. This is mostly relevant for low-throughput Kafka topics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2571 CC @beyond1920 @KurtYoung --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4711) TaskManager can crash due to failing onPartitionStateUpdate call
[ https://issues.apache.org/jira/browse/FLINK-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533193#comment-15533193 ] ASF GitHub Bot commented on FLINK-4711: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2569#discussion_r81173562 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala --- @@ -92,16 +92,6 @@ object TaskMessages { // -- /** - * Answer to a [[RequestPartitionState]] with the state of the respective partition. - */ - case class PartitionState( --- End diff -- Yes that is also true. Will fix it. > TaskManager can crash due to failing onPartitionStateUpdate call > > > Key: FLINK-4711 > URL: https://issues.apache.org/jira/browse/FLINK-4711 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0 > > > The {{TaskManager}} can crash because it calls > {{Task.onPartitionStateUpdate}} when it receives a {{PartitionState}} > message. The {{onPartitionStateUpdate}} method can throw an {{IOException}} > or {{InterruptedException}} which are not handled on the {{TaskManager}} > level. > Another problem is that the initial partition state request is triggered > within the {{SingleInputGate}}. The request causes the {{JobManager}} to send > a {{PartitionState}} message to the {{TaskManager}} which forwards it to the > {{Task}}. If the at any of these points a message gets lost, then it is not > retried and the partition state remains unknown. > In order to handle the exceptions, to make the data flow clearer and to add > automatic retries, I propose to let the {{Task}} send the partition state > check requests. Furthermore, the {{JobManager}} should directly answer to the > {{Task}} by replying to an ask operation. That way the message does not have > to be routed through the {{TaskManager}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4711) TaskManager can crash due to failing onPartitionStateUpdate call
[ https://issues.apache.org/jira/browse/FLINK-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533194#comment-15533194 ] ASF GitHub Bot commented on FLINK-4711: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2569#discussion_r81173599 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -873,8 +873,7 @@ class JobManager( } sender ! decorateMessage( -PartitionState( - taskExecutionId, +new org.apache.flink.runtime.io.network.PartitionState( --- End diff -- Yes that's true. Will fix it. > TaskManager can crash due to failing onPartitionStateUpdate call > > > Key: FLINK-4711 > URL: https://issues.apache.org/jira/browse/FLINK-4711 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0 > > > The {{TaskManager}} can crash because it calls > {{Task.onPartitionStateUpdate}} when it receives a {{PartitionState}} > message. The {{onPartitionStateUpdate}} method can throw an {{IOException}} > or {{InterruptedException}} which are not handled on the {{TaskManager}} > level. > Another problem is that the initial partition state request is triggered > within the {{SingleInputGate}}. The request causes the {{JobManager}} to send > a {{PartitionState}} message to the {{TaskManager}} which forwards it to the > {{Task}}. If the at any of these points a message gets lost, then it is not > retried and the partition state remains unknown. > In order to handle the exceptions, to make the data flow clearer and to add > automatic retries, I propose to let the {{Task}} send the partition state > check requests. Furthermore, the {{JobManager}} should directly answer to the > {{Task}} by replying to an ask operation. That way the message does not have > to be routed through the {{TaskManager}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2569: [FLINK-4711] Let the Task trigger partition state ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2569#discussion_r81173562 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala --- @@ -92,16 +92,6 @@ object TaskMessages { // -- /** - * Answer to a [[RequestPartitionState]] with the state of the respective partition. - */ - case class PartitionState( --- End diff -- Yes that is also true. Will fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2569: [FLINK-4711] Let the Task trigger partition state ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2569#discussion_r81173599 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -873,8 +873,7 @@ class JobManager( } sender ! decorateMessage( -PartitionState( - taskExecutionId, +new org.apache.flink.runtime.io.network.PartitionState( --- End diff -- Yes that's true. Will fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4711) TaskManager can crash due to failing onPartitionStateUpdate call
[ https://issues.apache.org/jira/browse/FLINK-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533192#comment-15533192 ] ASF GitHub Bot commented on FLINK-4711: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2569#discussion_r81173481 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java --- @@ -27,15 +26,6 @@ public interface JobManagerCommunicationFactory { --- End diff -- Good catch. Will remove it. > TaskManager can crash due to failing onPartitionStateUpdate call > > > Key: FLINK-4711 > URL: https://issues.apache.org/jira/browse/FLINK-4711 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0 > > > The {{TaskManager}} can crash because it calls > {{Task.onPartitionStateUpdate}} when it receives a {{PartitionState}} > message. The {{onPartitionStateUpdate}} method can throw an {{IOException}} > or {{InterruptedException}} which are not handled on the {{TaskManager}} > level. > Another problem is that the initial partition state request is triggered > within the {{SingleInputGate}}. The request causes the {{JobManager}} to send > a {{PartitionState}} message to the {{TaskManager}} which forwards it to the > {{Task}}. If the at any of these points a message gets lost, then it is not > retried and the partition state remains unknown. > In order to handle the exceptions, to make the data flow clearer and to add > automatic retries, I propose to let the {{Task}} send the partition state > check requests. Furthermore, the {{JobManager}} should directly answer to the > {{Task}} by replying to an ask operation. That way the message does not have > to be routed through the {{TaskManager}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2569: [FLINK-4711] Let the Task trigger partition state ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2569#discussion_r81173481 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java --- @@ -27,15 +26,6 @@ public interface JobManagerCommunicationFactory { --- End diff -- Good catch. Will remove it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2573: Refactor StreamOperator Hierachy to make Keys Expl...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2573 Refactor StreamOperator Hierachy to make Keys Explicit This is more of a preview PR, I'm not yet completely done with testing and making sure that everything works. R: @StephanEwen You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink refactor-stream-operators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2573.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2573 commit c9049c57d612e18f6051574d9903bb063b46840a Author: Aljoscha Krettek Date: 2016-09-28T09:07:22Z Refactor StreamOperator Hierachy to make Keys Explicit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4552) Refactor WindowOperator/Trigger Tests
[ https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533144#comment-15533144 ] ASF GitHub Bot commented on FLINK-4552: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2572 [FLINK-4552] Refactor WindowOperator/Trigger Tests This builds on #2570 Before, tests for WindowOperator, WindowAssigner, Trigger and WindowFunction were all conflated in WindowOperatorTest. All of these tested that a certain combination of a Trigger, WindowAssigner and WindowFunction produce the expected output. This change modularizes these tests and spreads them out across multiple files. For example, one per trigger/window assigner. The new WindowOperatorTest tests verify that the interaction between WindowOperator and the various other parts works as expected, that the correct methods on Trigger and WindowFunction are called at the expected time and that snapshotting, timers, cleanup etc. work correctly. These tests also verify that the different state types and WindowFunctions work correctly. For trigger tests this introduces TriggerTestHarness. This can be used to inject elements into Triggers they fire at the correct times. The actual output of the WindowFunction is not important for these tests. The new tests also make sure that triggers correctly clean up state and timers. WindowAssigner tests verify the behaviour of window assigners in isolation. They also test, for example, whether offset parameter of time-based windows work correctly. R: @StephanEwen @StefanRRichter @kl0u for review You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink window-test-refactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2572.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2572 commit 1a09d9032bf5683a378a7fc8dc480f2d14c5924d Author: Aljoscha Krettek Date: 2016-09-25T18:58:16Z Rename TimeServiceProvider to ProcessingTimeService The name is clashing with the soon-to-be-added TimerService/InternalTimerService which is meant as an interface for dealing with both processing time and event time. TimeServiceProvided is renamed to ProcessingTimeService to reflect the fact that it is a low-level utility that only deals with "physical" processing-time trigger tasks. commit 758827c3c8508ef9ef2ec079ff3a8469d0096ca8 Author: Aljoscha Krettek Date: 2016-09-28T13:10:35Z Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests commit f6dd9c74dc2c58c4263fb6d084651b514898d47a Author: Aljoscha Krettek Date: 2016-09-28T14:35:33Z Use Processing-Time Service of TestHarness in WindowOperatorTest Before, this was manually creating a TestProcessingTimeService, now, we're using the one that is there by default in OneInputStreamOperatorTestHarness. commit 65389d66c5586e6707b7a6bf48df512354fac085 Author: Aljoscha Krettek Date: 2016-09-28T14:43:40Z Refactor OperatorTestHarness to always use TestProcessingTimeService Before, this would allow handing in a custom ProcessingTimeService but this was in reality always TestProcessingTimeService. commit 1d013bcacc040552e5783c64d094ec309014457b Author: Aljoscha Krettek Date: 2016-09-28T13:12:26Z Use TestHarness Processing-time Facility in BucketingSinkTest Before, this was manually creating a TestProcessingTimeService. Now we use the one that is there by default in OneInputStreamOperatorTestHarness. commit eaf3dd00fefeb2487c7cafff6337123cbe42874b Author: Aljoscha Krettek Date: 2016-09-28T13:32:24Z Use OperatorTestHarness in AlignedWindowOperator Tests commit b597d2ef50c27554b83fddaff8873107265340d4 Author: Aljoscha Krettek Date: 2016-09-29T14:04:29Z Refactor Operator TestHarnesses to use Common Base Class This also introduces KeyedTwoInputStreamOperatorTestHarness which is similar to KeyedOneInputStreamOperatorTestHarness commit 58b16b26e07b6100f89e9deec63f0decb751f0e6 Author: Aljoscha Krettek Date: 2016-09-26T14:21:51Z [FLINK-3674] Add an interface for Time aware User Functions This moves the event-time/processing-time trigger code from WindowOperator behind a well defined interface that can be used by operators (and user functions). InternalTimerService is the new interface that has the same functionality that WindowOperator used to have. TimerService is the user facing interface that does not allow dealing with namespaces/payloads and also does not allow deleting timers. There is a default i
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2572 [FLINK-4552] Refactor WindowOperator/Trigger Tests This builds on #2570 Before, tests for WindowOperator, WindowAssigner, Trigger and WindowFunction were all conflated in WindowOperatorTest. All of these tested that a certain combination of a Trigger, WindowAssigner and WindowFunction produce the expected output. This change modularizes these tests and spreads them out across multiple files. For example, one per trigger/window assigner. The new WindowOperatorTest tests verify that the interaction between WindowOperator and the various other parts works as expected, that the correct methods on Trigger and WindowFunction are called at the expected time and that snapshotting, timers, cleanup etc. work correctly. These tests also verify that the different state types and WindowFunctions work correctly. For trigger tests this introduces TriggerTestHarness. This can be used to inject elements into Triggers they fire at the correct times. The actual output of the WindowFunction is not important for these tests. The new tests also make sure that triggers correctly clean up state and timers. WindowAssigner tests verify the behaviour of window assigners in isolation. They also test, for example, whether offset parameter of time-based windows work correctly. R: @StephanEwen @StefanRRichter @kl0u for review You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink window-test-refactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2572.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2572 commit 1a09d9032bf5683a378a7fc8dc480f2d14c5924d Author: Aljoscha Krettek Date: 2016-09-25T18:58:16Z Rename TimeServiceProvider to ProcessingTimeService The name is clashing with the soon-to-be-added TimerService/InternalTimerService which is meant as an interface for dealing with both processing time and event time. TimeServiceProvided is renamed to ProcessingTimeService to reflect the fact that it is a low-level utility that only deals with "physical" processing-time trigger tasks. commit 758827c3c8508ef9ef2ec079ff3a8469d0096ca8 Author: Aljoscha Krettek Date: 2016-09-28T13:10:35Z Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests commit f6dd9c74dc2c58c4263fb6d084651b514898d47a Author: Aljoscha Krettek Date: 2016-09-28T14:35:33Z Use Processing-Time Service of TestHarness in WindowOperatorTest Before, this was manually creating a TestProcessingTimeService, now, we're using the one that is there by default in OneInputStreamOperatorTestHarness. commit 65389d66c5586e6707b7a6bf48df512354fac085 Author: Aljoscha Krettek Date: 2016-09-28T14:43:40Z Refactor OperatorTestHarness to always use TestProcessingTimeService Before, this would allow handing in a custom ProcessingTimeService but this was in reality always TestProcessingTimeService. commit 1d013bcacc040552e5783c64d094ec309014457b Author: Aljoscha Krettek Date: 2016-09-28T13:12:26Z Use TestHarness Processing-time Facility in BucketingSinkTest Before, this was manually creating a TestProcessingTimeService. Now we use the one that is there by default in OneInputStreamOperatorTestHarness. commit eaf3dd00fefeb2487c7cafff6337123cbe42874b Author: Aljoscha Krettek Date: 2016-09-28T13:32:24Z Use OperatorTestHarness in AlignedWindowOperator Tests commit b597d2ef50c27554b83fddaff8873107265340d4 Author: Aljoscha Krettek Date: 2016-09-29T14:04:29Z Refactor Operator TestHarnesses to use Common Base Class This also introduces KeyedTwoInputStreamOperatorTestHarness which is similar to KeyedOneInputStreamOperatorTestHarness commit 58b16b26e07b6100f89e9deec63f0decb751f0e6 Author: Aljoscha Krettek Date: 2016-09-26T14:21:51Z [FLINK-3674] Add an interface for Time aware User Functions This moves the event-time/processing-time trigger code from WindowOperator behind a well defined interface that can be used by operators (and user functions). InternalTimerService is the new interface that has the same functionality that WindowOperator used to have. TimerService is the user facing interface that does not allow dealing with namespaces/payloads and also does not allow deleting timers. There is a default implementation in HeapInternalTimerService that can checkpoint timers to a stream and also restore from a stream. Right now, this is managed in AbstractStreamOperator and operators can ask for an InternalTimerService. This also adds tes
[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor
[ https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533113#comment-15533113 ] ASF GitHub Bot commented on FLINK-4348: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2571 [FLINK-4348] Simplify logic of SlotManager This pull request is split up into two commits: 1. It removes some code from the `SlotManager` to make it simpler. - It makes use of `handleSlotRequestFailedAtTaskManager` and simplifies it because we can assume that a Slot is only registered once if we talk to the same instance of a TaskExecutor. Further, we can omit to check the free slots because we previously removed the slot from the free slots. - `updateSlotStatus` only deals with new Task slots. All other updates are performed by the `ResourceManager`. In case the ResourceManager creashes, it will re-register all slot statuses. 2. It fences `TaskExecutor` messages using an `InstanceID` which is required to make 1 work correctly. New messages have been introduced to achieve that. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink flip-6 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2571.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2571 commit 1d297403e5e085d944d80b498c4a269a794f8e2f Author: Maximilian Michels Date: 2016-09-29T13:08:32Z [FLINK-4347] change assumptions in SlotManager allocation - Makes use of `handleSlotRequestFailedAtTaskManager` and simplifies it because we can assume that a Slot is only registered once if we talk to the same instance of a TaskExecutor. Further, we can omit to check the free slots because we previously removed the slot from the free slots. - `updateSlotStatus` only deals with new Task slots. All other updates are performed by the `ResourceManager`. In case the ResourceManager creashes, it will re-register all slot statuses. commit c6768524c869ecdb84f0a8ae48837afc329c9714 Author: Maximilian Michels Date: 2016-09-29T14:03:39Z [FLINK-4348] discard message from old TaskExecutorGateways This fences old message using the InstanceID of the TaskExecutor. > Implement slot allocation protocol with TaskExecutor > > > Key: FLINK-4348 > URL: https://issues.apache.org/jira/browse/FLINK-4348 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Kurt Young >Assignee: Maximilian Michels > > When slotManager finds a proper slot in the free pool for a slot request, > slotManager marks the slot as occupied, then tells the taskExecutor to give > the slot to the specified JobMaster. > when a slot request is sent to taskExecutor, it should contain following > parameters: AllocationID, JobID, slotID, resourceManagerLeaderSessionID. > There exists 3 following possibilities of the response from taskExecutor, we > will discuss when each possibility happens and how to handle. > 1. Ack request which means the taskExecutor gives the slot to the specified > jobMaster as expected. > 2. Decline request if the slot is already occupied by other AllocationID. > 3. Timeout which could caused by lost of request message or response message > or slow network transfer. > On the first occasion, ResourceManager need to do nothing. However, under the > second and third occasion, ResourceManager need to notify slotManager, > slotManager will verify and clear all the previous allocate information for > this slot request firstly, then try to find a proper slot for the slot > request again. This may cause some duplicate allocation, e.g. the slot > request to TaskManager is successful but the response is lost somehow, so we > may request a slot in another TaskManager, this causes two slots assigned to > one request, but it can be taken care of by rejecting registration at > JobMaster. > There are still some question need to discuss in a step further. > 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I > think it's better that SlotManager delegates the rpc call to ResourceManager > when SlotManager need to communicate with outside world. ResourceManager > know which taskExecutor to send the request based on ResourceID. Besides this > RPC call which used to request slot to taskExecutor should not be a > RpcMethod, because we hope only SlotManager has permission to call the > method, but the other component, for example JobMaster and TaskExecutor, > cannot call this method directly. > 2. If JobMaster reject the sl
[GitHub] flink pull request #2571: [FLINK-4348] Simplify logic of SlotManager
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2571 [FLINK-4348] Simplify logic of SlotManager This pull request is split up into two commits: 1. It removes some code from the `SlotManager` to make it simpler. - It makes use of `handleSlotRequestFailedAtTaskManager` and simplifies it because we can assume that a Slot is only registered once if we talk to the same instance of a TaskExecutor. Further, we can omit to check the free slots because we previously removed the slot from the free slots. - `updateSlotStatus` only deals with new Task slots. All other updates are performed by the `ResourceManager`. In case the ResourceManager creashes, it will re-register all slot statuses. 2. It fences `TaskExecutor` messages using an `InstanceID` which is required to make 1 work correctly. New messages have been introduced to achieve that. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink flip-6 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2571.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2571 commit 1d297403e5e085d944d80b498c4a269a794f8e2f Author: Maximilian Michels Date: 2016-09-29T13:08:32Z [FLINK-4347] change assumptions in SlotManager allocation - Makes use of `handleSlotRequestFailedAtTaskManager` and simplifies it because we can assume that a Slot is only registered once if we talk to the same instance of a TaskExecutor. Further, we can omit to check the free slots because we previously removed the slot from the free slots. - `updateSlotStatus` only deals with new Task slots. All other updates are performed by the `ResourceManager`. In case the ResourceManager creashes, it will re-register all slot statuses. commit c6768524c869ecdb84f0a8ae48837afc329c9714 Author: Maximilian Michels Date: 2016-09-29T14:03:39Z [FLINK-4348] discard message from old TaskExecutorGateways This fences old message using the InstanceID of the TaskExecutor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2569: [FLINK-4711] Let the Task trigger partition state ...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2569#discussion_r81164326 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala --- @@ -92,16 +92,6 @@ object TaskMessages { // -- /** - * Answer to a [[RequestPartitionState]] with the state of the respective partition. - */ - case class PartitionState( --- End diff -- Has some unused imports after removal that we could remove before merging --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4711) TaskManager can crash due to failing onPartitionStateUpdate call
[ https://issues.apache.org/jira/browse/FLINK-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533098#comment-15533098 ] ASF GitHub Bot commented on FLINK-4711: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2569#discussion_r81162283 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java --- @@ -27,15 +26,6 @@ public interface JobManagerCommunicationFactory { --- End diff -- This has been become obsolete with your change and is unused. Let's remove it completely? > TaskManager can crash due to failing onPartitionStateUpdate call > > > Key: FLINK-4711 > URL: https://issues.apache.org/jira/browse/FLINK-4711 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0 > > > The {{TaskManager}} can crash because it calls > {{Task.onPartitionStateUpdate}} when it receives a {{PartitionState}} > message. The {{onPartitionStateUpdate}} method can throw an {{IOException}} > or {{InterruptedException}} which are not handled on the {{TaskManager}} > level. > Another problem is that the initial partition state request is triggered > within the {{SingleInputGate}}. The request causes the {{JobManager}} to send > a {{PartitionState}} message to the {{TaskManager}} which forwards it to the > {{Task}}. If the at any of these points a message gets lost, then it is not > retried and the partition state remains unknown. > In order to handle the exceptions, to make the data flow clearer and to add > automatic retries, I propose to let the {{Task}} send the partition state > check requests. Furthermore, the {{JobManager}} should directly answer to the > {{Task}} by replying to an ask operation. That way the message does not have > to be routed through the {{TaskManager}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2569: [FLINK-4711] Let the Task trigger partition state ...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2569#discussion_r81162283 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java --- @@ -27,15 +26,6 @@ public interface JobManagerCommunicationFactory { --- End diff -- This has been become obsolete with your change and is unused. Let's remove it completely? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4711) TaskManager can crash due to failing onPartitionStateUpdate call
[ https://issues.apache.org/jira/browse/FLINK-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533100#comment-15533100 ] ASF GitHub Bot commented on FLINK-4711: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2569#discussion_r81164597 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -873,8 +873,7 @@ class JobManager( } sender ! decorateMessage( -PartitionState( - taskExecutionId, +new org.apache.flink.runtime.io.network.PartitionState( --- End diff -- I think it's not ambiguous anymore after removal of the TaskControlMessage and we can directly use `PartitionState`? > TaskManager can crash due to failing onPartitionStateUpdate call > > > Key: FLINK-4711 > URL: https://issues.apache.org/jira/browse/FLINK-4711 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0 > > > The {{TaskManager}} can crash because it calls > {{Task.onPartitionStateUpdate}} when it receives a {{PartitionState}} > message. The {{onPartitionStateUpdate}} method can throw an {{IOException}} > or {{InterruptedException}} which are not handled on the {{TaskManager}} > level. > Another problem is that the initial partition state request is triggered > within the {{SingleInputGate}}. The request causes the {{JobManager}} to send > a {{PartitionState}} message to the {{TaskManager}} which forwards it to the > {{Task}}. If the at any of these points a message gets lost, then it is not > retried and the partition state remains unknown. > In order to handle the exceptions, to make the data flow clearer and to add > automatic retries, I propose to let the {{Task}} send the partition state > check requests. Furthermore, the {{JobManager}} should directly answer to the > {{Task}} by replying to an ask operation. That way the message does not have > to be routed through the {{TaskManager}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2569: [FLINK-4711] Let the Task trigger partition state ...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2569#discussion_r81164597 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -873,8 +873,7 @@ class JobManager( } sender ! decorateMessage( -PartitionState( - taskExecutionId, +new org.apache.flink.runtime.io.network.PartitionState( --- End diff -- I think it's not ambiguous anymore after removal of the TaskControlMessage and we can directly use `PartitionState`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4711) TaskManager can crash due to failing onPartitionStateUpdate call
[ https://issues.apache.org/jira/browse/FLINK-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533099#comment-15533099 ] ASF GitHub Bot commented on FLINK-4711: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2569#discussion_r81164326 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala --- @@ -92,16 +92,6 @@ object TaskMessages { // -- /** - * Answer to a [[RequestPartitionState]] with the state of the respective partition. - */ - case class PartitionState( --- End diff -- Has some unused imports after removal that we could remove before merging > TaskManager can crash due to failing onPartitionStateUpdate call > > > Key: FLINK-4711 > URL: https://issues.apache.org/jira/browse/FLINK-4711 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0 > > > The {{TaskManager}} can crash because it calls > {{Task.onPartitionStateUpdate}} when it receives a {{PartitionState}} > message. The {{onPartitionStateUpdate}} method can throw an {{IOException}} > or {{InterruptedException}} which are not handled on the {{TaskManager}} > level. > Another problem is that the initial partition state request is triggered > within the {{SingleInputGate}}. The request causes the {{JobManager}} to send > a {{PartitionState}} message to the {{TaskManager}} which forwards it to the > {{Task}}. If the at any of these points a message gets lost, then it is not > retried and the partition state remains unknown. > In order to handle the exceptions, to make the data flow clearer and to add > automatic retries, I propose to let the {{Task}} send the partition state > check requests. Furthermore, the {{JobManager}} should directly answer to the > {{Task}} by replying to an ask operation. That way the message does not have > to be routed through the {{TaskManager}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4712) Implementing ranking predictions for ALS
[ https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Domokos Miklós Kelen updated FLINK-4712: Description: We started working on implementing ranking predictions for recommender systems. Ranking prediction means that beside predicting scores for user-item pairs, the recommender system is able to recommend a top K list for the users. Details: In practice, this would mean finding the K items for a particular user with the highest predicted rating. It should be possible also to specify whether to exclude the already seen items from a particular user's toplist. (See for example the 'exclude_known' setting of [Graphlab Create's ranking factorization recommender|https://turi.com/products/create/docs/generated/graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend.html#graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend] ). The output of the topK recommendation function could be in the form of {{DataSet[(Int,Int,Int)]}}, meaning (user, item, rank), similar to Graphlab Create's output. However, this is arguable: follow up work includes implementing ranking recommendation evaluation metrics (such as precision@k, recall@k, ndcg@k), similar to [Spark's implementations|https://spark.apache.org/docs/1.5.0/mllib-evaluation-metrics.html#ranking-systems]. It would be beneficial if we were able to design the API such that it could be included in the proposed evaluation framework (see [5157|https://issues.apache.org/jira/browse/FLINK-2157]), which makes it neccessary to consider the possible output type {{DataSet[(Int, Array[Int])]}} or {{DataSet[(Int, Array[(Int,Double)])]}} meaning (user, array of items), possibly including the predicted scores as well. See [4713|https://issues.apache.org/jira/browse/FLINK-4713] for details. Another question arising is whether to provide this function as a member of the ALS class, as a switch-kind of parameter to the ALS implementation (meaning the model is either a rating or a ranking recommender model) or in some other way. was: We started working on implementing ranking predictions for recommender systems. Ranking prediction means that beside predicting scores for user-item pairs, the recommender system is able to recommend a top K list for the users. Details: In practice, this would mean finding the K items for a particular user with the highest predicted rating. It should be possible also to specify whether to exclude the already seen items from a particular user's toplist. (See for example the 'exclude_known' setting of [Graphlab Create's ranking factorization recommender|https://turi.com/products/create/docs/generated/graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend.html#graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend] ). The output of the topK recommendation function could be in the form of {{DataSet[(Int,Int,Int)]}}, meaning (user, item, rank), similar to Graphlab Create's output. However, this is arguable: follow up work includes implementing ranking recommendation evaluation metrics (such as precision@k, recall@k, ndcg@k), similar to [Spark's implementations|https://spark.apache.org/docs/1.5.0/mllib-evaluation-metrics.html#ranking-systems]. It would be beneficial if we were able to design the API such that it could be included in the proposed evaluation framework (see [5157|https://issues.apache.org/jira/browse/FLINK-2157]), which makes it neccessary to consider the possible output type {{DataSet[(Int, Array[Int])]}} or {{DataSet[(Int, Array[(Int,Double)])]}} meaning (user, array of items), possibly including the predicted scores as well. See [issue todo] for details. Another question arising is whether to provide this function as a member of the ALS class, as a switch-kind of parameter to the ALS implementation (meaning the model is either a rating or a ranking recommender model) or in some other way. > Implementing ranking predictions for ALS > > > Key: FLINK-4712 > URL: https://issues.apache.org/jira/browse/FLINK-4712 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Domokos Miklós Kelen > > We started working on implementing ranking predictions for recommender > systems. Ranking prediction means that beside predicting scores for user-item > pairs, the recommender system is able to recommend a top K list for the users. > Details: > In practice, this would mean finding the K items for a particular user with > the highest predicted rating. It should be possible also to specify whether > to exclude the already seen items from a particular user's toplist. (See for >
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2570 [FLINK-3674] Add an interface for Time aware User Functions This moves the event-time/processing-time trigger code from `WindowOperator` behind a well defined interface that can be used by operators (and user functions). `InternalTimerService` is the new interface that has the same functionality that `WindowOperator` used to have. `TimerService` is the user facing interface that does not allow dealing with namespaces/payloads and also does not allow deleting timers. There is a default implementation in `HeapInternalTimerService` that can checkpoint timers to a stream and also restore from a stream. Right now, this is managed in `AbstractStreamOperator` and operators can ask for an `InternalTimerService`. This also adds tests for `HeapInternalTimerService`. This adds two new user functions: - `TimelyFlatMapFunction`: an extension of `FlatMapFunction` that also allows querying time and setting timers - `TimelyCoFlatMapFunction`: the same, but for `CoFlatMapFunction` There are two new `StreamOperator` implementations for these that use the `InternalTimerService` interface. This also adds tests for the two new operators. This also adds the new interface `KeyContext` that is used for setting/querying the current key context for state and timers. Timers are always scoped to a key, for now. Also, this moves the handling of watermarks for both one-input and two-input operators to `AbstractStreamOperators` so that we have a central ground-truth. There was also a bunch of small changes that I had to do to make the proper change more clean. I would like to keep these as separate commits because they clearly document what was going on. ## Note for Reviewers You should probably start from the tests, i.e. `HeapInternalTimerServiceTest`, `TimelyFlatMapTest` and `TimelyCoFlatMapTest`. Then, the other interesting bits are `AbstractStreamOperator` that now deals with watermarks and checkpointing the timers and the `HeapInternalTimerService` as well. Keep in mind that this is just moving the code from `WindowOperator` to `HeapInternalTimerService` with some generalizations. I didn't try to optimize any of the data structures that are used. R: @StephanEwen @StefanRRichter @kl0u for review, please ð You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink timely-function Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2570.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2570 commit 1a09d9032bf5683a378a7fc8dc480f2d14c5924d Author: Aljoscha Krettek Date: 2016-09-25T18:58:16Z Rename TimeServiceProvider to ProcessingTimeService The name is clashing with the soon-to-be-added TimerService/InternalTimerService which is meant as an interface for dealing with both processing time and event time. TimeServiceProvided is renamed to ProcessingTimeService to reflect the fact that it is a low-level utility that only deals with "physical" processing-time trigger tasks. commit 758827c3c8508ef9ef2ec079ff3a8469d0096ca8 Author: Aljoscha Krettek Date: 2016-09-28T13:10:35Z Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests commit f6dd9c74dc2c58c4263fb6d084651b514898d47a Author: Aljoscha Krettek Date: 2016-09-28T14:35:33Z Use Processing-Time Service of TestHarness in WindowOperatorTest Before, this was manually creating a TestProcessingTimeService, now, we're using the one that is there by default in OneInputStreamOperatorTestHarness. commit 65389d66c5586e6707b7a6bf48df512354fac085 Author: Aljoscha Krettek Date: 2016-09-28T14:43:40Z Refactor OperatorTestHarness to always use TestProcessingTimeService Before, this would allow handing in a custom ProcessingTimeService but this was in reality always TestProcessingTimeService. commit 1d013bcacc040552e5783c64d094ec309014457b Author: Aljoscha Krettek Date: 2016-09-28T13:12:26Z Use TestHarness Processing-time Facility in BucketingSinkTest Before, this was manually creating a TestProcessingTimeService. Now we use the one that is there by default in OneInputStreamOperatorTestHarness. commit eaf3dd00fefeb2487c7cafff6337123cbe42874b Author: Aljoscha Krettek Date: 2016-09-28T13:32:24Z Use OperatorTestHarness in AlignedWindowOperator Tests commit b597d2ef50c27554b83fddaff8873107265340d4 Author: Aljoscha Krettek Date: 2016-09-29T14:04:29Z Refactor Operator TestHarnesses to use Common Base Class This also introduces KeyedTwoInputStreamOperato
[jira] [Commented] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.
[ https://issues.apache.org/jira/browse/FLINK-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533076#comment-15533076 ] ASF GitHub Bot commented on FLINK-4068: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2560#discussion_r81163353 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala --- @@ -60,6 +60,7 @@ class FlinkPlannerImpl( var root: RelRoot = _ private def ready() { +planner.setExecutor(config.getExecutor) --- End diff -- I would not set the planner here as this class is only used by the SQL API. `FlinkRelBuilder.create()` is a better place. > Move constant computations out of code-generated `flatMap` functions. > - > > Key: FLINK-4068 > URL: https://issues.apache.org/jira/browse/FLINK-4068 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Jark Wu > > The generated functions for expressions of the Table API or SQL include > constant computations. > For instance the code generated for a predicate like: > {code} > myInt < (10 + 20) > {code} > looks roughly like: > {code} > public void flatMap(Row in, Collector out) { > Integer in1 = in.productElement(1); > int temp = 10 + 20; > if (in1 < temp) { > out.collect(in) > } > } > {code} > In this example the computation of {{temp}} is constant and could be moved > out of the {{flatMap()}} method. > The same might apply for generated function other than {{FlatMap}} as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)