[jira] [Commented] (FLINK-4613) Extend ALS to handle implicit feedback datasets

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535242#comment-15535242
 ] 

ASF GitHub Bot commented on FLINK-4613:
---

Github user mbalassi commented on the issue:

https://github.com/apache/flink/pull/2542
  
@gaborhermann @thvasilo I would definitely like to see a test on a larger 
dataset, that is actually what I was asking for when I mentioned "benchmark", 
maybe I was not clear then.


> Extend ALS to handle implicit feedback datasets
> ---
>
> Key: FLINK-4613
> URL: https://issues.apache.org/jira/browse/FLINK-4613
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Gábor Hermann
>Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle 
> _implicit feedback_ datasets. These datasets do not contain explicit ratings 
> by users, they are rather built by collecting user behavior (e.g. user 
> listened to artist X for Y minutes), and they require a slightly different 
> optimization objective. See details by [Hu et 
> al|http://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS 
> implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
>  which could be a basis for this extension. Only the updating factor part is 
> modified, and most of the changes are in the local parts of the algorithm 
> (i.e. UDFs). In fact, the only modification that is not local, is 
> precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, 
> which we can do with broadcast DataSets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2542: [FLINK-4613] [ml] Extend ALS to handle implicit feedback ...

2016-09-29 Thread mbalassi
Github user mbalassi commented on the issue:

https://github.com/apache/flink/pull/2542
  
@gaborhermann @thvasilo I would definitely like to see a test on a larger 
dataset, that is actually what I was asking for when I mentioned "benchmark", 
maybe I was not clear then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4704) Move Table API to org.apache.flink.table

2016-09-29 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535169#comment-15535169
 ] 

Jark Wu commented on FLINK-4704:


+1 

Table API is still in beta, so it's not too late to do this now.

> Move Table API to org.apache.flink.table
> 
>
> Key: FLINK-4704
> URL: https://issues.apache.org/jira/browse/FLINK-4704
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> This would be a large change. But maybe now is still a good time to do it. 
> Otherwise we will never fix this.
> Actually, the Table API is in the wrong package. At the moment it is in 
> {{org.apache.flink.api.table}} and the actual Scala/Java APIs are in 
> {{org.apache.flink.api.java/scala.table}}. All other APIs such as Python, 
> Gelly, Flink ML do not use the {{org.apache.flink.api}} namespace.
> I suggest the following packages:
> {code}
> org.apache.flink.table
> org.apache.flink.table.api.java
> org.apache.flink.table.api.scala
> {code}
> What do you think?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka

2016-09-29 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535151#comment-15535151
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4618:


Hi [~melmoth],

I'd like to make sure this bug is fixed in the upcoming 1.1.3 bugfix release.
Let me know if you'd like to continue working on this :) Otherwise I can pick 
it up soon.

> FlinkKafkaConsumer09 should start from the next record on startup from 
> offsets in Kafka
> ---
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>Reporter: Melmoth
> Fix For: 1.2.0, 1.1.3
>
>
> **Original reported ticket title: Last kafka message gets consumed twice when 
> restarting job**
> There seem to be an issue with the offset management in Flink. When a job is 
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
> consumer group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 
> 4848911, which is correct. After restarting, it consumes a record at 4848910, 
> causing the record to be consumed more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in 
> zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient 
>- Completed connection to node 2147482646
> 10:29:24,234 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No 
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Adding fetched record for partition myTopic-0 with offset 4848910 to 
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Returning fetched records at offset 4848910 for assigned partition 
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
> offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, 
> metadata=''}} to Node(2147482646, hdp1, 6667)
> 10:30:24,204 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed 
> offset 4848910 for partition myTopic-0
> 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:48,057 INFO  org.apache.flink.runtime.blob.B

[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535141#comment-15535141
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2574#discussion_r81282102
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka09Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Kafka09Fetcher.class)
+public class Kafka09FetcherTest {
+
+   @Test
+   public void testCommitDoesNotBlock() throws Exception {
+
+   // test data
+   final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
+   final Map testCommitData = new 
HashMap<>();
+   testCommitData.put(testPartition, 11L);
+
+   // to synchronize when the consumer is in its blocking method
+   final OneShotLatch sync = new OneShotLatch();
+
+   // - the mock consumer with blocking poll calls 
+   final MultiShotLatch blockerLatch = new MultiShotLatch();
+   
+   KafkaConsumer mockConsumer = mock(KafkaConsumer.class);
+   when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer>() {
+   
+   @Override
+   public ConsumerRecords answer(InvocationOnMock 
invocation) throws InterruptedException {
+   sync.trigger();
+   blockerLatch.await();
--- End diff --
 

[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535144#comment-15535144
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2574#discussion_r81275213
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -283,10 +296,16 @@ public void 
commitSpecificOffsetsToKafka(Map offsets)
}
}
 
-   if (this.consumer != null) {
-   synchronized (consumerLock) {
-   this.consumer.commitSync(offsetsToCommit);
-   }
+   if (commitInProgress) {
+   LOG.warn("Committing offsets to Kafka takes longer than 
the checkpoint interval. " +
+   "Some checkpoints may be subsumed 
before committed. " +
+   "This does not compromise Flink's 
checkpoint integrity.");
+   }
--- End diff --

Will it make sense to simply move this warning to the `if (toCommit != null 
&& !commitInProgress)` block in the main thread? That's where 
`commitInProgress` will actually determine whether or not the current offsets 
to commit will be dropped. Also, the actual committing should happen right 
after anyways because of `consumer.wakeup()`, so I don't see the purpose of an 
eager warning here.


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535142#comment-15535142
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2574#discussion_r81277067
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka09Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Kafka09Fetcher.class)
+public class Kafka09FetcherTest {
+
+   @Test
+   public void testCommitDoesNotBlock() throws Exception {
+
+   // test data
+   final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
+   final Map testCommitData = new 
HashMap<>();
+   testCommitData.put(testPartition, 11L);
+
+   // to synchronize when the consumer is in its blocking method
+   final OneShotLatch sync = new OneShotLatch();
+
+   // - the mock consumer with blocking poll calls 
+   final MultiShotLatch blockerLatch = new MultiShotLatch();
+   
+   KafkaConsumer mockConsumer = mock(KafkaConsumer.class);
+   when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer>() {
+   
+   @Override
+   public ConsumerRecords answer(InvocationOnMock 
invocation) throws InterruptedException {
+   sync.trigger();
+   blockerLatch.await();
+

[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535143#comment-15535143
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2574#discussion_r81277361
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka09Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Kafka09Fetcher.class)
+public class Kafka09FetcherTest {
--- End diff --

Should we also add a test to make sure that `KafkaConsumer` is immediately 
called `wakeup()` in `commitSpecificOffsetsToKafka`? Otherwise we are not 
ensuring the behaviour of "committing offsets back to Kafka on checkpoints"

Perhaps this can be integrated into `testCommitDoesNotBlock()`.


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should u

[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

2016-09-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2574#discussion_r81277067
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka09Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Kafka09Fetcher.class)
+public class Kafka09FetcherTest {
+
+   @Test
+   public void testCommitDoesNotBlock() throws Exception {
+
+   // test data
+   final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
+   final Map testCommitData = new 
HashMap<>();
+   testCommitData.put(testPartition, 11L);
+
+   // to synchronize when the consumer is in its blocking method
+   final OneShotLatch sync = new OneShotLatch();
+
+   // - the mock consumer with blocking poll calls 
+   final MultiShotLatch blockerLatch = new MultiShotLatch();
+   
+   KafkaConsumer mockConsumer = mock(KafkaConsumer.class);
+   when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer>() {
+   
+   @Override
+   public ConsumerRecords answer(InvocationOnMock 
invocation) throws InterruptedException {
+   sync.trigger();
+   blockerLatch.await();
+   return ConsumerRecords.empty();
+   }
+   });
+
+   doAnswer(new Answer() {
+   @Override
+   public Void answer(InvocationOnMock invocation) {
+

[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

2016-09-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2574#discussion_r81282102
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka09Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Kafka09Fetcher.class)
+public class Kafka09FetcherTest {
+
+   @Test
+   public void testCommitDoesNotBlock() throws Exception {
+
+   // test data
+   final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
+   final Map testCommitData = new 
HashMap<>();
+   testCommitData.put(testPartition, 11L);
+
+   // to synchronize when the consumer is in its blocking method
+   final OneShotLatch sync = new OneShotLatch();
+
+   // - the mock consumer with blocking poll calls 
+   final MultiShotLatch blockerLatch = new MultiShotLatch();
+   
+   KafkaConsumer mockConsumer = mock(KafkaConsumer.class);
+   when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer>() {
+   
+   @Override
+   public ConsumerRecords answer(InvocationOnMock 
invocation) throws InterruptedException {
+   sync.trigger();
+   blockerLatch.await();
--- End diff --

We are not ensuring that `blockLatch` is returned here, correct? Like my 
comment above, perhaps we should check that to to ensure that `wakeup` is 
called in `commitSpecificOffsetsToKafka`.


---
If your project is set up for it, you can reply to 

[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

2016-09-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2574#discussion_r81275213
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -283,10 +296,16 @@ public void 
commitSpecificOffsetsToKafka(Map offsets)
}
}
 
-   if (this.consumer != null) {
-   synchronized (consumerLock) {
-   this.consumer.commitSync(offsetsToCommit);
-   }
+   if (commitInProgress) {
+   LOG.warn("Committing offsets to Kafka takes longer than 
the checkpoint interval. " +
+   "Some checkpoints may be subsumed 
before committed. " +
+   "This does not compromise Flink's 
checkpoint integrity.");
+   }
--- End diff --

Will it make sense to simply move this warning to the `if (toCommit != null 
&& !commitInProgress)` block in the main thread? That's where 
`commitInProgress` will actually determine whether or not the current offsets 
to commit will be dropped. Also, the actual committing should happen right 
after anyways because of `consumer.wakeup()`, so I don't see the purpose of an 
eager warning here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

2016-09-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2574#discussion_r81277361
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka09Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Kafka09Fetcher.class)
+public class Kafka09FetcherTest {
--- End diff --

Should we also add a test to make sure that `KafkaConsumer` is immediately 
called `wakeup()` in `commitSpecificOffsetsToKafka`? Otherwise we are not 
ensuring the behaviour of "committing offsets back to Kafka on checkpoints"

Perhaps this can be integrated into `testCommitDoesNotBlock()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534886#comment-15534886
 ] 

ASF GitHub Bot commented on FLINK-4068:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2560
  
@twalthr  I add some Table API tests but skip some tests doesn't supported 
in Table API, such as `NULLIF`, `IN`, `EXTRACT`


> Move constant computations out of code-generated `flatMap` functions.
> -
>
> Key: FLINK-4068
> URL: https://issues.apache.org/jira/browse/FLINK-4068
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> The generated functions for expressions of the Table API or SQL include 
> constant computations.
> For instance the code generated for a predicate like:
> {code}
> myInt < (10 + 20)
> {code}
> looks roughly like:
> {code}
> public void flatMap(Row in, Collector out) {
>   Integer in1 = in.productElement(1);
>   int temp = 10 + 20;  
>   if (in1 < temp) {
> out.collect(in)
>   }
> }
> {code}
> In this example the computation of {{temp}} is constant and could be moved 
> out of the {{flatMap()}} method.
> The same might apply for generated function other than {{FlatMap}} as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2560: [FLINK-4068] [table] Move constant computations out of co...

2016-09-29 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2560
  
@twalthr  I add some Table API tests but skip some tests doesn't supported 
in Table API, such as `NULLIF`, `IN`, `EXTRACT`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4718) Confusing label in Parallel Streams Diagram

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534817#comment-15534817
 ] 

ASF GitHub Bot commented on FLINK-4718:
---

GitHub user nderraugh opened a pull request:

https://github.com/apache/flink/pull/2575

'Id' is the key of the stream and not the id of the event.

FLINK-4718 Confusing label in Parallel Streams Diagram

Minor text change to clarify a mildly confusing label in a diagram.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nderraugh/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2575.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2575


commit 479c8401f9680b4e0bf88af4c7b68033aeb54cdc
Author: Neil Derraugh 
Date:   2016-09-29T17:53:10Z

'Id' is the key of the stream and not the id of the event.




> Confusing label in Parallel Streams Diagram
> ---
>
> Key: FLINK-4718
> URL: https://issues.apache.org/jira/browse/FLINK-4718
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Affects Versions: 1.1.0
>Reporter: Neil Derraugh
>Priority: Trivial
>  Labels: newbie
> Fix For: 1.1.2
>
>
> The Event [id|timestamp] label in the Parallel Streams Diagram is confusing.  
> The 'id' is in fact the key of the stream and not the id of the event record. 
>  Hence we have B35 and B33.
> The 'id' label should be changed to key or key_id to better represent its 
> nature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2575: 'Id' is the key of the stream and not the id of th...

2016-09-29 Thread nderraugh
GitHub user nderraugh opened a pull request:

https://github.com/apache/flink/pull/2575

'Id' is the key of the stream and not the id of the event.

FLINK-4718 Confusing label in Parallel Streams Diagram

Minor text change to clarify a mildly confusing label in a diagram.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nderraugh/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2575.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2575


commit 479c8401f9680b4e0bf88af4c7b68033aeb54cdc
Author: Neil Derraugh 
Date:   2016-09-29T17:53:10Z

'Id' is the key of the stream and not the id of the event.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534809#comment-15534809
 ] 

ASF GitHub Bot commented on FLINK-4068:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2560#discussion_r81270921
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
 ---
@@ -60,6 +60,7 @@ class FlinkPlannerImpl(
   var root: RelRoot = _
 
   private def ready() {
+planner.setExecutor(config.getExecutor)
--- End diff --

Yes you are right ! This is the reason why Table API not work.


> Move constant computations out of code-generated `flatMap` functions.
> -
>
> Key: FLINK-4068
> URL: https://issues.apache.org/jira/browse/FLINK-4068
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> The generated functions for expressions of the Table API or SQL include 
> constant computations.
> For instance the code generated for a predicate like:
> {code}
> myInt < (10 + 20)
> {code}
> looks roughly like:
> {code}
> public void flatMap(Row in, Collector out) {
>   Integer in1 = in.productElement(1);
>   int temp = 10 + 20;  
>   if (in1 < temp) {
> out.collect(in)
>   }
> }
> {code}
> In this example the computation of {{temp}} is constant and could be moved 
> out of the {{flatMap()}} method.
> The same might apply for generated function other than {{FlatMap}} as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2560: [FLINK-4068] [table] Move constant computations ou...

2016-09-29 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2560#discussion_r81270921
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
 ---
@@ -60,6 +60,7 @@ class FlinkPlannerImpl(
   var root: RelRoot = _
 
   private def ready() {
+planner.setExecutor(config.getExecutor)
--- End diff --

Yes you are right ! This is the reason why Table API not work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-09-29 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534797#comment-15534797
 ] 

Zhijiang Wang edited comment on FLINK-4715 at 9/30/16 2:31 AM:
---

Yes, we already experienced this problem in real production many times,  
because the user code can not be controlled. If the thread is waiting for 
synchronized lock or other cases, it can not be cancelled. We take the way that 
if the job master cancel the task failed many times, the job master will let 
the task manager exit itself.


was (Author: zjwang):
Yes, we already experienced this problem in real production many times,  
because the user code can not be controlled. If the thread is waiting for 
synchronized lock or other cases, it can not be cancelled, and the job master 
cancel the task failed many times, the job master will let the task manager 
exit itself.

> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534800#comment-15534800
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2571
  
@mxm, What would happen under following cases: taskExecutor releases a 
registered slot,  then taskExecutor reports its latest slotReport to 
ResourceManager, ResourceManager should remove the old slot allocation from its 
own view and mark the slot free. So I think we should keep the following code 
in the old SlotManager `updateSlotStatus`:
_else {
// slot is reported empty

// check whether we also thought this slot is 
empty
if (allocationMap.isAllocated(slotId)) {
LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:null",
slotId, 
allocationMap.getAllocationID(slotId));

// we thought the slot is in use, 
correct it
allocationMap.removeAllocation(slotId);

// we have a free slot!
handleFreeSlot(slot);
}
}_


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4718) Confusing label in Parallel Streams Diagram

2016-09-29 Thread Neil Derraugh (JIRA)
Neil Derraugh created FLINK-4718:


 Summary: Confusing label in Parallel Streams Diagram
 Key: FLINK-4718
 URL: https://issues.apache.org/jira/browse/FLINK-4718
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Affects Versions: 1.1.0
Reporter: Neil Derraugh
Priority: Trivial
 Fix For: 1.1.2


The Event [id|timestamp] label in the Parallel Streams Diagram is confusing.  
The 'id' is in fact the key of the stream and not the id of the event record.  
Hence we have B35 and B33.

The 'id' label should be changed to key or key_id to better represent its 
nature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager

2016-09-29 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2571
  
@mxm, What would happen under following cases: taskExecutor releases a 
registered slot,  then taskExecutor reports its latest slotReport to 
ResourceManager, ResourceManager should remove the old slot allocation from its 
own view and mark the slot free. So I think we should keep the following code 
in the old SlotManager `updateSlotStatus`:
_else {
// slot is reported empty

// check whether we also thought this slot is 
empty
if (allocationMap.isAllocated(slotId)) {
LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:null",
slotId, 
allocationMap.getAllocationID(slotId));

// we thought the slot is in use, 
correct it
allocationMap.removeAllocation(slotId);

// we have a free slot!
handleFreeSlot(slot);
}
}_


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-09-29 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534797#comment-15534797
 ] 

Zhijiang Wang commented on FLINK-4715:
--

Yes, we already experienced this problem in real production many times,  
because the user code can not be controlled. If the thread is waiting for 
synchronized lock or other cases, it can not be cancelled, and the job master 
cancel the task failed many times, the job master will let the task manager 
exit itself.

> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534249#comment-15534249
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81246840
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph that contains two sets of vertices: top 
vertices and bottom vertices. Edges can only exist
+ * between a pair of vertices from different vertices sets. E.g. there can 
be no vertices between a pair
+ * of top vertices.
+ *
+ * Bipartite graph is useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or
+ * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-09-29 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81246840
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph that contains two sets of vertices: top 
vertices and bottom vertices. Edges can only exist
+ * between a pair of vertices from different vertices sets. E.g. there can 
be no vertices between a pair
+ * of top vertices.
+ *
+ * Bipartite graph is useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or
+ * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+   return bottomVertices;
+   }
+
+   /**
+* Get dataset with graph edges.
+*
+* @return dataset with graph edges
+*/
+   public DataSet> getEdges() {
+   return edges;
+   }
+
  

[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534241#comment-15534241
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81246465
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java ---
@@ -34,10 +34,10 @@
 
public Edge(){}
 
-   public Edge(K src, K trg, V val) {
-   this.f0 = src;
-   this.f1 = trg;
-   this.f2 = val;
+   public Edge(K source, K target, V value) {
--- End diff --

To make them consistent with naming style in other classes.
Do you suggest to revert this?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-09-29 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81246465
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java ---
@@ -34,10 +34,10 @@
 
public Edge(){}
 
-   public Edge(K src, K trg, V val) {
-   this.f0 = src;
-   this.f1 = trg;
-   this.f2 = val;
+   public Edge(K source, K target, V value) {
--- End diff --

To make them consistent with naming style in other classes.
Do you suggest to revert this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534239#comment-15534239
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2564
  
Hi @greghogan,

I like your ideas about providing different API for projections. This 
should be better than my approach. 
@vasia What do you think about this?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

2016-09-29 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2564
  
Hi @greghogan,

I like your ideas about providing different API for projections. This 
should be better than my approach. 
@vasia What do you think about this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534224#comment-15534224
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81245767
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 ---
@@ -480,7 +480,8 @@ protected static File asFile(String path) {
}
}

-   assertEquals("Wrong number of elements result", 
expectedStrings.length, resultStrings.length);
+   assertEquals(String.format("Wrong number of elements result. 
Expected: %s. Result: %s.", Arrays.toString(expectedStrings), 
Arrays.toString(resultStrings)),
--- End diff --

The issue here is that it compares lengths of objects and therefore JUnit 
only prints compared numbers (say 2 and 0) and not content of arrays.


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-09-29 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81245767
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 ---
@@ -480,7 +480,8 @@ protected static File asFile(String path) {
}
}

-   assertEquals("Wrong number of elements result", 
expectedStrings.length, resultStrings.length);
+   assertEquals(String.format("Wrong number of elements result. 
Expected: %s. Result: %s.", Arrays.toString(expectedStrings), 
Arrays.toString(resultStrings)),
--- End diff --

The issue here is that it compares lengths of objects and therefore JUnit 
only prints compared numbers (say 2 and 0) and not content of arrays.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534216#comment-15534216
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2546
  
Hi @StephanEwen . If I understand correctly, your suggestion is to make the 
test something like the following: 1) put the split in the reader 2) read the 
split 3) when the split finishes update the time in the provider 4) observe the 
time in the output elements. If this is the case, then the problem is that the 
reader just puts the split in a queue, and this is picked up by another thread 
that reads it. In this context, there is no way of knowing when the reading 
thread has finished reading the split and goes to the next one. So step 3) 
cannot be synchronized correctly. This is the reason I am just having a thread 
in the test that tries (without guarantees - the race condition you mentioned) 
to update the time while the reader is still reading. Any suggestions are 
welcome.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2546: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

2016-09-29 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2546
  
Hi @StephanEwen . If I understand correctly, your suggestion is to make the 
test something like the following: 1) put the split in the reader 2) read the 
split 3) when the split finishes update the time in the provider 4) observe the 
time in the output elements. If this is the case, then the problem is that the 
reader just puts the split in a queue, and this is picked up by another thread 
that reads it. In this context, there is no way of knowing when the reading 
thread has finished reading the split and goes to the next one. So step 3) 
cannot be synchronized correctly. This is the reason I am just having a thread 
in the test that tries (without guarantees - the race condition you mentioned) 
to update the time while the reader is still reading. Any suggestions are 
welcome.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534014#comment-15534014
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81225166
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 ---
@@ -480,7 +480,8 @@ protected static File asFile(String path) {
}
}

-   assertEquals("Wrong number of elements result", 
expectedStrings.length, resultStrings.length);
+   assertEquals(String.format("Wrong number of elements result. 
Expected: %s. Result: %s.", Arrays.toString(expectedStrings), 
Arrays.toString(resultStrings)),
--- End diff --

Doesn't IntelliJ offer to view the different results?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534015#comment-15534015
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81226805
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph that contains two sets of vertices: top 
vertices and bottom vertices. Edges can only exist
+ * between a pair of vertices from different vertices sets. E.g. there can 
be no vertices between a pair
+ * of top vertices.
+ *
+ * Bipartite graph is useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or
+ * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-09-29 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81225166
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 ---
@@ -480,7 +480,8 @@ protected static File asFile(String path) {
}
}

-   assertEquals("Wrong number of elements result", 
expectedStrings.length, resultStrings.length);
+   assertEquals(String.format("Wrong number of elements result. 
Expected: %s. Result: %s.", Arrays.toString(expectedStrings), 
Arrays.toString(resultStrings)),
--- End diff --

Doesn't IntelliJ offer to view the different results?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-09-29 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81226805
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph that contains two sets of vertices: top 
vertices and bottom vertices. Edges can only exist
+ * between a pair of vertices from different vertices sets. E.g. there can 
be no vertices between a pair
+ * of top vertices.
+ *
+ * Bipartite graph is useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or
+ * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+   return bottomVertices;
+   }
+
+   /**
+* Get dataset with graph edges.
+*
+* @return dataset with graph edges
+*/
+   public DataSet> getEdges() {
+   return edges;
+   }
+
  

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-09-29 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81217922
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph that contains two sets of vertices: top 
vertices and bottom vertices. Edges can only exist
--- End diff --

I would rephrase that to "... a graph whose vertices can be divided into 
two disjoint sets"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-09-29 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81218538
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java ---
@@ -34,10 +34,10 @@
 
public Edge(){}
 
-   public Edge(K src, K trg, V val) {
-   this.f0 = src;
-   this.f1 = trg;
-   this.f2 = val;
+   public Edge(K source, K target, V value) {
--- End diff --

Why did you change these?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-09-29 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81218403
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph that contains two sets of vertices: top 
vertices and bottom vertices. Edges can only exist
+ * between a pair of vertices from different vertices sets. E.g. there can 
be no vertices between a pair
+ * of top vertices.
+ *
+ * Bipartite graph is useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or
+ * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+   return bottomVertices;
+   }
+
+   /**
+* Get dataset with graph edges.
+*
+* @return dataset with graph edges
+*/
+   public DataSet> getEdges() {
+   return edges;
+   }
+
+ 

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-09-29 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81218056
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph that contains two sets of vertices: top 
vertices and bottom vertices. Edges can only exist
+ * between a pair of vertices from different vertices sets. E.g. there can 
be no vertices between a pair
+ * of top vertices.
+ *
+ * Bipartite graph is useful to represent graphs with two sets of 
objects, like researchers and their publications,
--- End diff --

Bipartite graphs are useful...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-09-29 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81218490
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph that contains two sets of vertices: top 
vertices and bottom vertices. Edges can only exist
+ * between a pair of vertices from different vertices sets. E.g. there can 
be no vertices between a pair
+ * of top vertices.
+ *
+ * Bipartite graph is useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or
+ * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+   return bottomVertices;
+   }
+
+   /**
+* Get dataset with graph edges.
+*
+* @return dataset with graph edges
+*/
+   public DataSet> getEdges() {
+   return edges;
+   }
+
+ 

[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533864#comment-15533864
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81218403
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph that contains two sets of vertices: top 
vertices and bottom vertices. Edges can only exist
+ * between a pair of vertices from different vertices sets. E.g. there can 
be no vertices between a pair
+ * of top vertices.
+ *
+ * Bipartite graph is useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or
+ * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+   

[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533866#comment-15533866
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81217922
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph that contains two sets of vertices: top 
vertices and bottom vertices. Edges can only exist
--- End diff --

I would rephrase that to "... a graph whose vertices can be divided into 
two disjoint sets"


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533865#comment-15533865
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81218538
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java ---
@@ -34,10 +34,10 @@
 
public Edge(){}
 
-   public Edge(K src, K trg, V val) {
-   this.f0 = src;
-   this.f1 = trg;
-   this.f2 = val;
+   public Edge(K source, K target, V value) {
--- End diff --

Why did you change these?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533863#comment-15533863
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81218056
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph that contains two sets of vertices: top 
vertices and bottom vertices. Edges can only exist
+ * between a pair of vertices from different vertices sets. E.g. there can 
be no vertices between a pair
+ * of top vertices.
+ *
+ * Bipartite graph is useful to represent graphs with two sets of 
objects, like researchers and their publications,
--- End diff --

Bipartite graphs are useful...


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533862#comment-15533862
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r81218490
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph that contains two sets of vertices: top 
vertices and bottom vertices. Edges can only exist
+ * between a pair of vertices from different vertices sets. E.g. there can 
be no vertices between a pair
+ * of top vertices.
+ *
+ * Bipartite graph is useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or
+ * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+   

[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533664#comment-15533664
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r81204726
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() 
throws Exception {
 
private int getLineNo(String line) {
String[] tkns = line.split("\\s");
-   Assert.assertEquals(tkns.length, 6);
+   Assert.assertEquals(6, tkns.length);
return Integer.parseInt(tkns[tkns.length - 1]);
}
 
+   private class TimeUpdatingThread extends Thread {
+
+   private volatile boolean isRunning;
+
+   private final TestTimeServiceProvider timeServiceProvider;
+   private final OneInputStreamOperatorTestHarness testHarness;
+   private final long wmInterval;
+   private final int elementUntilUpdating;
+
+   TimeUpdatingThread(final TestTimeServiceProvider 
timeServiceProvider,
+  final 
OneInputStreamOperatorTestHarness testHarness,
+  final long wmInterval,
+  final int 
elementUntilUpdating) {
+
+   this.timeServiceProvider = timeServiceProvider;
+   this.testHarness = testHarness;
+   this.wmInterval = wmInterval;
+   this.elementUntilUpdating = elementUntilUpdating;
+   this.isRunning = true;
+   }
+
+   @Override
+   public void run() {
+   try {
+   while (isRunning) {
+   if (testHarness.getOutput().size() % 
elementUntilUpdating == 0) {
--- End diff --

There is a "race" between the operator emitting elements and this thread. 
Both run in loops without delays. Only if this condition is evaluated by chance 
at the exact point in time when the list happens to have so many result 
elements, there will actually be a time advance.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533663#comment-15533663
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r81204828
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() 
throws Exception {
 
private int getLineNo(String line) {
String[] tkns = line.split("\\s");
-   Assert.assertEquals(tkns.length, 6);
+   Assert.assertEquals(6, tkns.length);
return Integer.parseInt(tkns[tkns.length - 1]);
}
 
+   private class TimeUpdatingThread extends Thread {
+
+   private volatile boolean isRunning;
+
+   private final TestTimeServiceProvider timeServiceProvider;
+   private final OneInputStreamOperatorTestHarness testHarness;
+   private final long wmInterval;
+   private final int elementUntilUpdating;
+
+   TimeUpdatingThread(final TestTimeServiceProvider 
timeServiceProvider,
+  final 
OneInputStreamOperatorTestHarness testHarness,
+  final long wmInterval,
+  final int 
elementUntilUpdating) {
+
+   this.timeServiceProvider = timeServiceProvider;
+   this.testHarness = testHarness;
+   this.wmInterval = wmInterval;
+   this.elementUntilUpdating = elementUntilUpdating;
+   this.isRunning = true;
+   }
+
+   @Override
+   public void run() {
+   try {
+   while (isRunning) {
+   if (testHarness.getOutput().size() % 
elementUntilUpdating == 0) {
+   long now = 
timeServiceProvider.getCurrentProcessingTime();
+   
timeServiceProvider.setCurrentTime(now + wmInterval);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
--- End diff --

This will not result in any meaningful feedback to the test.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

2016-09-29 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r81204726
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() 
throws Exception {
 
private int getLineNo(String line) {
String[] tkns = line.split("\\s");
-   Assert.assertEquals(tkns.length, 6);
+   Assert.assertEquals(6, tkns.length);
return Integer.parseInt(tkns[tkns.length - 1]);
}
 
+   private class TimeUpdatingThread extends Thread {
+
+   private volatile boolean isRunning;
+
+   private final TestTimeServiceProvider timeServiceProvider;
+   private final OneInputStreamOperatorTestHarness testHarness;
+   private final long wmInterval;
+   private final int elementUntilUpdating;
+
+   TimeUpdatingThread(final TestTimeServiceProvider 
timeServiceProvider,
+  final 
OneInputStreamOperatorTestHarness testHarness,
+  final long wmInterval,
+  final int 
elementUntilUpdating) {
+
+   this.timeServiceProvider = timeServiceProvider;
+   this.testHarness = testHarness;
+   this.wmInterval = wmInterval;
+   this.elementUntilUpdating = elementUntilUpdating;
+   this.isRunning = true;
+   }
+
+   @Override
+   public void run() {
+   try {
+   while (isRunning) {
+   if (testHarness.getOutput().size() % 
elementUntilUpdating == 0) {
--- End diff --

There is a "race" between the operator emitting elements and this thread. 
Both run in loops without delays. Only if this condition is evaluated by chance 
at the exact point in time when the list happens to have so many result 
elements, there will actually be a time advance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533662#comment-15533662
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r81204990
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -190,12 +213,30 @@ public void 
testFileReadingOperatorWithIngestionTime() throws Exception {
}
content.add(element.getValue() + "\n");
} else if (line instanceof Watermark) {
-   Assert.assertEquals(((Watermark) 
line).getTimestamp(), Long.MAX_VALUE);
+   watermarkTimestamps.add(((Watermark) 
line).getTimestamp());
} else {
Assert.fail("Unknown element in the list.");
}
}
 
+   // check if all the input was read
+   Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, noOfLines);
+
+   // check if the last element is the LongMax watermark
+   Assert.assertTrue(lastElement instanceof Watermark);
+   Assert.assertEquals(Long.MAX_VALUE, ((Watermark) 
lastElement).getTimestamp());
+
+   System.out.println(watermarkTimestamps.size());
--- End diff --

Leftover sysout printing.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

2016-09-29 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r81204990
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -190,12 +213,30 @@ public void 
testFileReadingOperatorWithIngestionTime() throws Exception {
}
content.add(element.getValue() + "\n");
} else if (line instanceof Watermark) {
-   Assert.assertEquals(((Watermark) 
line).getTimestamp(), Long.MAX_VALUE);
+   watermarkTimestamps.add(((Watermark) 
line).getTimestamp());
} else {
Assert.fail("Unknown element in the list.");
}
}
 
+   // check if all the input was read
+   Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, noOfLines);
+
+   // check if the last element is the LongMax watermark
+   Assert.assertTrue(lastElement instanceof Watermark);
+   Assert.assertEquals(Long.MAX_VALUE, ((Watermark) 
lastElement).getTimestamp());
+
+   System.out.println(watermarkTimestamps.size());
--- End diff --

Leftover sysout printing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

2016-09-29 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r81204828
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() 
throws Exception {
 
private int getLineNo(String line) {
String[] tkns = line.split("\\s");
-   Assert.assertEquals(tkns.length, 6);
+   Assert.assertEquals(6, tkns.length);
return Integer.parseInt(tkns[tkns.length - 1]);
}
 
+   private class TimeUpdatingThread extends Thread {
+
+   private volatile boolean isRunning;
+
+   private final TestTimeServiceProvider timeServiceProvider;
+   private final OneInputStreamOperatorTestHarness testHarness;
+   private final long wmInterval;
+   private final int elementUntilUpdating;
+
+   TimeUpdatingThread(final TestTimeServiceProvider 
timeServiceProvider,
+  final 
OneInputStreamOperatorTestHarness testHarness,
+  final long wmInterval,
+  final int 
elementUntilUpdating) {
+
+   this.timeServiceProvider = timeServiceProvider;
+   this.testHarness = testHarness;
+   this.wmInterval = wmInterval;
+   this.elementUntilUpdating = elementUntilUpdating;
+   this.isRunning = true;
+   }
+
+   @Override
+   public void run() {
+   try {
+   while (isRunning) {
+   if (testHarness.getOutput().size() % 
elementUntilUpdating == 0) {
+   long now = 
timeServiceProvider.getCurrentProcessingTime();
+   
timeServiceProvider.setCurrentTime(now + wmInterval);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
--- End diff --

This will not result in any meaningful feedback to the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4636) AbstractCEPPatternOperator fails to restore state

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533563#comment-15533563
 ] 

ASF GitHub Bot commented on FLINK-4636:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2568#discussion_r81200110
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
 ---
@@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) 
throws Exception {
 
int numberPriorityQueueEntries = ois.readInt();
 
-   priorityQueue = new 
PriorityQueue>(numberPriorityQueueEntries, new 
StreamRecordComparator());
-
-   for (int i = 0; i asRecord());
+   if(numberPriorityQueueEntries > 0) {
+   priorityQueue = new 
PriorityQueue>(numberPriorityQueueEntries, new 
StreamRecordComparator());
--- End diff --

The queue is created in `open()`.


> AbstractCEPPatternOperator fails to restore state
> -
>
> Key: FLINK-4636
> URL: https://issues.apache.org/jira/browse/FLINK-4636
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.2.0, 1.1.2
>Reporter: Fabian Hueske
>Assignee: Jagadish Bihani
> Fix For: 1.2.0, 1.1.3
>
>
> The {{restoreState()}} of the {{AbstractCEPPatternOperator}} restores the a 
> Java {{PriorityQueue}}. For that it first reads the number of elements to 
> insert and then creates a {{PriorityQueue}} object. However, Java's 
> {{PriorityQueue}} cannot be instantiated with an initial capacity of {{0}}, 
> which is not checked.
> In case of an empty queue, the {{PriorityQueue}} should be instantiated with 
> an initial size of {{1}}.
> See 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-CEPPatternOperator-when-taskmanager-is-killed-tp9024.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4643) Average Clustering Coefficient

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533562#comment-15533562
 ] 

ASF GitHub Bot commented on FLINK-4643:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2528
  
Looks good to me. Test failure is unrelated.

From my side, feel free to merge...



> Average Clustering Coefficient
> --
>
> Key: FLINK-4643
> URL: https://issues.apache.org/jira/browse/FLINK-4643
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Gelly has Global Clustering Coefficient and Local Clustering Coefficient. 
> This adds Average Clustering Coefficient. The distinction is discussed in 
> [http://jponnela.com/web_documents/twomode.pdf] (pdf page 2, document page 
> 32).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2568: [FLINK-4636] Add boundary check for priorityqueue ...

2016-09-29 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2568#discussion_r81200110
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
 ---
@@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) 
throws Exception {
 
int numberPriorityQueueEntries = ois.readInt();
 
-   priorityQueue = new 
PriorityQueue>(numberPriorityQueueEntries, new 
StreamRecordComparator());
-
-   for (int i = 0; i asRecord());
+   if(numberPriorityQueueEntries > 0) {
+   priorityQueue = new 
PriorityQueue>(numberPriorityQueueEntries, new 
StreamRecordComparator());
--- End diff --

The queue is created in `open()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2528: [FLINK-4643] [gelly] Average Clustering Coefficient

2016-09-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2528
  
Looks good to me. Test failure is unrelated.

From my side, feel free to merge...



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2568: [FLINK-4636] Add boundary check for priorityqueue ...

2016-09-29 Thread jaxbihani
Github user jaxbihani commented on a diff in the pull request:

https://github.com/apache/flink/pull/2568#discussion_r81199485
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
 ---
@@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) 
throws Exception {
 
int numberPriorityQueueEntries = ois.readInt();
 
-   priorityQueue = new 
PriorityQueue>(numberPriorityQueueEntries, new 
StreamRecordComparator());
-
-   for (int i = 0; i asRecord());
+   if(numberPriorityQueueEntries > 0) {
+   priorityQueue = new 
PriorityQueue>(numberPriorityQueueEntries, new 
StreamRecordComparator());
--- End diff --

I think if number of objects read are 0 then why would we want to create 
object at all? Thats unnecessary and condition check will save some CPU cycles.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4636) AbstractCEPPatternOperator fails to restore state

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533555#comment-15533555
 ] 

ASF GitHub Bot commented on FLINK-4636:
---

Github user jaxbihani commented on a diff in the pull request:

https://github.com/apache/flink/pull/2568#discussion_r81199485
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
 ---
@@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) 
throws Exception {
 
int numberPriorityQueueEntries = ois.readInt();
 
-   priorityQueue = new 
PriorityQueue>(numberPriorityQueueEntries, new 
StreamRecordComparator());
-
-   for (int i = 0; i asRecord());
+   if(numberPriorityQueueEntries > 0) {
+   priorityQueue = new 
PriorityQueue>(numberPriorityQueueEntries, new 
StreamRecordComparator());
--- End diff --

I think if number of objects read are 0 then why would we want to create 
object at all? Thats unnecessary and condition check will save some CPU cycles.


> AbstractCEPPatternOperator fails to restore state
> -
>
> Key: FLINK-4636
> URL: https://issues.apache.org/jira/browse/FLINK-4636
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.2.0, 1.1.2
>Reporter: Fabian Hueske
>Assignee: Jagadish Bihani
> Fix For: 1.2.0, 1.1.3
>
>
> The {{restoreState()}} of the {{AbstractCEPPatternOperator}} restores the a 
> Java {{PriorityQueue}}. For that it first reads the number of elements to 
> insert and then creates a {{PriorityQueue}} object. However, Java's 
> {{PriorityQueue}} cannot be instantiated with an initial capacity of {{0}}, 
> which is not checked.
> In case of an empty queue, the {{PriorityQueue}} should be instantiated with 
> an initial size of {{1}}.
> See 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-CEPPatternOperator-when-taskmanager-is-killed-tp9024.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4717) Naive version of atomic stop signal with savepoint

2016-09-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4717:


 Summary: Naive version of atomic stop signal with savepoint
 Key: FLINK-4717
 URL: https://issues.apache.org/jira/browse/FLINK-4717
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Till Rohrmann
Priority: Minor
 Fix For: 1.2.0


As a first step towards atomic stopping with savepoints we should implement a 
cancel command which prior to cancelling takes a savepoint. Additionally, it 
should turn off the periodic checkpointing so that there won't be checkpoints 
executed between the savepoint and the cancel command.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4573) Potential resource leak due to unclosed RandomAccessFile in TaskManagerLogHandler

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533528#comment-15533528
 ] 

ASF GitHub Bot commented on FLINK-4573:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2556
  
Merging this...


> Potential resource leak due to unclosed RandomAccessFile in 
> TaskManagerLogHandler
> -
>
> Key: FLINK-4573
> URL: https://issues.apache.org/jira/browse/FLINK-4573
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> try {
> raf = new 
> RandomAccessFile(file, "r");
> } catch 
> (FileNotFoundException e) {
> display(ctx, request, 
> "Displaying TaskManager log failed.");
> LOG.error("Displaying 
> TaskManager log failed.", e);
> return;
> }
> long fileLength = 
> raf.length();
> final FileChannel fc = 
> raf.getChannel();
> {code}
> If length() throws IOException, raf would be left unclosed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2556: [FLINK-4573] Fix potential resource leak due to unclosed ...

2016-09-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2556
  
Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---



[jira] [Updated] (FLINK-4650) Frequent task manager disconnects from JobManager

2016-09-29 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-4650:
-
Fix Version/s: 1.2.0

> Frequent task manager disconnects from JobManager
> -
>
> Key: FLINK-4650
> URL: https://issues.apache.org/jira/browse/FLINK-4650
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Network
>Affects Versions: 1.2.0
>Reporter: Nagarjun Guraja
> Fix For: 1.2.0
>
>
> Not sure of the exact reason but we observe more frequent task manager 
> disconnects while using 1.2 snapshot build as compared to 1.1.2 release build



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4650) Frequent task manager disconnects from JobManager

2016-09-29 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-4650:
-
Affects Version/s: 1.2.0

> Frequent task manager disconnects from JobManager
> -
>
> Key: FLINK-4650
> URL: https://issues.apache.org/jira/browse/FLINK-4650
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Network
>Affects Versions: 1.2.0
>Reporter: Nagarjun Guraja
> Fix For: 1.2.0
>
>
> Not sure of the exact reason but we observe more frequent task manager 
> disconnects while using 1.2 snapshot build as compared to 1.1.2 release build



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4716) Add trigger full recovery button to web UI

2016-09-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4716:


 Summary: Add trigger full recovery button to web UI
 Key: FLINK-4716
 URL: https://issues.apache.org/jira/browse/FLINK-4716
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Affects Versions: 1.2.0
Reporter: Till Rohrmann
Priority: Minor
 Fix For: 1.2.0


Add a trigger full recovery button to the web UI. The full recovery button will 
take the latest completed checkpoint and resume from there.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-09-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4715:


 Summary: TaskManager should commit suicide after cancellation 
failure
 Key: FLINK-4715
 URL: https://issues.apache.org/jira/browse/FLINK-4715
 Project: Flink
  Issue Type: Improvement
  Components: TaskManager
Affects Versions: 1.2.0
Reporter: Till Rohrmann
 Fix For: 1.2.0


In case of a failed cancellation, e.g. the task cannot be cancelled after a 
given time, the {{TaskManager}} should kill itself. That way we guarantee that 
there is no resource leak. 

This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4714) Set task state to RUNNING after state has been restored

2016-09-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4714:


 Summary: Set task state to RUNNING after state has been restored
 Key: FLINK-4714
 URL: https://issues.apache.org/jira/browse/FLINK-4714
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination, State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Till Rohrmann
 Fix For: 1.2.0


The task state is set to {{RUNNING}} as soon as the {{Task}} is executed. That, 
however, happens before the state of the {{StreamTask}} invokable has been 
restored. As a result, the {{CheckpointCoordinator}} starts to trigger 
checkpoints even though the {{StreamTask}} is not ready.

In order to avoid aborting checkpoints and properly start it, we should switch 
the task state to {{RUNNING}} after the state has been restored.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4700) Harden the TimeProvider test

2016-09-29 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533260#comment-15533260
 ] 

Kostas Kloudas commented on FLINK-4700:
---

[~StephanEwen] 

I am thinking that the best solution to this is to remove the 
testDefaultTimeProvider() test altogether.

The only thing it actually tests is the java ScheduledExecutorService and there 
is no way to make it stable 
without adding test-specific methods (visible for testing) in the 
DefaultTimeServiceProvider. 

What do you think?

> Harden the TimeProvider test
> 
>
> Key: FLINK-4700
> URL: https://issues.apache.org/jira/browse/FLINK-4700
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> Currently the TimeProvider test fails due to a race condition. This task aims 
> at fixing it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533216#comment-15533216
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2574
  
@tzulitai @rmetzger We need to make sure that the Kafka 0.10 code picks up 
this change and the test case.


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4711) TaskManager can crash due to failing onPartitionStateUpdate call

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533215#comment-15533215
 ] 

ASF GitHub Bot commented on FLINK-4711:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2569
  
Thanks for the review @uce. If Travis gives green light, I'll merge the PR.


> TaskManager can crash due to failing onPartitionStateUpdate call
> 
>
> Key: FLINK-4711
> URL: https://issues.apache.org/jira/browse/FLINK-4711
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> The {{TaskManager}} can crash because it calls 
> {{Task.onPartitionStateUpdate}} when it receives a {{PartitionState}} 
> message. The {{onPartitionStateUpdate}} method can throw an {{IOException}} 
> or {{InterruptedException}} which are not handled on the {{TaskManager}} 
> level.
> Another problem is that the initial partition state request is triggered 
> within the {{SingleInputGate}}. The request causes the {{JobManager}} to send 
> a {{PartitionState}} message to the {{TaskManager}} which forwards it to the 
> {{Task}}. If the at any of these points a message gets lost, then it is not 
> retried and the partition state remains unknown.
> In order to handle the exceptions, to make the data flow clearer and to add 
> automatic retries, I propose to let the {{Task}} send the partition state 
> check requests. Furthermore, the {{JobManager}} should directly answer to the 
> {{Task}} by replying to an ask operation. That way the message does not have 
> to be routed through the {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2574: [FLINK-4702] [kafka connector] Commit offsets to Kafka as...

2016-09-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2574
  
@tzulitai @rmetzger We need to make sure that the Kafka 0.10 code picks up 
this change and the test case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2569: [FLINK-4711] Let the Task trigger partition state request...

2016-09-29 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2569
  
Thanks for the review @uce. If Travis gives green light, I'll merge the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533211#comment-15533211
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/2559


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533210#comment-15533210
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2559
  
Closing this for #2574 


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

2016-09-29 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/2559


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2559
  
Closing this for #2574 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533206#comment-15533206
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/2574

[FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and 
don't block on polls

This fix is quite critical!

While the KafkaConsumer is polling for new data (with a timeout), it holds 
the consumer lock. If no data comes in Kafka, the lock is not released before 
the poll timeout is over.

During that time, no offset commit can make progress, because it needs the 
consumer lock. The `notifyCheckpointComplete()` method of the Kafka Consumer 
hence blocks until the poll timeout is over and the lock is released. For 
low-throughput Kafka Topics, this can cause wildly long checkpoint delays.

This changes `notifyCheckpointComplete()` to only "schedule" offsets to be 
committed, while the main fetcher thread actually kick off the asynchronous 
offset commits. That way, there is no interference between the 
`notifyCheckpointComplete()` method (which is executed under checkpoint lock) 
and the consumer lock.

In fact, the only KafkaConsumer method accessed concurrently to the main 
fetcher thread is `wakeup()` which is actually thread-safe (where the rest of 
the KafkaConsumer is not). The consumer lock was hence completely removed.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink kafka_09_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2574.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2574


commit 0846fd907db7d52d7e5fb7d704c5e1c13462e331
Author: Stephan Ewen 
Date:   2016-09-29T16:09:51Z

[FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and 
don't block on polls

Letting the Kafka commit block on polls means that 
'notifyCheckpointComplete()' may take
very long. This is mostly relevant for low-throughput Kafka topics.




> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533204#comment-15533204
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2571
  
CC @beyond1920 @KurtYoung 


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

2016-09-29 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/2574

[FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and 
don't block on polls

This fix is quite critical!

While the KafkaConsumer is polling for new data (with a timeout), it holds 
the consumer lock. If no data comes in Kafka, the lock is not released before 
the poll timeout is over.

During that time, no offset commit can make progress, because it needs the 
consumer lock. The `notifyCheckpointComplete()` method of the Kafka Consumer 
hence blocks until the poll timeout is over and the lock is released. For 
low-throughput Kafka Topics, this can cause wildly long checkpoint delays.

This changes `notifyCheckpointComplete()` to only "schedule" offsets to be 
committed, while the main fetcher thread actually kick off the asynchronous 
offset commits. That way, there is no interference between the 
`notifyCheckpointComplete()` method (which is executed under checkpoint lock) 
and the consumer lock.

In fact, the only KafkaConsumer method accessed concurrently to the main 
fetcher thread is `wakeup()` which is actually thread-safe (where the rest of 
the KafkaConsumer is not). The consumer lock was hence completely removed.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink kafka_09_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2574.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2574


commit 0846fd907db7d52d7e5fb7d704c5e1c13462e331
Author: Stephan Ewen 
Date:   2016-09-29T16:09:51Z

[FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and 
don't block on polls

Letting the Kafka commit block on polls means that 
'notifyCheckpointComplete()' may take
very long. This is mostly relevant for low-throughput Kafka topics.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager

2016-09-29 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2571
  
CC @beyond1920 @KurtYoung 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4711) TaskManager can crash due to failing onPartitionStateUpdate call

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533193#comment-15533193
 ] 

ASF GitHub Bot commented on FLINK-4711:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2569#discussion_r81173562
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
 ---
@@ -92,16 +92,6 @@ object TaskMessages {
   // 
--
 
   /**
-   * Answer to a [[RequestPartitionState]] with the state of the 
respective partition.
-   */
-  case class PartitionState(
--- End diff --

Yes that is also true. Will fix it.


> TaskManager can crash due to failing onPartitionStateUpdate call
> 
>
> Key: FLINK-4711
> URL: https://issues.apache.org/jira/browse/FLINK-4711
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> The {{TaskManager}} can crash because it calls 
> {{Task.onPartitionStateUpdate}} when it receives a {{PartitionState}} 
> message. The {{onPartitionStateUpdate}} method can throw an {{IOException}} 
> or {{InterruptedException}} which are not handled on the {{TaskManager}} 
> level.
> Another problem is that the initial partition state request is triggered 
> within the {{SingleInputGate}}. The request causes the {{JobManager}} to send 
> a {{PartitionState}} message to the {{TaskManager}} which forwards it to the 
> {{Task}}. If the at any of these points a message gets lost, then it is not 
> retried and the partition state remains unknown.
> In order to handle the exceptions, to make the data flow clearer and to add 
> automatic retries, I propose to let the {{Task}} send the partition state 
> check requests. Furthermore, the {{JobManager}} should directly answer to the 
> {{Task}} by replying to an ask operation. That way the message does not have 
> to be routed through the {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4711) TaskManager can crash due to failing onPartitionStateUpdate call

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533194#comment-15533194
 ] 

ASF GitHub Bot commented on FLINK-4711:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2569#discussion_r81173599
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -873,8 +873,7 @@ class JobManager(
   }
 
   sender ! decorateMessage(
-PartitionState(
-  taskExecutionId,
+new org.apache.flink.runtime.io.network.PartitionState(
--- End diff --

Yes that's true. Will fix it.


> TaskManager can crash due to failing onPartitionStateUpdate call
> 
>
> Key: FLINK-4711
> URL: https://issues.apache.org/jira/browse/FLINK-4711
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> The {{TaskManager}} can crash because it calls 
> {{Task.onPartitionStateUpdate}} when it receives a {{PartitionState}} 
> message. The {{onPartitionStateUpdate}} method can throw an {{IOException}} 
> or {{InterruptedException}} which are not handled on the {{TaskManager}} 
> level.
> Another problem is that the initial partition state request is triggered 
> within the {{SingleInputGate}}. The request causes the {{JobManager}} to send 
> a {{PartitionState}} message to the {{TaskManager}} which forwards it to the 
> {{Task}}. If the at any of these points a message gets lost, then it is not 
> retried and the partition state remains unknown.
> In order to handle the exceptions, to make the data flow clearer and to add 
> automatic retries, I propose to let the {{Task}} send the partition state 
> check requests. Furthermore, the {{JobManager}} should directly answer to the 
> {{Task}} by replying to an ask operation. That way the message does not have 
> to be routed through the {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2569: [FLINK-4711] Let the Task trigger partition state ...

2016-09-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2569#discussion_r81173562
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
 ---
@@ -92,16 +92,6 @@ object TaskMessages {
   // 
--
 
   /**
-   * Answer to a [[RequestPartitionState]] with the state of the 
respective partition.
-   */
-  case class PartitionState(
--- End diff --

Yes that is also true. Will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2569: [FLINK-4711] Let the Task trigger partition state ...

2016-09-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2569#discussion_r81173599
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -873,8 +873,7 @@ class JobManager(
   }
 
   sender ! decorateMessage(
-PartitionState(
-  taskExecutionId,
+new org.apache.flink.runtime.io.network.PartitionState(
--- End diff --

Yes that's true. Will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4711) TaskManager can crash due to failing onPartitionStateUpdate call

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533192#comment-15533192
 ] 

ASF GitHub Bot commented on FLINK-4711:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2569#discussion_r81173481
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java
 ---
@@ -27,15 +26,6 @@
 public interface JobManagerCommunicationFactory {
--- End diff --

Good catch. Will remove it.


> TaskManager can crash due to failing onPartitionStateUpdate call
> 
>
> Key: FLINK-4711
> URL: https://issues.apache.org/jira/browse/FLINK-4711
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> The {{TaskManager}} can crash because it calls 
> {{Task.onPartitionStateUpdate}} when it receives a {{PartitionState}} 
> message. The {{onPartitionStateUpdate}} method can throw an {{IOException}} 
> or {{InterruptedException}} which are not handled on the {{TaskManager}} 
> level.
> Another problem is that the initial partition state request is triggered 
> within the {{SingleInputGate}}. The request causes the {{JobManager}} to send 
> a {{PartitionState}} message to the {{TaskManager}} which forwards it to the 
> {{Task}}. If the at any of these points a message gets lost, then it is not 
> retried and the partition state remains unknown.
> In order to handle the exceptions, to make the data flow clearer and to add 
> automatic retries, I propose to let the {{Task}} send the partition state 
> check requests. Furthermore, the {{JobManager}} should directly answer to the 
> {{Task}} by replying to an ask operation. That way the message does not have 
> to be routed through the {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2569: [FLINK-4711] Let the Task trigger partition state ...

2016-09-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2569#discussion_r81173481
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java
 ---
@@ -27,15 +26,6 @@
 public interface JobManagerCommunicationFactory {
--- End diff --

Good catch. Will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2573: Refactor StreamOperator Hierachy to make Keys Expl...

2016-09-29 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2573

Refactor StreamOperator Hierachy to make Keys Explicit

This is more of a preview PR, I'm not yet completely done with testing and 
making sure that everything works.

R: @StephanEwen 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink refactor-stream-operators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2573.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2573


commit c9049c57d612e18f6051574d9903bb063b46840a
Author: Aljoscha Krettek 
Date:   2016-09-28T09:07:22Z

Refactor StreamOperator Hierachy to make Keys Explicit




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4552) Refactor WindowOperator/Trigger Tests

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533144#comment-15533144
 ] 

ASF GitHub Bot commented on FLINK-4552:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2572

[FLINK-4552] Refactor WindowOperator/Trigger Tests

This builds on #2570 

Before, tests for WindowOperator, WindowAssigner, Trigger and WindowFunction
were all conflated in WindowOperatorTest. All of these tested that a
certain combination of a Trigger, WindowAssigner and WindowFunction produce
the expected output.

This change modularizes these tests and spreads them out across multiple
files. For example, one per trigger/window assigner.

The new WindowOperatorTest tests verify that the interaction between
WindowOperator and the various other parts works as expected, that the
correct methods on Trigger and WindowFunction are called at the expected
time and that snapshotting, timers, cleanup etc. work correctly. These tests
also verify that the different state types and WindowFunctions work 
correctly.

For trigger tests this introduces TriggerTestHarness. This can be used
to inject elements into Triggers they fire at the correct times. The
actual output of the WindowFunction is not important for these tests.
The new tests also make sure that triggers correctly clean up state and 
timers.

WindowAssigner tests verify the behaviour of window assigners in isolation.
They also test, for example, whether offset parameter of time-based windows
work correctly.

R: @StephanEwen @StefanRRichter @kl0u for review

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink window-test-refactor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2572


commit 1a09d9032bf5683a378a7fc8dc480f2d14c5924d
Author: Aljoscha Krettek 
Date:   2016-09-25T18:58:16Z

Rename TimeServiceProvider to ProcessingTimeService

The name is clashing with the soon-to-be-added
TimerService/InternalTimerService which is meant as an interface for
dealing with both processing time and event time.

TimeServiceProvided is renamed to ProcessingTimeService to reflect the
fact that it is a low-level utility that only deals with "physical"
processing-time trigger tasks.

commit 758827c3c8508ef9ef2ec079ff3a8469d0096ca8
Author: Aljoscha Krettek 
Date:   2016-09-28T13:10:35Z

Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests

commit f6dd9c74dc2c58c4263fb6d084651b514898d47a
Author: Aljoscha Krettek 
Date:   2016-09-28T14:35:33Z

Use Processing-Time Service of TestHarness in WindowOperatorTest

Before, this was manually creating a TestProcessingTimeService, now,
we're using the one that is there by default in
OneInputStreamOperatorTestHarness.

commit 65389d66c5586e6707b7a6bf48df512354fac085
Author: Aljoscha Krettek 
Date:   2016-09-28T14:43:40Z

Refactor OperatorTestHarness to always use TestProcessingTimeService

Before, this would allow handing in a custom ProcessingTimeService but
this was in reality always TestProcessingTimeService.

commit 1d013bcacc040552e5783c64d094ec309014457b
Author: Aljoscha Krettek 
Date:   2016-09-28T13:12:26Z

Use TestHarness Processing-time Facility in BucketingSinkTest

Before, this was manually creating a TestProcessingTimeService. Now we
use the one that is there by default in
OneInputStreamOperatorTestHarness.

commit eaf3dd00fefeb2487c7cafff6337123cbe42874b
Author: Aljoscha Krettek 
Date:   2016-09-28T13:32:24Z

Use OperatorTestHarness in AlignedWindowOperator Tests

commit b597d2ef50c27554b83fddaff8873107265340d4
Author: Aljoscha Krettek 
Date:   2016-09-29T14:04:29Z

Refactor Operator TestHarnesses to use Common Base Class

This also introduces KeyedTwoInputStreamOperatorTestHarness which
is similar to KeyedOneInputStreamOperatorTestHarness

commit 58b16b26e07b6100f89e9deec63f0decb751f0e6
Author: Aljoscha Krettek 
Date:   2016-09-26T14:21:51Z

[FLINK-3674] Add an interface for Time aware User Functions

This moves the event-time/processing-time trigger code from
WindowOperator behind a well defined interface that can be used by
operators (and user functions).

InternalTimerService is the new interface that has the same
functionality that WindowOperator used to have. TimerService is the user
facing interface that does not allow dealing with namespaces/payloads
and also does not allow deleting timers. There is a default
i

[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests

2016-09-29 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2572

[FLINK-4552] Refactor WindowOperator/Trigger Tests

This builds on #2570 

Before, tests for WindowOperator, WindowAssigner, Trigger and WindowFunction
were all conflated in WindowOperatorTest. All of these tested that a
certain combination of a Trigger, WindowAssigner and WindowFunction produce
the expected output.

This change modularizes these tests and spreads them out across multiple
files. For example, one per trigger/window assigner.

The new WindowOperatorTest tests verify that the interaction between
WindowOperator and the various other parts works as expected, that the
correct methods on Trigger and WindowFunction are called at the expected
time and that snapshotting, timers, cleanup etc. work correctly. These tests
also verify that the different state types and WindowFunctions work 
correctly.

For trigger tests this introduces TriggerTestHarness. This can be used
to inject elements into Triggers they fire at the correct times. The
actual output of the WindowFunction is not important for these tests.
The new tests also make sure that triggers correctly clean up state and 
timers.

WindowAssigner tests verify the behaviour of window assigners in isolation.
They also test, for example, whether offset parameter of time-based windows
work correctly.

R: @StephanEwen @StefanRRichter @kl0u for review

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink window-test-refactor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2572


commit 1a09d9032bf5683a378a7fc8dc480f2d14c5924d
Author: Aljoscha Krettek 
Date:   2016-09-25T18:58:16Z

Rename TimeServiceProvider to ProcessingTimeService

The name is clashing with the soon-to-be-added
TimerService/InternalTimerService which is meant as an interface for
dealing with both processing time and event time.

TimeServiceProvided is renamed to ProcessingTimeService to reflect the
fact that it is a low-level utility that only deals with "physical"
processing-time trigger tasks.

commit 758827c3c8508ef9ef2ec079ff3a8469d0096ca8
Author: Aljoscha Krettek 
Date:   2016-09-28T13:10:35Z

Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests

commit f6dd9c74dc2c58c4263fb6d084651b514898d47a
Author: Aljoscha Krettek 
Date:   2016-09-28T14:35:33Z

Use Processing-Time Service of TestHarness in WindowOperatorTest

Before, this was manually creating a TestProcessingTimeService, now,
we're using the one that is there by default in
OneInputStreamOperatorTestHarness.

commit 65389d66c5586e6707b7a6bf48df512354fac085
Author: Aljoscha Krettek 
Date:   2016-09-28T14:43:40Z

Refactor OperatorTestHarness to always use TestProcessingTimeService

Before, this would allow handing in a custom ProcessingTimeService but
this was in reality always TestProcessingTimeService.

commit 1d013bcacc040552e5783c64d094ec309014457b
Author: Aljoscha Krettek 
Date:   2016-09-28T13:12:26Z

Use TestHarness Processing-time Facility in BucketingSinkTest

Before, this was manually creating a TestProcessingTimeService. Now we
use the one that is there by default in
OneInputStreamOperatorTestHarness.

commit eaf3dd00fefeb2487c7cafff6337123cbe42874b
Author: Aljoscha Krettek 
Date:   2016-09-28T13:32:24Z

Use OperatorTestHarness in AlignedWindowOperator Tests

commit b597d2ef50c27554b83fddaff8873107265340d4
Author: Aljoscha Krettek 
Date:   2016-09-29T14:04:29Z

Refactor Operator TestHarnesses to use Common Base Class

This also introduces KeyedTwoInputStreamOperatorTestHarness which
is similar to KeyedOneInputStreamOperatorTestHarness

commit 58b16b26e07b6100f89e9deec63f0decb751f0e6
Author: Aljoscha Krettek 
Date:   2016-09-26T14:21:51Z

[FLINK-3674] Add an interface for Time aware User Functions

This moves the event-time/processing-time trigger code from
WindowOperator behind a well defined interface that can be used by
operators (and user functions).

InternalTimerService is the new interface that has the same
functionality that WindowOperator used to have. TimerService is the user
facing interface that does not allow dealing with namespaces/payloads
and also does not allow deleting timers. There is a default
implementation in HeapInternalTimerService that can checkpoint timers to
a stream and also restore from a stream. Right now, this is managed in
AbstractStreamOperator and operators can ask for an
InternalTimerService.

This also adds tes

[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533113#comment-15533113
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2571

[FLINK-4348] Simplify logic of SlotManager

This pull request is split up into two commits:

1. It removes some code from the `SlotManager` to make it simpler. 
  - It makes use of `handleSlotRequestFailedAtTaskManager` and simplifies 
it because we can assume that a Slot is only registered once if we talk to the 
same instance of a TaskExecutor. Further, we can omit to check the free slots 
because we previously removed the slot from the free slots.
  - `updateSlotStatus` only deals with new Task slots. All other updates 
are performed by the `ResourceManager`. In case the ResourceManager creashes, 
it will re-register all slot statuses.

2. It fences `TaskExecutor` messages using an `InstanceID` which is 
required to make 1 work correctly. New messages have been introduced to achieve 
that.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink flip-6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2571.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2571


commit 1d297403e5e085d944d80b498c4a269a794f8e2f
Author: Maximilian Michels 
Date:   2016-09-29T13:08:32Z

[FLINK-4347] change assumptions in SlotManager allocation

- Makes use of `handleSlotRequestFailedAtTaskManager` and simplifies it
because we can assume that a Slot is only registered once if we talk to
the same instance of a TaskExecutor. Further, we can omit to check the
free slots because we previously removed the slot from the free slots.

- `updateSlotStatus` only deals with new Task slots. All other updates
are performed by the `ResourceManager`. In case the ResourceManager
creashes, it will re-register all slot statuses.

commit c6768524c869ecdb84f0a8ae48837afc329c9714
Author: Maximilian Michels 
Date:   2016-09-29T14:03:39Z

[FLINK-4348] discard message from old TaskExecutorGateways

This fences old message using the InstanceID of the TaskExecutor.




> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the sl

[GitHub] flink pull request #2571: [FLINK-4348] Simplify logic of SlotManager

2016-09-29 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2571

[FLINK-4348] Simplify logic of SlotManager

This pull request is split up into two commits:

1. It removes some code from the `SlotManager` to make it simpler. 
  - It makes use of `handleSlotRequestFailedAtTaskManager` and simplifies 
it because we can assume that a Slot is only registered once if we talk to the 
same instance of a TaskExecutor. Further, we can omit to check the free slots 
because we previously removed the slot from the free slots.
  - `updateSlotStatus` only deals with new Task slots. All other updates 
are performed by the `ResourceManager`. In case the ResourceManager creashes, 
it will re-register all slot statuses.

2. It fences `TaskExecutor` messages using an `InstanceID` which is 
required to make 1 work correctly. New messages have been introduced to achieve 
that.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink flip-6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2571.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2571


commit 1d297403e5e085d944d80b498c4a269a794f8e2f
Author: Maximilian Michels 
Date:   2016-09-29T13:08:32Z

[FLINK-4347] change assumptions in SlotManager allocation

- Makes use of `handleSlotRequestFailedAtTaskManager` and simplifies it
because we can assume that a Slot is only registered once if we talk to
the same instance of a TaskExecutor. Further, we can omit to check the
free slots because we previously removed the slot from the free slots.

- `updateSlotStatus` only deals with new Task slots. All other updates
are performed by the `ResourceManager`. In case the ResourceManager
creashes, it will re-register all slot statuses.

commit c6768524c869ecdb84f0a8ae48837afc329c9714
Author: Maximilian Michels 
Date:   2016-09-29T14:03:39Z

[FLINK-4348] discard message from old TaskExecutorGateways

This fences old message using the InstanceID of the TaskExecutor.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2569: [FLINK-4711] Let the Task trigger partition state ...

2016-09-29 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2569#discussion_r81164326
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
 ---
@@ -92,16 +92,6 @@ object TaskMessages {
   // 
--
 
   /**
-   * Answer to a [[RequestPartitionState]] with the state of the 
respective partition.
-   */
-  case class PartitionState(
--- End diff --

Has some unused imports after removal that we could remove before merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4711) TaskManager can crash due to failing onPartitionStateUpdate call

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533098#comment-15533098
 ] 

ASF GitHub Bot commented on FLINK-4711:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2569#discussion_r81162283
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java
 ---
@@ -27,15 +26,6 @@
 public interface JobManagerCommunicationFactory {
--- End diff --

This has been become obsolete with your change and is unused. Let's remove 
it completely?


> TaskManager can crash due to failing onPartitionStateUpdate call
> 
>
> Key: FLINK-4711
> URL: https://issues.apache.org/jira/browse/FLINK-4711
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> The {{TaskManager}} can crash because it calls 
> {{Task.onPartitionStateUpdate}} when it receives a {{PartitionState}} 
> message. The {{onPartitionStateUpdate}} method can throw an {{IOException}} 
> or {{InterruptedException}} which are not handled on the {{TaskManager}} 
> level.
> Another problem is that the initial partition state request is triggered 
> within the {{SingleInputGate}}. The request causes the {{JobManager}} to send 
> a {{PartitionState}} message to the {{TaskManager}} which forwards it to the 
> {{Task}}. If the at any of these points a message gets lost, then it is not 
> retried and the partition state remains unknown.
> In order to handle the exceptions, to make the data flow clearer and to add 
> automatic retries, I propose to let the {{Task}} send the partition state 
> check requests. Furthermore, the {{JobManager}} should directly answer to the 
> {{Task}} by replying to an ask operation. That way the message does not have 
> to be routed through the {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2569: [FLINK-4711] Let the Task trigger partition state ...

2016-09-29 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2569#discussion_r81162283
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java
 ---
@@ -27,15 +26,6 @@
 public interface JobManagerCommunicationFactory {
--- End diff --

This has been become obsolete with your change and is unused. Let's remove 
it completely?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4711) TaskManager can crash due to failing onPartitionStateUpdate call

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533100#comment-15533100
 ] 

ASF GitHub Bot commented on FLINK-4711:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2569#discussion_r81164597
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -873,8 +873,7 @@ class JobManager(
   }
 
   sender ! decorateMessage(
-PartitionState(
-  taskExecutionId,
+new org.apache.flink.runtime.io.network.PartitionState(
--- End diff --

I think it's not ambiguous anymore after removal of the TaskControlMessage 
and we can directly use `PartitionState`?


> TaskManager can crash due to failing onPartitionStateUpdate call
> 
>
> Key: FLINK-4711
> URL: https://issues.apache.org/jira/browse/FLINK-4711
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> The {{TaskManager}} can crash because it calls 
> {{Task.onPartitionStateUpdate}} when it receives a {{PartitionState}} 
> message. The {{onPartitionStateUpdate}} method can throw an {{IOException}} 
> or {{InterruptedException}} which are not handled on the {{TaskManager}} 
> level.
> Another problem is that the initial partition state request is triggered 
> within the {{SingleInputGate}}. The request causes the {{JobManager}} to send 
> a {{PartitionState}} message to the {{TaskManager}} which forwards it to the 
> {{Task}}. If the at any of these points a message gets lost, then it is not 
> retried and the partition state remains unknown.
> In order to handle the exceptions, to make the data flow clearer and to add 
> automatic retries, I propose to let the {{Task}} send the partition state 
> check requests. Furthermore, the {{JobManager}} should directly answer to the 
> {{Task}} by replying to an ask operation. That way the message does not have 
> to be routed through the {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2569: [FLINK-4711] Let the Task trigger partition state ...

2016-09-29 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2569#discussion_r81164597
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -873,8 +873,7 @@ class JobManager(
   }
 
   sender ! decorateMessage(
-PartitionState(
-  taskExecutionId,
+new org.apache.flink.runtime.io.network.PartitionState(
--- End diff --

I think it's not ambiguous anymore after removal of the TaskControlMessage 
and we can directly use `PartitionState`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4711) TaskManager can crash due to failing onPartitionStateUpdate call

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533099#comment-15533099
 ] 

ASF GitHub Bot commented on FLINK-4711:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2569#discussion_r81164326
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
 ---
@@ -92,16 +92,6 @@ object TaskMessages {
   // 
--
 
   /**
-   * Answer to a [[RequestPartitionState]] with the state of the 
respective partition.
-   */
-  case class PartitionState(
--- End diff --

Has some unused imports after removal that we could remove before merging


> TaskManager can crash due to failing onPartitionStateUpdate call
> 
>
> Key: FLINK-4711
> URL: https://issues.apache.org/jira/browse/FLINK-4711
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> The {{TaskManager}} can crash because it calls 
> {{Task.onPartitionStateUpdate}} when it receives a {{PartitionState}} 
> message. The {{onPartitionStateUpdate}} method can throw an {{IOException}} 
> or {{InterruptedException}} which are not handled on the {{TaskManager}} 
> level.
> Another problem is that the initial partition state request is triggered 
> within the {{SingleInputGate}}. The request causes the {{JobManager}} to send 
> a {{PartitionState}} message to the {{TaskManager}} which forwards it to the 
> {{Task}}. If the at any of these points a message gets lost, then it is not 
> retried and the partition state remains unknown.
> In order to handle the exceptions, to make the data flow clearer and to add 
> automatic retries, I propose to let the {{Task}} send the partition state 
> check requests. Furthermore, the {{JobManager}} should directly answer to the 
> {{Task}} by replying to an ask operation. That way the message does not have 
> to be routed through the {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4712) Implementing ranking predictions for ALS

2016-09-29 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Domokos Miklós Kelen updated FLINK-4712:

Description: 
We started working on implementing ranking predictions for recommender systems. 
Ranking prediction means that beside predicting scores for user-item pairs, the 
recommender system is able to recommend a top K list for the users.

Details:

In practice, this would mean finding the K items for a particular user with the 
highest predicted rating. It should be possible also to specify whether to 
exclude the already seen items from a particular user's toplist. (See for 
example the 'exclude_known' setting of [Graphlab Create's ranking factorization 
recommender|https://turi.com/products/create/docs/generated/graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend.html#graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend]
 ).

The output of the topK recommendation function could be in the form of 
{{DataSet[(Int,Int,Int)]}}, meaning (user, item, rank), similar to Graphlab 
Create's output. However, this is arguable: follow up work includes 
implementing ranking recommendation evaluation metrics (such as precision@k, 
recall@k, ndcg@k), similar to [Spark's 
implementations|https://spark.apache.org/docs/1.5.0/mllib-evaluation-metrics.html#ranking-systems].
 It would be beneficial if we were able to design the API such that it could be 
included in the proposed evaluation framework (see 
[5157|https://issues.apache.org/jira/browse/FLINK-2157]), which makes it 
neccessary to consider the possible output type {{DataSet[(Int, Array[Int])]}} 
or {{DataSet[(Int, Array[(Int,Double)])]}} meaning (user, array of items), 
possibly including the predicted scores as well. See 
[4713|https://issues.apache.org/jira/browse/FLINK-4713] for details.

Another question arising is whether to provide this function as a member of the 
ALS class, as a switch-kind of parameter to the ALS implementation (meaning the 
model is either a rating or a ranking recommender model) or in some other way.

  was:
We started working on implementing ranking predictions for recommender systems. 
Ranking prediction means that beside predicting scores for user-item pairs, the 
recommender system is able to recommend a top K list for the users.

Details:

In practice, this would mean finding the K items for a particular user with the 
highest predicted rating. It should be possible also to specify whether to 
exclude the already seen items from a particular user's toplist. (See for 
example the 'exclude_known' setting of [Graphlab Create's ranking factorization 
recommender|https://turi.com/products/create/docs/generated/graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend.html#graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend]
 ).

The output of the topK recommendation function could be in the form of 
{{DataSet[(Int,Int,Int)]}}, meaning (user, item, rank), similar to Graphlab 
Create's output. However, this is arguable: follow up work includes 
implementing ranking recommendation evaluation metrics (such as precision@k, 
recall@k, ndcg@k), similar to [Spark's 
implementations|https://spark.apache.org/docs/1.5.0/mllib-evaluation-metrics.html#ranking-systems].
 It would be beneficial if we were able to design the API such that it could be 
included in the proposed evaluation framework (see 
[5157|https://issues.apache.org/jira/browse/FLINK-2157]), which makes it 
neccessary to consider the possible output type {{DataSet[(Int, Array[Int])]}} 
or {{DataSet[(Int, Array[(Int,Double)])]}} meaning (user, array of items), 
possibly including the predicted scores as well. See [issue todo] for details.

Another question arising is whether to provide this function as a member of the 
ALS class, as a switch-kind of parameter to the ALS implementation (meaning the 
model is either a rating or a ranking recommender model) or in some other way.


> Implementing ranking predictions for ALS
> 
>
> Key: FLINK-4712
> URL: https://issues.apache.org/jira/browse/FLINK-4712
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Domokos Miklós Kelen
>
> We started working on implementing ranking predictions for recommender 
> systems. Ranking prediction means that beside predicting scores for user-item 
> pairs, the recommender system is able to recommend a top K list for the users.
> Details:
> In practice, this would mean finding the K items for a particular user with 
> the highest predicted rating. It should be possible also to specify whether 
> to exclude the already seen items from a particular user's toplist. (See for 
> 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-09-29 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2570

[FLINK-3674] Add an interface for Time aware User Functions

This moves the event-time/processing-time trigger code from
`WindowOperator` behind a well defined interface that can be used by
operators (and user functions).

`InternalTimerService` is the new interface that has the same
functionality that `WindowOperator` used to have. `TimerService` is the user
facing interface that does not allow dealing with namespaces/payloads
and also does not allow deleting timers. There is a default
implementation in `HeapInternalTimerService` that can checkpoint timers to
a stream and also restore from a stream. Right now, this is managed in
`AbstractStreamOperator` and operators can ask for an
`InternalTimerService`.

This also adds tests for `HeapInternalTimerService`.

This adds two new user functions:
 - `TimelyFlatMapFunction`: an extension of `FlatMapFunction` that also
   allows querying time and setting timers
 - `TimelyCoFlatMapFunction`: the same, but for `CoFlatMapFunction`

There are two new `StreamOperator` implementations for these that use the
`InternalTimerService` interface.

This also adds tests for the two new operators.

This also adds the new interface `KeyContext` that is used for
setting/querying the current key context for state and timers. Timers
are always scoped to a key, for now.

Also, this moves the handling of watermarks for both one-input and
two-input operators to `AbstractStreamOperators` so that we have a central
ground-truth.

There was also a bunch of small changes that I had to do to make the proper 
change more clean. I would like to keep these as separate commits because they 
clearly document what was going on.

## Note for Reviewers
You should probably start from the tests, i.e. 
`HeapInternalTimerServiceTest`, `TimelyFlatMapTest` and `TimelyCoFlatMapTest`. 
Then, the other interesting bits are `AbstractStreamOperator` that now deals 
with watermarks and checkpointing the timers and the `HeapInternalTimerService` 
as well. Keep in mind that this is just moving the code from `WindowOperator` 
to `HeapInternalTimerService` with some generalizations. I didn't try to 
optimize any of the data structures that are used.

R: @StephanEwen @StefanRRichter @kl0u for review, please 😃 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink timely-function

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2570.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2570


commit 1a09d9032bf5683a378a7fc8dc480f2d14c5924d
Author: Aljoscha Krettek 
Date:   2016-09-25T18:58:16Z

Rename TimeServiceProvider to ProcessingTimeService

The name is clashing with the soon-to-be-added
TimerService/InternalTimerService which is meant as an interface for
dealing with both processing time and event time.

TimeServiceProvided is renamed to ProcessingTimeService to reflect the
fact that it is a low-level utility that only deals with "physical"
processing-time trigger tasks.

commit 758827c3c8508ef9ef2ec079ff3a8469d0096ca8
Author: Aljoscha Krettek 
Date:   2016-09-28T13:10:35Z

Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests

commit f6dd9c74dc2c58c4263fb6d084651b514898d47a
Author: Aljoscha Krettek 
Date:   2016-09-28T14:35:33Z

Use Processing-Time Service of TestHarness in WindowOperatorTest

Before, this was manually creating a TestProcessingTimeService, now,
we're using the one that is there by default in
OneInputStreamOperatorTestHarness.

commit 65389d66c5586e6707b7a6bf48df512354fac085
Author: Aljoscha Krettek 
Date:   2016-09-28T14:43:40Z

Refactor OperatorTestHarness to always use TestProcessingTimeService

Before, this would allow handing in a custom ProcessingTimeService but
this was in reality always TestProcessingTimeService.

commit 1d013bcacc040552e5783c64d094ec309014457b
Author: Aljoscha Krettek 
Date:   2016-09-28T13:12:26Z

Use TestHarness Processing-time Facility in BucketingSinkTest

Before, this was manually creating a TestProcessingTimeService. Now we
use the one that is there by default in
OneInputStreamOperatorTestHarness.

commit eaf3dd00fefeb2487c7cafff6337123cbe42874b
Author: Aljoscha Krettek 
Date:   2016-09-28T13:32:24Z

Use OperatorTestHarness in AlignedWindowOperator Tests

commit b597d2ef50c27554b83fddaff8873107265340d4
Author: Aljoscha Krettek 
Date:   2016-09-29T14:04:29Z

Refactor Operator TestHarnesses to use Common Base Class

This also introduces KeyedTwoInputStreamOperato

[jira] [Commented] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533076#comment-15533076
 ] 

ASF GitHub Bot commented on FLINK-4068:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2560#discussion_r81163353
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
 ---
@@ -60,6 +60,7 @@ class FlinkPlannerImpl(
   var root: RelRoot = _
 
   private def ready() {
+planner.setExecutor(config.getExecutor)
--- End diff --

I would not set the planner here as this class is only used by the SQL API. 
`FlinkRelBuilder.create()` is a better place.


> Move constant computations out of code-generated `flatMap` functions.
> -
>
> Key: FLINK-4068
> URL: https://issues.apache.org/jira/browse/FLINK-4068
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> The generated functions for expressions of the Table API or SQL include 
> constant computations.
> For instance the code generated for a predicate like:
> {code}
> myInt < (10 + 20)
> {code}
> looks roughly like:
> {code}
> public void flatMap(Row in, Collector out) {
>   Integer in1 = in.productElement(1);
>   int temp = 10 + 20;  
>   if (in1 < temp) {
> out.collect(in)
>   }
> }
> {code}
> In this example the computation of {{temp}} is constant and could be moved 
> out of the {{flatMap()}} method.
> The same might apply for generated function other than {{FlatMap}} as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >