[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643392#comment-15643392 ] ASF GitHub Bot commented on FLINK-4391: --- Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 Maybe this is a solution. We will not think about changing the type of `checkpointLock`, which is an `Object` ans quite efficient, and not change the order of `broadcastBarriers` and `operator.snapshotState()`. By placing **pause** `EmitterThread` codes in `StreamTask.performCheckpoint()`, like this: ` private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception { LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName()); synchronized (lock) { if (isRunning) { // stop working threads first. for (StreamOperator operator : operatorChain.getAllOperators()) { if (operator instanceof AsyncWaitOperator) { operator.pauseEmitterThread(); } } // broadcast barriers after snapshot operators' states. operatorChain.broadcastCheckpointBarrier( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp() ); checkpointState(checkpointMetaData); return true; } else { return false; } } }` > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier >Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...
Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 Maybe this is a solution. We will not think about changing the type of `checkpointLock`, which is an `Object` ans quite efficient, and not change the order of `broadcastBarriers` and `operator.snapshotState()`. By placing **pause** `EmitterThread` codes in `StreamTask.performCheckpoint()`, like this: ` private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception { LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName()); synchronized (lock) { if (isRunning) { // stop working threads first. for (StreamOperator operator : operatorChain.getAllOperators()) { if (operator instanceof AsyncWaitOperator) { operator.pauseEmitterThread(); } } // broadcast barriers after snapshot operators' states. operatorChain.broadcastCheckpointBarrier( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp() ); checkpointState(checkpointMetaData); return true; } else { return false; } } }` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642997#comment-15642997 ] ASF GitHub Bot commented on FLINK-4391: --- Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86709933 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -540,15 +540,12 @@ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws synchronized (lock) { if (isRunning) { + checkpointState(checkpointMetaData); - // Since both state checkpointing and downstream barrier emission occurs in this - // lock scope, they are an atomic operation regardless of the order in which they occur. - // Given this, we immediately emit the checkpoint barriers, so the downstream operators - // can start their checkpoint work as soon as possible + // broadcast barriers after snapshot operators' states. operatorChain.broadcastCheckpointBarrier( - checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()); - - checkpointState(checkpointMetaData); + checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp() + ); --- End diff -- Can you share the test program and test result? I write an sample program to test the performance to acquire and release lock with `Object` / `ReentrantLock` / `ReentrantReadWriteLock`, finding that for one thread to use the lock, they have similar results. But referring to multiple thread cases, `ReentrantLock` and ` ReentrantReadWriteLock` excels `Object` lock. Here is my benchmark program: [link](https://github.com/bjlovegithub/JavaLockTest/blob/master/src/LockTest.java) And this is the sampled results run on my laptop: [stat](https://github.com/bjlovegithub/JavaLockTest/blob/master/stat/LockTest_result.data) > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier >Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86709933 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -540,15 +540,12 @@ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws synchronized (lock) { if (isRunning) { + checkpointState(checkpointMetaData); - // Since both state checkpointing and downstream barrier emission occurs in this - // lock scope, they are an atomic operation regardless of the order in which they occur. - // Given this, we immediately emit the checkpoint barriers, so the downstream operators - // can start their checkpoint work as soon as possible + // broadcast barriers after snapshot operators' states. operatorChain.broadcastCheckpointBarrier( - checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()); - - checkpointState(checkpointMetaData); + checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp() + ); --- End diff -- Can you share the test program and test result? I write an sample program to test the performance to acquire and release lock with `Object` / `ReentrantLock` / `ReentrantReadWriteLock`, finding that for one thread to use the lock, they have similar results. But referring to multiple thread cases, `ReentrantLock` and ` ReentrantReadWriteLock` excels `Object` lock. Here is my benchmark program: [link](https://github.com/bjlovegithub/JavaLockTest/blob/master/src/LockTest.java) And this is the sampled results run on my laptop: [stat](https://github.com/bjlovegithub/JavaLockTest/blob/master/stat/LockTest_result.data) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642901#comment-15642901 ] ASF GitHub Bot commented on FLINK-4391: --- Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 Good point ;D Emm, I think we have to override the `StreamOpeartor.notifyCheckpointComplete()` for `AsyncWaitOpeartor` so that once the `TaskManager` notifies the `Task` that checkpoint has completed, the `EmitterThread` can start working as soon as possible. > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier >Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...
Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 Good point ;D Emm, I think we have to override the `StreamOpeartor.notifyCheckpointComplete()` for `AsyncWaitOpeartor` so that once the `TaskManager` notifies the `Task` that checkpoint has completed, the `EmitterThread` can start working as soon as possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642872#comment-15642872 ] ASF GitHub Bot commented on FLINK-4391: --- Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86706353 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Tests for {@link AsyncCollectorBuffer}. These test that: + * + * + * Add a new item into the buffer + * Ordered mode processing + * Unordered mode processing + * Error handling + * + */ +public class AsyncCollectorBufferTest { + private AsyncFunctionfunction; + + private AsyncWaitOperator operator; + + private AsyncCollectorBuffer buffer; + + private Output output; + + @Before + public void setUp() throws Exception { + function = new AsyncFunction () { + @Override + public void asyncInvoke(Integer input, AsyncCollector collector) throws Exception { + + } + }; + + operator = new AsyncWaitOperator<>(function); + Class[] classes = AbstractStreamOperator.class.getDeclaredClasses(); + Class latencyClass = null; + for (Class c : classes) { + if (c.getName().indexOf("LatencyGauge") != -1) { + latencyClass = c; + } + } + + Constructor explicitConstructor = latencyClass.getDeclaredConstructors()[0]; + explicitConstructor.setAccessible(true); + Whitebox.setInternalState(operator, "latencyGauge", explicitConstructor.newInstance(10)); + + output = new FakedOutput(new ArrayList()); + TimestampedCollector collector =new TimestampedCollector(output); + buffer = + new AsyncCollectorBuffer<>(3, AsyncDataStream.OutputMode.ORDERED, operator); + buffer.setOutput(collector, output); + + Whitebox.setInternalState(operator, "output", output); + } + + @Test + public void testAdd() throws Exception { + Thread.sleep(1000); + buffer.add(new Watermark(0l)); + buffer.add(new LatencyMarker(111L, 1, 1)); + Assert.assertEquals(((SimpleLinkedList) Whitebox.getInternalState(buffer, "queue")).size(), 2); + Assert.assertEquals(((Map) Whitebox.getInternalState(buffer,
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86706353 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Tests for {@link AsyncCollectorBuffer}. These test that: + * + * + * Add a new item into the buffer + * Ordered mode processing + * Unordered mode processing + * Error handling + * + */ +public class AsyncCollectorBufferTest { + private AsyncFunctionfunction; + + private AsyncWaitOperator operator; + + private AsyncCollectorBuffer buffer; + + private Output output; + + @Before + public void setUp() throws Exception { + function = new AsyncFunction () { + @Override + public void asyncInvoke(Integer input, AsyncCollector collector) throws Exception { + + } + }; + + operator = new AsyncWaitOperator<>(function); + Class[] classes = AbstractStreamOperator.class.getDeclaredClasses(); + Class latencyClass = null; + for (Class c : classes) { + if (c.getName().indexOf("LatencyGauge") != -1) { + latencyClass = c; + } + } + + Constructor explicitConstructor = latencyClass.getDeclaredConstructors()[0]; + explicitConstructor.setAccessible(true); + Whitebox.setInternalState(operator, "latencyGauge", explicitConstructor.newInstance(10)); + + output = new FakedOutput(new ArrayList()); + TimestampedCollector collector =new TimestampedCollector(output); + buffer = + new AsyncCollectorBuffer<>(3, AsyncDataStream.OutputMode.ORDERED, operator); + buffer.setOutput(collector, output); + + Whitebox.setInternalState(operator, "output", output); + } + + @Test + public void testAdd() throws Exception { + Thread.sleep(1000); + buffer.add(new Watermark(0l)); + buffer.add(new LatencyMarker(111L, 1, 1)); + Assert.assertEquals(((SimpleLinkedList) Whitebox.getInternalState(buffer, "queue")).size(), 2); + Assert.assertEquals(((Map) Whitebox.getInternalState(buffer, "collectorToStreamElement")).size(), 2); + Assert.assertEquals(((Map) Whitebox.getInternalState(buffer, "collectorToQueue")).size(), 2); + + AsyncCollector collector = (AsyncCollector)((SimpleLinkedList)
[jira] [Comment Edited] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642868#comment-15642868 ] Xiaogang Shi edited comment on FLINK-4856 at 11/7/16 2:29 AM: -- I have started the implementation of MapStates. But at prior to that, I think we need some modification to current implementation to clarify the concepts. I have started two JIRA to state these problems. You may see FLINK-5023 and FLINK-5024 for the details. was (Author: xiaogang.shi): I have started the implementation of MapStates. But at prior to that, I think we need some modification to currently framework to clarify the concepts. I have started two JIRA to state these problems. You may see FLINK-5023 and FLINK-5024 for the details. > Add MapState for keyed streams > -- > > Key: FLINK-4856 > URL: https://issues.apache.org/jira/browse/FLINK-4856 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Many states in keyed streams are organized as key-value pairs. Currently, > these states are implemented by storing the entire map into a ValueState or a > ListState. The implementation however is very costly because all entries have > to be serialized/deserialized when updating a single entry. To improve the > efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642868#comment-15642868 ] Xiaogang Shi commented on FLINK-4856: - I have started the implementation of MapStates. But at prior to that, I think we need some modification to currently framework to clarify the concepts. I have started two JIRA to state these problems. You may see FLINK-5023 and FLINK-5024 for the details. > Add MapState for keyed streams > -- > > Key: FLINK-4856 > URL: https://issues.apache.org/jira/browse/FLINK-4856 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Many states in keyed streams are organized as key-value pairs. Currently, > these states are implemented by storing the entire map into a ValueState or a > ListState. The implementation however is very costly because all entries have > to be serialized/deserialized when updating a single entry. To improve the > efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642862#comment-15642862 ] ASF GitHub Bot commented on FLINK-4391: --- Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86706002 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer{ + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList > queue = new SimpleLinkedList<>(); + /** +* A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} +*/ + private final Map , StreamElement> collectorToStreamElement = new HashMap<>(); + /** +* A hash map keeping {@link AsyncCollector} and their node references in the #queue. +*/ + private final Map , SimpleLinkedList.Node> collectorToQueue = new HashMap<>(); + + private final LinkedList finishedCollectors = new LinkedList<>(); + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private TimestampedCollector timestampedCollector; + private Output output; + + /** +* Locks and conditions to synchronize with main thread and emitter thread. +*/ + private final Lock lock; + private final Condition notFull; + private final Condition taskDone; + private final Condition isEmpty; + + /** +* Error from user codes. +*/ + private volatile Exception error; + + private final Emitter emitter; + private final Thread emitThread; + + private boolean isCheckpointing; + + public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode mode, AsyncWaitOperator operator) { + Preconditions.checkArgument(maxSize > 0, "Future buffer size should be greater than 0."); + + this.bufferSize = maxSize; + this.mode = mode; + this.operator = operator; + + this.lock = new ReentrantLock(true);
[jira] [Created] (FLINK-5024) Add SimpleStateDescriptor to clarify the concepts
Xiaogang Shi created FLINK-5024: --- Summary: Add SimpleStateDescriptor to clarify the concepts Key: FLINK-5024 URL: https://issues.apache.org/jira/browse/FLINK-5024 Project: Flink Issue Type: Improvement Reporter: Xiaogang Shi Currently, StateDescriptors accept two type arguments : the first one is the type of the created state and the second one is the type of the values in the states. The concepts however is a little confusing here because in ListStates, the arguments passed to the StateDescriptors are the types of the list elements instead of the lists. It also makes the implementation of MapStates difficult. I suggest not to put the type serializer in StateDescriptors, making StateDescriptors independent of the data structures of the values. A new type of StateDescriptor named SimpleStateDescriptor can be provided to abstract those states (namely ValueState, ReducingState and FoldingState) whose states are not composited. The states (e.g. ListStates and MapStates) can implement their own descriptors according to their data structures. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86706002 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer{ + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList > queue = new SimpleLinkedList<>(); + /** +* A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} +*/ + private final Map , StreamElement> collectorToStreamElement = new HashMap<>(); + /** +* A hash map keeping {@link AsyncCollector} and their node references in the #queue. +*/ + private final Map , SimpleLinkedList.Node> collectorToQueue = new HashMap<>(); + + private final LinkedList finishedCollectors = new LinkedList<>(); + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private TimestampedCollector timestampedCollector; + private Output output; + + /** +* Locks and conditions to synchronize with main thread and emitter thread. +*/ + private final Lock lock; + private final Condition notFull; + private final Condition taskDone; + private final Condition isEmpty; + + /** +* Error from user codes. +*/ + private volatile Exception error; + + private final Emitter emitter; + private final Thread emitThread; + + private boolean isCheckpointing; + + public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode mode, AsyncWaitOperator operator) { + Preconditions.checkArgument(maxSize > 0, "Future buffer size should be greater than 0."); + + this.bufferSize = maxSize; + this.mode = mode; + this.operator = operator; + + this.lock = new ReentrantLock(true); + this.notFull = this.lock.newCondition(); + this.taskDone = this.lock.newCondition(); + this.isEmpty = this.lock.newCondition(); + + this.emitter = new Emitter(); +
[jira] [Created] (FLINK-5023) Add get() method in State interface
Xiaogang Shi created FLINK-5023: --- Summary: Add get() method in State interface Key: FLINK-5023 URL: https://issues.apache.org/jira/browse/FLINK-5023 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Xiaogang Shi Currently, the only method provided by the State interface is `clear()`. I think we should provide another method called `get()` to return the structured value (e.g., value, list, or map) under the current key. In fact, the functionality of `get()` has already been implemented in all types of states: e.g., `value()` in ValueState and `get()` in ListState. The modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...
Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2425 > The cookie is added to every single message/buffer that is transferred. That is too much - securing the integrity of the stream is responsibility of the encryption layer. The cookie should be added to requests messages that establish connections only. I will change the code to address cookie handling right after the SSL handshake using a new handler and drop the cookie passing for every messages. The handler will be added to the pipeline of both `NettyClient` and `NettyServer`. Client will send the cookie when the channel becomes active and the server will validate and keep track of the clients that are authorized. Here is the pseudo-code for Client and Server handlers. Please take a look and let me know if you are okay with this approach and I will modify the code. --- public static class ClientCookieHandler extends ChannelInboundHandlerAdapter { private final String secureCookie; final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); public ClientCookieHandler(String secureCookie) { this.secureCookie = secureCookie; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); if(this.secureCookie != null && this.secureCookie.length() != 0) { final ByteBuf buffer = Unpooled.buffer(4 + this.secureCookie.getBytes(DEFAULT_CHARSET).length); buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length); buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET)); ctx.writeAndFlush(buffer); } } } public static class ServerCookieDecoder extends MessageToMessageDecoder { private final String secureCookie; private final List channelList = new ArrayList<>(); private final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); public ServerCookieDecoder(String secureCookie) { this.secureCookie = secureCookie; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { if(secureCookie == null || secureCookie.length() == 0) { return; } if(channelList.contains(ctx.channel())) { return; } //read cookie based on the cookie length passed int cookieLength = msg.readInt(); if(cookieLength != secureCookie.getBytes(DEFAULT_CHARSET).length) { String message = "Cookie length does not match with source cookie. Invalid secure cookie passed."; throw new IllegalStateException(message); } //read only if cookie length is greater than zero if(cookieLength > 0) { final byte[] buffer = new byte[secureCookie.getBytes(DEFAULT_CHARSET).length]; msg.readBytes(buffer, 0, cookieLength); if(!Arrays.equals(secureCookie.getBytes(DEFAULT_CHARSET), buffer)) { LOG.error("Secure cookie from the client is not matching with the server's identity"); throw new IllegalStateException("Invalid secure cookie passed."); } LOG.info("Secure cookie validation passed"); channelList.add(ctx.channel()); } } } --- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642713#comment-15642713 ] ASF GitHub Bot commented on FLINK-3930: --- Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2425 > The cookie is added to every single message/buffer that is transferred. That is too much - securing the integrity of the stream is responsibility of the encryption layer. The cookie should be added to requests messages that establish connections only. I will change the code to address cookie handling right after the SSL handshake using a new handler and drop the cookie passing for every messages. The handler will be added to the pipeline of both `NettyClient` and `NettyServer`. Client will send the cookie when the channel becomes active and the server will validate and keep track of the clients that are authorized. Here is the pseudo-code for Client and Server handlers. Please take a look and let me know if you are okay with this approach and I will modify the code. --- public static class ClientCookieHandler extends ChannelInboundHandlerAdapter { private final String secureCookie; final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); public ClientCookieHandler(String secureCookie) { this.secureCookie = secureCookie; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); if(this.secureCookie != null && this.secureCookie.length() != 0) { final ByteBuf buffer = Unpooled.buffer(4 + this.secureCookie.getBytes(DEFAULT_CHARSET).length); buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length); buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET)); ctx.writeAndFlush(buffer); } } } public static class ServerCookieDecoder extends MessageToMessageDecoder { private final String secureCookie; private final List channelList = new ArrayList<>(); private final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); public ServerCookieDecoder(String secureCookie) { this.secureCookie = secureCookie; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { if(secureCookie == null || secureCookie.length() == 0) { return; } if(channelList.contains(ctx.channel())) { return; } //read cookie based on the cookie length passed int cookieLength = msg.readInt(); if(cookieLength != secureCookie.getBytes(DEFAULT_CHARSET).length) { String message = "Cookie length does not match with source cookie. Invalid secure cookie passed."; throw new IllegalStateException(message); } //read only if cookie length is greater than zero if(cookieLength > 0) { final byte[] buffer = new byte[secureCookie.getBytes(DEFAULT_CHARSET).length]; msg.readBytes(buffer, 0, cookieLength); if(!Arrays.equals(secureCookie.getBytes(DEFAULT_CHARSET), buffer)) { LOG.error("Secure cookie from the client is not matching with the server's identity"); throw new IllegalStateException("Invalid secure cookie passed."); } LOG.info("Secure cookie validation passed"); channelList.add(ctx.channel()); } } } --- > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements
[jira] [Commented] (FLINK-3030) Enhance Dashboard to show Execution Attempts
[ https://issues.apache.org/jira/browse/FLINK-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642459#comment-15642459 ] ASF GitHub Bot commented on FLINK-3030: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2448#discussion_r86697108 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonGenerators.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; + +import java.io.IOException; +import java.util.Map; + +public final class JsonGenerators { + private JsonGenerators() {} + + public static void serializeExecutionAttempt(Execution execAttempt, JsonGenerator gen) throws IOException { + + final ExecutionState status = execAttempt.getState(); + final long now = System.currentTimeMillis(); + + InstanceConnectionInfo location = execAttempt.getAssignedResourceLocation(); + String locationString = location == null ? "(unassigned)" : location.getHostname(); + + long startTime = execAttempt.getStateTimestamp(ExecutionState.DEPLOYING); + if (startTime == 0) { + startTime = -1; + } + long endTime = status.isTerminal() ? execAttempt.getStateTimestamp(status) : -1; + long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1; + + Mapmetrics = execAttempt.getFlinkAccumulators(); + LongCounter readBytes; + LongCounter writeBytes; + LongCounter readRecords; + LongCounter writeRecords; + + if (metrics != null) { + readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN); + writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT); + readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN); + writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT); --- End diff -- Ok. I'll rebase the change and fix merge conflicts. > Enhance Dashboard to show Execution Attempts > > > Key: FLINK-3030 > URL: https://issues.apache.org/jira/browse/FLINK-3030 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > Fix For: 1.0.0 > > > Currently, the web dashboard shows only the latest execution attempt. We > should make all execution attempts and their accumulators available for > inspection. > The REST monitoring API supports this, so it should be a change only to the > frontend part. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2448: [FLINK-3030][web frontend] Enhance dashboard to sh...
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2448#discussion_r86697108 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonGenerators.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; + +import java.io.IOException; +import java.util.Map; + +public final class JsonGenerators { + private JsonGenerators() {} + + public static void serializeExecutionAttempt(Execution execAttempt, JsonGenerator gen) throws IOException { + + final ExecutionState status = execAttempt.getState(); + final long now = System.currentTimeMillis(); + + InstanceConnectionInfo location = execAttempt.getAssignedResourceLocation(); + String locationString = location == null ? "(unassigned)" : location.getHostname(); + + long startTime = execAttempt.getStateTimestamp(ExecutionState.DEPLOYING); + if (startTime == 0) { + startTime = -1; + } + long endTime = status.isTerminal() ? execAttempt.getStateTimestamp(status) : -1; + long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1; + + Mapmetrics = execAttempt.getFlinkAccumulators(); + LongCounter readBytes; + LongCounter writeBytes; + LongCounter readRecords; + LongCounter writeRecords; + + if (metrics != null) { + readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN); + writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT); + readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN); + writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT); --- End diff -- Ok. I'll rebase the change and fix merge conflicts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642456#comment-15642456 ] ASF GitHub Bot commented on FLINK-2254: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2564 @vasia @greghogan I've updated the PR. Could you please give it another look? > Add Bipartite Graph Support for Gelly > - > > Key: FLINK-2254 > URL: https://issues.apache.org/jira/browse/FLINK-2254 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Ivan Mushketyk > Labels: requires-design-doc > > A bipartite graph is a graph for which the set of vertices can be divided > into two disjoint sets such that each edge having a source vertex in the > first set, will have a target vertex in the second set. We would like to > support efficient operations for this type of graphs along with a set of > metrics(http://jponnela.com/web_documents/twomode.pdf). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2564 @vasia @greghogan I've updated the PR. Could you please give it another look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r86694144 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices. + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between + * a pair of top vertices. + * + * Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + * Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * {@link BipartiteGraph#simpleTopProjection()} or + * {@link BipartiteGraph#fullBottomProjection()} methods. + * + * @param the key type of the top vertices + * @param the key type of the bottom vertices + * @param the top vertices value type + * @param the bottom vertices value type + * @param the edge value type + */ +public class BipartiteGraph{ + private final ExecutionEnvironment context; + private final DataSet > topVertices; + private final DataSet > bottomVertices; + private final DataSet > edges; + + private BipartiteGraph( + DataSet > topVertices, + DataSet > bottomVertices, + DataSet > edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** +* Create bipartite graph from datasets. +* +* @param topVertices dataset of top vertices in the graph +* @param bottomVertices dataset of bottom vertices in the graph +* @param edges dataset of edges between vertices +* @param context Flink execution context +* @param the key type of the top vertices +* @param the key type of the bottom vertices +* @param the top vertices value type +* @param the bottom vertices value type +* @param the edge value type +* @return new bipartite graph created from provided datasets +*/ + public static BipartiteGraph fromDataSet( + DataSet > topVertices, + DataSet > bottomVertices, + DataSet > edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** +* Get dataset with top vertices. +* +* @return dataset with top vertices +*/ + public DataSet > getTopVertices() { + return topVertices; + } + + /** +* Get dataset with bottom vertices. +* +* @return dataset with bottom vertices +*/ + public DataSet > getBottomVertices() { +
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642279#comment-15642279 ] ASF GitHub Bot commented on FLINK-2254: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r86694144 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices. + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between + * a pair of top vertices. + * + * Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + * Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * {@link BipartiteGraph#simpleTopProjection()} or + * {@link BipartiteGraph#fullBottomProjection()} methods. + * + * @param the key type of the top vertices + * @param the key type of the bottom vertices + * @param the top vertices value type + * @param the bottom vertices value type + * @param the edge value type + */ +public class BipartiteGraph{ + private final ExecutionEnvironment context; + private final DataSet > topVertices; + private final DataSet > bottomVertices; + private final DataSet > edges; + + private BipartiteGraph( + DataSet > topVertices, + DataSet > bottomVertices, + DataSet > edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** +* Create bipartite graph from datasets. +* +* @param topVertices dataset of top vertices in the graph +* @param bottomVertices dataset of bottom vertices in the graph +* @param edges dataset of edges between vertices +* @param context Flink execution context +* @param the key type of the top vertices +* @param the key type of the bottom vertices +* @param the top vertices value type +* @param the bottom vertices value type +* @param the edge value type +* @return new bipartite graph created from provided datasets +*/ + public static BipartiteGraph fromDataSet( + DataSet > topVertices, + DataSet > bottomVertices, + DataSet > edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** +* Get dataset with top vertices. +* +* @return dataset with top vertices +*/ + public DataSet > getTopVertices() { +
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642277#comment-15642277 ] ASF GitHub Bot commented on FLINK-2254: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r86694124 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices. + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between + * a pair of top vertices. + * + * Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + * Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * {@link BipartiteGraph#simpleTopProjection()} or + * {@link BipartiteGraph#fullBottomProjection()} methods. + * + * @param the key type of the top vertices + * @param the key type of the bottom vertices + * @param the top vertices value type + * @param the bottom vertices value type + * @param the edge value type + */ +public class BipartiteGraph{ + private final ExecutionEnvironment context; + private final DataSet > topVertices; + private final DataSet > bottomVertices; + private final DataSet > edges; + + private BipartiteGraph( + DataSet > topVertices, + DataSet > bottomVertices, + DataSet > edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** +* Create bipartite graph from datasets. +* +* @param topVertices dataset of top vertices in the graph +* @param bottomVertices dataset of bottom vertices in the graph +* @param edges dataset of edges between vertices +* @param context Flink execution context +* @param the key type of the top vertices +* @param the key type of the bottom vertices +* @param the top vertices value type +* @param the bottom vertices value type +* @param the edge value type +* @return new bipartite graph created from provided datasets +*/ + public static BipartiteGraph fromDataSet( + DataSet > topVertices, + DataSet > bottomVertices, + DataSet > edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** +* Get dataset with top vertices. +* +* @return dataset with top vertices +*/ + public DataSet > getTopVertices() { +
[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r86694124 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices. + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between + * a pair of top vertices. + * + * Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + * Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * {@link BipartiteGraph#simpleTopProjection()} or + * {@link BipartiteGraph#fullBottomProjection()} methods. + * + * @param the key type of the top vertices + * @param the key type of the bottom vertices + * @param the top vertices value type + * @param the bottom vertices value type + * @param the edge value type + */ +public class BipartiteGraph{ + private final ExecutionEnvironment context; + private final DataSet > topVertices; + private final DataSet > bottomVertices; + private final DataSet > edges; + + private BipartiteGraph( + DataSet > topVertices, + DataSet > bottomVertices, + DataSet > edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** +* Create bipartite graph from datasets. +* +* @param topVertices dataset of top vertices in the graph +* @param bottomVertices dataset of bottom vertices in the graph +* @param edges dataset of edges between vertices +* @param context Flink execution context +* @param the key type of the top vertices +* @param the key type of the bottom vertices +* @param the top vertices value type +* @param the bottom vertices value type +* @param the edge value type +* @return new bipartite graph created from provided datasets +*/ + public static BipartiteGraph fromDataSet( + DataSet > topVertices, + DataSet > bottomVertices, + DataSet > edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** +* Get dataset with top vertices. +* +* @return dataset with top vertices +*/ + public DataSet > getTopVertices() { + return topVertices; + } + + /** +* Get dataset with bottom vertices. +* +* @return dataset with bottom vertices +*/ + public DataSet > getBottomVertices() { +
[GitHub] flink issue #2431: [FLINK-4521][web frontend] Fix "Submit new Job" panel in ...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2431 Hi @iampeter Thank you for your review and sorry for the long delay. I'll update the code in the next few days. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4521) Fix "Submit new Job" panel in development mode
[ https://issues.apache.org/jira/browse/FLINK-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642067#comment-15642067 ] ASF GitHub Bot commented on FLINK-4521: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2431 Hi @iampeter Thank you for your review and sorry for the long delay. I'll update the code in the next few days. > Fix "Submit new Job" panel in development mode > -- > > Key: FLINK-4521 > URL: https://issues.apache.org/jira/browse/FLINK-4521 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > If web frontend is started in the development mode, "Submit new Job" panel is > empty. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4934) Triadic Census
[ https://issues.apache.org/jira/browse/FLINK-4934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15641962#comment-15641962 ] ASF GitHub Bot commented on FLINK-4934: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2731 The RMat graph is used across many tests and the intent was to verify the larger tests against secondary frameworks. I have no problem leaving out the files as the command is documented in the base test class. > Triadic Census > -- > > Key: FLINK-4934 > URL: https://issues.apache.org/jira/browse/FLINK-4934 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.2.0 > > > A triad is any three vertices in a graph. An undirected graph has 4 types of > triads (with 0, 1, 2, or 3 edges among the three vertices) and a directed > graph has 16 types > (http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf). > This can be implemented as an analytic. The undirected implementation will > use {{VertexMetrics}} and {{TriangleCount}}. The directed implementation will > use {{VertexDegrees}} and {{TriangleListing}} with postprocessing. > This could be added to the {{TriangleListing}} driver in Gelly examples. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2731: [FLINK-4934] [gelly] Triadic Census
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2731 The RMat graph is used across many tests and the intent was to verify the larger tests against secondary frameworks. I have no problem leaving out the files as the command is documented in the base test class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4934) Triadic Census
[ https://issues.apache.org/jira/browse/FLINK-4934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15641896#comment-15641896 ] ASF GitHub Bot commented on FLINK-4934: --- Github user vasia commented on the issue: https://github.com/apache/flink/pull/2731 Hi @greghogan, do we really need a 12k-line csv and a 32k-line csv to test this? > Triadic Census > -- > > Key: FLINK-4934 > URL: https://issues.apache.org/jira/browse/FLINK-4934 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.2.0 > > > A triad is any three vertices in a graph. An undirected graph has 4 types of > triads (with 0, 1, 2, or 3 edges among the three vertices) and a directed > graph has 16 types > (http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf). > This can be implemented as an analytic. The undirected implementation will > use {{VertexMetrics}} and {{TriangleCount}}. The directed implementation will > use {{VertexDegrees}} and {{TriangleListing}} with postprocessing. > This could be added to the {{TriangleListing}} driver in Gelly examples. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2731: [FLINK-4934] [gelly] Triadic Census
Github user vasia commented on the issue: https://github.com/apache/flink/pull/2731 Hi @greghogan, do we really need a 12k-line csv and a 32k-line csv to test this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15641702#comment-15641702 ] ASF GitHub Bot commented on FLINK-3702: --- Github user mbalassi commented on the issue: https://github.com/apache/flink/pull/2094 Thanks @fhueske. I am happy to squash the commits and merge the PR given that you are satisfied with it. I would like to merge it by mid next week, so I could proceed with adding the necessary final touches to @Xazax-hun's PR on the serializer code generation during the end of the week. I would love to have that issue also wrapped up. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2094: [FLINK-3702] Make FieldAccessors support nested field exp...
Github user mbalassi commented on the issue: https://github.com/apache/flink/pull/2094 Thanks @fhueske. I am happy to squash the commits and merge the PR given that you are satisfied with it. I would like to merge it by mid next week, so I could proceed with adding the necessary final touches to @Xazax-hun's PR on the serializer code generation during the end of the week. I would love to have that issue also wrapped up. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---