[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643392#comment-15643392
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
Maybe this is a solution. We will not think about changing the type of 
`checkpointLock`, which is an `Object` ans quite efficient, and not change the 
order of  `broadcastBarriers` and `operator.snapshotState()`. By placing 
**pause** `EmitterThread` codes in `StreamTask.performCheckpoint()`, like this: 
 

`   private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws Exception {

LOG.debug("Starting checkpoint {} on task {}", 
checkpointMetaData.getCheckpointId(), getName());

synchronized (lock) {
if (isRunning) {
// stop working threads first.
for (StreamOperator operator : 
operatorChain.getAllOperators()) {
if (operator instanceof 
AsyncWaitOperator) {
operator.pauseEmitterThread();
}
}

// broadcast barriers after snapshot operators' 
states.
operatorChain.broadcastCheckpointBarrier(

checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()
);

checkpointState(checkpointMetaData);

return true;
} else {
return false;
}
}
}`


> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...

2016-11-06 Thread bjlovegithub
Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
Maybe this is a solution. We will not think about changing the type of 
`checkpointLock`, which is an `Object` ans quite efficient, and not change the 
order of  `broadcastBarriers` and `operator.snapshotState()`. By placing 
**pause** `EmitterThread` codes in `StreamTask.performCheckpoint()`, like this: 
 

`   private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws Exception {

LOG.debug("Starting checkpoint {} on task {}", 
checkpointMetaData.getCheckpointId(), getName());

synchronized (lock) {
if (isRunning) {
// stop working threads first.
for (StreamOperator operator : 
operatorChain.getAllOperators()) {
if (operator instanceof 
AsyncWaitOperator) {
operator.pauseEmitterThread();
}
}

// broadcast barriers after snapshot operators' 
states.
operatorChain.broadcastCheckpointBarrier(

checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()
);

checkpointState(checkpointMetaData);

return true;
} else {
return false;
}
}
}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642997#comment-15642997
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86709933
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -540,15 +540,12 @@ private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws
 
synchronized (lock) {
if (isRunning) {
+   checkpointState(checkpointMetaData);
 
-   // Since both state checkpointing and 
downstream barrier emission occurs in this
-   // lock scope, they are an atomic operation 
regardless of the order in which they occur.
-   // Given this, we immediately emit the 
checkpoint barriers, so the downstream operators
-   // can start their checkpoint work as soon as 
possible
+   // broadcast barriers after snapshot operators' 
states.
operatorChain.broadcastCheckpointBarrier(
-   
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
-
-   checkpointState(checkpointMetaData);
+   
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()
+   );
--- End diff --

Can you share the test program and test result? I write an sample program 
to test the performance to acquire and release lock with `Object` / 
`ReentrantLock` / `ReentrantReadWriteLock`, finding that for one thread to use 
the lock, they have similar results. But referring to multiple thread cases, 
`ReentrantLock` and ` ReentrantReadWriteLock` excels `Object` lock.
Here is my benchmark program: 
[link](https://github.com/bjlovegithub/JavaLockTest/blob/master/src/LockTest.java)
And this is the sampled results run on my laptop: 
[stat](https://github.com/bjlovegithub/JavaLockTest/blob/master/stat/LockTest_result.data)


> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-06 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86709933
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -540,15 +540,12 @@ private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws
 
synchronized (lock) {
if (isRunning) {
+   checkpointState(checkpointMetaData);
 
-   // Since both state checkpointing and 
downstream barrier emission occurs in this
-   // lock scope, they are an atomic operation 
regardless of the order in which they occur.
-   // Given this, we immediately emit the 
checkpoint barriers, so the downstream operators
-   // can start their checkpoint work as soon as 
possible
+   // broadcast barriers after snapshot operators' 
states.
operatorChain.broadcastCheckpointBarrier(
-   
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
-
-   checkpointState(checkpointMetaData);
+   
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()
+   );
--- End diff --

Can you share the test program and test result? I write an sample program 
to test the performance to acquire and release lock with `Object` / 
`ReentrantLock` / `ReentrantReadWriteLock`, finding that for one thread to use 
the lock, they have similar results. But referring to multiple thread cases, 
`ReentrantLock` and ` ReentrantReadWriteLock` excels `Object` lock.
Here is my benchmark program: 
[link](https://github.com/bjlovegithub/JavaLockTest/blob/master/src/LockTest.java)
And this is the sampled results run on my laptop: 
[stat](https://github.com/bjlovegithub/JavaLockTest/blob/master/stat/LockTest_result.data)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642901#comment-15642901
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
Good point ;D

Emm, I think we have to override the 
`StreamOpeartor.notifyCheckpointComplete()` for `AsyncWaitOpeartor` so that 
once the `TaskManager` notifies the `Task` that checkpoint has completed, the 
`EmitterThread` can start working as soon as possible.


> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...

2016-11-06 Thread bjlovegithub
Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
Good point ;D

Emm, I think we have to override the 
`StreamOpeartor.notifyCheckpointComplete()` for `AsyncWaitOpeartor` so that 
once the `TaskManager` notifies the `Task` that checkpoint has completed, the 
`EmitterThread` can start working as soon as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642872#comment-15642872
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86706353
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
 ---
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Tests for {@link AsyncCollectorBuffer}. These test that:
+ *
+ * 
+ * Add a new item into the buffer
+ * Ordered mode processing
+ * Unordered mode processing
+ * Error handling
+ * 
+ */
+public class AsyncCollectorBufferTest {
+   private AsyncFunction function;
+
+   private AsyncWaitOperator operator;
+
+   private AsyncCollectorBuffer buffer;
+
+   private Output output;
+
+   @Before
+   public void setUp() throws Exception {
+   function = new AsyncFunction() {
+   @Override
+   public void asyncInvoke(Integer input, 
AsyncCollector collector) throws Exception {
+
+   }
+   };
+
+   operator = new AsyncWaitOperator<>(function);
+   Class[] classes = 
AbstractStreamOperator.class.getDeclaredClasses();
+   Class latencyClass = null;
+   for (Class c : classes) {
+   if (c.getName().indexOf("LatencyGauge") != -1) {
+   latencyClass = c;
+   }
+   }
+
+   Constructor explicitConstructor = 
latencyClass.getDeclaredConstructors()[0];
+   explicitConstructor.setAccessible(true);
+   Whitebox.setInternalState(operator, "latencyGauge", 
explicitConstructor.newInstance(10));
+
+   output = new FakedOutput(new ArrayList());
+   TimestampedCollector collector =new 
TimestampedCollector(output);
+   buffer =
+   new AsyncCollectorBuffer<>(3, 
AsyncDataStream.OutputMode.ORDERED, operator);
+   buffer.setOutput(collector, output);
+
+   Whitebox.setInternalState(operator, "output", output);
+   }
+
+   @Test
+   public void testAdd() throws Exception {
+   Thread.sleep(1000);
+   buffer.add(new Watermark(0l));
+   buffer.add(new LatencyMarker(111L, 1, 1));
+   Assert.assertEquals(((SimpleLinkedList) 
Whitebox.getInternalState(buffer, "queue")).size(), 2);
+   Assert.assertEquals(((Map) Whitebox.getInternalState(buffer, 

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-06 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86706353
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
 ---
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Tests for {@link AsyncCollectorBuffer}. These test that:
+ *
+ * 
+ * Add a new item into the buffer
+ * Ordered mode processing
+ * Unordered mode processing
+ * Error handling
+ * 
+ */
+public class AsyncCollectorBufferTest {
+   private AsyncFunction function;
+
+   private AsyncWaitOperator operator;
+
+   private AsyncCollectorBuffer buffer;
+
+   private Output output;
+
+   @Before
+   public void setUp() throws Exception {
+   function = new AsyncFunction() {
+   @Override
+   public void asyncInvoke(Integer input, 
AsyncCollector collector) throws Exception {
+
+   }
+   };
+
+   operator = new AsyncWaitOperator<>(function);
+   Class[] classes = 
AbstractStreamOperator.class.getDeclaredClasses();
+   Class latencyClass = null;
+   for (Class c : classes) {
+   if (c.getName().indexOf("LatencyGauge") != -1) {
+   latencyClass = c;
+   }
+   }
+
+   Constructor explicitConstructor = 
latencyClass.getDeclaredConstructors()[0];
+   explicitConstructor.setAccessible(true);
+   Whitebox.setInternalState(operator, "latencyGauge", 
explicitConstructor.newInstance(10));
+
+   output = new FakedOutput(new ArrayList());
+   TimestampedCollector collector =new 
TimestampedCollector(output);
+   buffer =
+   new AsyncCollectorBuffer<>(3, 
AsyncDataStream.OutputMode.ORDERED, operator);
+   buffer.setOutput(collector, output);
+
+   Whitebox.setInternalState(operator, "output", output);
+   }
+
+   @Test
+   public void testAdd() throws Exception {
+   Thread.sleep(1000);
+   buffer.add(new Watermark(0l));
+   buffer.add(new LatencyMarker(111L, 1, 1));
+   Assert.assertEquals(((SimpleLinkedList) 
Whitebox.getInternalState(buffer, "queue")).size(), 2);
+   Assert.assertEquals(((Map) Whitebox.getInternalState(buffer, 
"collectorToStreamElement")).size(), 2);
+   Assert.assertEquals(((Map) Whitebox.getInternalState(buffer, 
"collectorToQueue")).size(), 2);
+
+   AsyncCollector collector = (AsyncCollector)((SimpleLinkedList) 

[jira] [Comment Edited] (FLINK-4856) Add MapState for keyed streams

2016-11-06 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642868#comment-15642868
 ] 

Xiaogang Shi edited comment on FLINK-4856 at 11/7/16 2:29 AM:
--

I have started the implementation of MapStates. But at prior to that, I think 
we need some modification to current implementation to clarify the concepts. I 
have started two JIRA to state these problems. You may see FLINK-5023 and 
FLINK-5024 for the details.  


was (Author: xiaogang.shi):
I have started the implementation of MapStates. But at prior to that, I think 
we need some modification to currently framework to clarify the concepts. I 
have started two JIRA to state these problems. You may see FLINK-5023 and 
FLINK-5024 for the details.  

> Add MapState for keyed streams
> --
>
> Key: FLINK-4856
> URL: https://issues.apache.org/jira/browse/FLINK-4856
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Many states in keyed streams are organized as key-value pairs. Currently, 
> these states are implemented by storing the entire map into a ValueState or a 
> ListState. The implementation however is very costly because all entries have 
> to be serialized/deserialized when updating a single entry. To improve the 
> efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4856) Add MapState for keyed streams

2016-11-06 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642868#comment-15642868
 ] 

Xiaogang Shi commented on FLINK-4856:
-

I have started the implementation of MapStates. But at prior to that, I think 
we need some modification to currently framework to clarify the concepts. I 
have started two JIRA to state these problems. You may see FLINK-5023 and 
FLINK-5024 for the details.  

> Add MapState for keyed streams
> --
>
> Key: FLINK-4856
> URL: https://issues.apache.org/jira/browse/FLINK-4856
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Many states in keyed streams are organized as key-value pairs. Currently, 
> these states are implemented by storing the entire map into a ValueState or a 
> ListState. The implementation however is very costly because all entries have 
> to be serialized/deserialized when updating a single entry. To improve the 
> efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642862#comment-15642862
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86706002
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList> queue = new 
SimpleLinkedList<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
+*/
+   private final Map, StreamElement> 
collectorToStreamElement = new HashMap<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their node references 
in the #queue.
+*/
+   private final Map, SimpleLinkedList.Node> 
collectorToQueue = new HashMap<>();
+
+   private final LinkedList finishedCollectors = new 
LinkedList<>();
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private TimestampedCollector timestampedCollector;
+   private Output output;
+
+   /**
+* Locks and conditions to synchronize with main thread and emitter 
thread.
+*/
+   private final Lock lock;
+   private final Condition notFull;
+   private final Condition taskDone;
+   private final Condition isEmpty;
+
+   /**
+* Error from user codes.
+*/
+   private volatile Exception error;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private boolean isCheckpointing;
+
+   public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode 
mode, AsyncWaitOperator operator) {
+   Preconditions.checkArgument(maxSize > 0, "Future buffer size 
should be greater than 0.");
+
+   this.bufferSize = maxSize;
+   this.mode = mode;
+   this.operator = operator;
+
+   this.lock = new ReentrantLock(true);
  

[jira] [Created] (FLINK-5024) Add SimpleStateDescriptor to clarify the concepts

2016-11-06 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-5024:
---

 Summary: Add SimpleStateDescriptor to clarify the concepts
 Key: FLINK-5024
 URL: https://issues.apache.org/jira/browse/FLINK-5024
 Project: Flink
  Issue Type: Improvement
Reporter: Xiaogang Shi


Currently, StateDescriptors accept two type arguments : the first one is the 
type of the created state and the second one is the type of the values in the 
states. 

The concepts however is a little confusing here because in ListStates, the 
arguments passed to the StateDescriptors are the types of the list elements 
instead of the lists. It also makes the implementation of MapStates difficult.

I suggest not to put the type serializer in StateDescriptors, making 
StateDescriptors independent of the data structures of the values. 

A new type of StateDescriptor named SimpleStateDescriptor can be provided to 
abstract those states (namely ValueState, ReducingState and FoldingState) whose 
states are not composited. 

The states (e.g. ListStates and MapStates) can implement their own descriptors 
according to their data structures. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-06 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86706002
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList> queue = new 
SimpleLinkedList<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
+*/
+   private final Map, StreamElement> 
collectorToStreamElement = new HashMap<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their node references 
in the #queue.
+*/
+   private final Map, SimpleLinkedList.Node> 
collectorToQueue = new HashMap<>();
+
+   private final LinkedList finishedCollectors = new 
LinkedList<>();
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private TimestampedCollector timestampedCollector;
+   private Output output;
+
+   /**
+* Locks and conditions to synchronize with main thread and emitter 
thread.
+*/
+   private final Lock lock;
+   private final Condition notFull;
+   private final Condition taskDone;
+   private final Condition isEmpty;
+
+   /**
+* Error from user codes.
+*/
+   private volatile Exception error;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private boolean isCheckpointing;
+
+   public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode 
mode, AsyncWaitOperator operator) {
+   Preconditions.checkArgument(maxSize > 0, "Future buffer size 
should be greater than 0.");
+
+   this.bufferSize = maxSize;
+   this.mode = mode;
+   this.operator = operator;
+
+   this.lock = new ReentrantLock(true);
+   this.notFull = this.lock.newCondition();
+   this.taskDone = this.lock.newCondition();
+   this.isEmpty = this.lock.newCondition();
+
+   this.emitter = new Emitter();
+   

[jira] [Created] (FLINK-5023) Add get() method in State interface

2016-11-06 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-5023:
---

 Summary: Add get() method in State interface
 Key: FLINK-5023
 URL: https://issues.apache.org/jira/browse/FLINK-5023
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Xiaogang Shi


Currently, the only method provided by the State interface is `clear()`. I 
think we should provide another method called `get()` to return the structured 
value (e.g., value, list, or map) under the current key. 

In fact, the functionality of `get()` has already been implemented in all types 
of states: e.g., `value()` in ValueState and `get()` in ListState. The 
modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...

2016-11-06 Thread vijikarthi
Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2425
  
>
The cookie is added to every single message/buffer that is transferred. 
That is too much - securing the integrity of the stream is responsibility of 
the encryption layer. The cookie should be added to requests messages that 
establish connections only.

I will change the code to address cookie handling right after the SSL 
handshake using a new handler and drop the cookie passing for every messages. 
The handler will be added to the pipeline of both `NettyClient` and 
`NettyServer`. Client will send the cookie when the channel becomes active and 
the server will validate and keep track of the clients that are authorized. 

Here is the pseudo-code for Client and Server handlers. Please take a look 
and let me know if you are okay with this approach and I will modify the code.

---
public static class ClientCookieHandler extends 
ChannelInboundHandlerAdapter {

private final String secureCookie;

final Charset DEFAULT_CHARSET = Charset.forName("utf-8");

public ClientCookieHandler(String secureCookie) {
this.secureCookie = secureCookie;
}


@Override
public void channelActive(ChannelHandlerContext ctx) throws 
Exception {
super.channelActive(ctx);

if(this.secureCookie != null && 
this.secureCookie.length() != 0) {
final ByteBuf buffer = Unpooled.buffer(4 + 
this.secureCookie.getBytes(DEFAULT_CHARSET).length);

buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length);

buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET));
ctx.writeAndFlush(buffer);
}
}
}

public static class ServerCookieDecoder extends 
MessageToMessageDecoder {

private final String secureCookie;

private final List channelList = new ArrayList<>();

private final Charset DEFAULT_CHARSET = 
Charset.forName("utf-8");

   public ServerCookieDecoder(String secureCookie) {
this.secureCookie = secureCookie;
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, 
List out) throws Exception {

if(secureCookie == null || secureCookie.length() == 0) {
return;
}

if(channelList.contains(ctx.channel())) {
return;
}

//read cookie based on the cookie length passed
int cookieLength = msg.readInt();
if(cookieLength != 
secureCookie.getBytes(DEFAULT_CHARSET).length) {
String message = "Cookie length does not match 
with source cookie. Invalid secure cookie passed.";
throw new IllegalStateException(message);
}

//read only if cookie length is greater than zero
if(cookieLength > 0) {

final byte[] buffer = new 
byte[secureCookie.getBytes(DEFAULT_CHARSET).length];
msg.readBytes(buffer, 0, cookieLength);


if(!Arrays.equals(secureCookie.getBytes(DEFAULT_CHARSET), buffer)) {
LOG.error("Secure cookie from the 
client is not matching with the server's identity");
throw new 
IllegalStateException("Invalid secure cookie passed.");
}

LOG.info("Secure cookie validation passed");

channelList.add(ctx.channel());
}

}
}
--- 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642713#comment-15642713
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2425
  
>
The cookie is added to every single message/buffer that is transferred. 
That is too much - securing the integrity of the stream is responsibility of 
the encryption layer. The cookie should be added to requests messages that 
establish connections only.

I will change the code to address cookie handling right after the SSL 
handshake using a new handler and drop the cookie passing for every messages. 
The handler will be added to the pipeline of both `NettyClient` and 
`NettyServer`. Client will send the cookie when the channel becomes active and 
the server will validate and keep track of the clients that are authorized. 

Here is the pseudo-code for Client and Server handlers. Please take a look 
and let me know if you are okay with this approach and I will modify the code.

---
public static class ClientCookieHandler extends 
ChannelInboundHandlerAdapter {

private final String secureCookie;

final Charset DEFAULT_CHARSET = Charset.forName("utf-8");

public ClientCookieHandler(String secureCookie) {
this.secureCookie = secureCookie;
}


@Override
public void channelActive(ChannelHandlerContext ctx) throws 
Exception {
super.channelActive(ctx);

if(this.secureCookie != null && 
this.secureCookie.length() != 0) {
final ByteBuf buffer = Unpooled.buffer(4 + 
this.secureCookie.getBytes(DEFAULT_CHARSET).length);

buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length);

buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET));
ctx.writeAndFlush(buffer);
}
}
}

public static class ServerCookieDecoder extends 
MessageToMessageDecoder {

private final String secureCookie;

private final List channelList = new ArrayList<>();

private final Charset DEFAULT_CHARSET = 
Charset.forName("utf-8");

   public ServerCookieDecoder(String secureCookie) {
this.secureCookie = secureCookie;
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, 
List out) throws Exception {

if(secureCookie == null || secureCookie.length() == 0) {
return;
}

if(channelList.contains(ctx.channel())) {
return;
}

//read cookie based on the cookie length passed
int cookieLength = msg.readInt();
if(cookieLength != 
secureCookie.getBytes(DEFAULT_CHARSET).length) {
String message = "Cookie length does not match 
with source cookie. Invalid secure cookie passed.";
throw new IllegalStateException(message);
}

//read only if cookie length is greater than zero
if(cookieLength > 0) {

final byte[] buffer = new 
byte[secureCookie.getBytes(DEFAULT_CHARSET).length];
msg.readBytes(buffer, 0, cookieLength);


if(!Arrays.equals(secureCookie.getBytes(DEFAULT_CHARSET), buffer)) {
LOG.error("Secure cookie from the 
client is not matching with the server's identity");
throw new 
IllegalStateException("Invalid secure cookie passed.");
}

LOG.info("Secure cookie validation passed");

channelList.add(ctx.channel());
}

}
}
--- 


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements 

[jira] [Commented] (FLINK-3030) Enhance Dashboard to show Execution Attempts

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642459#comment-15642459
 ] 

ASF GitHub Bot commented on FLINK-3030:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2448#discussion_r86697108
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonGenerators.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+
+import java.io.IOException;
+import java.util.Map;
+
+public final class JsonGenerators {
+   private JsonGenerators() {}
+
+   public static void serializeExecutionAttempt(Execution execAttempt, 
JsonGenerator gen) throws IOException {
+
+   final ExecutionState status = execAttempt.getState();
+   final long now = System.currentTimeMillis();
+
+   InstanceConnectionInfo location = 
execAttempt.getAssignedResourceLocation();
+   String locationString = location == null ? "(unassigned)" : 
location.getHostname();
+
+   long startTime = 
execAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
+   if (startTime == 0) {
+   startTime = -1;
+   }
+   long endTime = status.isTerminal() ? 
execAttempt.getStateTimestamp(status) : -1;
+   long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) 
- startTime) : -1;
+
+   Map metrics = 
execAttempt.getFlinkAccumulators();
+   LongCounter readBytes;
+   LongCounter writeBytes;
+   LongCounter readRecords;
+   LongCounter writeRecords;
+
+   if (metrics != null) {
+   readBytes = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
+   writeBytes = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
+   readRecords = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
+   writeRecords = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
--- End diff --

Ok. I'll rebase the change and fix merge conflicts.


> Enhance Dashboard to show Execution Attempts
> 
>
> Key: FLINK-3030
> URL: https://issues.apache.org/jira/browse/FLINK-3030
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
> Fix For: 1.0.0
>
>
> Currently, the web dashboard shows only the latest execution attempt. We 
> should make all execution attempts and their accumulators available for 
> inspection.
> The REST monitoring API supports this, so it should be a change only to the 
> frontend part.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2448: [FLINK-3030][web frontend] Enhance dashboard to sh...

2016-11-06 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2448#discussion_r86697108
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonGenerators.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+
+import java.io.IOException;
+import java.util.Map;
+
+public final class JsonGenerators {
+   private JsonGenerators() {}
+
+   public static void serializeExecutionAttempt(Execution execAttempt, 
JsonGenerator gen) throws IOException {
+
+   final ExecutionState status = execAttempt.getState();
+   final long now = System.currentTimeMillis();
+
+   InstanceConnectionInfo location = 
execAttempt.getAssignedResourceLocation();
+   String locationString = location == null ? "(unassigned)" : 
location.getHostname();
+
+   long startTime = 
execAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
+   if (startTime == 0) {
+   startTime = -1;
+   }
+   long endTime = status.isTerminal() ? 
execAttempt.getStateTimestamp(status) : -1;
+   long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) 
- startTime) : -1;
+
+   Map metrics = 
execAttempt.getFlinkAccumulators();
+   LongCounter readBytes;
+   LongCounter writeBytes;
+   LongCounter readRecords;
+   LongCounter writeRecords;
+
+   if (metrics != null) {
+   readBytes = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
+   writeBytes = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
+   readRecords = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
+   writeRecords = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
--- End diff --

Ok. I'll rebase the change and fix merge conflicts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642456#comment-15642456
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2564
  
@vasia @greghogan I've updated the PR. Could you please give it another 
look?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

2016-11-06 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2564
  
@vasia @greghogan I've updated the PR. Could you please give it another 
look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-11-06 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r86694144
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#simpleTopProjection()} or
+ * {@link BipartiteGraph#fullBottomProjection()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+   

[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642279#comment-15642279
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r86694144
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#simpleTopProjection()} or
+ * {@link BipartiteGraph#fullBottomProjection()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   

[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642277#comment-15642277
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r86694124
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#simpleTopProjection()} or
+ * {@link BipartiteGraph#fullBottomProjection()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-11-06 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r86694124
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#simpleTopProjection()} or
+ * {@link BipartiteGraph#fullBottomProjection()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+   

[GitHub] flink issue #2431: [FLINK-4521][web frontend] Fix "Submit new Job" panel in ...

2016-11-06 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2431
  
Hi @iampeter 
Thank you for your review and sorry for the long delay.
I'll update the code in the next few days.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4521) Fix "Submit new Job" panel in development mode

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642067#comment-15642067
 ] 

ASF GitHub Bot commented on FLINK-4521:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2431
  
Hi @iampeter 
Thank you for your review and sorry for the long delay.
I'll update the code in the next few days.



> Fix "Submit new Job" panel in development mode
> --
>
> Key: FLINK-4521
> URL: https://issues.apache.org/jira/browse/FLINK-4521
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> If web frontend is started in the development mode, "Submit new Job" panel is 
> empty.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4934) Triadic Census

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15641962#comment-15641962
 ] 

ASF GitHub Bot commented on FLINK-4934:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2731
  
The RMat graph is used across many tests and the intent was to verify the 
larger tests against secondary frameworks. I have no problem leaving out the 
files as the command is documented in the base test class.


> Triadic Census
> --
>
> Key: FLINK-4934
> URL: https://issues.apache.org/jira/browse/FLINK-4934
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> A triad is any three vertices in a graph. An undirected graph has 4 types of 
> triads (with 0, 1, 2, or 3 edges among the three vertices) and a directed 
> graph has 16 types 
> (http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf).
> This can be implemented as an analytic. The undirected implementation will 
> use {{VertexMetrics}} and {{TriangleCount}}. The directed implementation will 
> use {{VertexDegrees}} and {{TriangleListing}} with postprocessing.
> This could be added to the {{TriangleListing}} driver in Gelly examples.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2731: [FLINK-4934] [gelly] Triadic Census

2016-11-06 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2731
  
The RMat graph is used across many tests and the intent was to verify the 
larger tests against secondary frameworks. I have no problem leaving out the 
files as the command is documented in the base test class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4934) Triadic Census

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15641896#comment-15641896
 ] 

ASF GitHub Bot commented on FLINK-4934:
---

Github user vasia commented on the issue:

https://github.com/apache/flink/pull/2731
  
Hi @greghogan,
do we really need a 12k-line csv and a 32k-line csv to test this?


> Triadic Census
> --
>
> Key: FLINK-4934
> URL: https://issues.apache.org/jira/browse/FLINK-4934
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> A triad is any three vertices in a graph. An undirected graph has 4 types of 
> triads (with 0, 1, 2, or 3 edges among the three vertices) and a directed 
> graph has 16 types 
> (http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf).
> This can be implemented as an analytic. The undirected implementation will 
> use {{VertexMetrics}} and {{TriangleCount}}. The directed implementation will 
> use {{VertexDegrees}} and {{TriangleListing}} with postprocessing.
> This could be added to the {{TriangleListing}} driver in Gelly examples.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2731: [FLINK-4934] [gelly] Triadic Census

2016-11-06 Thread vasia
Github user vasia commented on the issue:

https://github.com/apache/flink/pull/2731
  
Hi @greghogan,
do we really need a 12k-line csv and a 32k-line csv to test this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15641702#comment-15641702
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user mbalassi commented on the issue:

https://github.com/apache/flink/pull/2094
  
Thanks @fhueske. I am happy to squash the commits and merge the PR given 
that you are satisfied with it.

I would like to merge it by mid next week, so I could proceed with adding 
the necessary final touches to @Xazax-hun's PR on the serializer code 
generation during the end of the week. I would love to have that issue also 
wrapped up.  


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2094: [FLINK-3702] Make FieldAccessors support nested field exp...

2016-11-06 Thread mbalassi
Github user mbalassi commented on the issue:

https://github.com/apache/flink/pull/2094
  
Thanks @fhueske. I am happy to squash the commits and merge the PR given 
that you are satisfied with it.

I would like to merge it by mid next week, so I could proceed with adding 
the necessary final touches to @Xazax-hun's PR on the serializer code 
generation during the end of the week. I would love to have that issue also 
wrapped up. 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---