[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157961376
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Exception indicating that we could not find a
+ * {@link org.apache.flink.api.common.JobExecutionResult} under the given 
{@link JobID}.
+ */
+public class JobExecutionResultNotFoundException extends FlinkException {
+
+   private final JobID jobId;
+
+   private static final long serialVersionUID = 1L;
--- End diff --

Fixed.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157961174
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Exception indicating that we could not find a
+ * {@link org.apache.flink.api.common.JobExecutionResult} under the given 
{@link JobID}.
+ */
+public class JobExecutionResultNotFoundException extends FlinkException {
--- End diff --

I think it is not a *is-a* relationship.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157960760
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
 ---
@@ -55,6 +56,26 @@ public SerializedJobExecutionResult(JobID jobID, long 
netRuntime,
this.accumulatorResults = accumulators;
}
 
+   /**
+* Creates an instance from {@link JobExecutionResult}.
+*/
+   public static SerializedJobExecutionResult from(final 
JobExecutionResult jobExecutionResult) {
+   final Map accumulatorResults = 
jobExecutionResult.getAllAccumulatorResults();
+
+   final Map 
serializedAccumulatorResults = new HashMap<>(accumulatorResults.size());
+   for (final Map.Entry entry : 
accumulatorResults.entrySet()) {
+   try {
+   
serializedAccumulatorResults.put(entry.getKey(), new 
SerializedValue<>(entry.getValue()));
+   } catch (final IOException e) {
+   throw new RuntimeException(e);
--- End diff --

Not needed anymore.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157960722
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -357,6 +362,28 @@ public void start() throws Exception {
return 
CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
}
 
+   @Override
+   public CompletableFuture> getJobExecutionResult(
--- End diff --

Done.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157960666
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
+   jobExecutionResultCache =
+   CacheBuilder.newBuilder()
+   .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .build();
+
+   public void put(final SerializedJobExecutionResult result) {
+   assertJobExecutionResultNotCached(result.getJobId());
--- End diff --

It's just being strict. Can remove if it is wrong.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread GJL
Github user GJL closed the pull request at:

https://github.com/apache/flink/pull/5168


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157489280
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -357,6 +362,28 @@ public void start() throws Exception {
return 
CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
}
 
+   @Override
+   public CompletableFuture> getJobExecutionResult(
+   final JobID jobId,
+   final Time timeout) {
+   final Either 
jobExecutionResult =
+   jobExecutionResultCache.get(jobId);
+   if (jobExecutionResult == null) {
+   return FutureUtils.completedExceptionally(new 
JobExecutionResultNotFoundException(jobId));
+   } else {
+   return 
CompletableFuture.completedFuture(jobExecutionResult);
+   }
+   }
+
+   @Override
+   public CompletableFuture isJobExecutionResultPresent(final 
JobID jobId, final Time timeout) {
+   final boolean jobExecutionResultPresent = 
jobExecutionResultCache.contains(jobId);
+   if (!jobManagerRunners.containsKey(jobId) && 
!jobExecutionResultPresent) {
+   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+   }
+   return 
CompletableFuture.completedFuture(jobExecutionResultPresent);
--- End diff --

But this would never return a future containing `false`. 


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157476788
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -357,6 +362,28 @@ public void start() throws Exception {
return 
CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
}
 
+   @Override
+   public CompletableFuture> getJobExecutionResult(
+   final JobID jobId,
+   final Time timeout) {
+   final Either 
jobExecutionResult =
+   jobExecutionResultCache.get(jobId);
+   if (jobExecutionResult == null) {
+   return FutureUtils.completedExceptionally(new 
JobExecutionResultNotFoundException(jobId));
+   } else {
+   return 
CompletableFuture.completedFuture(jobExecutionResult);
+   }
+   }
+
+   @Override
+   public CompletableFuture isJobExecutionResultPresent(final 
JobID jobId, final Time timeout) {
+   final boolean jobExecutionResultPresent = 
jobExecutionResultCache.contains(jobId);
+   if (!jobManagerRunners.containsKey(jobId) && 
!jobExecutionResultPresent) {
+   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+   }
+   return 
CompletableFuture.completedFuture(jobExecutionResultPresent);
--- End diff --

I think the logic should be something like 

```
if (jobExecutionResultPresent || jobManagerRunners.containsKey(jobId)) { 
  return CompletableFuture.completedFuture(true);
} else {
  return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
}
```


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157480732
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/queue/QueueStatus.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.queue;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Response type for temporary queue resources, i.e., resources that are 
asynchronously created.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+public class QueueStatus implements ResponseBody {
+
+   private static final String FIELD_NAME_STATUS = "id";
+
+   @JsonProperty(value = FIELD_NAME_STATUS, required = true)
+   private final StatusId statusId;
+
+   @JsonCreator
+   public QueueStatus(
+   @JsonProperty(value = FIELD_NAME_STATUS, required = true) final 
StatusId statusId) {
+   this.statusId = requireNonNull(statusId, "statusId must not be 
null");
+   }
+
+   public static QueueStatus inProgress() {
+   return new QueueStatus(StatusId.IN_PROGRESS);
+   }
+
+   public static QueueStatus created() {
+   return new QueueStatus(StatusId.CREATED);
+   }
+
+   public StatusId getStatusId() {
+   return statusId;
+   }
+
+   /**
+* Defines queue statuses.
+*/
+   public enum StatusId {
+   IN_PROGRESS,
+   CREATED
--- End diff --

`CREATED` somewhat confused me. Maybe we could rename it into `DONE` or 
`COMPLETED`.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157480619
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBody.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ResponseBody} that carries a {@link QueueStatus} and a {@link 
JobExecutionResult}.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+public class JobExecutionResultResponseBody implements ResponseBody {
+
+   @JsonProperty(value = "status", required = true)
+   private final QueueStatus status;
--- End diff --

Could this be an `enum`? Then we wouldn't create JSON like 
```
"status": {
 "id": "..."
}
```


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157479720
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBody.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ResponseBody} that carries a {@link QueueStatus} and a {@link 
JobExecutionResult}.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+public class JobExecutionResultResponseBody implements ResponseBody {
+
+   @JsonProperty(value = "status", required = true)
+   private final QueueStatus status;
+
+   @JsonProperty(value = "job-execution-result")
+   private final JobExecutionResult jobExecutionResult;
--- End diff --

`@Nullable` missing?


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157480094
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.util.SerializedValue;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link SerializedValueSerializer} and {@link 
SerializedValueDeserializer}.
+ */
+public class SerializedValueSerializerTest {
--- End diff --

`extends TestLogger` is missing.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157482818
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -357,6 +362,28 @@ public void start() throws Exception {
return 
CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
}
 
+   @Override
+   public CompletableFuture> getJobExecutionResult(
--- End diff --

I'm actually wondering why we are differentiating between execution success 
and failure that much. Also in the failure case we should have a net runtime 
and accumulator values. The only difference is that in the failure case we have 
an additional `SerializedThrowable`.

We could for example have a common base class with a 
`JobExecutionResultSuccess` and `JobExecutionResultFailure` specialization 
(maybe also `JobExecutionResultCancelled` as a sub class of 
`JobExecutionResultFailure`). What do you think?


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157475492
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
 ---
@@ -55,6 +56,26 @@ public SerializedJobExecutionResult(JobID jobID, long 
netRuntime,
this.accumulatorResults = accumulators;
}
 
+   /**
+* Creates an instance from {@link JobExecutionResult}.
+*/
+   public static SerializedJobExecutionResult from(final 
JobExecutionResult jobExecutionResult) {
+   final Map accumulatorResults = 
jobExecutionResult.getAllAccumulatorResults();
+
+   final Map 
serializedAccumulatorResults = new HashMap<>(accumulatorResults.size());
+   for (final Map.Entry entry : 
accumulatorResults.entrySet()) {
+   try {
+   
serializedAccumulatorResults.put(entry.getKey(), new 
SerializedValue<>(entry.getValue()));
+   } catch (final IOException e) {
+   throw new RuntimeException(e);
--- End diff --

We shouldn't throw `RuntimeExceptions` but instead let the `IOException` 
bubble up.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157477273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Exception indicating that we could not find a
+ * {@link org.apache.flink.api.common.JobExecutionResult} under the given 
{@link JobID}.
+ */
+public class JobExecutionResultNotFoundException extends FlinkException {
--- End diff --

Could this class extend `FlinkJobNotFoundException`?


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157476994
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
+   jobExecutionResultCache =
+   CacheBuilder.newBuilder()
+   .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .build();
+
+   public void put(final SerializedJobExecutionResult result) {
+   assertJobExecutionResultNotCached(result.getJobId());
--- End diff --

Why is this a problem?


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157462859
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResult.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
+import org.apache.flink.util.SerializedValue;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class is used to represent the information in {@link 
JobExecutionResult} as JSON. In case
+ * of a job failure, no {@link JobExecutionResult} will be available. In 
this case instances of this
+ * class will only store a {@link Throwable}.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JobExecutionResult {
+
+   private static final String FIELD_NAME_JOB_ID = "id";
+
+   private static final String FIELD_NAME_NET_RUNTIME = "net-runtime";
+
+   private static final String FIELD_NAME_ACCUMULATOR_RESULTS = 
"accumulator-results";
+
+   private static final String FIELD_NAME_FAILURE_CAUSE = "failure-cause";
+
+   @JsonSerialize(using = JobIDSerializer.class)
+   @JsonDeserialize(using = JobIDDeserializer.class)
+   @JsonProperty(value = FIELD_NAME_JOB_ID, required = true)
+   private final JobID jobId;
+
+   @JsonProperty(FIELD_NAME_NET_RUNTIME)
+   private final Long netRuntime;
+
+   @JsonProperty(FIELD_NAME_ACCUMULATOR_RESULTS)
+   private final Map accumulatorResults;
+
+   @JsonProperty(FIELD_NAME_FAILURE_CAUSE)
+   private final Throwable throwable;
--- End diff --

Maybe only include the errorMessage to avoid ser/des issues.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027881
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
+   jobExecutionResultCache =
+   CacheBuilder.newBuilder()
+   .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .build();
+
+   public void put(final SerializedJobExecutionResult result) {
+   assertJobExecutionResultNotCached(result.getJobId());
+   jobExecutionResultCache.put(result.getJobId(), 
Either.Right(result));
+   }
+
+   public void put(final JobID jobId, Throwable throwable) {
+   assertJobExecutionResultNotCached(jobId);
+   jobExecutionResultCache.put(jobId, Either.Left(throwable));
+   }
+
+   public boolean contains(final JobID jobId) {
+   return jobExecutionResultCache.getIfPresent(jobId) != null;
+   }
+
+   @Nullable
+   public Either get(final JobID 
jobId) {
--- End diff --

Not sure if I am abusing Flink's `Either` here.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027633
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Exception indicating that we could not find a
+ * {@link org.apache.flink.api.common.JobExecutionResult} under the given 
{@link JobID}.
+ */
+public class JobExecutionResultNotFoundException extends FlinkException {
+
+   private final JobID jobId;
+
+   private static final long serialVersionUID = 1L;
--- End diff --

Should be on top.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027510
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
+   jobExecutionResultCache =
+   CacheBuilder.newBuilder()
+   .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .build();
+
+   public void put(final SerializedJobExecutionResult result) {
--- End diff --

Javadocs are missing.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027178
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
 ---
@@ -33,10 +40,23 @@
objectMapper.enable(
DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
-   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
-   
DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
--- End diff --

I had to remove `FAIL_ON_MISSING_CREATOR_PROPERTIES` because `null` fields 
are not always represented in the JSON. The `RestClient` would otherwise run 
into problems.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157026590
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.messages.JobExecutionResultNotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobExecutionResult;
+import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * Returns the {@link org.apache.flink.api.common.JobExecutionResult} for 
a given {@link JobID}.
+ */
+public class JobExecutionResultHandler
--- End diff --

Sample response after running batch WordCount example:
```
{
  "status": {
"id": "CREATED"
  },
  "job-execution-result": {
"id": "533a165a6de7f70919a54b1d6f36d3b3",
"net-runtime": 0,
"accumulator-results": {
  "94a58184eb17398571f35da42b714517": 
"rO0ABXNyABNqYXZhLnV0aWwuQXJyYXlMaXN0eIHSHZnHYZ0DAAFJAARzaXpleHCqdwQAAACqdXIAAltCrPMX+AYIVOACAAB4cAYCYQV1cQB+AAILB2FjdGlvbgF1cQB+AAIKBmFmdGVyAXVxAH4AAgwIYWdhaW5zdAF1cQB+AAIIBGFsbAJ1cQB+AAIIBGFuZAx1cQB+AAIJBWFybXMBdXEAfgACCwdhcnJvd3MBdXEAfgACCQVhd3J5AXVxAH4AAgcDYXkBdXEAfgACCQViYXJlAXVxAH4AAgcDYmUEdXEAfgACCQViZWFyA3VxAH4AAgsHYm9ka2luAXVxAH4AAgoGYm91cm4BdXEAfgACCARidXQBdXEAfgACBwNieQJ1cQB+AAINCWNhbGFtaXR5AXVxAH4AAgkFY2FzdAF1cQB+AAIJBWNvaWwBdXEAfgACCQVjb21lAXVxAH4AAg8LY29uc2NpZW5jZQF1cQB+AAIRDWNvbnN1bW1hdGlvbgF1cQB+AAIOCmNvbnR1bWVseQF1cQB+AAIMCGNvdW50cnkBdXEAfgACDAhjb3dhcmRzAXVxAH4AAg0JY3VycmVudHMBdXEAfgACBgJkBHVxAH4AAgoGZGVhdGgCdXEAfgACCgZkZWxheQF1cQB+AAILB2Rlc3BpcwF1cQB+AAINCWRldm91dGx5AAA
 

[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157026313
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
 ---
@@ -33,10 +40,23 @@
objectMapper.enable(
DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
-   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
-   
DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
+   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);
objectMapper.disable(
SerializationFeature.FAIL_ON_EMPTY_BEANS);
+
+   final SimpleModule jacksonFlinkModule = new SimpleModule();
+
+   final JavaType serializedValueWildcardType = objectMapper
+   .getTypeFactory()
+   .constructType(new TypeReference() {
+   });
+
+   jacksonFlinkModule.addSerializer(new 
SerializedValueSerializer(serializedValueWildcardType));
--- End diff --

Could also be done using `@JsonSerialization` annotation


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157025791
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
--- End diff --

Cache isn't size limited.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5168

[FLINK-8234][flip6] WIP

WIP

@tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8234

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5168.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5168


commit cc969846791bf818fbc81feb241a188410431ae5
Author: gyao 
Date:   2017-12-14T16:27:16Z

[FLINK-8234][flip6] WIP




---