flink git commit: Fix typo in exception message from assiged to assigned in ExecutionJobVertex.

2015-02-24 Thread hsaputra
Repository: flink
Updated Branches:
  refs/heads/master bfbbbf906 -> 64c302f8f


Fix typo in exception message from assiged to assigned in ExecutionJobVertex.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64c302f8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64c302f8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64c302f8

Branch: refs/heads/master
Commit: 64c302f8f4d211a2cd10764e3090b4f9bdde436c
Parents: bfbbbf9
Author: Henry Saputra 
Authored: Tue Feb 24 15:59:35 2015 -0800
Committer: Henry Saputra 
Committed: Tue Feb 24 15:59:35 2015 -0800

--
 .../apache/flink/runtime/executiongraph/ExecutionJobVertex.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/64c302f8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 9df9bd5..0444e5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -132,7 +132,7 @@ public class ExecutionJobVertex implements Serializable {
// sanity check for the double referencing between intermediate 
result partitions and execution vertices
for (IntermediateResult ir : this.producedDataSets) {
if (ir.getNumberOfAssignedPartitions() != parallelism) {
-   throw new RuntimeException("The intermediate 
result's partitions were not correctly assiged.");
+   throw new RuntimeException("The intermediate 
result's partitions were not correctly assigned.");
}
}




flink git commit: fixed package statement

2015-02-24 Thread hsaputra
Repository: flink
Updated Branches:
  refs/heads/master ed8b26bf2 -> bfbbbf906


fixed package statement

package statement was not updated after class movement
also added a missing import

Author: mjsax 

Closes #437 from mjsax/bug_fix_package_statement and squashes the following 
commits:

1470b0f [mjsax] fixed package statement


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfbbbf90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfbbbf90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfbbbf90

Branch: refs/heads/master
Commit: bfbbbf90627945e249f05c610b7ba8790c069bee
Parents: ed8b26b
Author: mjsax 
Authored: Tue Feb 24 14:27:22 2015 -0800
Committer: Henry Saputra 
Committed: Tue Feb 24 14:27:22 2015 -0800

--
 .../typeutils/runtime/kryo/KryoWithCustomSerializersTest.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/bfbbbf90/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
--
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
index 155010e..d68afd6 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.api.java.typeutils.runtime;
+package org.apache.flink.api.java.typeutils.runtime.kryo;
 
 import java.util.Collection;
 import java.util.HashSet;
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest;
 import org.joda.time.LocalDate;
 import org.junit.Test;
 



[1/2] flink git commit: [FLINK-1580] [FLINK-1590] [runtime] Various cleanups and improvements in the TaskManager initialization

2015-02-24 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master 4883af675 -> ed8b26bf2


http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
--
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
index aea9727..bed8f19 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -20,12 +20,13 @@ package org.apache.flink.yarn.appMaster;
 
 import java.io.IOException;
 import java.security.PrivilegedAction;
-import java.util.Arrays;
 import java.util.Map;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import org.apache.flink.yarn.YarnUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.yarn.YarnTaskManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.yarn.FlinkYarnClient;
@@ -33,36 +34,64 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import scala.Tuple2;
 
+/**
+ * The entry point for running a TaskManager in a YARN container. The YARN 
container will invoke
+ * this class' main method.
+ */
 public class YarnTaskManagerRunner {
 
private static final Logger LOG = 
LoggerFactory.getLogger(YarnTaskManagerRunner.class);
 
+
public static void main(final String[] args) throws IOException {
-   Map envs = System.getenv();
+
+   EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
TaskManager");
+   EnvironmentInformation.checkJavaVersion();
+
+   // try to parse the command line arguments
+   final Configuration configuration;
+   try {
+   configuration = 
TaskManager.parseArgsAndLoadConfig(args);
+   }
+   catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
+   System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+   return;
+   }
+
+   // read the environment variables for YARN
+   final Map envs = System.getenv();
final String yarnClientUsername = 
envs.get(FlinkYarnClient.ENV_CLIENT_USERNAME);
final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
 
// configure local directory
-   final String[] newArgs = Arrays.copyOf(args, args.length + 2);
-   newArgs[newArgs.length-2] = "--tempDir";
-   newArgs[newArgs.length-1] = localDirs;
-   LOG.info("Setting log path "+localDirs);
-   LOG.info("YARN daemon runs as 
'"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
-   + " user to execute Flink TaskManager to 
'"+yarnClientUsername+"'");
+   String flinkTempDirs = 
configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
+   if (flinkTempDirs == null) {
+   LOG.info("Setting directories for temporary file " + 
localDirs);
+   
configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, localDirs);
+   }
+   else {
+   LOG.info("Overriding YARN's temporary file directories 
with those " +
+   "specified in the Flink config: " + 
flinkTempDirs);
+   }
+
+   LOG.info("YARN daemon runs as '" + 
UserGroupInformation.getCurrentUser().getShortUserName()
+   +"' setting user to execute Flink TaskManager 
to '"+yarnClientUsername+"'");
+
UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(yarnClientUsername);
-   for(Token toks : 
UserGroupInformation.getCurrentUser().getTokens()) {
+   for (Token toks : 
UserGroupInformation.getCurrentUser().getTokens()) {
ugi.addToken(toks);
}
ugi.doAs(new PrivilegedAction() {
@Override
public Object run() {
try {
-   Tuple2 tuple = 
YarnUtils.startActorSystemAndTaskManager(newArgs);
-   tuple._1().awaitTermination();
-   } catch (Exception

[2/2] flink git commit: [FLINK-1580] [FLINK-1590] [runtime] Various cleanups and improvements in the TaskManager initialization

2015-02-24 Thread sewen
[FLINK-1580] [FLINK-1590] [runtime] Various cleanups and improvements in the 
TaskManager initialization

 - Better checks during TaskManager startup
 - More robust initialization of TaskManager actor system and actor
 - Fix memory accounting during TaskManager startup
 - Better logging for TaskManagers started through YARN
 - Remove command line parameter hacking fro YARN TaskManagers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed8b26bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed8b26bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed8b26bf

Branch: refs/heads/master
Commit: ed8b26bf2e8dd7c187c24ad0d8ff3e67f6a7478c
Parents: 4883af6
Author: Stephan Ewen 
Authored: Tue Feb 24 20:23:47 2015 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 24 20:30:29 2015 +0100

--
 .../org/apache/flink/runtime/net/NetUtils.java  |   2 +-
 .../runtime/util/EnvironmentInformation.java|  98 ++-
 .../apache/flink/runtime/util/MathUtils.java|  10 +
 .../flink/runtime/jobmanager/JobManager.scala   |  50 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  57 +-
 .../minicluster/LocalFlinkMiniCluster.scala |  56 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 644 +--
 .../TaskManagerCLIConfiguration.scala   |   3 +-
 .../taskmanager/TaskManagerConfiguration.scala  |   7 +-
 .../taskmanager/TaskManagerProfiler.scala   |   4 +-
 .../TaskManagerProcessReapingTest.java  |   6 +-
 .../runtime/taskmanager/TaskManagerTest.java|   5 +-
 .../apache/flink/runtime/util/MathUtilTest.java |  34 +-
 .../TaskManagerRegistrationITCase.scala |  10 +-
 .../runtime/testingUtils/TestingCluster.scala   |  18 +-
 .../testingUtils/TestingTaskManager.scala   |  25 +-
 .../runtime/testingUtils/TestingUtils.scala |  38 +-
 .../test/util/ForkableFlinkMiniCluster.scala|  32 +-
 .../yarn/appMaster/YarnTaskManagerRunner.java   |  63 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |  18 +-
 .../scala/org/apache/flink/yarn/YarnUtils.scala |  45 --
 21 files changed, 723 insertions(+), 502 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 73504e9..8e0a41a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -107,7 +107,7 @@ public class NetUtils {
break;

default:
-   throw new 
RuntimeException("Unkown address detection strategy: " + strategy);
+   throw new 
RuntimeException("Unknown address detection strategy: " + strategy);
}
}
}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index 535c756..d2147e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -36,12 +36,6 @@ public class EnvironmentInformation {
private static final Logger LOG = 
LoggerFactory.getLogger(EnvironmentInformation.class);
 
private static final String UNKNOWN = "";
-   
-   private static final String[] IGNORED_STARTUP_OPTIONS = {
-   
"-Dlog.file",
-   
"-Dlogback.configurationFile",
-   
"-Dlog4j.configuration"
-   
};
 
/**
 * Returns the version of the code as String. If version == null, then 
the JobManager does not run from a
@@ -55,7 +49,7 @@ public class Envir

flink git commit: [FLINK-1596] [runtime] Remove space in temp filename

2015-02-24 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/release-0.8 37cddde20 -> 941712941


[FLINK-1596] [runtime] Remove space in temp filename


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94171294
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94171294
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94171294

Branch: refs/heads/release-0.8
Commit: 9417129413b804c6d758bf74f230f0109d3ecdcf
Parents: 37cddde
Author: Johannes 
Authored: Sat Feb 21 21:32:34 2015 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 24 20:29:43 2015 +0100

--
 .../org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/94171294/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index d6c1ce0..c29e7d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -157,7 +157,7 @@ public interface FileIOChannel {
 
public ID next() {
int threadNum = counter % paths.length;
-   String filename = String.format(" %s.%06d.channel", 
namePrefix, (counter++));
+   String filename = String.format("%s.%06d.channel", 
namePrefix, (counter++));
return new ID(new File(paths[threadNum], filename), 
threadNum);
}
}



flink git commit: Small cleanup to truncate some lines that are too long for easy read of the code.

2015-02-24 Thread hsaputra
Repository: flink
Updated Branches:
  refs/heads/master 1230bcaa0 -> 4883af675


Small cleanup to truncate some lines that are too long for easy read of the 
code.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4883af67
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4883af67
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4883af67

Branch: refs/heads/master
Commit: 4883af675e19d8a9c750a83b3f2c019583e6bf7f
Parents: 1230bca
Author: Henry Saputra 
Authored: Tue Feb 24 10:57:33 2015 -0800
Committer: Henry Saputra 
Committed: Tue Feb 24 10:58:09 2015 -0800

--
 .../network/partition/IntermediateResultPartition.java  | 12 
 .../partition/queue/PipelinedPartitionQueue.java|  3 ++-
 2 files changed, 10 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4883af67/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
index 71af7a6..80bd38d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
@@ -213,7 +213,8 @@ public class IntermediateResultPartition implements 
BufferPoolOwner {
// Consume
// 

 
-   public IntermediateResultPartitionQueueIterator getQueueIterator(int 
queueIndex, Optional bufferProvider) throws IOException {
+   public IntermediateResultPartitionQueueIterator getQueueIterator(int 
queueIndex, Optional bufferProvider)
+   throws IOException {
synchronized (queues) {
if (isReleased) {
throw new 
IllegalQueueIteratorRequestException("Intermediate result partition has already 
been released.");
@@ -231,7 +232,8 @@ public class IntermediateResultPartition implements 
BufferPoolOwner {
 
@Override
public String toString() {
-   return "Intermediate result partition " + partitionId + " [num 
queues: " + queues.length + ", " + (isFinished ? "finished" : "not finished") + 
"]";
+   return "Intermediate result partition " + partitionId + " [num 
queues: " + queues.length + ", "
+   + (isFinished ? "finished" : "not finished") + 
"]";
}
 
private void checkInProducePhase() {
@@ -296,7 +298,8 @@ public class IntermediateResultPartition implements 
BufferPoolOwner {
 
// 

 
-   public static IntermediateResultPartition create(RuntimeEnvironment 
environment, int partitionIndex, JobID jobId, ExecutionAttemptID executionId, 
NetworkEnvironment networkEnvironment, PartitionDeploymentDescriptor desc) {
+   public static IntermediateResultPartition create(RuntimeEnvironment 
environment, int partitionIndex, JobID jobId,
+   ExecutionAttemptID executionId, NetworkEnvironment 
networkEnvironment, PartitionDeploymentDescriptor desc) {
final IntermediateResultPartitionID partitionId = 
checkNotNull(desc.getPartitionId());
final IntermediateResultPartitionType partitionType = 
checkNotNull(desc.getPartitionType());
 
@@ -307,6 +310,7 @@ public class IntermediateResultPartition implements 
BufferPoolOwner {
partitionQueues[i] = new PipelinedPartitionQueue();
}
 
-   return new IntermediateResultPartition(environment, 
partitionIndex, jobId, executionId, partitionId, partitionType, 
partitionQueues, networkEnvironment);
+   return new IntermediateResultPartition(environment, 
partitionIndex, jobId, executionId, partitionId, partitionType,
+   partitionQueues, networkEnvironment);
}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4883af67/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java
index a24cdeb..5d562e4 100644
--- 
a/flink-runtime/src/main/java/

[2/6] flink git commit: [FLINK-1596] [runtime] Remove space in temp filename

2015-02-24 Thread sewen
[FLINK-1596] [runtime] Remove space in temp filename

This closes #431


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98bc7b95
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98bc7b95
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98bc7b95

Branch: refs/heads/master
Commit: 98bc7b951b30961871958a4483e0b186bfb785b8
Parents: c111444
Author: Johannes 
Authored: Sat Feb 21 21:32:34 2015 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 24 00:08:27 2015 +0100

--
 .../org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/98bc7b95/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index e00568e..c5a3daa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -164,7 +164,7 @@ public interface FileIOChannel {
 
public ID next() {
int threadNum = counter % paths.length;
-   String filename = String.format(" %s.%06d.channel", 
namePrefix, (counter++));
+   String filename = String.format("%s.%06d.channel", 
namePrefix, (counter++));
return new ID(new File(paths[threadNum], filename), 
threadNum);
}
}



[6/6] flink git commit: [tests] Add process reaping test for TaskManager, improves process reaping test for JobManager.

2015-02-24 Thread sewen
[tests] Add process reaping test for TaskManager, improves process reaping test 
for JobManager.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1230bcaa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1230bcaa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1230bcaa

Branch: refs/heads/master
Commit: 1230bcaa0f531f6260f291dd066f64fe52cc6708
Parents: 70df028
Author: Stephan Ewen 
Authored: Tue Feb 24 12:23:59 2015 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 24 12:23:59 2015 +0100

--
 .../flink/runtime/jobmanager/JobManager.scala   |  94 
 .../flink/runtime/taskmanager/TaskManager.scala |  68 +-
 .../JobManagerProcessReapingTest.java   |  83 ++-
 .../TaskManagerProcessReapingTest.java  | 236 +++
 .../runtime/testutils/CommonTestUtils.java  |  24 ++
 5 files changed, 447 insertions(+), 58 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/1230bcaa/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
--
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a1642b4..2671f2d 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -94,17 +94,18 @@ class JobManager(val configuration: Configuration,
  val profiler: Option[ActorRef],
  val defaultExecutionRetries: Int,
  val delayBetweenRetries: Long,
- implicit val timeout: FiniteDuration)
+ val timeout: FiniteDuration)
   extends Actor with ActorLogMessages with ActorLogging {
 
-  import context._
-
-  val LOG = JobManager.LOG
-
-  // List of current jobs running
-  val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, 
JobInfo)]()
+  /** Reference to the log, for debugging */
+  protected val LOG = JobManager.LOG
 
+  /** List of current jobs running jobs */
+  protected val currentJobs = scala.collection.mutable.HashMap[JobID, 
(ExecutionGraph, JobInfo)]()
 
+  /**
+   * Run when the job manager is started. Simply logs an informational message.
+   */
   override def preStart(): Unit = {
 LOG.info(s"Starting JobManager at ${self.path}.")
   }
@@ -138,6 +139,11 @@ class JobManager(val configuration: Configuration,
 }
   }
 
+  /**
+   * Central work method of the JobManager actor. Receives messages and reacts 
to them.
+   *
+   * @return
+   */
   override def receiveWithLogMessages: Receive = {
 
 case RegisterTaskManager(connectionInfo, hardwareInformation, 
numberOfSlots) =>
@@ -182,7 +188,7 @@ class JobManager(val configuration: Configuration,
   // execute the cancellation asynchronously
   Future {
 executionGraph.cancel()
-  }
+  }(context.dispatcher)
 
   sender ! CancellationSuccess(jobID)
 case None =>
@@ -198,10 +204,12 @@ class JobManager(val configuration: Configuration,
 currentJobs.get(taskExecutionState.getJobID) match {
   case Some((executionGraph, _)) =>
 val originalSender = sender
+
 Future {
   val result = executionGraph.updateState(taskExecutionState)
   originalSender ! result
-}
+}(context.dispatcher)
+
 sender ! true
   case None => log.error("Cannot find execution graph for ID {} to 
change state to {}.",
 taskExecutionState.getJobID, taskExecutionState.getExecutionState)
@@ -603,6 +611,7 @@ object JobManager {
 EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
 checkJavaVersion()
 
+// parsing the command line arguments
 val (configuration: Configuration,
  executionMode: ExecutionMode,
  listeningHost: String, listeningPort: Int) =
@@ -617,16 +626,17 @@ object JobManager {
   }
 }
 
-// we may want to check that the JobManager hostname is in the config
+// we want to check that the JobManager hostname is in the config
 // if it is not in there, the actor system will bind to the loopback 
interface's
 // address and will not be reachable from anyone remote
 if (listeningHost == null) {
   val message = "Config parameter '" + 
ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
-"' is missing (hostname or address to bind JobManager to)."
+"' is missing (hostname/address to bind JobManager to)."
   LOG.error(message)
   System.exit(STARTUP_FAILURE_RETURN_CODE)
 }
 
+// run the job manager
 tr

[1/6] flink git commit: [runtime] Improve error handling when submitting a job to the JobManager

2015-02-24 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master c11144470 -> 1230bcaa0


[runtime] Improve error handling when submitting a job to the JobManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ddb5653
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ddb5653
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ddb5653

Branch: refs/heads/master
Commit: 9ddb565314a4efb744167eccbe6de0d3299ac4d0
Parents: 9528a52
Author: Stephan Ewen 
Authored: Mon Feb 23 18:36:07 2015 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 24 00:08:27 2015 +0100

--
 .../org/apache/flink/client/program/Client.java |  15 +-
 .../org/apache/flink/util/ExceptionUtils.java   |  26 ++-
 .../apache/flink/runtime/blob/BlobCache.java|   6 +-
 .../runtime/client/JobExecutionException.java   |   5 +-
 .../librarycache/BlobLibraryCacheManager.java   |  62 --
 .../flink/runtime/executiongraph/Execution.java |   2 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  16 ++
 .../apache/flink/runtime/client/JobClient.scala |   4 +-
 .../flink/runtime/jobmanager/JobInfo.scala  |   1 -
 .../flink/runtime/jobmanager/JobManager.scala   | 207 ++-
 .../runtime/messages/JobManagerMessages.scala   |  27 +--
 .../jobmanager/JobManagerStartupTest.java   |   1 -
 .../flink/runtime/jobmanager/JobSubmitTest.java | 157 ++
 .../runtime/jobmanager/JobManagerITCase.scala   |  18 +-
 14 files changed, 392 insertions(+), 155 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
--
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index bd364ac..5a032a0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -329,8 +329,9 @@ public class Client {
 
try {
JobClient.uploadJarFiles(jobGraph, hostname, client, 
timeout);
-   } catch (IOException e) {
-   throw new ProgramInvocationException("Could not upload 
the programs JAR files to the JobManager.", e);
+   }
+   catch (IOException e) {
+   throw new ProgramInvocationException("Could not upload 
the program's JAR files to the JobManager.", e);
}
 
try{
@@ -340,10 +341,12 @@ public class Client {
else {
JobClient.submitJobDetached(jobGraph, client, 
timeout);
}
-   } catch (JobExecutionException e) {
-   throw new ProgramInvocationException("The program 
execution failed.", e);
-   } catch (Exception e) {
-   throw new ProgramInvocationException("Unexpected 
exception while program execution.", e);
+   }
+   catch (JobExecutionException e) {
+   throw new ProgramInvocationException("The program 
execution failed: " + e.getMessage(), e);
+   }
+   catch (Exception e) {
+   throw new ProgramInvocationException("Exception during 
program execution.", e);
}
finally {
actorSystem.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
--
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 99a098e..9784844 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -24,6 +24,7 @@
 
 package org.apache.flink.util;
 
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
@@ -57,7 +58,7 @@ public class ExceptionUtils {

/**
 * Throws the given {@code Throwable} in scenarios where the signatures 
do not allow you to
-* throw arbitrary Throwables. Errors and RuntimeExceptions are thrown 
directly, other exceptions
+* throw an arbitrary Throwable. Errors and RuntimeExceptions are 
thrown directly, other exceptions
 * are packed into runtime exceptions
 * 
 * @param t The throwable to be thrown.
@@ -76,8 +77,8 @@ public class ExceptionUtils {

/**
 * Throws the given {@code Throwable} in scen

[4/6] flink git commit: [jobmanager] Add a process reaper to kill the JobManager process when the main actor dies.

2015-02-24 Thread sewen
[jobmanager] Add a process reaper to kill the JobManager process when the main 
actor dies.

Also adds various tests for failure behavior during job submission.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5725c721
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5725c721
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5725c721

Branch: refs/heads/master
Commit: 5725c72129fe9a75487204b470a30e850ad4091c
Parents: 9ddb565
Author: Stephan Ewen 
Authored: Mon Feb 23 23:35:10 2015 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 24 10:14:30 2015 +0100

--
 .../flink/runtime/process/ProcessReaper.java|  60 ++
 .../flink/runtime/jobmanager/JobManager.scala   |  65 --
 .../JobManagerProcessReapingTest.java   | 150 +
 .../jobmanager/JobManagerStartupTest.java   |  23 +-
 .../runtime/jobmanager/JobManagerTest.java  |  48 +
 .../flink/runtime/jobmanager/JobSubmitTest.java |  12 +-
 .../runtime/testutils/CommonTestUtils.java  | 127 ++-
 .../testutils/InterruptibleByteChannel.java | 210 ---
 .../runtime/testutils/ServerTestUtils.java  | 181 
 .../JobSubmissionFailsITCase.java   |  11 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |   2 +-
 11 files changed, 408 insertions(+), 481 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
new file mode 100644
index 000..b12b82d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.process;
+
+import akka.actor.ActorRef;
+import akka.actor.Terminated;
+import akka.actor.UntypedActor;
+import org.slf4j.Logger;
+
+/**
+ * Utility actors that monitors other actors and kills the JVM upon
+ * actor termination.
+ */
+public class ProcessReaper extends UntypedActor {
+
+   private final Logger log;
+   private final int exitCode;
+
+   public ProcessReaper(ActorRef watchTarget, Logger log, int exitCode) {
+   if (watchTarget == null || 
watchTarget.equals(ActorRef.noSender())) {
+   throw new IllegalArgumentException("Target may not be 
null or 'noSender'");
+   }
+   this.log = log;
+   this.exitCode = exitCode;
+
+   getContext().watch(watchTarget);
+   }
+
+   @Override
+   public void onReceive(Object message) {
+   if (message instanceof Terminated) {
+   try {
+   Terminated term = (Terminated) message;
+   String name = term.actor().path().name();
+   if (log != null) {
+   log.error("Actor " + name + " 
terminated, stopping process...");
+   }
+   }
+   finally {
+   System.exit(exitCode);
+   }
+   }
+   }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
--
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ce3bc74..a1642b4 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -

[5/6] flink git commit: [tests] Speed up DataSinkTaskTest

2015-02-24 Thread sewen
[tests] Speed up DataSinkTaskTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70df0282
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70df0282
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70df0282

Branch: refs/heads/master
Commit: 70df02824d8738f009c50ba15b7365f943e4ecca
Parents: 5725c72
Author: Stephan Ewen 
Authored: Mon Feb 23 23:36:32 2015 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 24 10:14:30 2015 +0100

--
 .../runtime/operators/DataSinkTaskTest.java | 67 +---
 1 file changed, 30 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/70df0282/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index 1a4b7f1..84fc851 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -72,61 +72,55 @@ public class DataSinkTaskTest extends TaskTestBase
 
@Test
public void testDataSinkTask() {
+   FileReader fr = null;
+   BufferedReader br = null;
+   try {
+   int keyCnt = 100;
+   int valCnt = 20;
 
-   int keyCnt = 100;
-   int valCnt = 20;
+   super.initEnvironment(MEMORY_MANAGER_SIZE, 
NETWORK_BUFFER_SIZE);
+   super.addInput(new UniformRecordGenerator(keyCnt, 
valCnt, false), 0);
 
-   super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
-   super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 
false), 0);
+   DataSinkTask testTask = new 
DataSinkTask();
 
-   DataSinkTask testTask = new DataSinkTask();
+   super.registerFileOutputTask(testTask, 
MockOutputFormat.class, new File(tempTestPath).toURI().toString());
 
-   super.registerFileOutputTask(testTask, MockOutputFormat.class, 
new File(tempTestPath).toURI().toString());
-
-   try {
testTask.invoke();
-   } catch (Exception e) {
-   LOG.debug("Exception while invoking the test task.", e);
-   Assert.fail("Invoke method caused exception.");
-   }
 
-   File tempTestFile = new File(this.tempTestPath);
+   File tempTestFile = new File(this.tempTestPath);
 
-   Assert.assertTrue("Temp output file does not 
exist",tempTestFile.exists());
+   Assert.assertTrue("Temp output file does not exist", 
tempTestFile.exists());
 
-   FileReader fr = null;
-   BufferedReader br = null;
-   try {
fr = new FileReader(tempTestFile);
br = new BufferedReader(fr);
 
-   HashMap> keyValueCountMap = 
new HashMap>(keyCnt);
+   HashMap> keyValueCountMap = 
new HashMap>(keyCnt);
 
-   while(br.ready()) {
+   while (br.ready()) {
String line = br.readLine();
 
-   Integer key = 
Integer.parseInt(line.substring(0,line.indexOf("_")));
-   Integer val = 
Integer.parseInt(line.substring(line.indexOf("_")+1,line.length()));
+   Integer key = 
Integer.parseInt(line.substring(0, line.indexOf("_")));
+   Integer val = 
Integer.parseInt(line.substring(line.indexOf("_") + 1, line.length()));
 
-   if(!keyValueCountMap.containsKey(key)) {
-   keyValueCountMap.put(key,new 
HashSet());
+   if (!keyValueCountMap.containsKey(key)) {
+   keyValueCountMap.put(key, new 
HashSet());
}
keyValueCountMap.get(key).add(val);
}
 
-   Assert.assertTrue("Invalid key count in out file. 
Expected: "+keyCnt+" Actual: "+keyValueCountMap.keySet().size(),
-   keyValueCountMap.keySet().size() == keyCnt);
+   Assert.assertTrue("Invalid key count in out file. 
Expected: " + keyCnt + " Actual: " + keyValueCountMap.keySet().size(),
+   keyValueCountMap.keySet().size() == 
keyCnt)

[3/6] flink git commit: [FLINK-1598] [runtime] Better error message when network serialization of records exceeds java heap space.

2015-02-24 Thread sewen
[FLINK-1598] [runtime] Better error message when network serialization of 
records exceeds java heap space.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9528a521
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9528a521
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9528a521

Branch: refs/heads/master
Commit: 9528a521f56e0c6b0c70d43e62ad84b19c048c36
Parents: 98bc7b9
Author: Stephan Ewen 
Authored: Mon Feb 23 13:45:51 2015 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 24 00:08:27 2015 +0100

--
 .../runtime/util/DataOutputSerializer.java  | 31 
 1 file changed, 25 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/9528a521/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
index 2d06e29..7f8105d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
@@ -268,16 +268,35 @@ public class DataOutputSerializer implements 
DataOutputView {


private void resize(int minCapacityAdd) throws IOException {
+   int newLen = Math.max(this.buffer.length * 2, 
this.buffer.length + minCapacityAdd);
+   byte[] nb;
try {
-   final int newLen = Math.max(this.buffer.length * 2, 
this.buffer.length + minCapacityAdd);
-   final byte[] nb = new byte[newLen];
-   System.arraycopy(this.buffer, 0, nb, 0, this.position);
-   this.buffer = nb;
-   this.wrapper = ByteBuffer.wrap(this.buffer);
+   nb = new byte[newLen];
}
-   catch (NegativeArraySizeException nasex) {
+   catch (NegativeArraySizeException e) {
throw new IOException("Serialization failed because the 
record length would exceed 2GB (max addressable array size in Java).");
}
+   catch (OutOfMemoryError e) {
+   // this was too large to allocate, try the smaller size 
(if possible)
+   if (newLen > this.buffer.length + minCapacityAdd) {
+   newLen = this.buffer.length + minCapacityAdd;
+   try {
+   nb = new byte[newLen];
+   }
+   catch (OutOfMemoryError ee) {
+   // still not possible. give an 
informative exception message that reports the size
+   throw new IOException("Failed to 
serialize element. Serialized size (> "
+   + newLen + " bytes) 
exceeds JVM heap space", ee);
+   }
+   } else {
+   throw new IOException("Failed to serialize 
element. Serialized size (> "
+   + newLen + " bytes) exceeds JVM 
heap space", e);
+   }
+   }
+
+   System.arraycopy(this.buffer, 0, nb, 0, this.position);
+   this.buffer = nb;
+   this.wrapper = ByteBuffer.wrap(this.buffer);
}

@SuppressWarnings("restriction")



svn commit: r1661872 [1/2] - in /flink: _posts/2015-02-06-streaming-example.md site/blog/feed.xml site/blog/index.html site/blog/page2/index.html site/blog/page3/index.html

2015-02-24 Thread rmetzger
Author: rmetzger
Date: Tue Feb 24 09:51:10 2015
New Revision: 1661872

URL: http://svn.apache.org/r1661872
Log:
Remove duplicate streaming blogpost markdown file

Removed:
flink/_posts/2015-02-06-streaming-example.md
Modified:
flink/site/blog/feed.xml
flink/site/blog/index.html
flink/site/blog/page2/index.html
flink/site/blog/page3/index.html

Modified: flink/site/blog/feed.xml
URL: 
http://svn.apache.org/viewvc/flink/site/blog/feed.xml?rev=1661872&r1=1661871&r2=1661872&view=diff
==
Binary files - no diff available.

Modified: flink/site/blog/index.html
URL: 
http://svn.apache.org/viewvc/flink/site/blog/index.html?rev=1661872&r1=1661871&r2=1661872&view=diff
==
--- flink/site/blog/index.html (original)
+++ flink/site/blog/index.html Tue Feb 24 09:51:10 2015
@@ -803,696 +803,6 @@ internally, fault tolerance, and perform



-   Introducing Flink 
Streaming
-   06 Feb 2015
-
-   This post is the first of a series of 
blog posts on Flink Streaming,
-the recent addition to Apache Flink that makes it possible to analyze
-continuous data sources in addition to static files. Flink Streaming
-uses the pipelined Flink engine to process data streams in real time,
-and offers a new API including definition of flexible windows.
-
-In this post, we go through an example that uses the Flink Streaming
-API to compute statistics on stock market data that arrive
-continuously, and combine the stock market data with Twitter streams.
-See the http://flink.apache.org/docs/latest/streaming_guide.html";>Streaming 
Programming
-Guide for a
-detailed presentation of the Streaming API.
-
-First, we read a bunch of stock price streams and combine them into
-one stream of market data. We apply several transformations on this
-market data stream, like rolling aggregations per stock. Then we emit
-price warning alerts when the prices are rapidly changing. Moving 
-towards more advanced features, we compute rolling correlations
-between the market data streams and a Twitter stream with stock mentions.
-
-
-
-Back to top
-
-Reading from multiple inputs
-
-First, let us create the stream of stock prices:
-
-
-Read a socket stream of stock prices
-Parse the text in the stream to create a stream of StockPrice 
objects
-Add four other sources tagged with the stock symbol.
-Finally, merge the streams to create a unified stream. 
-
-
-
-
-
-
-
-def main(args: Array[String]) {
-
-  val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-  //Read from a socket stream at map it to StockPrice 
objects
-  val socketStockStream = env.socketTextStream("localhost", ).map(x => {
-val split = x.split(",")
-StockPrice(split(0), split(1).toDouble)
-  })
-
-  //Generate other stock streams
-  val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
-  val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _)
-  val DJI_Stream = env.addSource(generateStock("DJI")(30) _)
-  val BUX_Stream = env.addSource(generateStock("BUX")(40) _)
-
-  //Merge all stock streams together
-  val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, 
-DJI_Stream, BUX_Stream)
-
-  stockStream.print()
-
-  env.execute("Stock 
stream")
-}
-
-
-
-
-
-public static void main(String[] args) throws 
Exception {
-
-final StreamExecutionEnvironment env =
-StreamExecutionEnvironment.getExecutionEnvironment();
-
-//Read from a socket stream at map it to StockPrice 
objects
-DataStream socketStockStream = env
-.socketTextStream("localhost", )
-.map(new MapFunction() {
-private String[] tokens;
-
-@Override
-public StockPrice map(String value) throws 
Exception {
-tokens = 
value.split(",");
-return new 
StockPrice(tokens[0],
-Double.parseDouble(tokens[1]));
-}
-});
-
-//Generate other stock streams
-DataStream SPX_stream = env.addSource(new 
StockSource("SPX", 10));
-DataStream FTSE_stream = env.addSource(new 
StockSource("FTSE", 20));
-DataStream DJI_stream = env.addSource(new 
StockSource("DJI", 30));
-DataStream BUX_stream = env.addSource(new 
StockSource("BUX", 40));
-
-//Merge all stock streams together
-DataStream stockStream = socketStockStream
-.merge(SPX_stream, 
FTSE_stream, DJI_stream, BUX_stream);
-
-stockStream.print();
-
-env.execute("Stock 
stream");
- 
-
-
-
-
-See
-http://flink.apache.org/docs/latest/streaming_guide.html#sources";>here
-on how you can create streaming sources for Flink Streaming

svn commit: r1661872 [2/2] - in /flink: _posts/2015-02-06-streaming-example.md site/blog/feed.xml site/blog/index.html site/blog/page2/index.html site/blog/page3/index.html

2015-02-24 Thread rmetzger
Modified: flink/site/blog/page2/index.html
URL: 
http://svn.apache.org/viewvc/flink/site/blog/page2/index.html?rev=1661872&r1=1661871&r2=1661872&view=diff
==
--- flink/site/blog/page2/index.html (original)
+++ flink/site/blog/page2/index.html Tue Feb 24 09:51:10 2015
@@ -140,96 +140,6 @@



-   Stratosphere version 0.5 
available
-   31 May 2014
-
-   We are happy to announce a new major 
Stratosphere release, version 0.5. This release adds many new features and 
improves the interoperability, stability, and performance of the system. The 
major theme of the release is the completely new Java API that makes it easy to 
write powerful distributed programs.
-
-The release can be downloaded from the http://stratosphere.eu/downloads/";>Stratosphere website and from https://github.com/stratosphere/stratosphere/releases/tag/release-0.5";>GitHub.
 All components are available as Apache Maven dependencies, making it simple to 
include Stratosphere in other projects. The website provides http://stratosphere.eu/docs/0.5/";>extensive documentation of the 
system and the new features.
-
-Shortlist of new Features
-
-Below is a short list of the most important additions to the Stratosphere 
system.
-
-New Java API
-
-This release introduces a completely new data set-centric Java 
API. This programming model significantly eases the development of 
Stratosphere programs, supports flexible use of regular Java classes as data 
types, and adds many new built-in operators to simplify the writing of powerful 
programs. The result are programs that need less code, are more readable, 
interoperate better with existing code, and execute faster.
-
-Take a look at the http://stratosphere.eu/docs/0.5/programming_guides/examples_java.html";>examples
  to get a feel for the API.
-
-General API Improvements
-
-Broadcast Variables: Publish a data set to all instances 
of another operator. This is handy if the your operator depends on the result 
of a computation, e.g., filter all values smaller than the average.
-
-Distributed Cache: Make (local and HDFS) files locally 
available on each machine processing a task.
-
-Iteration Termination Improvements Iterative algorithms 
can now terminate based on intermediate data sets, not only through aggregated 
statistics.
-
-Collection data sources and sinks: Speed-up the 
development and testing of Stratosphere programs by reading data from regular 
Java collections and inserting back into them.
-
-JDBC data sources and sinks: Read data from and write data 
to relational databases using a JDBC driver.
-
-Hadoop input format and output format support: Read and 
write data with any Hadoop input or output format.
-
-Support for Avro encoded data: Read data that has been 
materialized using Avro.
-
-Deflate Files: Stratosphere now transparently reads 
.deflate compressed files.
-
-Runtime and Optimizer 
Improvements
-
-DAG Runtime Streaming: Detection and resolution of 
streaming data flow deadlocks in the data flow optimizer.
-
-Intermediate results across iteration boundaries: 
Intermediate results computed outside iterative parts can be used inside 
iterative parts of the program.
-
-Stability fixes: Various stability fixes in both optimizer 
and runtime.
-
-Setup & Tooling
-
-Improved YARN support: Many improvements based on 
user-feedback: Packaging, Permissions, Error handling.
-
-Java 8 compatibility
-
-Contributors
-
-In total, 26 people have contributed to Stratosphere since the last 
release. Thank you for making this project possible!
-
-
-Alexander Alexandrov
-Jesus Camacho
-Ufuk Celebi
-Mikhail Erofeev
-Stephan Ewen
-Alexandr Ferodov
-Filip Haase
-Jonathan Hasenberg
-Markus Holzemer
-Fabian Hueske
-Vasia Kalavri
-Aljoscha Krettek
-Rajika Kumarasiri
-Sebastian Kunert
-Aaron Lam
-Robert Metzger
-Faisal Moeen
-Martin Neumann
-Mingliang Qi
-Till Rohrmann
-Chesnay Schepler
-Vyachislav Soludev
-Tuan Trieu
-Artem Tsikiridis
-Timo Walther
-Robert Waury
-
-
-Stratosphere is going Apache
-
-The Stratosphere project has been accepted to the Apache Incubator and will 
continue its work under the umbrella of the Apache Software Foundation. Due to 
a name conflict, we are switching the name of the project. We will make future 
releases of Stratosphere through the Apache foundation under a new name.
-
-   Stratosphere version 0.5 
available
-   
-   
-   
Stratosphere 
accepted as Apache Incubator Project
16 Apr 2014
 
@@ -775,6 +685,54 @@ For a complete overview of the renamings
Stratosphere
 wins award at Humboldt Innovation Competition "Big Data: Research meets 
Startups"

svn commit: r1661871 [3/3] - in /flink: q/ site/blog/ site/blog/page2/ site/blog/page3/ site/q/

2015-02-24 Thread rmetzger
Modified: flink/site/blog/page2/index.html
URL: 
http://svn.apache.org/viewvc/flink/site/blog/page2/index.html?rev=1661871&r1=1661870&r2=1661871&view=diff
==
--- flink/site/blog/page2/index.html (original)
+++ flink/site/blog/page2/index.html Tue Feb 24 09:47:45 2015
@@ -140,6 +140,96 @@



+   Stratosphere version 0.5 
available
+   31 May 2014
+
+   We are happy to announce a new major 
Stratosphere release, version 0.5. This release adds many new features and 
improves the interoperability, stability, and performance of the system. The 
major theme of the release is the completely new Java API that makes it easy to 
write powerful distributed programs.
+
+The release can be downloaded from the http://stratosphere.eu/downloads/";>Stratosphere website and from https://github.com/stratosphere/stratosphere/releases/tag/release-0.5";>GitHub.
 All components are available as Apache Maven dependencies, making it simple to 
include Stratosphere in other projects. The website provides http://stratosphere.eu/docs/0.5/";>extensive documentation of the 
system and the new features.
+
+Shortlist of new Features
+
+Below is a short list of the most important additions to the Stratosphere 
system.
+
+New Java API
+
+This release introduces a completely new data set-centric Java 
API. This programming model significantly eases the development of 
Stratosphere programs, supports flexible use of regular Java classes as data 
types, and adds many new built-in operators to simplify the writing of powerful 
programs. The result are programs that need less code, are more readable, 
interoperate better with existing code, and execute faster.
+
+Take a look at the http://stratosphere.eu/docs/0.5/programming_guides/examples_java.html";>examples
  to get a feel for the API.
+
+General API Improvements
+
+Broadcast Variables: Publish a data set to all instances 
of another operator. This is handy if the your operator depends on the result 
of a computation, e.g., filter all values smaller than the average.
+
+Distributed Cache: Make (local and HDFS) files locally 
available on each machine processing a task.
+
+Iteration Termination Improvements Iterative algorithms 
can now terminate based on intermediate data sets, not only through aggregated 
statistics.
+
+Collection data sources and sinks: Speed-up the 
development and testing of Stratosphere programs by reading data from regular 
Java collections and inserting back into them.
+
+JDBC data sources and sinks: Read data from and write data 
to relational databases using a JDBC driver.
+
+Hadoop input format and output format support: Read and 
write data with any Hadoop input or output format.
+
+Support for Avro encoded data: Read data that has been 
materialized using Avro.
+
+Deflate Files: Stratosphere now transparently reads 
.deflate compressed files.
+
+Runtime and Optimizer 
Improvements
+
+DAG Runtime Streaming: Detection and resolution of 
streaming data flow deadlocks in the data flow optimizer.
+
+Intermediate results across iteration boundaries: 
Intermediate results computed outside iterative parts can be used inside 
iterative parts of the program.
+
+Stability fixes: Various stability fixes in both optimizer 
and runtime.
+
+Setup & Tooling
+
+Improved YARN support: Many improvements based on 
user-feedback: Packaging, Permissions, Error handling.
+
+Java 8 compatibility
+
+Contributors
+
+In total, 26 people have contributed to Stratosphere since the last 
release. Thank you for making this project possible!
+
+
+Alexander Alexandrov
+Jesus Camacho
+Ufuk Celebi
+Mikhail Erofeev
+Stephan Ewen
+Alexandr Ferodov
+Filip Haase
+Jonathan Hasenberg
+Markus Holzemer
+Fabian Hueske
+Vasia Kalavri
+Aljoscha Krettek
+Rajika Kumarasiri
+Sebastian Kunert
+Aaron Lam
+Robert Metzger
+Faisal Moeen
+Martin Neumann
+Mingliang Qi
+Till Rohrmann
+Chesnay Schepler
+Vyachislav Soludev
+Tuan Trieu
+Artem Tsikiridis
+Timo Walther
+Robert Waury
+
+
+Stratosphere is going Apache
+
+The Stratosphere project has been accepted to the Apache Incubator and will 
continue its work under the umbrella of the Apache Software Foundation. Due to 
a name conflict, we are switching the name of the project. We will make future 
releases of Stratosphere through the Apache foundation under a new name.
+
+   Stratosphere version 0.5 
available
+   
+   
+   
Stratosphere 
accepted as Apache Incubator Project
16 Apr 2014
 
@@ -685,54 +775,6 @@ For a complete overview of the renamings
Stratosphere
 wins award at Humboldt Innovation Competition "Big Data: Research meets 
Startups"

svn commit: r1661871 [2/3] - in /flink: q/ site/blog/ site/blog/page2/ site/blog/page3/ site/q/

2015-02-24 Thread rmetzger
Modified: flink/site/blog/index.html
URL: 
http://svn.apache.org/viewvc/flink/site/blog/index.html?rev=1661871&r1=1661870&r2=1661871&view=diff
==
--- flink/site/blog/index.html (original)
+++ flink/site/blog/index.html Tue Feb 24 09:47:45 2015
@@ -803,6 +803,696 @@ internally, fault tolerance, and perform



+   Introducing Flink 
Streaming
+   06 Feb 2015
+
+   This post is the first of a series of 
blog posts on Flink Streaming,
+the recent addition to Apache Flink that makes it possible to analyze
+continuous data sources in addition to static files. Flink Streaming
+uses the pipelined Flink engine to process data streams in real time,
+and offers a new API including definition of flexible windows.
+
+In this post, we go through an example that uses the Flink Streaming
+API to compute statistics on stock market data that arrive
+continuously, and combine the stock market data with Twitter streams.
+See the http://flink.apache.org/docs/latest/streaming_guide.html";>Streaming 
Programming
+Guide for a
+detailed presentation of the Streaming API.
+
+First, we read a bunch of stock price streams and combine them into
+one stream of market data. We apply several transformations on this
+market data stream, like rolling aggregations per stock. Then we emit
+price warning alerts when the prices are rapidly changing. Moving 
+towards more advanced features, we compute rolling correlations
+between the market data streams and a Twitter stream with stock mentions.
+
+
+
+Back to top
+
+Reading from multiple inputs
+
+First, let us create the stream of stock prices:
+
+
+Read a socket stream of stock prices
+Parse the text in the stream to create a stream of StockPrice 
objects
+Add four other sources tagged with the stock symbol.
+Finally, merge the streams to create a unified stream. 
+
+
+
+
+
+
+
+def main(args: Array[String]) {
+
+  val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+  //Read from a socket stream at map it to StockPrice 
objects
+  val socketStockStream = env.socketTextStream("localhost", ).map(x => {
+val split = x.split(",")
+StockPrice(split(0), split(1).toDouble)
+  })
+
+  //Generate other stock streams
+  val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
+  val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _)
+  val DJI_Stream = env.addSource(generateStock("DJI")(30) _)
+  val BUX_Stream = env.addSource(generateStock("BUX")(40) _)
+
+  //Merge all stock streams together
+  val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, 
+DJI_Stream, BUX_Stream)
+
+  stockStream.print()
+
+  env.execute("Stock 
stream")
+}
+
+
+
+
+
+public static void main(String[] args) throws 
Exception {
+
+final StreamExecutionEnvironment env =
+StreamExecutionEnvironment.getExecutionEnvironment();
+
+//Read from a socket stream at map it to StockPrice 
objects
+DataStream socketStockStream = env
+.socketTextStream("localhost", )
+.map(new MapFunction() {
+private String[] tokens;
+
+@Override
+public StockPrice map(String value) throws 
Exception {
+tokens = 
value.split(",");
+return new 
StockPrice(tokens[0],
+Double.parseDouble(tokens[1]));
+}
+});
+
+//Generate other stock streams
+DataStream SPX_stream = env.addSource(new 
StockSource("SPX", 10));
+DataStream FTSE_stream = env.addSource(new 
StockSource("FTSE", 20));
+DataStream DJI_stream = env.addSource(new 
StockSource("DJI", 30));
+DataStream BUX_stream = env.addSource(new 
StockSource("BUX", 40));
+
+//Merge all stock streams together
+DataStream stockStream = socketStockStream
+.merge(SPX_stream, 
FTSE_stream, DJI_stream, BUX_stream);
+
+stockStream.print();
+
+env.execute("Stock 
stream");
+ 
+
+
+
+
+See
+http://flink.apache.org/docs/latest/streaming_guide.html#sources";>here
+on how you can create streaming sources for Flink Streaming
+programs. Flink, of course, has support for reading in streams from
+http://flink.apache.org/docs/latest/streaming_guide.html#stream-connectors";>external
+sources
+such as Apache Kafka, Apache Flume, RabbitMQ, and others. For the sake
+of this example, the data streams are simply generated using the
+generateSource method:
+
+
+
+
+val symbols 
= List("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG")
+
+case class StockPrice(symbol: String, price: Double)
+
+def generateStock(symbol: String)(sigma: Int)(out: Collector[StockPrice]) = 
{
+  var price = 1000.
+  while (true) {
+price = price + Random.nextGau

svn commit: r1661871 [1/3] - in /flink: q/ site/blog/ site/blog/page2/ site/blog/page3/ site/q/

2015-02-24 Thread rmetzger
Author: rmetzger
Date: Tue Feb 24 09:47:45 2015
New Revision: 1661871

URL: http://svn.apache.org/r1661871
Log:
[FLINK-1414] Add quickstarts to website

Added:
flink/q/
flink/q/quickstart-SNAPSHOT.sh   (with props)
flink/q/quickstart-scala-SNAPSHOT.sh   (with props)
flink/q/quickstart-scala.sh   (with props)
flink/q/quickstart.sh   (with props)
flink/site/q/
flink/site/q/quickstart-SNAPSHOT.sh   (with props)
flink/site/q/quickstart-scala-SNAPSHOT.sh   (with props)
flink/site/q/quickstart-scala.sh   (with props)
flink/site/q/quickstart.sh   (with props)
Modified:
flink/site/blog/feed.xml
flink/site/blog/index.html
flink/site/blog/page2/index.html
flink/site/blog/page3/index.html

Added: flink/q/quickstart-SNAPSHOT.sh
URL: 
http://svn.apache.org/viewvc/flink/q/quickstart-SNAPSHOT.sh?rev=1661871&view=auto
==
--- flink/q/quickstart-SNAPSHOT.sh (added)
+++ flink/q/quickstart-SNAPSHOT.sh Tue Feb 24 09:47:45 2015
@@ -0,0 +1,48 @@
+#!/usr/bin/env bash
+
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+
+PACKAGE=quickstart
+
+mvn archetype:generate 
\
+  -DarchetypeGroupId=org.apache.flink  \
+  -DarchetypeArtifactId=flink-quickstart-java  \
+  -DarchetypeVersion=0.9-SNAPSHOT  \
+  -DgroupId=org.apache.flink   \
+  -DartifactId=$PACKAGE
\
+  -Dversion=0.1
\
+  -Dpackage=org.myorg.quickstart   
\
+  -DinteractiveMode=false  
\
+  
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
+
+#
+# Give some guidance
+#
+echo -e "\\n\\n"
+echo -e "\\tA sample quickstart Flink Job has been created."
+echo -e "\\tSwitch into the directory using"
+echo -e "\\t\\t cd $PACKAGE"
+echo -e "\\tImport the project there using your favorite IDE (Import it as a 
maven project)"
+echo -e "\\tBuild a jar inside the directory using"
+echo -e "\\t\\t mvn clean package"
+echo -e "\\tYou will find the runnable jar in $PACKAGE/target"
+echo -e "\\tConsult our website if you have any troubles: 
http://flink.apache.org/community.html#mailing-lists";
+echo -e "\\n\\n"
+

Propchange: flink/q/quickstart-SNAPSHOT.sh
--
svn:executable = *

Added: flink/q/quickstart-scala-SNAPSHOT.sh
URL: 
http://svn.apache.org/viewvc/flink/q/quickstart-scala-SNAPSHOT.sh?rev=1661871&view=auto
==
--- flink/q/quickstart-scala-SNAPSHOT.sh (added)
+++ flink/q/quickstart-scala-SNAPSHOT.sh Tue Feb 24 09:47:45 2015
@@ -0,0 +1,48 @@
+#!/usr/bin/env bash
+
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+
+PACKAGE=quickstart
+
+mvn archetype:generate