flink git commit: [runtime] Add docs to BufferPool classes
Repository: flink Updated Branches: refs/heads/master 4f4c6b90d - 0ca1f0c7b [runtime] Add docs to BufferPool classes Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ca1f0c7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ca1f0c7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ca1f0c7 Branch: refs/heads/master Commit: 0ca1f0c7b76cc4b4199fb687be4592dbd1548757 Parents: 4f4c6b9 Author: Ufuk Celebi u...@apache.org Authored: Tue Apr 7 07:44:27 2015 +0200 Committer: Ufuk Celebi u...@apache.org Committed: Tue Apr 7 07:44:27 2015 +0200 -- .../runtime/io/network/buffer/BufferPool.java | 31 .../io/network/buffer/BufferPoolFactory.java| 14 - .../io/network/buffer/BufferProvider.java | 13 .../io/network/buffer/LocalBufferPool.java | 4 +++ 4 files changed, 61 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/0ca1f0c7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java index c2a3c05..fdfa201 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java @@ -20,21 +20,52 @@ package org.apache.flink.runtime.io.network.buffer; import java.io.IOException; +/** + * A dynamically sized buffer pool. + */ public interface BufferPool extends BufferProvider, BufferRecycler { + /** +* The owner of this buffer pool to be called when memory needs to be released to avoid back +* pressure. +*/ void setBufferPoolOwner(BufferPoolOwner owner); + /** +* Destroys this buffer pool. +* +* p If not all buffers are available, they are recycled lazily as soon as they are recycled. +*/ void lazyDestroy(); + /** +* Checks whether this buffer pool has been destroyed. +*/ @Override boolean isDestroyed(); + /** +* Returns the number of guaranteed (minimum number of) memory segments of this buffer pool. +*/ int getNumberOfRequiredMemorySegments(); + /** +* Returns the current size of this buffer pool. +* +* p The size of the buffer pool can change dynamically at runtime. +*/ int getNumBuffers(); + /** +* Sets the current size of this buffer pool. +* +* p The size needs to be greater or equals to the guaranteed number of memory segments. +*/ void setNumBuffers(int numBuffers) throws IOException; + /** +* Returns the number memory segments, which are currently held by this buffer pool. +*/ int getNumberOfAvailableMemorySegments(); } http://git-wip-us.apache.org/repos/asf/flink/blob/0ca1f0c7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java index fcd4d96..23321f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java @@ -20,10 +20,22 @@ package org.apache.flink.runtime.io.network.buffer; import java.io.IOException; +/** + * A factory for buffer pools. + */ public interface BufferPoolFactory { + /** +* Tries to create a buffer pool, which is guaranteed to provide at least the number of required +* buffers. +* +* p The buffer pool is either of dynamic size or fixed. +*/ BufferPool createBufferPool(int numRequiredBuffers, boolean isFixedSize) throws IOException; + /** +* Destroy callback for updating factory book keeping. +*/ void destroyBufferPool(BufferPool bufferPool) throws IOException; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/0ca1f0c7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
[2/2] flink git commit: [TaskManager] Add test for failure behavior on TaskManager startup
[TaskManager] Add test for failure behavior on TaskManager startup Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ed009ec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ed009ec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ed009ec Branch: refs/heads/master Commit: 4ed009ec9921e998161f82bc78137429986e2ba3 Parents: 0ca1f0c Author: Stephan Ewen se...@apache.org Authored: Wed Feb 18 14:29:11 2015 +0100 Committer: Stephan Ewen se...@apache.org Committed: Tue Apr 7 10:14:13 2015 +0200 -- .../flink/runtime/taskmanager/TaskManager.scala | 28 +- .../runtime/taskmanager/RegistrationTest.java | 379 .../TaskManagerRegistrationTest.java| 443 +++ .../taskmanager/TestManagerStartupTest.java | 166 +++ 4 files changed, 626 insertions(+), 390 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4ed009ec/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala -- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index af13b74..7d60c00 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1368,7 +1368,7 @@ object TaskManager { val cause = t.getCause() if (cause != null t.getCause().isInstanceOf[java.net.BindException]) { val address = taskManagerHostname + : + actorSystemPort -throw new Exception(Unable to bind TaskManager actor system to address + +throw new IOException(Unable to bind TaskManager actor system to address + address + - + cause.getMessage(), t) } } @@ -1532,22 +1532,28 @@ object TaskManager { } // now start the memory manager -val memoryManager = new DefaultMemoryManager(memorySize, - taskManagerConfig.numberOfSlots, - netConfig.networkBufferSize) +val memoryManager = try { + new DefaultMemoryManager(memorySize, + taskManagerConfig.numberOfSlots, + netConfig.networkBufferSize) +} catch { + case e: OutOfMemoryError = throw new Exception( +OutOfMemory error ( + e.getMessage + ) while allocating the TaskManager memory ( + + memorySize + bytes)., e) +} // start the I/O manager last, it will create some temp directories. val ioManager: IOManager = new IOManagerAsync(taskManagerConfig.tmpDirPaths) // create the actor properties (which define the actor constructor parameters) val tmProps = Props(taskManagerClass, -taskManagerConfig, -connectionInfo, -jobManagerAkkaUrl, -memoryManager, -ioManager, -network, -taskManagerConfig.numberOfSlots) + taskManagerConfig, + connectionInfo, + jobManagerAkkaUrl, + memoryManager, + ioManager, + network, + taskManagerConfig.numberOfSlots) taskManagerActorName match { case Some(actorName) = actorSystem.actorOf(tmProps, actorName) http://git-wip-us.apache.org/repos/asf/flink/blob/4ed009ec/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java deleted file mode 100644 index 1b4f5f3..000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java +++ /dev/null @@ -1,379 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
[1/2] flink git commit: [runtime] Improve robustness of test TaskManagerRegistrationTest.
Repository: flink Updated Branches: refs/heads/master 0ca1f0c7b - 5cd9e9d94 [runtime] Improve robustness of test TaskManagerRegistrationTest. Also rename jobmanager.TaskManagerRegistrationTest to avoid name conflicts. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5cd9e9d9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5cd9e9d9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5cd9e9d9 Branch: refs/heads/master Commit: 5cd9e9d94bb3af4a5a868789f663af262672cac8 Parents: 4ed009e Author: Stephan Ewen se...@apache.org Authored: Mon Apr 6 20:00:21 2015 +0200 Committer: Stephan Ewen se...@apache.org Committed: Tue Apr 7 10:14:13 2015 +0200 -- .../TaskManagerRegistrationTest.java| 20 ++- .../jobmanager/JobManagerRegistrationTest.scala | 139 +++ .../TaskManagerRegistrationTest.scala | 139 --- 3 files changed, 156 insertions(+), 142 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5cd9e9d9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index 7421837..69964ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.InvalidActorNameException; import akka.actor.Kill; import akka.actor.Props; import akka.actor.UntypedActor; @@ -308,9 +309,22 @@ public class TaskManagerRegistrationTest { // now start the second fake JobManager and expect that // the TaskManager registers again // the second fake JM needs to have the same actor URL - final ActorRef fakeJobManager2 = actorSystem.actorOf(fakeJmProps, jobManagerName); + ActorRef fakeJobManager2 = null; + + // since we cannot reliably wait until the actor is unregistered (name is + // available again) we loop with multiple tries for 20 seconds + long deadline = 200L + System.nanoTime(); + do { + try { + fakeJobManager2 = actorSystem.actorOf(fakeJmProps, jobManagerName); + } catch (InvalidActorNameException e) { + // wait and retry + Thread.sleep(100); + } + } while (fakeJobManager2 == null System.nanoTime() deadline); // expect the next registration + final ActorRef jm2Closure = fakeJobManager2; new Within(new FiniteDuration(10, TimeUnit.SECONDS)) { @Override @@ -318,8 +332,8 @@ public class TaskManagerRegistrationTest { expectMsgClass(RegisterTaskManager.class); // we accept the registration - taskManager.tell(new AcknowledgeRegistration(fakeJobManager2, new InstanceID(), 45234), - fakeJobManager2); + taskManager.tell(new AcknowledgeRegistration(jm2Closure, new InstanceID(), 45234), + jm2Closure); } }; http://git-wip-us.apache.org/repos/asf/flink/blob/5cd9e9d9/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala -- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala new file mode 100644 index 000..5fde5ea --- /dev/null +++
[2/3] flink git commit: Additional tests for Path.isAbsolute()
Additional tests for Path.isAbsolute() Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23fe0062 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23fe0062 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23fe0062 Branch: refs/heads/master Commit: 23fe00629e4b36e5c4d3849c2711743148369631 Parents: 2f683af Author: Fabian Hueske fhue...@apache.org Authored: Sat Apr 4 21:35:38 2015 +0200 Committer: Fabian Hueske fhue...@apache.org Committed: Tue Apr 7 14:46:15 2015 +0200 -- .../java/org/apache/flink/core/fs/Path.java | 5 +-- .../java/org/apache/flink/core/fs/PathTest.java | 35 +--- 2 files changed, 32 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/23fe0062/flink-core/src/main/java/org/apache/flink/core/fs/Path.java -- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java index 05bccd1..75155eb 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java @@ -312,10 +312,7 @@ public class Path implements IOReadableWritable, Serializable { */ public boolean isAbsolute() { final int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0; - if (uri.getPath().length() start) { - return uri.getPath().startsWith(SEPARATOR, start); - } - return true; + return uri.getPath().startsWith(SEPARATOR, start); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/23fe0062/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java -- diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java index e4c4e88..8fa2cea 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java @@ -69,9 +69,6 @@ public class PathTest { p = new Path(file:/C:/my/windows/path); assertEquals(/C:/my/windows/path, p.toUri().getPath()); - p = new Path(C:); - assertEquals(/C:, p.toUri().getPath()); - try { new Path((String)null); fail(); @@ -98,21 +95,51 @@ public class PathTest { @Test public void testIsAbsolute() { + // UNIX + Path p = new Path(/my/abs/path); assertTrue(p.isAbsolute()); + p = new Path(/); + assertTrue(p.isAbsolute()); + p = new Path(./my/rel/path); assertFalse(p.isAbsolute()); + p = new Path(my/rel/path); + assertFalse(p.isAbsolute()); + + // WINDOWS + p = new Path(C:/my/abs/windows/path); assertTrue(p.isAbsolute()); - p = new Path(file:/C:); + p = new Path(y:/my/abs/windows/path); + assertTrue(p.isAbsolute()); + + p = new Path(b:\\my\\abs\\windows\\path); assertTrue(p.isAbsolute()); p = new Path(C:); + assertFalse(p.isAbsolute()); + + p = new Path(C:my\\relative\\path); + assertFalse(p.isAbsolute()); + + p = new Path(\\my\\dir); + assertTrue(p.isAbsolute()); + + p = new Path(\\); assertTrue(p.isAbsolute()); + p = new Path(.\\my\\relative\\path); + assertFalse(p.isAbsolute()); + + p = new Path(my\\relative\\path); + assertFalse(p.isAbsolute()); + + p = new Path(myServer\\myDir); + assertTrue(p.isAbsolute()); } @Test
[3/3] flink git commit: [FLINK-1817] Fix ClassLoaderObjectInputStream to support primitive type classes
[FLINK-1817] Fix ClassLoaderObjectInputStream to support primitive type classes This closes #565 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d33b4454 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d33b4454 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d33b4454 Branch: refs/heads/master Commit: d33b44549d416c30f01ead9aacf5cf7ed30674ca Parents: 23fe006 Author: Fabian Hueske fhue...@apache.org Authored: Thu Apr 2 01:28:08 2015 +0200 Committer: Fabian Hueske fhue...@apache.org Committed: Tue Apr 7 14:46:24 2015 +0200 -- .../apache/flink/util/InstantiationUtil.java| 30 +++- 1 file changed, 29 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d33b4454/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index c7088f5..241e56a 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -34,6 +34,7 @@ import java.io.ObjectOutputStream; import java.io.ObjectStreamClass; import java.lang.reflect.Constructor; import java.lang.reflect.Modifier; +import java.util.HashMap; /** * Utility class to create instances from class objects and checking failure reasons. @@ -48,10 +49,37 @@ public class InstantiationUtil { private static class ClassLoaderObjectInputStream extends ObjectInputStream { private ClassLoader classLoader; + private static final HashMapString, Class? primitiveClasses + = new HashMapString, Class?(8, 1.0F); + static { + primitiveClasses.put(boolean, boolean.class); + primitiveClasses.put(byte, byte.class); + primitiveClasses.put(char, char.class); + primitiveClasses.put(short, short.class); + primitiveClasses.put(int, int.class); + primitiveClasses.put(long, long.class); + primitiveClasses.put(float, float.class); + primitiveClasses.put(double, double.class); + primitiveClasses.put(void, void.class); + } + @Override public Class? resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { if (classLoader != null) { - return Class.forName(desc.getName(), false, classLoader); + String name = desc.getName(); + try { + return Class.forName(name, false, classLoader); + } catch (ClassNotFoundException ex) { + // check if class is a primitive class + Class? cl = primitiveClasses.get(name); + if (cl != null) { + // return primitive class + return cl; + } else { + // throw ClassNotFoundException + throw ex; + } + } } return super.resolveClass(desc);
[2/3] flink git commit: [FLINK-1560] [streaming] Added ITCases to streaming examples
[FLINK-1560] [streaming] Added ITCases to streaming examples Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/464e7828 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/464e7828 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/464e7828 Branch: refs/heads/master Commit: 464e782868e1f697809d681ce7f8528bef4f2bdb Parents: ed7d165 Author: szape nemderogator...@gmail.com Authored: Wed Mar 25 09:35:39 2015 +0100 Committer: mbalassi mbala...@apache.org Committed: Tue Apr 7 16:09:37 2015 +0200 -- .../examples/iteration/IterateExample.java | 70 .../iteration/util/IterateExampleData.java | 32 .../examples/join/util/WindowJoinData.java | 66 .../util/IncrementalLearningSkeletonData.java | 34 .../twitter/util/TwitterStreamData.java | 27 +++ .../examples/windowing/SessionWindowing.java| 55 ++- .../windowing/util/SessionWindowingData.java| 27 +++ .../util/TopSpeedWindowingExampleData.java | 165 +++ .../test/iteration/IterateExampleITCase.java| 45 + .../examples/test/join/WindowJoinITCase.java| 48 ++ .../ml/IncrementalLearningSkeletonITCase.java | 42 + .../socket/SocketTextStreamWordCountITCase.java | 94 +++ .../test/twitter/TwitterStreamITCase.java | 42 + .../test/windowing/SessionWindowingITCase.java | 43 + .../TopSpeedWindowingExampleITCase.java | 45 + 15 files changed, 789 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java -- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index d9a8167..f5f2cd7 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -26,32 +26,24 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream; import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.streaming.api.windowing.helper.Time; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import java.util.Random; -import java.util.concurrent.TimeUnit; /** - * Example illustrating iterations in Flink streaming. + * Example illustrating iterations in Flink streaming. p/ p The program sums up random numbers and counts additions + * it performs to reach a specific threshold in an iterative streaming fashion. /p * p/ - * p - * The program sums up random numbers and counts additions it performs to reach - * a specific threshold in an iterative streaming fashion. - * /p * p/ - * p/ - * This example shows how to use: - * ul - * listreaming iterations, - * libuffer timeout to enhance latency, - * lidirected outputs. - * /ul + * This example shows how to use: ul listreaming iterations, libuffer timeout to enhance latency, lidirected + * outputs. /ul */ public class IterateExample { + private static final int BOUND = 100; + // * // PROGRAM // * @@ -71,7 +63,7 @@ public class IterateExample { // create input stream of integer pairs DataStreamTuple2Integer, Integer inputStream; - if(fileInput) { + if (fileInput) { inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap()); } else { inputStream = env.addSource(new RandomFibonacciSource()); @@ -94,10 +86,7 @@ public class IterateExample { // 'output' channel then get the input pairs that have the greatest iteration counter // on a 1 second sliding window DataStreamTuple2Tuple2Integer, Integer, Integer numbers = step.select(output) - .map(new OutputMap()) - .window(Time.of(1L, TimeUnit.SECONDS)) -
[3/3] flink git commit: [FLINK-1560] [streaming] Streaming examples rework
[FLINK-1560] [streaming] Streaming examples rework Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed7d1653 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed7d1653 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed7d1653 Branch: refs/heads/master Commit: ed7d16534f55ca3ee43b2c5110c71bfa224e7144 Parents: d33b445 Author: szape nemderogator...@gmail.com Authored: Wed Mar 25 09:26:01 2015 +0100 Committer: mbalassi mbala...@apache.org Committed: Tue Apr 7 16:09:37 2015 +0200 -- .../examples/iteration/IterateExample.java | 159 ++- .../streaming/examples/join/WindowJoin.java | 87 -- .../ml/IncrementalLearningSkeleton.java | 113 ++--- .../socket/SocketTextStreamWordCount.java | 28 ++-- .../examples/twitter/TwitterStream.java | 35 ++-- .../windowing/TopSpeedWindowingExample.java | 60 +-- .../examples/wordcount/PojoExample.java | 3 +- .../test/wordcount/WordCountITCase.java | 4 +- 8 files changed, 371 insertions(+), 118 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java -- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index bbd5433..d9a8167 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -17,29 +17,32 @@ package org.apache.flink.streaming.examples.iteration; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.IterativeDataStream; import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.helper.Time; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; /** * Example illustrating iterations in Flink streaming. - * + * p/ * p * The program sums up random numbers and counts additions it performs to reach * a specific threshold in an iterative streaming fashion. * /p - * - * p + * p/ + * p/ * This example shows how to use: * ul * listreaming iterations, @@ -59,35 +62,44 @@ public class IterateExample { return; } - // set up input for the stream of (0,0) pairs - ListTuple2Double, Integer input = new ArrayListTuple2Double, Integer(); - for (int i = 0; i 1000; i++) { - input.add(new Tuple2Double, Integer(0., 0)); - } + // set up input for the stream of integer pairs - // obtain execution environment and set setBufferTimeout(0) to enable + // obtain execution environment and set setBufferTimeout to 1 to enable // continuous flushing of the output buffers (lowest latency) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() .setBufferTimeout(1); + // create input stream of integer pairs + DataStreamTuple2Integer, Integer inputStream; + if(fileInput) { + inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap()); + } else { + inputStream = env.addSource(new RandomFibonacciSource()); + } + // create an iterative data stream from the input with 5 second timeout - IterativeDataStreamTuple2Double, Integer it =
[1/3] flink git commit: [FLINK-1560] [streaming] Streaming example ITCases cleanup
Repository: flink Updated Branches: refs/heads/master d33b44549 - 954beca7e [FLINK-1560] [streaming] Streaming example ITCases cleanup This closes #519 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/954beca7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/954beca7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/954beca7 Branch: refs/heads/master Commit: 954beca7ec9c8ebadfaf11c77c37e493f190554b Parents: 464e782 Author: mbalassi mbala...@apache.org Authored: Mon Apr 6 23:46:06 2015 +0200 Committer: mbalassi mbala...@apache.org Committed: Tue Apr 7 16:09:37 2015 +0200 -- .../apache/flink/streaming/util/TestStreamEnvironment.java | 1 + .../test/socket/SocketTextStreamWordCountITCase.java | 8 +--- .../examples/test/windowing/SessionWindowingITCase.java | 1 - .../test/windowing/TopSpeedWindowingExampleITCase.java | 2 +- .../main/java/org/apache/flink/test/util/TestBaseUtils.java | 2 ++ 5 files changed, 9 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java -- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index 8ddf9e6..f7843cf 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -46,6 +46,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism){ this.executor = executor; + setDefaultLocalParallelism(parallelism); setParallelism(parallelism); } http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java -- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java index b16a85f..0af8fe2 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.examples.test.socket; +import org.apache.flink.runtime.net.NetUtils; import org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount; import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; @@ -29,14 +30,15 @@ import java.net.Socket; public class SocketTextStreamWordCountITCase extends StreamingProgramTestBase { private static final String HOST = localhost; - private static final String PORT = ; + private static Integer port; protected String resultPath; private ServerSocket temporarySocket; @Override protected void preSubmit() throws Exception { - temporarySocket = createSocket(HOST, Integer.valueOf(PORT), WordCountData.TEXT); + port = NetUtils.getAvailablePort(); + temporarySocket = createSocket(HOST, port, WordCountData.TEXT); resultPath = getTempDirPath(result); } @@ -48,7 +50,7 @@ public class SocketTextStreamWordCountITCase extends StreamingProgramTestBase { @Override protected void testProgram() throws Exception { - SocketTextStreamWordCount.main(new String[]{HOST, PORT, resultPath}); + SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath}); } public ServerSocket createSocket(String host, int port, String contents) throws Exception {
[4/8] flink git commit: [FLINK-1837] [streaming] Throw Exception for checkpointed iterative programs
[FLINK-1837] [streaming] Throw Exception for checkpointed iterative programs Checkpointing currently does not support this special case Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/52ebb295 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/52ebb295 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/52ebb295 Branch: refs/heads/release-0.9.0-milestone-1 Commit: 52ebb295c6782e5cc9c7747656c278849ec9030a Parents: 954beca Author: mbalassi mbala...@apache.org Authored: Tue Apr 7 17:04:39 2015 +0200 Committer: mbalassi mbala...@apache.org Committed: Tue Apr 7 17:04:39 2015 +0200 -- .../apache/flink/streaming/api/StreamGraph.java | 7 .../apache/flink/streaming/api/IterateTest.java | 37 2 files changed, 38 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/52ebb295/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java -- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index 351dec9..aa71804 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -481,6 +481,11 @@ public class StreamGraph extends StreamingPlan { */ public JobGraph getJobGraph(String jobGraphName) { + // temporarily forbid checkpointing for iterative jobs + if (isIterative() isCheckpointingEnabled()){ + throw new UnsupportedOperationException(Checkpointing is currently not supported for iterative jobs!); + } + this.jobName = jobGraphName; WindowingOptimizer.optimizeGraph(this); @@ -558,6 +563,8 @@ public class StreamGraph extends StreamingPlan { return iterationTimeouts.get(vertexID); } + public boolean isIterative() { return !iterationIds.isEmpty(); } + public String getOperatorName(Integer vertexID) { return operatorNames.get(vertexID); } http://git-wip-us.apache.org/repos/asf/flink/blob/52ebb295/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java -- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java index a64c4b1..31bd147 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -29,11 +29,13 @@ import org.junit.Test; import java.util.Collections; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class IterateTest { private static final long MEMORYSIZE = 32; private static boolean iterated[]; + private static int PARALLELISM = 2; public static final class IterationHead extends RichFlatMapFunctionBoolean,Boolean { @@ -73,14 +75,10 @@ public class IterateTest { } } - @Test - public void test() throws Exception { - int parallelism = 2; - StreamExecutionEnvironment env = new TestStreamEnvironment(parallelism, MEMORYSIZE); - iterated = new boolean[parallelism]; + public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironment env){ env.setBufferTimeout(10); - DataStreamBoolean source = env.fromCollection(Collections.nCopies(parallelism, false)); + DataStreamBoolean source = env.fromCollection(Collections.nCopies(PARALLELISM, false)); IterativeDataStreamBoolean iteration = source.iterate(3000); @@ -88,6 +86,15 @@ public class IterateTest { new IterationTail()); iteration.closeWith(increment).addSink(new MySink()); + return env; + } + + @Test + public void test() throws Exception { + StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE); + iterated = new boolean[PARALLELISM]; + +
[5/8] flink git commit: [maven] [quickfix] Don't exclude Zookeeper in flink-streaming-connectors
[maven] [quickfix] Don't exclude Zookeeper in flink-streaming-connectors Running package locally resulted in the following error: [ERROR] Failed to execute goal ... on project flink-streaming-connectors: Compilation failure [ERROR] .../KafkaTopicUtils.java:[57,25] cannot access org.apache.zookeeper.Watcher [ERROR] class file for org.apache.zookeeper.Watcher not found Mysteriously, this does not happen on Travis. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4363f988 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4363f988 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4363f988 Branch: refs/heads/release-0.9.0-milestone-1 Commit: 4363f9884e83f610c33283140adf2a4efb836372 Parents: 52ebb29 Author: Ufuk Celebi u...@apache.org Authored: Tue Apr 7 17:33:23 2015 +0200 Committer: Ufuk Celebi u...@apache.org Committed: Tue Apr 7 17:56:44 2015 +0200 -- flink-staging/flink-streaming/flink-streaming-connectors/pom.xml | 4 1 file changed, 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4363f988/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml -- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml index 8155100..fa03728 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml +++ b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml @@ -56,10 +56,6 @@ under the License. version${kafka.version}/version exclusions exclusion - groupIdorg.apache.zookeeper/groupId - artifactIdzookeeper/artifactId - /exclusion - exclusion groupIdcom.sun.jmx/groupId artifactIdjmxri/artifactId /exclusion
[1/8] flink git commit: [FLINK-1560] [streaming] Streaming example ITCases cleanup
Repository: flink Updated Branches: refs/heads/release-0.9.0-milestone-1 90dc58b55 - 287b4e9a3 (forced update) [FLINK-1560] [streaming] Streaming example ITCases cleanup This closes #519 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/954beca7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/954beca7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/954beca7 Branch: refs/heads/release-0.9.0-milestone-1 Commit: 954beca7ec9c8ebadfaf11c77c37e493f190554b Parents: 464e782 Author: mbalassi mbala...@apache.org Authored: Mon Apr 6 23:46:06 2015 +0200 Committer: mbalassi mbala...@apache.org Committed: Tue Apr 7 16:09:37 2015 +0200 -- .../apache/flink/streaming/util/TestStreamEnvironment.java | 1 + .../test/socket/SocketTextStreamWordCountITCase.java | 8 +--- .../examples/test/windowing/SessionWindowingITCase.java | 1 - .../test/windowing/TopSpeedWindowingExampleITCase.java | 2 +- .../main/java/org/apache/flink/test/util/TestBaseUtils.java | 2 ++ 5 files changed, 9 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java -- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index 8ddf9e6..f7843cf 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -46,6 +46,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism){ this.executor = executor; + setDefaultLocalParallelism(parallelism); setParallelism(parallelism); } http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java -- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java index b16a85f..0af8fe2 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.examples.test.socket; +import org.apache.flink.runtime.net.NetUtils; import org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount; import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; @@ -29,14 +30,15 @@ import java.net.Socket; public class SocketTextStreamWordCountITCase extends StreamingProgramTestBase { private static final String HOST = localhost; - private static final String PORT = ; + private static Integer port; protected String resultPath; private ServerSocket temporarySocket; @Override protected void preSubmit() throws Exception { - temporarySocket = createSocket(HOST, Integer.valueOf(PORT), WordCountData.TEXT); + port = NetUtils.getAvailablePort(); + temporarySocket = createSocket(HOST, port, WordCountData.TEXT); resultPath = getTempDirPath(result); } @@ -48,7 +50,7 @@ public class SocketTextStreamWordCountITCase extends StreamingProgramTestBase { @Override protected void testProgram() throws Exception { - SocketTextStreamWordCount.main(new String[]{HOST, PORT, resultPath}); + SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath}); } public ServerSocket createSocket(String host, int port, String contents) throws Exception {
[7/8] flink git commit: [runtime] Fix TaskManager's BLOB service host lookup when connecting to the JobManager
[runtime] Fix TaskManager's BLOB service host lookup when connecting to the JobManager Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b89855a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b89855a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b89855a Branch: refs/heads/release-0.9.0-milestone-1 Commit: 4b89855aaab50ec785a0c5e0e19124f8f9ea9440 Parents: 09bd1f8 Author: Stephan Ewen se...@apache.org Authored: Tue Apr 7 17:40:01 2015 +0200 Committer: Stephan Ewen se...@apache.org Committed: Tue Apr 7 18:13:01 2015 +0200 -- .../flink/runtime/taskmanager/TaskManager.scala | 15 --- .../flink/runtime/taskmanager/TaskManagerTest.java | 2 +- 2 files changed, 13 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4b89855a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala -- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 7d60c00..d6b91ec 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -608,6 +608,16 @@ extends Actor with ActorLogMessages with ActorLogging { id: InstanceID, blobPort: Int): Unit = { +if (jobManager == null) { + throw new NullPointerException(jobManager may not be null) +} +if (id == null) { + throw new NullPointerException(instance ID may not be null) +} +if (blobPort = 0 || blobPort 65535) { + throw new IllegalArgumentException(blob port is out of range: + blobPort) +} + // sanity check that we are not currently registered with a different JobManager if (isConnected) { if (currentJobManager.get == jobManager) { @@ -644,9 +654,8 @@ extends Actor with ActorLogMessages with ActorLogging { // start a blob service, if a blob server is specified if (blobPort 0) { - val address = new InetSocketAddress( -currentJobManager.flatMap(_.path.address.host).getOrElse(localhost), -blobPort) + val jmHost = jobManager.path.address.host.getOrElse(localhost) + val address = new InetSocketAddress(jmHost, blobPort) LOG.info(Determined BLOB server address to be {}. Starting BLOB cache., address) http://git-wip-us.apache.org/repos/asf/flink/blob/4b89855a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index e736a55..760b14e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -566,7 +566,7 @@ public class TaskManagerTest { if (message instanceof RegistrationMessages.RegisterTaskManager) { final InstanceID iid = new InstanceID(); final ActorRef self = getSelf(); - getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, -1), self); + getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, 12345), self); } else if(message instanceof TaskMessages.UpdateTaskExecutionState){ getSender().tell(true, getSelf());
[2/8] flink git commit: [FLINK-1560] [streaming] Added ITCases to streaming examples
[FLINK-1560] [streaming] Added ITCases to streaming examples Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/464e7828 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/464e7828 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/464e7828 Branch: refs/heads/release-0.9.0-milestone-1 Commit: 464e782868e1f697809d681ce7f8528bef4f2bdb Parents: ed7d165 Author: szape nemderogator...@gmail.com Authored: Wed Mar 25 09:35:39 2015 +0100 Committer: mbalassi mbala...@apache.org Committed: Tue Apr 7 16:09:37 2015 +0200 -- .../examples/iteration/IterateExample.java | 70 .../iteration/util/IterateExampleData.java | 32 .../examples/join/util/WindowJoinData.java | 66 .../util/IncrementalLearningSkeletonData.java | 34 .../twitter/util/TwitterStreamData.java | 27 +++ .../examples/windowing/SessionWindowing.java| 55 ++- .../windowing/util/SessionWindowingData.java| 27 +++ .../util/TopSpeedWindowingExampleData.java | 165 +++ .../test/iteration/IterateExampleITCase.java| 45 + .../examples/test/join/WindowJoinITCase.java| 48 ++ .../ml/IncrementalLearningSkeletonITCase.java | 42 + .../socket/SocketTextStreamWordCountITCase.java | 94 +++ .../test/twitter/TwitterStreamITCase.java | 42 + .../test/windowing/SessionWindowingITCase.java | 43 + .../TopSpeedWindowingExampleITCase.java | 45 + 15 files changed, 789 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java -- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index d9a8167..f5f2cd7 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -26,32 +26,24 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream; import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.streaming.api.windowing.helper.Time; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import java.util.Random; -import java.util.concurrent.TimeUnit; /** - * Example illustrating iterations in Flink streaming. + * Example illustrating iterations in Flink streaming. p/ p The program sums up random numbers and counts additions + * it performs to reach a specific threshold in an iterative streaming fashion. /p * p/ - * p - * The program sums up random numbers and counts additions it performs to reach - * a specific threshold in an iterative streaming fashion. - * /p * p/ - * p/ - * This example shows how to use: - * ul - * listreaming iterations, - * libuffer timeout to enhance latency, - * lidirected outputs. - * /ul + * This example shows how to use: ul listreaming iterations, libuffer timeout to enhance latency, lidirected + * outputs. /ul */ public class IterateExample { + private static final int BOUND = 100; + // * // PROGRAM // * @@ -71,7 +63,7 @@ public class IterateExample { // create input stream of integer pairs DataStreamTuple2Integer, Integer inputStream; - if(fileInput) { + if (fileInput) { inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap()); } else { inputStream = env.addSource(new RandomFibonacciSource()); @@ -94,10 +86,7 @@ public class IterateExample { // 'output' channel then get the input pairs that have the greatest iteration counter // on a 1 second sliding window DataStreamTuple2Tuple2Integer, Integer, Integer numbers = step.select(output) - .map(new OutputMap()) - .window(Time.of(1L, TimeUnit.SECONDS)) -
[3/8] flink git commit: [FLINK-1560] [streaming] Streaming examples rework
[FLINK-1560] [streaming] Streaming examples rework Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed7d1653 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed7d1653 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed7d1653 Branch: refs/heads/release-0.9.0-milestone-1 Commit: ed7d16534f55ca3ee43b2c5110c71bfa224e7144 Parents: d33b445 Author: szape nemderogator...@gmail.com Authored: Wed Mar 25 09:26:01 2015 +0100 Committer: mbalassi mbala...@apache.org Committed: Tue Apr 7 16:09:37 2015 +0200 -- .../examples/iteration/IterateExample.java | 159 ++- .../streaming/examples/join/WindowJoin.java | 87 -- .../ml/IncrementalLearningSkeleton.java | 113 ++--- .../socket/SocketTextStreamWordCount.java | 28 ++-- .../examples/twitter/TwitterStream.java | 35 ++-- .../windowing/TopSpeedWindowingExample.java | 60 +-- .../examples/wordcount/PojoExample.java | 3 +- .../test/wordcount/WordCountITCase.java | 4 +- 8 files changed, 371 insertions(+), 118 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java -- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index bbd5433..d9a8167 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -17,29 +17,32 @@ package org.apache.flink.streaming.examples.iteration; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.IterativeDataStream; import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.helper.Time; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; /** * Example illustrating iterations in Flink streaming. - * + * p/ * p * The program sums up random numbers and counts additions it performs to reach * a specific threshold in an iterative streaming fashion. * /p - * - * p + * p/ + * p/ * This example shows how to use: * ul * listreaming iterations, @@ -59,35 +62,44 @@ public class IterateExample { return; } - // set up input for the stream of (0,0) pairs - ListTuple2Double, Integer input = new ArrayListTuple2Double, Integer(); - for (int i = 0; i 1000; i++) { - input.add(new Tuple2Double, Integer(0., 0)); - } + // set up input for the stream of integer pairs - // obtain execution environment and set setBufferTimeout(0) to enable + // obtain execution environment and set setBufferTimeout to 1 to enable // continuous flushing of the output buffers (lowest latency) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() .setBufferTimeout(1); + // create input stream of integer pairs + DataStreamTuple2Integer, Integer inputStream; + if(fileInput) { + inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap()); + } else { + inputStream = env.addSource(new RandomFibonacciSource()); + } + // create an iterative data stream from the input with 5 second timeout - IterativeDataStreamTuple2Double,
[1/3] flink git commit: Remove 'executable' permission from various non-executable source files
Repository: flink Updated Branches: refs/heads/master 4b89855aa - 07aff921b Remove 'executable' permission from various non-executable source files Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/46925645 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/46925645 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/46925645 Branch: refs/heads/master Commit: 469256458c51ccf9035c4d0b6083c072d9858452 Parents: 4b89855 Author: Stephan Ewen se...@apache.org Authored: Tue Apr 7 19:09:02 2015 +0200 Committer: Stephan Ewen se...@apache.org Committed: Tue Apr 7 19:12:25 2015 +0200 -- docs/css/main/main.css | 0 docs/favicon.ico | Bin docs/favicon.png | Bin docs/img/ClientJmTm.svg| 0 docs/img/job_and_execution_graph.svg | 0 docs/img/logo.png | Bin docs/img/plan_visualizer.png | Bin docs/img/slots.svg | 0 docs/img/state_machine.svg | 0 .../main/resources/web-docs/css/nephelefrontend.css| 0 .../src/main/resources/web-docs/img/GradientBoxes.png | Bin .../src/main/resources/web-docs/img/gradient.jpg | Bin flink-clients/src/main/resources/web-docs/launch.html | 0 .../org/apache/flink/types/NullFieldException.java | 0 .../resources/web-docs-infoserver/css/bootstrap.css| 0 .../web-docs-infoserver/css/bootstrap.min.css | 0 .../resources/web-docs-infoserver/css/sb-admin.css | 0 .../font-awesome/css/font-awesome.css | 0 .../font-awesome/css/font-awesome.min.css | 0 .../font-awesome/fonts/FontAwesome.otf | Bin .../font-awesome/fonts/fontawesome-webfont.eot | Bin .../font-awesome/fonts/fontawesome-webfont.svg | 0 .../font-awesome/fonts/fontawesome-webfont.ttf | Bin .../font-awesome/fonts/fontawesome-webfont.woff| Bin .../font-awesome/less/bordered-pulled.less | 0 .../web-docs-infoserver/font-awesome/less/core.less| 0 .../font-awesome/less/fixed-width.less | 0 .../font-awesome/less/font-awesome.less| 0 .../web-docs-infoserver/font-awesome/less/icons.less | 0 .../web-docs-infoserver/font-awesome/less/larger.less | 0 .../web-docs-infoserver/font-awesome/less/list.less| 0 .../web-docs-infoserver/font-awesome/less/mixins.less | 0 .../web-docs-infoserver/font-awesome/less/path.less| 0 .../font-awesome/less/rotated-flipped.less | 0 .../font-awesome/less/spinning.less| 0 .../web-docs-infoserver/font-awesome/less/stacked.less | 0 .../font-awesome/less/variables.less | 0 .../font-awesome/scss/_bordered-pulled.scss| 0 .../web-docs-infoserver/font-awesome/scss/_core.scss | 0 .../font-awesome/scss/_fixed-width.scss| 0 .../web-docs-infoserver/font-awesome/scss/_icons.scss | 0 .../web-docs-infoserver/font-awesome/scss/_larger.scss | 0 .../web-docs-infoserver/font-awesome/scss/_list.scss | 0 .../web-docs-infoserver/font-awesome/scss/_mixins.scss | 0 .../web-docs-infoserver/font-awesome/scss/_path.scss | 0 .../font-awesome/scss/_rotated-flipped.scss| 0 .../font-awesome/scss/_spinning.scss | 0 .../font-awesome/scss/_stacked.scss| 0 .../font-awesome/scss/_variables.scss | 0 .../font-awesome/scss/font-awesome.scss| 0 .../main/resources/web-docs-infoserver/js/bootstrap.js | 0 flink-staging/flink-gelly/pom.xml | 0 .../test/example/LabelPropagationExampleITCase.java| 0 .../apache/flink/addons/hbase/TableInputFormat.java| 0 .../flink/addons/hbase/example/HBaseReadExample.java | 0 .../flink-hbase/src/test/resources/log4j.properties| 0 .../flink/streaming/connectors/rabbitmq/RMQSource.java | 0 .../streaming/connectors/rabbitmq/RMQTopology.java | 0 .../streaming/api/datastream/ConnectedDataStream.java | 0 .../flink/streaming/api/datastream/DataStreamSink.java | 0 .../streaming/api/datastream/DataStreamSource.java | 0 .../streaming/api/datastream/GroupedDataStream.java| 0 .../api/datastream/SingleOutputStreamOperator.java | 0 .../streaming/api/datastream/SplitDataStream.java | 0 .../api/environment/LocalStreamEnvironment.java| 0 .../streaming/api/function/co/RichCoMapFunction.java | 0 .../streaming/api/function/sink/PrintSinkFunction.java | 0 .../streaming/api/function/sink/RichSinkFunction.java | 0
[3/3] flink git commit: [licences] Updated LICENSE and NOTICE file of the binary distribution to contain dependencies of 'flink-dist'.
[licences] Updated LICENSE and NOTICE file of the binary distribution to contain dependencies of 'flink-dist'. This means that the LICENSE and NOTICE files reflect all dependencies that we include in the binary distribution, but not those of artifacts pulled only through maven. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07aff921 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07aff921 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07aff921 Branch: refs/heads/master Commit: 07aff921b3254c17997497ed19fd1771b231b4bf Parents: 71ca958 Author: Stephan Ewen se...@apache.org Authored: Tue Apr 7 21:08:35 2015 +0200 Committer: Stephan Ewen se...@apache.org Committed: Tue Apr 7 23:00:41 2015 +0200 -- flink-dist/src/main/flink-bin/LICENSE | 71 +--- flink-dist/src/main/flink-bin/NOTICE | 267 - 2 files changed, 38 insertions(+), 300 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/07aff921/flink-dist/src/main/flink-bin/LICENSE -- diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE index 8c733e4..d348bbc 100644 --- a/flink-dist/src/main/flink-bin/LICENSE +++ b/flink-dist/src/main/flink-bin/LICENSE @@ -224,33 +224,27 @@ under the Apache License (v 2.0): - Apache Commons CLI (http://commons.apache.org/cli/) - Apache Commons FileUpload (http://commons.apache.org/fileupload/) - Apache Commons IO (http://commons.apache.org/io/) - - Apache Commons Lang v.3.3.2 (http://commons.apache.org/proper/commons-lang/) + - Apache Commons Lang 3 v.3.3.2 (http://commons.apache.org/proper/commons-lang/) - Apache Commons Math (http://commons.apache.org/proper/commons-math/) - - Apache Avro (http://avro.apache.org) + - Apache log4j 1.2 (http://logging.apache.org/log4j/1.2/) + - Apache Avro v1.7.6 (http://avro.apache.org) - Apache Hadoop (http://hadoop.apache.org) - - Apache HBase Client (http://hbase.apache.org) - - Apache Derby (http://db.apache.org/derby/) - - Apache Kafka (http://kafka.apache.org) - - Apache Flume (http://flume.apache.org) - Apache Sling (http://sling.apache.org) - Apache Thrift (http://thrift.apache.org) - Google Guava (https://code.google.com/p/guava-libraries/) - Google Protocol Buffers (https://github.com/google/protobuf/) - Netty v4.0.21 (http://netty.io) - - Powermock (http://www.powermock.org) - Javassist (http://www.javassist.org) - Kryo-Serializers (https://github.com/magro/kryo-serializers/) - Joda-Time (http://www.joda.org/joda-time/) - Chill_2.10 v0.5.1 (https://github.com/twitter/chill) - - Jetty Web Container (http://www.eclipse.org/jetty/) + - Jetty Server, Jetty Security, Jetty Servlet (http://www.eclipse.org/jetty/) - Amazon Web Services SDK for Java (http://aws.amazon.com/sdkforjava/) - - ScalaTest (http://www.scalatest.org) - StartBootstrap (http://startbootstrap.com) - CHAP Links Library Timeline (http://almende.github.io/chap-links-library/) - - Twitter Hosebird Client (hbc) (https://github.com/twitter/hbc) - Jettison (http://jettison.codehaus.org) - Akka (http://akka.io) - - Breeze (https://github.com/scalanlp/breeze) + - Metrics (https://github.com/dropwizard/metrics) --- @@ -272,6 +266,7 @@ The Apache Flink project bundles the following components under the MIT License: - Font Awesome - Code (http://fortawesome.github.io/Font-Awesome/) - Copyright (c) 2014 Dave Gandy - D3 dagre renderer (https://github.com/cpettitt/dagre-d3) - Copyright (c) 2012-2013 Chris Pettitt - spymemcached (http://code.google.com/p/spymemcached/) - Copyright (c) 2006-2009 Dustin Sallings, Copyright (c) 2009-2011 Couchbase, Inc. + - scopt (https://github.com/scopt/scopt) - Copyright (c) scopt contributors All rights reserved. @@ -303,12 +298,8 @@ The Apache Flink project bundles the following components under BSD-style licenses: [3-clause BSD license] - - core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core) - Kryo (https://github.com/EsotericSoftware/kryo) - Copyright (c) 2008, Nathan Sweet - D3 (http://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock - - LevelDB JNI (https://github.com/fusesource/leveldbjni/) - Copyright (c) 2011, FuseSource Corp. - - Memcached (https://github.com/memcached/memcached) - Copyright (c) 2003, Danga Interactive, Inc. - - Redis (http://redis.io/) - Copyright (c) 2009, Salvatore Sanfilippo and Pieter Noordhuis [BSD-like License] - Scala Library (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc. @@ -349,21 +340,6 @@ POSSIBILITY OF SUCH DAMAGE.
[2/3] flink git commit: [contrib] Remove redundant LICENSE file in 'flink-contrib/docker-flink'
[contrib] Remove redundant LICENSE file in 'flink-contrib/docker-flink' Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71ca958a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71ca958a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71ca958a Branch: refs/heads/master Commit: 71ca958a2dbb94f1f309603130ac403751e55e63 Parents: 4692564 Author: Stephan Ewen se...@apache.org Authored: Tue Apr 7 19:44:51 2015 +0200 Committer: Stephan Ewen se...@apache.org Committed: Tue Apr 7 19:44:51 2015 +0200 -- flink-contrib/docker-flink/LICENSE | 202 1 file changed, 202 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/71ca958a/flink-contrib/docker-flink/LICENSE -- diff --git a/flink-contrib/docker-flink/LICENSE b/flink-contrib/docker-flink/LICENSE deleted file mode 100644 index e06d208..000 --- a/flink-contrib/docker-flink/LICENSE +++ /dev/null @@ -1,202 +0,0 @@ -Apache License - Version 2.0, January 2004 -http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - License shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - Licensor shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - Legal Entity shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - control means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - You (or Your) shall mean an individual or Legal Entity - exercising permissions granted by this License. - - Source form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - Object form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - Work shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - Derivative Works shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - Contribution shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, submitted - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as Not a Contribution. - - Contributor shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works
buildbot success in ASF Buildbot on flink-docs-master
The Buildbot has detected a restored build on builder flink-docs-master while building ASF Buildbot. Full details are available at: http://ci.apache.org/builders/flink-docs-master/builds/63 Buildbot URL: http://ci.apache.org/ Buildslave for this Build: orcus_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered this build Build Source Stamp: [branch master] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot