[1/2] flink git commit: [hotfix] [rpc] Add RpcConnectionTest to validate that connection buildup fails fast when endpoint is unreachable.

2016-09-26 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/flip-6 fdeda082f -> 3cda59339


[hotfix] [rpc] Add RpcConnectionTest to validate that connection buildup fails 
fast when endpoint is unreachable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/629078ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/629078ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/629078ee

Branch: refs/heads/flip-6
Commit: 629078ee3f5fcecd5498a81abaf8c99f9e614b02
Parents: fdeda08
Author: Stephan Ewen 
Authored: Wed Sep 21 13:03:17 2016 +0200
Committer: Stephan Ewen 
Committed: Fri Sep 23 19:44:13 2016 +0200

--
 .../flink/runtime/rpc/AsyncCallsTest.java   |  4 +-
 .../flink/runtime/rpc/RpcCompletenessTest.java  | 14 ++--
 .../flink/runtime/rpc/RpcConnectionTest.java| 86 
 3 files changed, 96 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/629078ee/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index e8255d4..7affdb9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -43,9 +43,9 @@ public class AsyncCallsTest extends TestLogger {
//  shared test members
// 

 
-   private static ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
+   private static final ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
 
-   private static AkkaRpcService akkaRpcService =
+   private static final AkkaRpcService akkaRpcService =
new AkkaRpcService(actorSystem, 
Time.milliseconds(1L));
 
@AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/629078ee/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index ee3f784..53355e8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -30,6 +30,7 @@ import org.reflections.Reflections;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -69,7 +70,8 @@ public class RpcCompletenessTest extends TestLogger {
 
@SuppressWarnings("rawtypes")
private void checkCompleteness(Class 
rpcEndpoint, Class rpcGateway) {
-   Method[] gatewayMethods = 
getRpcMethodsFromGateway(rpcGateway).toArray(new Method[0]);
+   List rpcMethodsFromGateway = 
getRpcMethodsFromGateway(rpcGateway);
+   Method[] gatewayMethods = rpcMethodsFromGateway.toArray(new 
Method[rpcMethodsFromGateway.size()]);
Method[] serverMethods = rpcEndpoint.getMethods();
 
Map> rpcMethods = new HashMap<>();
@@ -360,13 +362,13 @@ public class RpcCompletenessTest extends TestLogger {
}
 
// Get all methods declared in current interface
-   for(Method method : interfaceClass.getDeclaredMethods()) {
-   allMethods.add(method);
-   }
+   Collections.addAll(allMethods, 
interfaceClass.getDeclaredMethods());
 
// Get all method inherited from super interface
-   for(Class superClass : interfaceClass.getInterfaces()) {
-   allMethods.addAll(getRpcMethodsFromGateway(superClass));
+   for (Class superClass : interfaceClass.getInterfaces()) {
+   @SuppressWarnings("unchecked")
+   Class gatewayClass = (Class) superClass;
+   
allMethods.addAll(getRpcMethodsFromGateway(gatewayClass));
}
return allMethods;
}

http://git-wip-us.apache.org/repos/asf/flink/blob/629078ee/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
 
b/flink-runtime/src/te

[2/2] flink git commit: [FLINK-4408] [JobManager] Introduce JobMasterRunner and implement job submission & setting up the ExecutionGraph

2016-09-26 Thread sewen
[FLINK-4408] [JobManager] Introduce JobMasterRunner and implement job 
submission & setting up the ExecutionGraph

This closes #2480


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3cda5933
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3cda5933
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3cda5933

Branch: refs/heads/flip-6
Commit: 3cda59339ed593b04be3a897bf04a01e3673db5b
Parents: 629078e
Author: Kurt Young 
Authored: Thu Sep 8 12:00:13 2016 +0800
Committer: Stephan Ewen 
Committed: Fri Sep 23 20:30:05 2016 +0200

--
 .../flink/api/common/JobExecutionResult.java|   2 +-
 .../flink/api/common/JobSubmissionResult.java   |   2 +-
 .../HighAvailabilityServices.java   |  12 +
 .../runtime/highavailability/NonHaServices.java |  16 +-
 .../runtime/jobmanager/OnCompletionActions.java |  31 ++
 .../runtime/jobmanager/scheduler/Scheduler.java |   9 +
 .../runtime/jobmaster/JobManagerRunner.java | 288 +++
 .../runtime/jobmaster/JobManagerServices.java   |  73 +++
 .../flink/runtime/jobmaster/JobMaster.java  | 485 ++-
 .../runtime/jobmaster/JobMasterGateway.java |  13 +
 .../jobmaster/MiniClusterJobDispatcher.java | 385 +++
 .../flink/runtime/rpc/FatalErrorHandler.java|  24 +
 .../runtime/taskexecutor/TaskExecutor.java  |  12 +
 .../TestingHighAvailabilityServices.java|  39 +-
 .../jobmaster/JobManagerRunnerMockTest.java | 254 ++
 .../flink/runtime/rpc/RpcConnectionTest.java|  17 +-
 16 files changed, 1533 insertions(+), 129 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java 
b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
index cb4ecc5..7286cc5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
@@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit;
 @Public
 public class JobExecutionResult extends JobSubmissionResult {
 
-   private long netRuntime;
+   private final long netRuntime;
 
private final Map accumulatorResults;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java 
b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
index c5dc869..b0e7e24 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
@@ -26,7 +26,7 @@ import org.apache.flink.annotation.Public;
 @Public
 public class JobSubmissionResult {
 
-   private JobID jobID;
+   private final JobID jobID;
 
public JobSubmissionResult(JobID jobID) {
this.jobID = jobID;

http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 7634176..d67e927 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
@@ -61,4 +63,14 @@ public interface HighAvailabilityServices {
 * @param jobID The identifier of the job running the election.
 */
LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) 
throws Exception;
+
+   /**
+* Gets the checkpoint recovery factory for the job manager
+*/
+   CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws 
Exception;
+
+   /**
+* Gets 

flink git commit: [FLINK-4672] [taskmanager] Do not decorate Actor Kill messages

2016-09-26 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/release-1.1 62c666f57 -> caa0fbb21


[FLINK-4672] [taskmanager] Do not decorate Actor Kill messages


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/caa0fbb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/caa0fbb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/caa0fbb2

Branch: refs/heads/release-1.1
Commit: caa0fbb2157de56c9bdc4bbf8aedb73df90edede
Parents: 62c666f
Author: Stephan Ewen 
Authored: Fri Sep 23 18:42:47 2016 +0200
Committer: Stephan Ewen 
Committed: Mon Sep 26 11:25:01 2016 +0200

--
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 .../runtime/taskmanager/TaskManagerTest.java| 33 ++--
 2 files changed, 31 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/caa0fbb2/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
--
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a7dd789..8e787bb 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1333,7 +1333,7 @@ class TaskManager(
   "\n" +
   "A fatal error occurred, forcing the TaskManager to shut down: " + 
message, cause)
 
-self ! decorateMessage(Kill)
+self ! Kill
   }
 
   override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: 
UUID): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/caa0fbb2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index ce88c09..1c50265 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.Kill;
 import akka.actor.Props;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
@@ -55,6 +56,7 @@ import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackT
 import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleSuccess;
 import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
@@ -1369,6 +1371,28 @@ public class TaskManagerTest extends TestLogger {
}};
}
 
+   @Test
+   public void testTerminationOnFatalError() {
+   new JavaTestKit(system){{
+
+   final ActorGateway taskManager = 
TestingUtils.createTaskManager(
+   system,
+   system.deadLetters(), // no jobmanager
+   new Configuration(),
+   true,
+   false);
+
+   try {
+   watch(taskManager.actor());
+   taskManager.tell(new FatalError("test fatal 
error", new Exception("something super bad")));
+   expectTerminated(d, taskManager.actor());
+   }
+   finally {
+   taskManager.tell(Kill.getInstance());
+   }
+   }};
+   }
+   
// 

 
public static class SimpleJobManager extends FlinkUntypedActor {
@@ -1549,11 +1573,14 @@ public class TaskManagerTest extends TestLogger {
 
@Override
public void invoke() throws Exception {
-   Object o = new Object();
+   final Object o = new Object();
+   //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
synchroni

flink git commit: [hotfix] Fix restart strategy class loading by using not lower cased class name

2016-09-26 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 4a8e94403 -> 28ff5a3c9


[hotfix] Fix restart strategy class loading by using not lower cased class name


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28ff5a3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28ff5a3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28ff5a3c

Branch: refs/heads/master
Commit: 28ff5a3ce02b84f4e4aa568347ba5e13c3d2
Parents: 4a8e944
Author: Till Rohrmann 
Authored: Mon Sep 26 12:00:19 2016 +0200
Committer: Till Rohrmann 
Committed: Mon Sep 26 12:00:19 2016 +0200

--
 .../runtime/executiongraph/restart/RestartStrategyFactory.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/28ff5a3c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index ae92b3a..870bf63 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -79,9 +79,9 @@ public abstract class RestartStrategyFactory implements 
Serializable {
 * @throws Exception which indicates that the RestartStrategy could not 
be instantiated.
 */
public static RestartStrategyFactory 
createRestartStrategyFactory(Configuration configuration) throws Exception {
-   String restartStrategyName = 
configuration.getString(ConfigConstants.RESTART_STRATEGY, "none").toLowerCase();
+   String restartStrategyName = 
configuration.getString(ConfigConstants.RESTART_STRATEGY, "none");
 
-   switch (restartStrategyName) {
+   switch (restartStrategyName.toLowerCase()) {
case "none":
// support deprecated ConfigConstants values
final int numberExecutionRetries = 
configuration.getInteger(ConfigConstants.EXECUTION_RETRIES_KEY,



[2/2] flink git commit: [FLINK-4672] [taskmanager] Do not decorate Actor Kill messages

2016-09-26 Thread sewen
[FLINK-4672] [taskmanager] Do not decorate Actor Kill messages


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f237cfe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f237cfe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f237cfe

Branch: refs/heads/master
Commit: 6f237cfe6f70b5b72fedd3dea6fbeb6c929631e8
Parents: 28ff5a3
Author: Stephan Ewen 
Authored: Fri Sep 23 18:42:47 2016 +0200
Committer: Stephan Ewen 
Committed: Mon Sep 26 14:11:05 2016 +0200

--
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 .../runtime/taskmanager/TaskManagerTest.java| 33 ++--
 2 files changed, 31 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6f237cfe/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
--
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 9e2feb5..04f3168 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1379,7 +1379,7 @@ class TaskManager(
   "\n" +
   "A fatal error occurred, forcing the TaskManager to shut down: " + 
message, cause)
 
-self ! decorateMessage(Kill)
+self ! Kill
   }
 
   override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: 
UUID): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6f237cfe/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 0e53673..0774fd5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.Kill;
 import akka.actor.Props;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
@@ -55,6 +56,7 @@ import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackT
 import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleSuccess;
 import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
@@ -1367,6 +1369,28 @@ public class TaskManagerTest extends TestLogger {
}};
}
 
+   @Test
+   public void testTerminationOnFatalError() {
+   new JavaTestKit(system){{
+
+   final ActorGateway taskManager = 
TestingUtils.createTaskManager(
+   system,
+   system.deadLetters(), // no jobmanager
+   new Configuration(),
+   true,
+   false);
+
+   try {
+   watch(taskManager.actor());
+   taskManager.tell(new FatalError("test fatal 
error", new Exception("something super bad")));
+   expectTerminated(d, taskManager.actor());
+   }
+   finally {
+   taskManager.tell(Kill.getInstance());
+   }
+   }};
+   }
+   
// 

 
public static class SimpleJobManager extends FlinkUntypedActor {
@@ -1547,11 +1571,14 @@ public class TaskManagerTest extends TestLogger {
 
@Override
public void invoke() throws Exception {
-   Object o = new Object();
+   final Object o = new Object();
+   //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
synchronized (o) {
-   o.wait();
+   //noins

[1/2] flink git commit: [FLINK-4218] [checkpoints] Do not rely on FileSystem to determing state sizes

2016-09-26 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master 28ff5a3c9 -> 95e9004e3


[FLINK-4218] [checkpoints] Do not rely on FileSystem to determing state sizes

This prevents failures on eventually consistent S3, where the operations for
keys (=entries in the parent directory/bucket) are not guaranteed to be 
immediately
 consistent (visible) after a blob was written.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95e9004e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95e9004e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95e9004e

Branch: refs/heads/master
Commit: 95e9004e36fffae755eab7aa3d5d0d5e8bfb7113
Parents: 6f237cf
Author: Stephan Ewen 
Authored: Fri Sep 23 15:16:27 2016 +0200
Committer: Stephan Ewen 
Committed: Mon Sep 26 14:11:05 2016 +0200

--
 .../runtime/checkpoint/CompletedCheckpoint.java |  2 +-
 .../flink/runtime/checkpoint/TaskState.java |  2 +-
 .../savepoint/SavepointV1Serializer.java|  6 ++--
 .../flink/runtime/state/ChainedStateHandle.java |  2 +-
 .../runtime/state/KeyGroupsStateHandle.java |  2 +-
 .../state/RetrievableStreamStateHandle.java |  9 +++---
 .../apache/flink/runtime/state/StateObject.java |  6 ++--
 .../state/filesystem/FileStateHandle.java   | 32 
 .../filesystem/FsCheckpointStreamFactory.java   |  9 +-
 .../FileSystemStateStorageHelper.java   | 15 +++--
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  2 +-
 .../stats/SimpleCheckpointStatsTrackerTest.java | 18 +++
 .../state/AbstractCloseableHandleTest.java  |  6 ++--
 .../tasks/InterruptSensitiveRestoreTest.java|  3 +-
 14 files changed, 58 insertions(+), 56 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index e412006..7cb3916 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -108,7 +108,7 @@ public class CompletedCheckpoint implements StateObject {
}
 
@Override
-   public long getStateSize() throws Exception {
+   public long getStateSize() throws IOException {
long result = 0L;
 
for (TaskState taskState : taskStates.values()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index 9025090..657dd60 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -152,7 +152,7 @@ public class TaskState implements StateObject {
 
 
@Override
-   public long getStateSize() throws Exception {
+   public long getStateSize() throws IOException {
long result = 0L;
 
for (int i = 0; i < parallelism; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 8e05b81..f07f44f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -197,6 +197,7 @@ class SavepointV1Serializer implements 
SavepointSerializer {
} else if (stateHandle instanceof FileStateHandle) {
dos.writeByte(FILE_STREAM_STATE_HANDLE);
FileStateHandle fileStateHandle = (FileStateHandle) 
stateHandle;
+   dos.writeLong(stateHandle.getStateSize());
dos.writeUTF(fileStateHandle.getFilePath().toString());
 
} else if (stateHandle instanceof ByteStreamStateHandle) {
@@ -218,12 +219,13 @@ class Savepoint

flink git commit: [FLINK-4580] [rpc] Report rpc invocation exceptions to the caller

2016-09-26 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/flip-6 3cda59339 -> 2a61e74b9


[FLINK-4580] [rpc] Report rpc invocation exceptions to the caller

This closes #2526.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a61e74b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a61e74b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a61e74b

Branch: refs/heads/flip-6
Commit: 2a61e74b9108835ffb5f8ab89d67cb7105801594
Parents: 3cda593
Author: Till Rohrmann 
Authored: Wed Sep 21 15:18:27 2016 +0200
Committer: Till Rohrmann 
Committed: Mon Sep 26 14:26:54 2016 +0200

--
 .../flink/runtime/rpc/akka/AkkaRpcActor.java| 53 
 .../runtime/rpc/akka/AkkaRpcActorTest.java  | 34 +
 2 files changed, 66 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2a61e74b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 59daa46..1b456a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.akka.messages.Processing;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,11 +87,11 @@ class AkkaRpcActor> extends Untyp
unstashAll();
getContext().become(new Procedure() {
@Override
-   public void apply(Object message) throws 
Exception {
-   if (message.equals(Processing.STOP)) {
+   public void apply(Object msg) throws Exception {
+   if (msg.equals(Processing.STOP)) {
getContext().unbecome();
} else {
-   handleMessage(message);
+   handleMessage(msg);
}
}
});
@@ -130,21 +131,36 @@ class AkkaRpcActor> extends Untyp
 * @param rpcInvocation Rpc invocation message
 */
private void handleRpcInvocation(RpcInvocation rpcInvocation) {
+   Method rpcMethod = null;
+
try {
String methodName = rpcInvocation.getMethodName();
Class[] parameterTypes = 
rpcInvocation.getParameterTypes();
 
-   Method rpcMethod = lookupRpcMethod(methodName, 
parameterTypes);
+   rpcMethod = lookupRpcMethod(methodName, parameterTypes);
+   } catch(ClassNotFoundException e) {
+   LOG.error("Could not load method arguments.", e);
+
+   RpcConnectionException rpcException = new 
RpcConnectionException("Could not load method arguments.", e);
+   getSender().tell(new Status.Failure(rpcException), 
getSelf());
+   } catch (IOException e) {
+   LOG.error("Could not deserialize rpc invocation 
message.", e);
+
+   RpcConnectionException rpcException = new 
RpcConnectionException("Could not deserialize rpc invocation message.", e);
+   getSender().tell(new Status.Failure(rpcException), 
getSelf());
+   } catch (final NoSuchMethodException e) {
+   LOG.error("Could not find rpc method for rpc 
invocation.", e);
+
+   RpcConnectionException rpcException = new 
RpcConnectionException("Could not find rpc method for rpc invocation.", e);
+   getSender().tell(new Status.Failure(rpcException), 
getSelf());
+   }
 
-   if (rpcMethod.getReturnType().equals(Void.TYPE)) {
-   // No return value to send back
-   try {
+   if (rpcMethod != null) {
+   try {
+   if 
(rpcMethod.getReturnType().equals(Void.TYPE)) {
+   // No return value to send back
rpcMethod.invoke(rpcEndpoint, 
rpcInvocation.g

flink git commit: [FLINK-4662] Bump Calcite version up to 1.9

2016-09-26 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 95e9004e3 -> 8fa313c39


[FLINK-4662] Bump Calcite version up to 1.9

This closes #2535.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8fa313c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8fa313c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8fa313c3

Branch: refs/heads/master
Commit: 8fa313c39fbad7bb96327477544d1ec15e8dc0f6
Parents: 95e9004
Author: Jark Wu 
Authored: Thu Sep 22 21:47:37 2016 +0800
Committer: twalthr 
Committed: Mon Sep 26 15:58:39 2016 +0200

--
 flink-libraries/flink-table/pom.xml  |  2 +-
 .../apache/flink/api/table/FlinkPlannerImpl.scala| 15 ---
 .../apache/flink/api/table/FlinkTypeFactory.scala|  8 
 .../flink/api/table/codegen/CodeGenerator.scala  |  4 ++--
 .../flink/api/table/expressions/arithmetic.scala |  5 +
 .../table/plan/nodes/dataset/DataSetConvention.scala |  7 ++-
 .../api/table/plan/nodes/dataset/DataSetRel.scala|  4 ++--
 .../plan/nodes/datastream/DataStreamConvention.scala |  7 ++-
 8 files changed, 34 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/pom.xml
--
diff --git a/flink-libraries/flink-table/pom.xml 
b/flink-libraries/flink-table/pom.xml
index e83a778..4c91f1c 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -51,7 +51,7 @@ under the License.

org.apache.calcite
calcite-core
-   1.7.0
+   1.9.0



org.apache.calcite.avatica

http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
index bb08654..97e5cf2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
@@ -98,10 +98,10 @@ class FlinkPlannerImpl(
   assert(validatedSqlNode != null)
   val rexBuilder: RexBuilder = createRexBuilder
   val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+  val config = SqlToRelConverter.configBuilder()
+.withTrimUnusedFields(false).withConvertTableAccess(false).build()
   val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
-new ViewExpanderImpl, validator, createCatalogReader, cluster, 
convertletTable)
-  sqlToRelConverter.setTrimUnusedFields(false)
-  sqlToRelConverter.enableTableAccessConversion(false)
+new ViewExpanderImpl, validator, createCatalogReader, cluster, 
convertletTable, config)
   root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
   root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
   root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
@@ -118,7 +118,8 @@ class FlinkPlannerImpl(
 override def expandView(
 rowType: RelDataType,
 queryString: String,
-schemaPath: util.List[String]): RelRoot = {
+schemaPath: util.List[String],
+viewPath: util.List[String]): RelRoot = {
 
   val parser: SqlParser = SqlParser.create(queryString, parserConfig)
   var sqlNode: SqlNode = null
@@ -136,10 +137,10 @@ class FlinkPlannerImpl(
   val validatedSqlNode: SqlNode = validator.validate(sqlNode)
   val rexBuilder: RexBuilder = createRexBuilder
   val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+  val config: SqlToRelConverter.Config = SqlToRelConverter.configBuilder
+.withTrimUnusedFields(false).withConvertTableAccess(false).build
   val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
-new ViewExpanderImpl, validator, catalogReader, cluster, 
convertletTable)
-  sqlToRelConverter.setTrimUnusedFields(false)
-  sqlToRelConverter.enableTableAccessConversion(false)
+new ViewExpanderImpl, validator, catalogReader, cluster, 
convertletTable, config)
   root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false)
   root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
   root = root.withRel(RelDecorrelator.decorrelateQuery(root.re

flink git commit: [FLINK-4684] [checkpoints] Remove redundant class loader from CheckpointCoordinator

2016-09-26 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master 8fa313c39 -> 70e71c161


[FLINK-4684] [checkpoints] Remove redundant class loader from 
CheckpointCoordinator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70e71c16
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70e71c16
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70e71c16

Branch: refs/heads/master
Commit: 70e71c16177b40c2418e6a8ca0838bf117f6a926
Parents: 8fa313c
Author: Stephan Ewen 
Authored: Mon Sep 26 12:32:10 2016 +0200
Committer: Stephan Ewen 
Committed: Mon Sep 26 18:05:01 2016 +0200

--
 .../checkpoint/CheckpointCoordinator.java   |   5 -
 .../flink/runtime/checkpoint/TaskState.java |   2 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   1 -
 .../apache/flink/runtime/state/StateUtil.java   |   2 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 101 +++
 .../checkpoint/CheckpointStateRestoreTest.java  |   7 +-
 6 files changed, 38 insertions(+), 80 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index fc40911..6a43ddf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -112,9 +112,6 @@ public class CheckpointCoordinator {
 * need to be ascending across job managers. */
private final CheckpointIDCounter checkpointIdCounter;
 
-   /** Class loader used to deserialize the state handles (as they may be 
user-defined) */
-   private final ClassLoader userClassLoader;
-
/** The base checkpoint interval. Actual trigger time may be affected 
by the
 * max concurrent checkpoints and minimum-pause values */
private final long baseInterval;
@@ -167,7 +164,6 @@ public class CheckpointCoordinator {
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
-   ClassLoader userClassLoader,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
SavepointStore savepointStore,
@@ -198,7 +194,6 @@ public class CheckpointCoordinator {
this.completedCheckpointStore = 
checkNotNull(completedCheckpointStore);
this.savepointStore = checkNotNull(savepointStore);
this.recentPendingCheckpoints = new 
ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
-   this.userClassLoader = checkNotNull(userClassLoader);
this.statsTracker = checkNotNull(statsTracker);
 
this.timer = new Timer("Checkpoint Timer", true);

http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index 657dd60..f5e3618 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -62,7 +62,7 @@ public class TaskState implements StateObject {
"Parallelism " + parallelism + " is not smaller 
or equal to max parallelism " + maxParallelism + ".");
 
this.jobVertexID = jobVertexID;
-   //preallocate lists of the required size, so that we can 
randomly set values to indexes
+
this.subtaskStates = new HashMap<>(parallelism);
this.keyGroupsStateHandles = new HashMap<>(parallelism);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index c3cf297..7c3fa0b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/f

flink git commit: [FLINK-4590] [table] Some Table API tests are failing when debug lvl is set to DEBUG

2016-09-26 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 70e71c161 -> 7eb45c133


[FLINK-4590] [table] Some Table API tests are failing when debug lvl is set to 
DEBUG

This closes #2504.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7eb45c13
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7eb45c13
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7eb45c13

Branch: refs/heads/master
Commit: 7eb45c133c49933b14719f06bf68ccf162a3e0b2
Parents: 70e71c1
Author: twalthr 
Authored: Fri Sep 16 10:52:28 2016 +0200
Committer: twalthr 
Committed: Mon Sep 26 18:48:47 2016 +0200

--
 .../table/plan/nodes/dataset/BatchScan.scala| 10 ++---
 .../nodes/dataset/BatchTableSourceScan.scala|  6 +--
 .../plan/nodes/dataset/DataSetAggregate.scala   | 28 +++---
 .../table/plan/nodes/dataset/DataSetCalc.scala  | 16 
 .../plan/nodes/dataset/DataSetIntersect.scala   | 16 
 .../table/plan/nodes/dataset/DataSetJoin.scala  | 20 +-
 .../table/plan/nodes/dataset/DataSetMinus.scala | 39 +---
 .../table/plan/nodes/dataset/DataSetScan.scala  |  6 +--
 .../table/plan/nodes/dataset/DataSetSort.scala  | 22 ++-
 .../table/plan/nodes/dataset/DataSetUnion.scala | 19 ++
 .../plan/nodes/dataset/DataSetValues.scala  | 16 
 .../plan/nodes/datastream/DataStreamCalc.scala  | 16 
 .../plan/nodes/datastream/DataStreamScan.scala  |  6 +--
 .../plan/nodes/datastream/DataStreamUnion.scala | 16 
 .../nodes/datastream/DataStreamValues.scala | 12 +++---
 .../plan/nodes/datastream/StreamScan.scala  |  8 ++--
 .../src/test/resources/log4j-test.properties| 27 ++
 .../src/test/resources/logback-test.xml | 29 +++
 18 files changed, 193 insertions(+), 119 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
index 85ed6ef..15b2081 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
@@ -36,14 +36,14 @@ abstract class BatchScan(
 cluster: RelOptCluster,
 traitSet: RelTraitSet,
 table: RelOptTable,
-rowType: RelDataType)
+rowRelDataType: RelDataType)
   extends TableScan(cluster, traitSet, table)
   with DataSetRel {
 
-  override def deriveRowType() = rowType
+  override def deriveRowType() = rowRelDataType
 
   override def toString: String = {
-s"Source(from: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))"
+s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", 
")}))"
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
@@ -81,14 +81,14 @@ abstract class BatchScan(
 
   val mapFunc = getConversionMapper(
 config,
-false,
+nullableInput = false,
 inputType,
 determinedType,
 "DataSetSourceConversion",
 getRowType.getFieldNames,
 Some(flinkTable.fieldIndexes))
 
-  val opName = s"from: 
(${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+  val opName = s"from: 
(${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
   input.map(mapFunc).name(opName)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 027a5be..10d9534 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -35,15 +35,15 @@ class BatchTableSourceScan(
 rowType: RelDataType)
   extends BatchScan(cluster, traitSet, table, rowType) {
 
-  val tableSourceTable = table.unwrap(classOf[TableSourceTable])
+  val tableSourceTable = getTable.unwrap(classOf[

flink git commit: [FLINK-4252] [table] Validate input and output classes of Table API

2016-09-26 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 7eb45c133 -> f150f9877


[FLINK-4252] [table] Validate input and output classes of Table API

This closes #2507.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f150f987
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f150f987
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f150f987

Branch: refs/heads/master
Commit: f150f987772c8d96f41a5acd1d20cba6622cb5c9
Parents: 7eb45c1
Author: twalthr 
Authored: Fri Sep 16 14:27:47 2016 +0200
Committer: twalthr 
Committed: Mon Sep 26 18:58:44 2016 +0200

--
 .../api/java/table/BatchTableEnvironment.scala  |  5 ++---
 .../flink/api/table/BatchTableEnvironment.scala |  4 +++-
 .../flink/api/table/FlinkRelBuilder.scala   |  4 ++--
 .../api/table/StreamTableEnvironment.scala  |  2 ++
 .../flink/api/table/TableEnvironment.scala  | 15 +
 .../api/java/batch/table/FromDataSetITCase.java | 23 
 6 files changed, 47 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
index 9ba5b20..a4f40d5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
@@ -17,12 +17,11 @@
  */
 package org.apache.flink.api.java.table
 
-import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
 import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.api.table.expressions.ExpressionParser
-import org.apache.flink.api.table.{Row, TableConfig, Table}
+import org.apache.flink.api.table.{Table, TableConfig}
 
 /**
   * The [[org.apache.flink.api.table.TableEnvironment]] for a Java batch 
[[DataSet]]

http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index eb4c819..ad3ff7a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -190,7 +190,7 @@ abstract class BatchTableEnvironment(
 *
 * @param table The table for which the AST and execution plan will be 
returned.
 */
-  def explain(table: Table): String = explain(table: Table, false)
+  def explain(table: Table): String = explain(table: Table, extended = false)
 
   /**
 * Registers a [[DataSet]] as a table under a given name in the 
[[TableEnvironment]]'s catalog.
@@ -240,6 +240,8 @@ abstract class BatchTableEnvironment(
 */
   protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): 
DataSet[A] = {
 
+validateType(tpe)
+
 val relNode = table.getRelNode
 
 // decorrelate

http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
index e3bb97e..3827f05 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.api.table
 
 import org.apache.calcite.jdbc.CalciteSchema
-import org.apache.calcite.plan.{Context, RelOptCluster, RelOptSchema}
+import org.apache.calcite.plan.{Context, RelOptCluster, RelOptPlanner, 
RelOptSchema}
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rex.RexBuilder
 import org.apache.calcite.schema.SchemaPlus
@@ -38,7 +38,7 @@ class FlinkR

flink git commit: [FLINK-4241] [table] Cryptic expression parser exceptions

2016-09-26 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master f150f9877 -> ef1598498


[FLINK-4241] [table] Cryptic expression parser exceptions

This closes #2529.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef159849
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef159849
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef159849

Branch: refs/heads/master
Commit: ef15984988e883ced5311b332e5e5d8521c9573f
Parents: f150f98
Author: twalthr 
Authored: Wed Sep 21 17:12:31 2016 +0200
Committer: twalthr 
Committed: Mon Sep 26 19:06:43 2016 +0200

--
 docs/dev/table_api.md   |  2 +-
 .../table/expressions/ExpressionParser.scala| 56 
 2 files changed, 34 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ef159849/docs/dev/table_api.md
--
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index bb083ff..1d03b38 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -953,7 +953,7 @@ alias = logic | ( logic , "AS" , fieldReference ) ;
 
 logic = comparison , [ ( "&&" | "||" ) , comparison ] ;
 
-comparison = term , [ ( "=" | "===" | "!=" | "!==" | ">" | ">=" | "<" | "<=" ) 
, term ] ;
+comparison = term , [ ( "=" | "==" | "===" | "!=" | "!==" | ">" | ">=" | "<" | 
"<=" ) , term ] ;
 
 term = product , [ ( "+" | "-" ) , product ] ;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef159849/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index ae027e9..4707adf 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -154,9 +154,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   }
 
   lazy val literalExpr: PackratParser[Expression] =
-numberLiteral |
-  stringLiteralFlink | singleQuoteStringLiteral |
-  boolLiteral | nullLiteral
+numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | 
boolLiteral | nullLiteral
 
   lazy val fieldReference: PackratParser[NamedExpression] = (STAR | ident) ^^ {
 sym => UnresolvedFieldReference(sym)
@@ -334,7 +332,8 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 
   // suffix/prefix composite
 
-  lazy val composite: PackratParser[Expression] = suffixed | prefixed | atom
+  lazy val composite: PackratParser[Expression] = suffixed | prefixed | atom |
+failure("Composite expression expected.")
 
   // unary ops
 
@@ -342,22 +341,25 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 
   lazy val unaryMinus: PackratParser[Expression] = "-" ~> composite ^^ { e => 
UnaryMinus(e) }
 
-  lazy val unary = composite | unaryNot | unaryMinus
+  lazy val unary = composite | unaryNot | unaryMinus |
+failure("Unary expression expected.")
 
   // arithmetic
 
   lazy val product = unary * (
 "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } |
-  "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } |
-  "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } )
+"/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } |
+"%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } ) |
+failure("Product expected.")
 
   lazy val term = product * (
 "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } |
- "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } )
+"-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } ) |
+failure("Term expected.")
 
   // Comparison
 
-  lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "=") ~ term ^^ 
{
+  lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "==" | "=") ~ 
term ^^ {
 case l ~ _ ~ r => EqualTo(l, r)
   }
 
@@ -382,23 +384,26 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   }
 
   lazy val comparison: PackratParser[Expression] =
-  equalTo | notEqualTo |
-  greaterThan | greaterThanOrEqual |
-  lessThan | lessThanOrEqual | term
+equalTo | notEqualTo |
+greaterThan | greaterThanOrEqual |
+lessThan | lessThanOrEqual | term |
+failure("Comparison expected.")
 
   // logic
 
   lazy val logic = comparison * (
 "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } |
-  "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } 

buildbot failure in on flink-docs-release-0.10

2016-09-26 Thread buildbot
The Buildbot has detected a new failure on builder flink-docs-release-0.10 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.10/builds/343

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave2_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' 
triggered this build
Build Source Stamp: [branch release-0.10] HEAD
Blamelist: 

BUILD FAILED: failed compile

Sincerely,
 -The Buildbot