[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1048


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-28 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135746375
  
Actually, we can somewhat ignore the first problem - it seems that we can 
rebase the client refactoring pull requests well onto this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-28 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135746761
  
Okay, one change I would like to do during merging is to not deserialize 
exceptions on the JobManager. I would extend the SerializedThrowable to include 
the original message and original stack trace so that it can be printed / 
logged just like the original exception. Exceptions in the execution graph 
would always be serialized exceptions always as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-28 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135770745
  
The `SerializedThrowable` is then a subclass of `Exception`?

I think it's a very good solution to not deserialize the exception on the 
`JobManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-28 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135767450
  
I agree. In both cases the compiler cannot help much to avoid forgetting 
about it and, thus, it's the responsibility of the programmer to make sure that 
the failure case is handled properly. It should be added to the description of 
the `JobSubmitAndWait` message what can expected as the return value of the 
future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-28 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135769797
  
The added advantage of that approach s that we keep no user exceptions 
alive in the execution graph.

Not keeping those alive is quite desirable, actually, to allow user code 
unloading.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-28 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135769165
  
I have now actually a pretty interesting solution. I changed the 
SerializedThrowable such that it looks like the original exception, with 
respect to message, stack trace, printing, etc. Logging the serialized 
throwable, for example, looks like logging the original exception. If one 
forgets to de-serialize it, it is not super bad any more.

Furthermore, the SerializedThrowable is not unwrapped when sent by a 
TaskManager to the JobManager, but kept as it is (skip some work).

When sending an exception back to the client, the original message is sent 
back. Which may be a SerializedThrowable (if it originated at the workers) or 
not (if it originated at the JobManager, such as missing resources).

The client would still need to check and deserialize like it does now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-28 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135765919
  
Explicit would mean here to send something where the receiver knows that he 
has to deserialize. Whether through a dedicated message, or a 
SerializedThrowable that is NOT a Throwable would not make much difference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-28 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135770247
  
That sounds very good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-28 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135811858
  
I'm not sure whether this would solve the initial problem. I thought the 
problem was that a user code exception can be sent from the `JobManager` to the 
`JobClientActor` which will cause Akka to fail while serializing it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-28 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135793882
  
Only disadvantage is that archived execution graphs prevent user class 
unloading...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-28 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135812417
  
Sending a stringified exception is not an option between jm and client 
because the user might rely on the exception in the RemoteExecEnv.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-28 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135793797
  
Hmm... the more I work on it, the trickier it appears.

It is efficient when done rigth, but one needs to watch carefully at what 
places to wrap an exception and when not.

The solution to always serialize/deserialize manually during transport is 
far easier, actually...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-28 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135813751
  
Right, I am going for an explicit serialized form of the exception at this 
point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r38078466
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 ---
@@ -239,7 +239,7 @@ abstract class FlinkMiniCluster(
 
   @throws(classOf[JobExecutionException])
   def submitJobAndWait(jobGraph: JobGraph, printUpdates: Boolean)
-: 
SerializedJobExecutionResult = {
+: 
JobExecutionResult = {
--- End diff --

Indentation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r38078400
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -349,27 +351,29 @@ class JobManager(
 
   case JobStatus.CANCELED =
 jobInfo.client ! decorateMessage(
-  Failure(
-new JobCancellationException(
-  jobID,
-Job was cancelled., error)
+  Failure(new SerializedThrowable(
+  new JobCancellationException(
+jobID,
+  Job was cancelled., deserializedError)
+)
   )
 )
 
   case JobStatus.FAILED =
 jobInfo.client ! decorateMessage(
-  Failure(
-new JobExecutionException(
-  jobID,
-  Job execution failed.,
-  error)
+  Failure(new SerializedThrowable(
+  new JobExecutionException(
+jobID,
+Job execution failed.,
+deserializedError)
+)
   )
 )
 
   case x =
 val exception = new JobExecutionException(jobID, s$x is 
not a  +
   terminal state.)
-jobInfo.client ! decorateMessage(Failure(exception))
+jobInfo.client ! decorateMessage(Failure(new 
SerializedThrowable(exception)))
--- End diff --

Do we really need to serialize the `JobExecutionException` in a 
`SerializedThrowable` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r38078531
  
--- Diff: flink-tests/src/test/resources/log4j-test.properties ---
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
--- End diff --

Unintended commit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r38078839
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -349,27 +351,29 @@ class JobManager(
 
   case JobStatus.CANCELED =
 jobInfo.client ! decorateMessage(
-  Failure(
-new JobCancellationException(
-  jobID,
-Job was cancelled., error)
+  Failure(new SerializedThrowable(
+  new JobCancellationException(
+jobID,
+  Job was cancelled., deserializedError)
+)
   )
 )
 
   case JobStatus.FAILED =
 jobInfo.client ! decorateMessage(
-  Failure(
-new JobExecutionException(
-  jobID,
-  Job execution failed.,
-  error)
+  Failure(new SerializedThrowable(
+  new JobExecutionException(
--- End diff --

Same here with the serialization of `JobExecutionException`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-27 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135367041
  
I had some more comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r38078804
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -349,27 +351,29 @@ class JobManager(
 
   case JobStatus.CANCELED =
 jobInfo.client ! decorateMessage(
-  Failure(
-new JobCancellationException(
-  jobID,
-Job was cancelled., error)
+  Failure(new SerializedThrowable(
+  new JobCancellationException(
--- End diff --

Why do we have to serialize the `JobCancellationException` here? Isn't it 
enough to only serialize the error transferred with the `JobStatusChanged` 
message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-27 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-135363969
  
I addressed all PR comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r38078076
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -177,9 +176,18 @@ public static SerializedJobExecutionResult 
submitJobAndWait(
throw new JobTimeoutException(jobGraph.getJobID(), 
Timeout while waiting for JobManager answer.  +
Job time exceeded  + 
AkkaUtils.INF_TIMEOUT(), e);
}
-   catch (Throwable t) {
+   catch(SerializedThrowable serializedThrowable) {
+   Throwable deserialized = 
SerializedThrowable.get(serializedThrowable, userCodeClassloader);
+   if(deserialized instanceof JobExecutionException) {
+   // no need to wrap the Throwable into a 
JobExecutionException again.
+   throw (JobExecutionException)deserialized;
+   }
+   throw new JobExecutionException(jobGraph.getJobID(),
--- End diff --

Can we put this statement in an `else` branch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r38078252
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -349,27 +351,29 @@ class JobManager(
 
   case JobStatus.CANCELED =
 jobInfo.client ! decorateMessage(
-  Failure(
-new JobCancellationException(
-  jobID,
-Job was cancelled., error)
+  Failure(new SerializedThrowable(
+  new JobCancellationException(
+jobID,
+  Job was cancelled., deserializedError)
--- End diff --

Indentation and line breaks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-24 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r37754281
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * Utility class for dealing with serialized Throwables.
+ * Needed to send around user-specific exception classes with Akka.
+ */
+public class SerializedThrowable implements Serializable {
+   private final byte[] serializedError;
+
+   // The exception must not be (de)serialized with the class, as its
+   // class may not be part of the system class loader.
+   private transient Throwable cachedError;
+
+
+   /**
+* Create a new SerializedThrowable.
+* @param error The exception to serialize.
+*/
+   public SerializedThrowable(Throwable error) {
+   Preconditions.checkNotNull(error, The exception to serialize 
has to be set);
+   this.cachedError = error;
+   byte[] serializedError;
+   try {
+   serializedError = 
InstantiationUtil.serializeObject(error);
+   }
+   catch (Throwable t) {
+   // could not serialize exception. send the stringified 
version instead
+   try {
+   this.cachedError = new 
Exception(ExceptionUtils.stringifyException(error));
+   serializedError = 
InstantiationUtil.serializeObject(this.cachedError);
+   }
+   catch (Throwable tt) {
+   // seems like we cannot do much to report the 
actual exception
+   // report a placeholder instead
+   try {
+   this.cachedError = new Exception(Cause 
is a ' + error.getClass().getName()
+   + ' (failed to 
serialize or stringify));
+   serializedError = 
InstantiationUtil.serializeObject(this.cachedError);
+   }
+   catch (Throwable ttt) {
+   // this should never happen unless the 
JVM is fubar.
+   // we just report the state without the 
error
+   this.cachedError = null;
+   serializedError = null;
+   }
+   }
+   }
+   this.serializedError = serializedError;
+   }
+
+   public Throwable getError(ClassLoader usercodeClassloader) {
--- End diff --

What kind of camel case is `usercodeClassloader`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-24 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r37754479
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -327,8 +327,11 @@ class JobManager(
   currentJobs.get(jobID) match {
 case Some((executionGraph, jobInfo)) = executionGraph.getJobName
 
+  val deserializedError = if(error != null) {
+error.getError(executionGraph.getUserClassLoader)
--- End diff --

why not renaming `error.getError` = `serializedError.deserializeError`? 
Makes things a little bit clearer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-24 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r37749500
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java ---
@@ -176,8 +175,7 @@ public JobExecutionResult executePlan(Plan plan) throws 
Exception {
JobGraph jobGraph = jgg.compileJobGraph(op);

boolean sysoutPrint = 
isPrintingStatusDuringExecution();
-   SerializedJobExecutionResult result = 
flink.submitJobAndWait(jobGraph,sysoutPrint);
-   return 
result.toJobExecutionResult(ClassLoader.getSystemClassLoader());
+   return 
flink.submitJobAndWait(jobGraph,sysoutPrint);
--- End diff --

Missing space...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-24 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r37753243
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -123,12 +122,12 @@ public static InetSocketAddress 
getJobManagerAddress(Configuration config) throw
 * @throws org.apache.flink.runtime.client.JobExecutionException Thrown 
if the job
 *   
execution fails.
 */
-   public static SerializedJobExecutionResult submitJobAndWait(
+   public static JobExecutionResult submitJobAndWait(
ActorSystem actorSystem,
ActorGateway jobManagerGateway,
JobGraph jobGraph,
FiniteDuration timeout,
-   boolean sysoutLogUpdates) throws JobExecutionException {
+   boolean sysoutLogUpdates, ClassLoader 
usercodeClassloader) throws JobExecutionException {
--- End diff --

Style: Why appending the new parameter to the last line when all the other 
parameters are written in a new line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-24 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r37753621
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -1028,8 +1029,12 @@ public void registerExecutionListener(ActorGateway 
listener) {

private void notifyJobStatusChange(JobStatus newState, Throwable error) 
{
if (jobStatusListenerActors.size()  0) {
+   SerializedThrowable st = null;
--- End diff --

`st` is not really descriptive. A slightly longer variable name could be 
helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-24 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-134206474
  
Looks good, modulo some minor comments. But maybe @tillrohrmann has 
something to say, let's wait for him. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-24 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r37752056
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * Utility class for dealing with serialized Throwables.
+ * Needed to send around user-specific exception classes with Akka.
+ */
+public class SerializedThrowable implements Serializable {
--- End diff --

no serialVersionUID


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-24 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/1048

[FLINK-2543] Fix user object deserialization for user state

File-based state handles were using the system classloader to deserialize 
the state object.

Exceptions send from the JobManager to the JobClient were relying on Akka's 
JavaSerialization, which does not have access to the user code classloader.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink2543

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1048.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1048


commit f5a642267a5ce69018d900398e4eb5134b2ea747
Author: Robert Metzger rmetz...@apache.org
Date:   2015-08-18T16:15:40Z

[FLINK-2543] Fix user object deserialization for file-based state handles. 
Send exceptions from JM -- JC in serialized form

Exceptions send from the JobManager to the JobClient were relying on Akka's 
JavaSerialization, which does not have access to the user code classloader.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-24 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r37753999
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
 ---
@@ -160,19 +123,10 @@ public TaskExecutionState(JobID jobID, 
ExecutionAttemptID executionId,
 *job this update refers to.
 */
public Throwable getError(ClassLoader usercodeClassloader) {
-   if (this.serializedError == null) {
+   if (this.throwable == null) {
return null;
}
-
-   if (this.cachedError == null) {
-   try {
-   cachedError = (Throwable) 
InstantiationUtil.deserializeObject(this.serializedError, usercodeClassloader);
-   }
-   catch (Exception e) {
-   throw new RuntimeException(Error while 
deserializing the attached exception, e);
-   }
-   }
-   return this.cachedError;
+   return throwable.getError(usercodeClassloader);
--- End diff --

Could we place the last return in an else branch to make the control flow 
clearer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-24 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r37756350
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java 
---
@@ -128,6 +128,13 @@ else if (message instanceof Status.Success) {
// job was successfully submitted :-)
logger.info(Job was successfully submitted to the 
JobManager);
}
+   else if (message instanceof JobClientMessages.JobFailure) {
--- End diff --

Wrapping the `SerializedThrowable` in a `Failure` would allow us to get rid 
off this if branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-24 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1048#issuecomment-134226512
  
I had some comments. The main question is why do we introduce a new 
`JobFailure` message instead of treating a `SerializedThrowable` as what it is, 
namely a `Failure`. If we let `SerializedThrowable` extend `Exception`, then we 
could send the serialized throwable over the network using a `Failure`, like 
any other exception. Thus there are no duplicate code paths. Only at the 
receiving end, we have to check for a `SerializedThrowable` and deserialize the 
wrapped exception to obtain the original value. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-24 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r37754792
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
 ---
@@ -80,7 +81,7 @@ object ExecutionGraphMessages {
   jobID: JobID,
   newJobStatus: JobStatus,
   timestamp: Long,
-  error: Throwable)
+  error: SerializedThrowable)
--- End diff --

Why not giving the serialized throwable a name which indicates that it's 
actually a serialized throwable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2543] Fix user object deserialization f...

2015-08-24 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1048#discussion_r37756262
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -160,12 +159,12 @@ public static SerializedJobExecutionResult 
submitJobAndWait(
 
SerializedJobExecutionResult result = 
((JobManagerMessages.JobResultSuccess) answer).result();
if (result != null) {
-   return result;
+   return 
result.toJobExecutionResult(usercodeClassloader);
} else {
throw new Exception(Job was 
successfully executed but result contained a null JobExecutionResult.);
}
-   } else if (answer instanceof Status.Failure) {
-   throw ((Status.Failure) answer).cause();
+   } else if (answer instanceof 
JobClientMessages.JobFailure) {
--- End diff --

Why do we introduce a new `JobFailure` message here? Can't we simply let 
our `SerializedThrowable` class inherit from `Exception` and send it as a 
failure over the network? We then only have to add in the outer try block a 
catch block which deserializes the wrapped exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---