flink git commit: Fix typo in exception message from assiged to assigned in ExecutionJobVertex.
Repository: flink Updated Branches: refs/heads/master bfbbbf906 -> 64c302f8f Fix typo in exception message from assiged to assigned in ExecutionJobVertex. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64c302f8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64c302f8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64c302f8 Branch: refs/heads/master Commit: 64c302f8f4d211a2cd10764e3090b4f9bdde436c Parents: bfbbbf9 Author: Henry Saputra Authored: Tue Feb 24 15:59:35 2015 -0800 Committer: Henry Saputra Committed: Tue Feb 24 15:59:35 2015 -0800 -- .../apache/flink/runtime/executiongraph/ExecutionJobVertex.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/64c302f8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 9df9bd5..0444e5d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -132,7 +132,7 @@ public class ExecutionJobVertex implements Serializable { // sanity check for the double referencing between intermediate result partitions and execution vertices for (IntermediateResult ir : this.producedDataSets) { if (ir.getNumberOfAssignedPartitions() != parallelism) { - throw new RuntimeException("The intermediate result's partitions were not correctly assiged."); + throw new RuntimeException("The intermediate result's partitions were not correctly assigned."); } }
flink git commit: fixed package statement
Repository: flink Updated Branches: refs/heads/master ed8b26bf2 -> bfbbbf906 fixed package statement package statement was not updated after class movement also added a missing import Author: mjsax Closes #437 from mjsax/bug_fix_package_statement and squashes the following commits: 1470b0f [mjsax] fixed package statement Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfbbbf90 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfbbbf90 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfbbbf90 Branch: refs/heads/master Commit: bfbbbf90627945e249f05c610b7ba8790c069bee Parents: ed8b26b Author: mjsax Authored: Tue Feb 24 14:27:22 2015 -0800 Committer: Henry Saputra Committed: Tue Feb 24 14:27:22 2015 -0800 -- .../typeutils/runtime/kryo/KryoWithCustomSerializersTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/bfbbbf90/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java -- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java index 155010e..d68afd6 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.api.java.typeutils.runtime; +package org.apache.flink.api.java.typeutils.runtime.kryo; import java.util.Collection; import java.util.HashSet; @@ -25,6 +25,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest; import org.joda.time.LocalDate; import org.junit.Test;
[1/2] flink git commit: [FLINK-1580] [FLINK-1590] [runtime] Various cleanups and improvements in the TaskManager initialization
Repository: flink Updated Branches: refs/heads/master 4883af675 -> ed8b26bf2 http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java -- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java index aea9727..bed8f19 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java @@ -20,12 +20,13 @@ package org.apache.flink.yarn.appMaster; import java.io.IOException; import java.security.PrivilegedAction; -import java.util.Arrays; import java.util.Map; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import org.apache.flink.yarn.YarnUtils; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.yarn.YarnTaskManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.yarn.FlinkYarnClient; @@ -33,36 +34,64 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; -import scala.Tuple2; +/** + * The entry point for running a TaskManager in a YARN container. The YARN container will invoke + * this class' main method. + */ public class YarnTaskManagerRunner { private static final Logger LOG = LoggerFactory.getLogger(YarnTaskManagerRunner.class); + public static void main(final String[] args) throws IOException { - Map envs = System.getenv(); + + EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskManager"); + EnvironmentInformation.checkJavaVersion(); + + // try to parse the command line arguments + final Configuration configuration; + try { + configuration = TaskManager.parseArgsAndLoadConfig(args); + } + catch (Throwable t) { + LOG.error(t.getMessage(), t); + System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE()); + return; + } + + // read the environment variables for YARN + final Map envs = System.getenv(); final String yarnClientUsername = envs.get(FlinkYarnClient.ENV_CLIENT_USERNAME); final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); // configure local directory - final String[] newArgs = Arrays.copyOf(args, args.length + 2); - newArgs[newArgs.length-2] = "--tempDir"; - newArgs[newArgs.length-1] = localDirs; - LOG.info("Setting log path "+localDirs); - LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting" - + " user to execute Flink TaskManager to '"+yarnClientUsername+"'"); + String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null); + if (flinkTempDirs == null) { + LOG.info("Setting directories for temporary file " + localDirs); + configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, localDirs); + } + else { + LOG.info("Overriding YARN's temporary file directories with those " + + "specified in the Flink config: " + flinkTempDirs); + } + + LOG.info("YARN daemon runs as '" + UserGroupInformation.getCurrentUser().getShortUserName() + +"' setting user to execute Flink TaskManager to '"+yarnClientUsername+"'"); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername); - for(Token toks : UserGroupInformation.getCurrentUser().getTokens()) { + for (Token toks : UserGroupInformation.getCurrentUser().getTokens()) { ugi.addToken(toks); } ugi.doAs(new PrivilegedAction() { @Override public Object run() { try { - Tuple2 tuple = YarnUtils.startActorSystemAndTaskManager(newArgs); - tuple._1().awaitTermination(); - } catch (Exception
[2/2] flink git commit: [FLINK-1580] [FLINK-1590] [runtime] Various cleanups and improvements in the TaskManager initialization
[FLINK-1580] [FLINK-1590] [runtime] Various cleanups and improvements in the TaskManager initialization - Better checks during TaskManager startup - More robust initialization of TaskManager actor system and actor - Fix memory accounting during TaskManager startup - Better logging for TaskManagers started through YARN - Remove command line parameter hacking fro YARN TaskManagers Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed8b26bf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed8b26bf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed8b26bf Branch: refs/heads/master Commit: ed8b26bf2e8dd7c187c24ad0d8ff3e67f6a7478c Parents: 4883af6 Author: Stephan Ewen Authored: Tue Feb 24 20:23:47 2015 +0100 Committer: Stephan Ewen Committed: Tue Feb 24 20:30:29 2015 +0100 -- .../org/apache/flink/runtime/net/NetUtils.java | 2 +- .../runtime/util/EnvironmentInformation.java| 98 ++- .../apache/flink/runtime/util/MathUtils.java| 10 + .../flink/runtime/jobmanager/JobManager.scala | 50 +- .../runtime/minicluster/FlinkMiniCluster.scala | 57 +- .../minicluster/LocalFlinkMiniCluster.scala | 56 +- .../flink/runtime/taskmanager/TaskManager.scala | 644 +-- .../TaskManagerCLIConfiguration.scala | 3 +- .../taskmanager/TaskManagerConfiguration.scala | 7 +- .../taskmanager/TaskManagerProfiler.scala | 4 +- .../TaskManagerProcessReapingTest.java | 6 +- .../runtime/taskmanager/TaskManagerTest.java| 5 +- .../apache/flink/runtime/util/MathUtilTest.java | 34 +- .../TaskManagerRegistrationITCase.scala | 10 +- .../runtime/testingUtils/TestingCluster.scala | 18 +- .../testingUtils/TestingTaskManager.scala | 25 +- .../runtime/testingUtils/TestingUtils.scala | 38 +- .../test/util/ForkableFlinkMiniCluster.scala| 32 +- .../yarn/appMaster/YarnTaskManagerRunner.java | 63 +- .../org/apache/flink/yarn/YarnTaskManager.scala | 18 +- .../scala/org/apache/flink/yarn/YarnUtils.scala | 45 -- 21 files changed, 723 insertions(+), 502 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java index 73504e9..8e0a41a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java @@ -107,7 +107,7 @@ public class NetUtils { break; default: - throw new RuntimeException("Unkown address detection strategy: " + strategy); + throw new RuntimeException("Unknown address detection strategy: " + strategy); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java index 535c756..d2147e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java @@ -36,12 +36,6 @@ public class EnvironmentInformation { private static final Logger LOG = LoggerFactory.getLogger(EnvironmentInformation.class); private static final String UNKNOWN = ""; - - private static final String[] IGNORED_STARTUP_OPTIONS = { - "-Dlog.file", - "-Dlogback.configurationFile", - "-Dlog4j.configuration" - }; /** * Returns the version of the code as String. If version == null, then the JobManager does not run from a @@ -55,7 +49,7 @@ public class Envir
flink git commit: [FLINK-1596] [runtime] Remove space in temp filename
Repository: flink Updated Branches: refs/heads/release-0.8 37cddde20 -> 941712941 [FLINK-1596] [runtime] Remove space in temp filename Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94171294 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94171294 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94171294 Branch: refs/heads/release-0.8 Commit: 9417129413b804c6d758bf74f230f0109d3ecdcf Parents: 37cddde Author: Johannes Authored: Sat Feb 21 21:32:34 2015 +0100 Committer: Stephan Ewen Committed: Tue Feb 24 20:29:43 2015 +0100 -- .../org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/94171294/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java index d6c1ce0..c29e7d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java @@ -157,7 +157,7 @@ public interface FileIOChannel { public ID next() { int threadNum = counter % paths.length; - String filename = String.format(" %s.%06d.channel", namePrefix, (counter++)); + String filename = String.format("%s.%06d.channel", namePrefix, (counter++)); return new ID(new File(paths[threadNum], filename), threadNum); } }
flink git commit: Small cleanup to truncate some lines that are too long for easy read of the code.
Repository: flink Updated Branches: refs/heads/master 1230bcaa0 -> 4883af675 Small cleanup to truncate some lines that are too long for easy read of the code. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4883af67 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4883af67 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4883af67 Branch: refs/heads/master Commit: 4883af675e19d8a9c750a83b3f2c019583e6bf7f Parents: 1230bca Author: Henry Saputra Authored: Tue Feb 24 10:57:33 2015 -0800 Committer: Henry Saputra Committed: Tue Feb 24 10:58:09 2015 -0800 -- .../network/partition/IntermediateResultPartition.java | 12 .../partition/queue/PipelinedPartitionQueue.java| 3 ++- 2 files changed, 10 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4883af67/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java index 71af7a6..80bd38d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java @@ -213,7 +213,8 @@ public class IntermediateResultPartition implements BufferPoolOwner { // Consume // - public IntermediateResultPartitionQueueIterator getQueueIterator(int queueIndex, Optional bufferProvider) throws IOException { + public IntermediateResultPartitionQueueIterator getQueueIterator(int queueIndex, Optional bufferProvider) + throws IOException { synchronized (queues) { if (isReleased) { throw new IllegalQueueIteratorRequestException("Intermediate result partition has already been released."); @@ -231,7 +232,8 @@ public class IntermediateResultPartition implements BufferPoolOwner { @Override public String toString() { - return "Intermediate result partition " + partitionId + " [num queues: " + queues.length + ", " + (isFinished ? "finished" : "not finished") + "]"; + return "Intermediate result partition " + partitionId + " [num queues: " + queues.length + ", " + + (isFinished ? "finished" : "not finished") + "]"; } private void checkInProducePhase() { @@ -296,7 +298,8 @@ public class IntermediateResultPartition implements BufferPoolOwner { // - public static IntermediateResultPartition create(RuntimeEnvironment environment, int partitionIndex, JobID jobId, ExecutionAttemptID executionId, NetworkEnvironment networkEnvironment, PartitionDeploymentDescriptor desc) { + public static IntermediateResultPartition create(RuntimeEnvironment environment, int partitionIndex, JobID jobId, + ExecutionAttemptID executionId, NetworkEnvironment networkEnvironment, PartitionDeploymentDescriptor desc) { final IntermediateResultPartitionID partitionId = checkNotNull(desc.getPartitionId()); final IntermediateResultPartitionType partitionType = checkNotNull(desc.getPartitionType()); @@ -307,6 +310,7 @@ public class IntermediateResultPartition implements BufferPoolOwner { partitionQueues[i] = new PipelinedPartitionQueue(); } - return new IntermediateResultPartition(environment, partitionIndex, jobId, executionId, partitionId, partitionType, partitionQueues, networkEnvironment); + return new IntermediateResultPartition(environment, partitionIndex, jobId, executionId, partitionId, partitionType, + partitionQueues, networkEnvironment); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4883af67/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java index a24cdeb..5d562e4 100644 --- a/flink-runtime/src/main/java/
[2/6] flink git commit: [FLINK-1596] [runtime] Remove space in temp filename
[FLINK-1596] [runtime] Remove space in temp filename This closes #431 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98bc7b95 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98bc7b95 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98bc7b95 Branch: refs/heads/master Commit: 98bc7b951b30961871958a4483e0b186bfb785b8 Parents: c111444 Author: Johannes Authored: Sat Feb 21 21:32:34 2015 +0100 Committer: Stephan Ewen Committed: Tue Feb 24 00:08:27 2015 +0100 -- .../org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/98bc7b95/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java index e00568e..c5a3daa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java @@ -164,7 +164,7 @@ public interface FileIOChannel { public ID next() { int threadNum = counter % paths.length; - String filename = String.format(" %s.%06d.channel", namePrefix, (counter++)); + String filename = String.format("%s.%06d.channel", namePrefix, (counter++)); return new ID(new File(paths[threadNum], filename), threadNum); } }
[6/6] flink git commit: [tests] Add process reaping test for TaskManager, improves process reaping test for JobManager.
[tests] Add process reaping test for TaskManager, improves process reaping test for JobManager. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1230bcaa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1230bcaa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1230bcaa Branch: refs/heads/master Commit: 1230bcaa0f531f6260f291dd066f64fe52cc6708 Parents: 70df028 Author: Stephan Ewen Authored: Tue Feb 24 12:23:59 2015 +0100 Committer: Stephan Ewen Committed: Tue Feb 24 12:23:59 2015 +0100 -- .../flink/runtime/jobmanager/JobManager.scala | 94 .../flink/runtime/taskmanager/TaskManager.scala | 68 +- .../JobManagerProcessReapingTest.java | 83 ++- .../TaskManagerProcessReapingTest.java | 236 +++ .../runtime/testutils/CommonTestUtils.java | 24 ++ 5 files changed, 447 insertions(+), 58 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/1230bcaa/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala -- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index a1642b4..2671f2d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -94,17 +94,18 @@ class JobManager(val configuration: Configuration, val profiler: Option[ActorRef], val defaultExecutionRetries: Int, val delayBetweenRetries: Long, - implicit val timeout: FiniteDuration) + val timeout: FiniteDuration) extends Actor with ActorLogMessages with ActorLogging { - import context._ - - val LOG = JobManager.LOG - - // List of current jobs running - val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]() + /** Reference to the log, for debugging */ + protected val LOG = JobManager.LOG + /** List of current jobs running jobs */ + protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]() + /** + * Run when the job manager is started. Simply logs an informational message. + */ override def preStart(): Unit = { LOG.info(s"Starting JobManager at ${self.path}.") } @@ -138,6 +139,11 @@ class JobManager(val configuration: Configuration, } } + /** + * Central work method of the JobManager actor. Receives messages and reacts to them. + * + * @return + */ override def receiveWithLogMessages: Receive = { case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) => @@ -182,7 +188,7 @@ class JobManager(val configuration: Configuration, // execute the cancellation asynchronously Future { executionGraph.cancel() - } + }(context.dispatcher) sender ! CancellationSuccess(jobID) case None => @@ -198,10 +204,12 @@ class JobManager(val configuration: Configuration, currentJobs.get(taskExecutionState.getJobID) match { case Some((executionGraph, _)) => val originalSender = sender + Future { val result = executionGraph.updateState(taskExecutionState) originalSender ! result -} +}(context.dispatcher) + sender ! true case None => log.error("Cannot find execution graph for ID {} to change state to {}.", taskExecutionState.getJobID, taskExecutionState.getExecutionState) @@ -603,6 +611,7 @@ object JobManager { EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager") checkJavaVersion() +// parsing the command line arguments val (configuration: Configuration, executionMode: ExecutionMode, listeningHost: String, listeningPort: Int) = @@ -617,16 +626,17 @@ object JobManager { } } -// we may want to check that the JobManager hostname is in the config +// we want to check that the JobManager hostname is in the config // if it is not in there, the actor system will bind to the loopback interface's // address and will not be reachable from anyone remote if (listeningHost == null) { val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY + -"' is missing (hostname or address to bind JobManager to)." +"' is missing (hostname/address to bind JobManager to)." LOG.error(message) System.exit(STARTUP_FAILURE_RETURN_CODE) } +// run the job manager tr
[1/6] flink git commit: [runtime] Improve error handling when submitting a job to the JobManager
Repository: flink Updated Branches: refs/heads/master c11144470 -> 1230bcaa0 [runtime] Improve error handling when submitting a job to the JobManager Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ddb5653 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ddb5653 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ddb5653 Branch: refs/heads/master Commit: 9ddb565314a4efb744167eccbe6de0d3299ac4d0 Parents: 9528a52 Author: Stephan Ewen Authored: Mon Feb 23 18:36:07 2015 +0100 Committer: Stephan Ewen Committed: Tue Feb 24 00:08:27 2015 +0100 -- .../org/apache/flink/client/program/Client.java | 15 +- .../org/apache/flink/util/ExceptionUtils.java | 26 ++- .../apache/flink/runtime/blob/BlobCache.java| 6 +- .../runtime/client/JobExecutionException.java | 5 +- .../librarycache/BlobLibraryCacheManager.java | 62 -- .../flink/runtime/executiongraph/Execution.java | 2 +- .../apache/flink/runtime/jobgraph/JobGraph.java | 16 ++ .../apache/flink/runtime/client/JobClient.scala | 4 +- .../flink/runtime/jobmanager/JobInfo.scala | 1 - .../flink/runtime/jobmanager/JobManager.scala | 207 ++- .../runtime/messages/JobManagerMessages.scala | 27 +-- .../jobmanager/JobManagerStartupTest.java | 1 - .../flink/runtime/jobmanager/JobSubmitTest.java | 157 ++ .../runtime/jobmanager/JobManagerITCase.scala | 18 +- 14 files changed, 392 insertions(+), 155 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-clients/src/main/java/org/apache/flink/client/program/Client.java -- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index bd364ac..5a032a0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -329,8 +329,9 @@ public class Client { try { JobClient.uploadJarFiles(jobGraph, hostname, client, timeout); - } catch (IOException e) { - throw new ProgramInvocationException("Could not upload the programs JAR files to the JobManager.", e); + } + catch (IOException e) { + throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e); } try{ @@ -340,10 +341,12 @@ public class Client { else { JobClient.submitJobDetached(jobGraph, client, timeout); } - } catch (JobExecutionException e) { - throw new ProgramInvocationException("The program execution failed.", e); - } catch (Exception e) { - throw new ProgramInvocationException("Unexpected exception while program execution.", e); + } + catch (JobExecutionException e) { + throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); + } + catch (Exception e) { + throw new ProgramInvocationException("Exception during program execution.", e); } finally { actorSystem.shutdown(); http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index 99a098e..9784844 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -24,6 +24,7 @@ package org.apache.flink.util; +import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; @@ -57,7 +58,7 @@ public class ExceptionUtils { /** * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to -* throw arbitrary Throwables. Errors and RuntimeExceptions are thrown directly, other exceptions +* throw an arbitrary Throwable. Errors and RuntimeExceptions are thrown directly, other exceptions * are packed into runtime exceptions * * @param t The throwable to be thrown. @@ -76,8 +77,8 @@ public class ExceptionUtils { /** * Throws the given {@code Throwable} in scen
[4/6] flink git commit: [jobmanager] Add a process reaper to kill the JobManager process when the main actor dies.
[jobmanager] Add a process reaper to kill the JobManager process when the main actor dies. Also adds various tests for failure behavior during job submission. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5725c721 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5725c721 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5725c721 Branch: refs/heads/master Commit: 5725c72129fe9a75487204b470a30e850ad4091c Parents: 9ddb565 Author: Stephan Ewen Authored: Mon Feb 23 23:35:10 2015 +0100 Committer: Stephan Ewen Committed: Tue Feb 24 10:14:30 2015 +0100 -- .../flink/runtime/process/ProcessReaper.java| 60 ++ .../flink/runtime/jobmanager/JobManager.scala | 65 -- .../JobManagerProcessReapingTest.java | 150 + .../jobmanager/JobManagerStartupTest.java | 23 +- .../runtime/jobmanager/JobManagerTest.java | 48 + .../flink/runtime/jobmanager/JobSubmitTest.java | 12 +- .../runtime/testutils/CommonTestUtils.java | 127 ++- .../testutils/InterruptibleByteChannel.java | 210 --- .../runtime/testutils/ServerTestUtils.java | 181 .../JobSubmissionFailsITCase.java | 11 +- .../apache/flink/yarn/ApplicationMaster.scala | 2 +- 11 files changed, 408 insertions(+), 481 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java new file mode 100644 index 000..b12b82d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.process; + +import akka.actor.ActorRef; +import akka.actor.Terminated; +import akka.actor.UntypedActor; +import org.slf4j.Logger; + +/** + * Utility actors that monitors other actors and kills the JVM upon + * actor termination. + */ +public class ProcessReaper extends UntypedActor { + + private final Logger log; + private final int exitCode; + + public ProcessReaper(ActorRef watchTarget, Logger log, int exitCode) { + if (watchTarget == null || watchTarget.equals(ActorRef.noSender())) { + throw new IllegalArgumentException("Target may not be null or 'noSender'"); + } + this.log = log; + this.exitCode = exitCode; + + getContext().watch(watchTarget); + } + + @Override + public void onReceive(Object message) { + if (message instanceof Terminated) { + try { + Terminated term = (Terminated) message; + String name = term.actor().path().name(); + if (log != null) { + log.error("Actor " + name + " terminated, stopping process..."); + } + } + finally { + System.exit(exitCode); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala -- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index ce3bc74..a1642b4 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -
[5/6] flink git commit: [tests] Speed up DataSinkTaskTest
[tests] Speed up DataSinkTaskTest Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70df0282 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70df0282 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70df0282 Branch: refs/heads/master Commit: 70df02824d8738f009c50ba15b7365f943e4ecca Parents: 5725c72 Author: Stephan Ewen Authored: Mon Feb 23 23:36:32 2015 +0100 Committer: Stephan Ewen Committed: Tue Feb 24 10:14:30 2015 +0100 -- .../runtime/operators/DataSinkTaskTest.java | 67 +--- 1 file changed, 30 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/70df0282/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java index 1a4b7f1..84fc851 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java @@ -72,61 +72,55 @@ public class DataSinkTaskTest extends TaskTestBase @Test public void testDataSinkTask() { + FileReader fr = null; + BufferedReader br = null; + try { + int keyCnt = 100; + int valCnt = 20; - int keyCnt = 100; - int valCnt = 20; + super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); + super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0); - super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); - super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0); + DataSinkTask testTask = new DataSinkTask(); - DataSinkTask testTask = new DataSinkTask(); + super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString()); - super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString()); - - try { testTask.invoke(); - } catch (Exception e) { - LOG.debug("Exception while invoking the test task.", e); - Assert.fail("Invoke method caused exception."); - } - File tempTestFile = new File(this.tempTestPath); + File tempTestFile = new File(this.tempTestPath); - Assert.assertTrue("Temp output file does not exist",tempTestFile.exists()); + Assert.assertTrue("Temp output file does not exist", tempTestFile.exists()); - FileReader fr = null; - BufferedReader br = null; - try { fr = new FileReader(tempTestFile); br = new BufferedReader(fr); - HashMap> keyValueCountMap = new HashMap>(keyCnt); + HashMap> keyValueCountMap = new HashMap>(keyCnt); - while(br.ready()) { + while (br.ready()) { String line = br.readLine(); - Integer key = Integer.parseInt(line.substring(0,line.indexOf("_"))); - Integer val = Integer.parseInt(line.substring(line.indexOf("_")+1,line.length())); + Integer key = Integer.parseInt(line.substring(0, line.indexOf("_"))); + Integer val = Integer.parseInt(line.substring(line.indexOf("_") + 1, line.length())); - if(!keyValueCountMap.containsKey(key)) { - keyValueCountMap.put(key,new HashSet()); + if (!keyValueCountMap.containsKey(key)) { + keyValueCountMap.put(key, new HashSet()); } keyValueCountMap.get(key).add(val); } - Assert.assertTrue("Invalid key count in out file. Expected: "+keyCnt+" Actual: "+keyValueCountMap.keySet().size(), - keyValueCountMap.keySet().size() == keyCnt); + Assert.assertTrue("Invalid key count in out file. Expected: " + keyCnt + " Actual: " + keyValueCountMap.keySet().size(), + keyValueCountMap.keySet().size() == keyCnt)
[3/6] flink git commit: [FLINK-1598] [runtime] Better error message when network serialization of records exceeds java heap space.
[FLINK-1598] [runtime] Better error message when network serialization of records exceeds java heap space. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9528a521 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9528a521 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9528a521 Branch: refs/heads/master Commit: 9528a521f56e0c6b0c70d43e62ad84b19c048c36 Parents: 98bc7b9 Author: Stephan Ewen Authored: Mon Feb 23 13:45:51 2015 +0100 Committer: Stephan Ewen Committed: Tue Feb 24 00:08:27 2015 +0100 -- .../runtime/util/DataOutputSerializer.java | 31 1 file changed, 25 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9528a521/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java index 2d06e29..7f8105d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java @@ -268,16 +268,35 @@ public class DataOutputSerializer implements DataOutputView { private void resize(int minCapacityAdd) throws IOException { + int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd); + byte[] nb; try { - final int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd); - final byte[] nb = new byte[newLen]; - System.arraycopy(this.buffer, 0, nb, 0, this.position); - this.buffer = nb; - this.wrapper = ByteBuffer.wrap(this.buffer); + nb = new byte[newLen]; } - catch (NegativeArraySizeException nasex) { + catch (NegativeArraySizeException e) { throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java)."); } + catch (OutOfMemoryError e) { + // this was too large to allocate, try the smaller size (if possible) + if (newLen > this.buffer.length + minCapacityAdd) { + newLen = this.buffer.length + minCapacityAdd; + try { + nb = new byte[newLen]; + } + catch (OutOfMemoryError ee) { + // still not possible. give an informative exception message that reports the size + throw new IOException("Failed to serialize element. Serialized size (> " + + newLen + " bytes) exceeds JVM heap space", ee); + } + } else { + throw new IOException("Failed to serialize element. Serialized size (> " + + newLen + " bytes) exceeds JVM heap space", e); + } + } + + System.arraycopy(this.buffer, 0, nb, 0, this.position); + this.buffer = nb; + this.wrapper = ByteBuffer.wrap(this.buffer); } @SuppressWarnings("restriction")
svn commit: r1661872 [1/2] - in /flink: _posts/2015-02-06-streaming-example.md site/blog/feed.xml site/blog/index.html site/blog/page2/index.html site/blog/page3/index.html
Author: rmetzger Date: Tue Feb 24 09:51:10 2015 New Revision: 1661872 URL: http://svn.apache.org/r1661872 Log: Remove duplicate streaming blogpost markdown file Removed: flink/_posts/2015-02-06-streaming-example.md Modified: flink/site/blog/feed.xml flink/site/blog/index.html flink/site/blog/page2/index.html flink/site/blog/page3/index.html Modified: flink/site/blog/feed.xml URL: http://svn.apache.org/viewvc/flink/site/blog/feed.xml?rev=1661872&r1=1661871&r2=1661872&view=diff == Binary files - no diff available. Modified: flink/site/blog/index.html URL: http://svn.apache.org/viewvc/flink/site/blog/index.html?rev=1661872&r1=1661871&r2=1661872&view=diff == --- flink/site/blog/index.html (original) +++ flink/site/blog/index.html Tue Feb 24 09:51:10 2015 @@ -803,696 +803,6 @@ internally, fault tolerance, and perform - Introducing Flink Streaming - 06 Feb 2015 - - This post is the first of a series of blog posts on Flink Streaming, -the recent addition to Apache Flink that makes it possible to analyze -continuous data sources in addition to static files. Flink Streaming -uses the pipelined Flink engine to process data streams in real time, -and offers a new API including definition of flexible windows. - -In this post, we go through an example that uses the Flink Streaming -API to compute statistics on stock market data that arrive -continuously, and combine the stock market data with Twitter streams. -See the http://flink.apache.org/docs/latest/streaming_guide.html";>Streaming Programming -Guide for a -detailed presentation of the Streaming API. - -First, we read a bunch of stock price streams and combine them into -one stream of market data. We apply several transformations on this -market data stream, like rolling aggregations per stock. Then we emit -price warning alerts when the prices are rapidly changing. Moving -towards more advanced features, we compute rolling correlations -between the market data streams and a Twitter stream with stock mentions. - - - -Back to top - -Reading from multiple inputs - -First, let us create the stream of stock prices: - - -Read a socket stream of stock prices -Parse the text in the stream to create a stream of StockPrice objects -Add four other sources tagged with the stock symbol. -Finally, merge the streams to create a unified stream. - - - - - - - -def main(args: Array[String]) { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - - //Read from a socket stream at map it to StockPrice objects - val socketStockStream = env.socketTextStream("localhost", ).map(x => { -val split = x.split(",") -StockPrice(split(0), split(1).toDouble) - }) - - //Generate other stock streams - val SPX_Stream = env.addSource(generateStock("SPX")(10) _) - val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) - val DJI_Stream = env.addSource(generateStock("DJI")(30) _) - val BUX_Stream = env.addSource(generateStock("BUX")(40) _) - - //Merge all stock streams together - val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, -DJI_Stream, BUX_Stream) - - stockStream.print() - - env.execute("Stock stream") -} - - - - - -public static void main(String[] args) throws Exception { - -final StreamExecutionEnvironment env = -StreamExecutionEnvironment.getExecutionEnvironment(); - -//Read from a socket stream at map it to StockPrice objects -DataStreamsocketStockStream = env -.socketTextStream("localhost", ) -.map(new MapFunction () { -private String[] tokens; - -@Override -public StockPrice map(String value) throws Exception { -tokens = value.split(","); -return new StockPrice(tokens[0], -Double.parseDouble(tokens[1])); -} -}); - -//Generate other stock streams -DataStream SPX_stream = env.addSource(new StockSource("SPX", 10)); -DataStream FTSE_stream = env.addSource(new StockSource("FTSE", 20)); -DataStream DJI_stream = env.addSource(new StockSource("DJI", 30)); -DataStream BUX_stream = env.addSource(new StockSource("BUX", 40)); - -//Merge all stock streams together -DataStream stockStream = socketStockStream -.merge(SPX_stream, FTSE_stream, DJI_stream, BUX_stream); - -stockStream.print(); - -env.execute("Stock stream"); - - - - - -See -http://flink.apache.org/docs/latest/streaming_guide.html#sources";>here -on how you can create streaming sources for Flink Streaming
svn commit: r1661872 [2/2] - in /flink: _posts/2015-02-06-streaming-example.md site/blog/feed.xml site/blog/index.html site/blog/page2/index.html site/blog/page3/index.html
Modified: flink/site/blog/page2/index.html URL: http://svn.apache.org/viewvc/flink/site/blog/page2/index.html?rev=1661872&r1=1661871&r2=1661872&view=diff == --- flink/site/blog/page2/index.html (original) +++ flink/site/blog/page2/index.html Tue Feb 24 09:51:10 2015 @@ -140,96 +140,6 @@ - Stratosphere version 0.5 available - 31 May 2014 - - We are happy to announce a new major Stratosphere release, version 0.5. This release adds many new features and improves the interoperability, stability, and performance of the system. The major theme of the release is the completely new Java API that makes it easy to write powerful distributed programs. - -The release can be downloaded from the http://stratosphere.eu/downloads/";>Stratosphere website and from https://github.com/stratosphere/stratosphere/releases/tag/release-0.5";>GitHub. All components are available as Apache Maven dependencies, making it simple to include Stratosphere in other projects. The website provides http://stratosphere.eu/docs/0.5/";>extensive documentation of the system and the new features. - -Shortlist of new Features - -Below is a short list of the most important additions to the Stratosphere system. - -New Java API - -This release introduces a completely new data set-centric Java API. This programming model significantly eases the development of Stratosphere programs, supports flexible use of regular Java classes as data types, and adds many new built-in operators to simplify the writing of powerful programs. The result are programs that need less code, are more readable, interoperate better with existing code, and execute faster. - -Take a look at the http://stratosphere.eu/docs/0.5/programming_guides/examples_java.html";>examples to get a feel for the API. - -General API Improvements - -Broadcast Variables: Publish a data set to all instances of another operator. This is handy if the your operator depends on the result of a computation, e.g., filter all values smaller than the average. - -Distributed Cache: Make (local and HDFS) files locally available on each machine processing a task. - -Iteration Termination Improvements Iterative algorithms can now terminate based on intermediate data sets, not only through aggregated statistics. - -Collection data sources and sinks: Speed-up the development and testing of Stratosphere programs by reading data from regular Java collections and inserting back into them. - -JDBC data sources and sinks: Read data from and write data to relational databases using a JDBC driver. - -Hadoop input format and output format support: Read and write data with any Hadoop input or output format. - -Support for Avro encoded data: Read data that has been materialized using Avro. - -Deflate Files: Stratosphere now transparently reads .deflate compressed files. - -Runtime and Optimizer Improvements - -DAG Runtime Streaming: Detection and resolution of streaming data flow deadlocks in the data flow optimizer. - -Intermediate results across iteration boundaries: Intermediate results computed outside iterative parts can be used inside iterative parts of the program. - -Stability fixes: Various stability fixes in both optimizer and runtime. - -Setup & Tooling - -Improved YARN support: Many improvements based on user-feedback: Packaging, Permissions, Error handling. - -Java 8 compatibility - -Contributors - -In total, 26 people have contributed to Stratosphere since the last release. Thank you for making this project possible! - - -Alexander Alexandrov -Jesus Camacho -Ufuk Celebi -Mikhail Erofeev -Stephan Ewen -Alexandr Ferodov -Filip Haase -Jonathan Hasenberg -Markus Holzemer -Fabian Hueske -Vasia Kalavri -Aljoscha Krettek -Rajika Kumarasiri -Sebastian Kunert -Aaron Lam -Robert Metzger -Faisal Moeen -Martin Neumann -Mingliang Qi -Till Rohrmann -Chesnay Schepler -Vyachislav Soludev -Tuan Trieu -Artem Tsikiridis -Timo Walther -Robert Waury - - -Stratosphere is going Apache - -The Stratosphere project has been accepted to the Apache Incubator and will continue its work under the umbrella of the Apache Software Foundation. Due to a name conflict, we are switching the name of the project. We will make future releases of Stratosphere through the Apache foundation under a new name. - - Stratosphere version 0.5 available - - - Stratosphere accepted as Apache Incubator Project 16 Apr 2014 @@ -775,6 +685,54 @@ For a complete overview of the renamings Stratosphere wins award at Humboldt Innovation Competition "Big Data: Research meets Startups"
svn commit: r1661871 [3/3] - in /flink: q/ site/blog/ site/blog/page2/ site/blog/page3/ site/q/
Modified: flink/site/blog/page2/index.html URL: http://svn.apache.org/viewvc/flink/site/blog/page2/index.html?rev=1661871&r1=1661870&r2=1661871&view=diff == --- flink/site/blog/page2/index.html (original) +++ flink/site/blog/page2/index.html Tue Feb 24 09:47:45 2015 @@ -140,6 +140,96 @@ + Stratosphere version 0.5 available + 31 May 2014 + + We are happy to announce a new major Stratosphere release, version 0.5. This release adds many new features and improves the interoperability, stability, and performance of the system. The major theme of the release is the completely new Java API that makes it easy to write powerful distributed programs. + +The release can be downloaded from the http://stratosphere.eu/downloads/";>Stratosphere website and from https://github.com/stratosphere/stratosphere/releases/tag/release-0.5";>GitHub. All components are available as Apache Maven dependencies, making it simple to include Stratosphere in other projects. The website provides http://stratosphere.eu/docs/0.5/";>extensive documentation of the system and the new features. + +Shortlist of new Features + +Below is a short list of the most important additions to the Stratosphere system. + +New Java API + +This release introduces a completely new data set-centric Java API. This programming model significantly eases the development of Stratosphere programs, supports flexible use of regular Java classes as data types, and adds many new built-in operators to simplify the writing of powerful programs. The result are programs that need less code, are more readable, interoperate better with existing code, and execute faster. + +Take a look at the http://stratosphere.eu/docs/0.5/programming_guides/examples_java.html";>examples to get a feel for the API. + +General API Improvements + +Broadcast Variables: Publish a data set to all instances of another operator. This is handy if the your operator depends on the result of a computation, e.g., filter all values smaller than the average. + +Distributed Cache: Make (local and HDFS) files locally available on each machine processing a task. + +Iteration Termination Improvements Iterative algorithms can now terminate based on intermediate data sets, not only through aggregated statistics. + +Collection data sources and sinks: Speed-up the development and testing of Stratosphere programs by reading data from regular Java collections and inserting back into them. + +JDBC data sources and sinks: Read data from and write data to relational databases using a JDBC driver. + +Hadoop input format and output format support: Read and write data with any Hadoop input or output format. + +Support for Avro encoded data: Read data that has been materialized using Avro. + +Deflate Files: Stratosphere now transparently reads .deflate compressed files. + +Runtime and Optimizer Improvements + +DAG Runtime Streaming: Detection and resolution of streaming data flow deadlocks in the data flow optimizer. + +Intermediate results across iteration boundaries: Intermediate results computed outside iterative parts can be used inside iterative parts of the program. + +Stability fixes: Various stability fixes in both optimizer and runtime. + +Setup & Tooling + +Improved YARN support: Many improvements based on user-feedback: Packaging, Permissions, Error handling. + +Java 8 compatibility + +Contributors + +In total, 26 people have contributed to Stratosphere since the last release. Thank you for making this project possible! + + +Alexander Alexandrov +Jesus Camacho +Ufuk Celebi +Mikhail Erofeev +Stephan Ewen +Alexandr Ferodov +Filip Haase +Jonathan Hasenberg +Markus Holzemer +Fabian Hueske +Vasia Kalavri +Aljoscha Krettek +Rajika Kumarasiri +Sebastian Kunert +Aaron Lam +Robert Metzger +Faisal Moeen +Martin Neumann +Mingliang Qi +Till Rohrmann +Chesnay Schepler +Vyachislav Soludev +Tuan Trieu +Artem Tsikiridis +Timo Walther +Robert Waury + + +Stratosphere is going Apache + +The Stratosphere project has been accepted to the Apache Incubator and will continue its work under the umbrella of the Apache Software Foundation. Due to a name conflict, we are switching the name of the project. We will make future releases of Stratosphere through the Apache foundation under a new name. + + Stratosphere version 0.5 available + + + Stratosphere accepted as Apache Incubator Project 16 Apr 2014 @@ -685,54 +775,6 @@ For a complete overview of the renamings Stratosphere wins award at Humboldt Innovation Competition "Big Data: Research meets Startups"
svn commit: r1661871 [2/3] - in /flink: q/ site/blog/ site/blog/page2/ site/blog/page3/ site/q/
Modified: flink/site/blog/index.html URL: http://svn.apache.org/viewvc/flink/site/blog/index.html?rev=1661871&r1=1661870&r2=1661871&view=diff == --- flink/site/blog/index.html (original) +++ flink/site/blog/index.html Tue Feb 24 09:47:45 2015 @@ -803,6 +803,696 @@ internally, fault tolerance, and perform + Introducing Flink Streaming + 06 Feb 2015 + + This post is the first of a series of blog posts on Flink Streaming, +the recent addition to Apache Flink that makes it possible to analyze +continuous data sources in addition to static files. Flink Streaming +uses the pipelined Flink engine to process data streams in real time, +and offers a new API including definition of flexible windows. + +In this post, we go through an example that uses the Flink Streaming +API to compute statistics on stock market data that arrive +continuously, and combine the stock market data with Twitter streams. +See the http://flink.apache.org/docs/latest/streaming_guide.html";>Streaming Programming +Guide for a +detailed presentation of the Streaming API. + +First, we read a bunch of stock price streams and combine them into +one stream of market data. We apply several transformations on this +market data stream, like rolling aggregations per stock. Then we emit +price warning alerts when the prices are rapidly changing. Moving +towards more advanced features, we compute rolling correlations +between the market data streams and a Twitter stream with stock mentions. + + + +Back to top + +Reading from multiple inputs + +First, let us create the stream of stock prices: + + +Read a socket stream of stock prices +Parse the text in the stream to create a stream of StockPrice objects +Add four other sources tagged with the stock symbol. +Finally, merge the streams to create a unified stream. + + + + + + + +def main(args: Array[String]) { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + + //Read from a socket stream at map it to StockPrice objects + val socketStockStream = env.socketTextStream("localhost", ).map(x => { +val split = x.split(",") +StockPrice(split(0), split(1).toDouble) + }) + + //Generate other stock streams + val SPX_Stream = env.addSource(generateStock("SPX")(10) _) + val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) + val DJI_Stream = env.addSource(generateStock("DJI")(30) _) + val BUX_Stream = env.addSource(generateStock("BUX")(40) _) + + //Merge all stock streams together + val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, +DJI_Stream, BUX_Stream) + + stockStream.print() + + env.execute("Stock stream") +} + + + + + +public static void main(String[] args) throws Exception { + +final StreamExecutionEnvironment env = +StreamExecutionEnvironment.getExecutionEnvironment(); + +//Read from a socket stream at map it to StockPrice objects +DataStreamsocketStockStream = env +.socketTextStream("localhost", ) +.map(new MapFunction () { +private String[] tokens; + +@Override +public StockPrice map(String value) throws Exception { +tokens = value.split(","); +return new StockPrice(tokens[0], +Double.parseDouble(tokens[1])); +} +}); + +//Generate other stock streams +DataStream SPX_stream = env.addSource(new StockSource("SPX", 10)); +DataStream FTSE_stream = env.addSource(new StockSource("FTSE", 20)); +DataStream DJI_stream = env.addSource(new StockSource("DJI", 30)); +DataStream BUX_stream = env.addSource(new StockSource("BUX", 40)); + +//Merge all stock streams together +DataStream stockStream = socketStockStream +.merge(SPX_stream, FTSE_stream, DJI_stream, BUX_stream); + +stockStream.print(); + +env.execute("Stock stream"); + + + + + +See +http://flink.apache.org/docs/latest/streaming_guide.html#sources";>here +on how you can create streaming sources for Flink Streaming +programs. Flink, of course, has support for reading in streams from +http://flink.apache.org/docs/latest/streaming_guide.html#stream-connectors";>external +sources +such as Apache Kafka, Apache Flume, RabbitMQ, and others. For the sake +of this example, the data streams are simply generated using the +generateSource method: + + + + +val symbols = List("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG") + +case class StockPrice(symbol: String, price: Double) + +def generateStock(symbol: String)(sigma: Int)(out: Collector[StockPrice]) = { + var price = 1000. + while (true) { +price = price + Random.nextGau
svn commit: r1661871 [1/3] - in /flink: q/ site/blog/ site/blog/page2/ site/blog/page3/ site/q/
Author: rmetzger Date: Tue Feb 24 09:47:45 2015 New Revision: 1661871 URL: http://svn.apache.org/r1661871 Log: [FLINK-1414] Add quickstarts to website Added: flink/q/ flink/q/quickstart-SNAPSHOT.sh (with props) flink/q/quickstart-scala-SNAPSHOT.sh (with props) flink/q/quickstart-scala.sh (with props) flink/q/quickstart.sh (with props) flink/site/q/ flink/site/q/quickstart-SNAPSHOT.sh (with props) flink/site/q/quickstart-scala-SNAPSHOT.sh (with props) flink/site/q/quickstart-scala.sh (with props) flink/site/q/quickstart.sh (with props) Modified: flink/site/blog/feed.xml flink/site/blog/index.html flink/site/blog/page2/index.html flink/site/blog/page3/index.html Added: flink/q/quickstart-SNAPSHOT.sh URL: http://svn.apache.org/viewvc/flink/q/quickstart-SNAPSHOT.sh?rev=1661871&view=auto == --- flink/q/quickstart-SNAPSHOT.sh (added) +++ flink/q/quickstart-SNAPSHOT.sh Tue Feb 24 09:47:45 2015 @@ -0,0 +1,48 @@ +#!/usr/bin/env bash + + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + + +PACKAGE=quickstart + +mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-quickstart-java \ + -DarchetypeVersion=0.9-SNAPSHOT \ + -DgroupId=org.apache.flink \ + -DartifactId=$PACKAGE \ + -Dversion=0.1 \ + -Dpackage=org.myorg.quickstart \ + -DinteractiveMode=false \ + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ + +# +# Give some guidance +# +echo -e "\\n\\n" +echo -e "\\tA sample quickstart Flink Job has been created." +echo -e "\\tSwitch into the directory using" +echo -e "\\t\\t cd $PACKAGE" +echo -e "\\tImport the project there using your favorite IDE (Import it as a maven project)" +echo -e "\\tBuild a jar inside the directory using" +echo -e "\\t\\t mvn clean package" +echo -e "\\tYou will find the runnable jar in $PACKAGE/target" +echo -e "\\tConsult our website if you have any troubles: http://flink.apache.org/community.html#mailing-lists"; +echo -e "\\n\\n" + Propchange: flink/q/quickstart-SNAPSHOT.sh -- svn:executable = * Added: flink/q/quickstart-scala-SNAPSHOT.sh URL: http://svn.apache.org/viewvc/flink/q/quickstart-scala-SNAPSHOT.sh?rev=1661871&view=auto == --- flink/q/quickstart-scala-SNAPSHOT.sh (added) +++ flink/q/quickstart-scala-SNAPSHOT.sh Tue Feb 24 09:47:45 2015 @@ -0,0 +1,48 @@ +#!/usr/bin/env bash + + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + + +PACKAGE=quickstart + +mvn archetype:generate