[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157961376 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.FlinkException; + +import static java.util.Objects.requireNonNull; + +/** + * Exception indicating that we could not find a + * {@link org.apache.flink.api.common.JobExecutionResult} under the given {@link JobID}. + */ +public class JobExecutionResultNotFoundException extends FlinkException { + + private final JobID jobId; + + private static final long serialVersionUID = 1L; --- End diff -- Fixed. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157961174 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.FlinkException; + +import static java.util.Objects.requireNonNull; + +/** + * Exception indicating that we could not find a + * {@link org.apache.flink.api.common.JobExecutionResult} under the given {@link JobID}. + */ +public class JobExecutionResultNotFoundException extends FlinkException { --- End diff -- I think it is not a *is-a* relationship. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157960760 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java --- @@ -55,6 +56,26 @@ public SerializedJobExecutionResult(JobID jobID, long netRuntime, this.accumulatorResults = accumulators; } + /** +* Creates an instance from {@link JobExecutionResult}. +*/ + public static SerializedJobExecutionResult from(final JobExecutionResult jobExecutionResult) { + final MapaccumulatorResults = jobExecutionResult.getAllAccumulatorResults(); + + final Map serializedAccumulatorResults = new HashMap<>(accumulatorResults.size()); + for (final Map.Entry entry : accumulatorResults.entrySet()) { + try { + serializedAccumulatorResults.put(entry.getKey(), new SerializedValue<>(entry.getValue())); + } catch (final IOException e) { + throw new RuntimeException(e); --- End diff -- Not needed anymore. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157960722 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -357,6 +362,28 @@ public void start() throws Exception { return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); } + @Override + public CompletableFuture> getJobExecutionResult( --- End diff -- Done. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157960666 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> + jobExecutionResultCache = + CacheBuilder.newBuilder() + .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS) + .build(); + + public void put(final SerializedJobExecutionResult result) { + assertJobExecutionResultNotCached(result.getJobId()); --- End diff -- It's just being strict. Can remove if it is wrong. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL closed the pull request at: https://github.com/apache/flink/pull/5168 ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157489280 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -357,6 +362,28 @@ public void start() throws Exception { return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); } + @Override + public CompletableFuture> getJobExecutionResult( + final JobID jobId, + final Time timeout) { + final Either jobExecutionResult = + jobExecutionResultCache.get(jobId); + if (jobExecutionResult == null) { + return FutureUtils.completedExceptionally(new JobExecutionResultNotFoundException(jobId)); + } else { + return CompletableFuture.completedFuture(jobExecutionResult); + } + } + + @Override + public CompletableFuture isJobExecutionResultPresent(final JobID jobId, final Time timeout) { + final boolean jobExecutionResultPresent = jobExecutionResultCache.contains(jobId); + if (!jobManagerRunners.containsKey(jobId) && !jobExecutionResultPresent) { + return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + } + return CompletableFuture.completedFuture(jobExecutionResultPresent); --- End diff -- But this would never return a future containing `false`. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157476788 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -357,6 +362,28 @@ public void start() throws Exception { return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); } + @Override + public CompletableFuture> getJobExecutionResult( + final JobID jobId, + final Time timeout) { + final Either jobExecutionResult = + jobExecutionResultCache.get(jobId); + if (jobExecutionResult == null) { + return FutureUtils.completedExceptionally(new JobExecutionResultNotFoundException(jobId)); + } else { + return CompletableFuture.completedFuture(jobExecutionResult); + } + } + + @Override + public CompletableFuture isJobExecutionResultPresent(final JobID jobId, final Time timeout) { + final boolean jobExecutionResultPresent = jobExecutionResultCache.contains(jobId); + if (!jobManagerRunners.containsKey(jobId) && !jobExecutionResultPresent) { + return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + } + return CompletableFuture.completedFuture(jobExecutionResultPresent); --- End diff -- I think the logic should be something like ``` if (jobExecutionResultPresent || jobManagerRunners.containsKey(jobId)) { return CompletableFuture.completedFuture(true); } else { return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } ``` ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157480732 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/queue/QueueStatus.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.queue; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import static java.util.Objects.requireNonNull; + +/** + * Response type for temporary queue resources, i.e., resources that are asynchronously created. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +public class QueueStatus implements ResponseBody { + + private static final String FIELD_NAME_STATUS = "id"; + + @JsonProperty(value = FIELD_NAME_STATUS, required = true) + private final StatusId statusId; + + @JsonCreator + public QueueStatus( + @JsonProperty(value = FIELD_NAME_STATUS, required = true) final StatusId statusId) { + this.statusId = requireNonNull(statusId, "statusId must not be null"); + } + + public static QueueStatus inProgress() { + return new QueueStatus(StatusId.IN_PROGRESS); + } + + public static QueueStatus created() { + return new QueueStatus(StatusId.CREATED); + } + + public StatusId getStatusId() { + return statusId; + } + + /** +* Defines queue statuses. +*/ + public enum StatusId { + IN_PROGRESS, + CREATED --- End diff -- `CREATED` somewhat confused me. Maybe we could rename it into `DONE` or `COMPLETED`. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157480619 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBody.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import static java.util.Objects.requireNonNull; + +/** + * {@link ResponseBody} that carries a {@link QueueStatus} and a {@link JobExecutionResult}. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +public class JobExecutionResultResponseBody implements ResponseBody { + + @JsonProperty(value = "status", required = true) + private final QueueStatus status; --- End diff -- Could this be an `enum`? Then we wouldn't create JSON like ``` "status": { "id": "..." } ``` ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157479720 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBody.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import static java.util.Objects.requireNonNull; + +/** + * {@link ResponseBody} that carries a {@link QueueStatus} and a {@link JobExecutionResult}. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +public class JobExecutionResultResponseBody implements ResponseBody { + + @JsonProperty(value = "status", required = true) + private final QueueStatus status; + + @JsonProperty(value = "job-execution-result") + private final JobExecutionResult jobExecutionResult; --- End diff -- `@Nullable` missing? ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157480094 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; + +import org.junit.Before; +import org.junit.Test; + +import java.io.Serializable; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link SerializedValueSerializer} and {@link SerializedValueDeserializer}. + */ +public class SerializedValueSerializerTest { --- End diff -- `extends TestLogger` is missing. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157482818 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -357,6 +362,28 @@ public void start() throws Exception { return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); } + @Override + public CompletableFuture> getJobExecutionResult( --- End diff -- I'm actually wondering why we are differentiating between execution success and failure that much. Also in the failure case we should have a net runtime and accumulator values. The only difference is that in the failure case we have an additional `SerializedThrowable`. We could for example have a common base class with a `JobExecutionResultSuccess` and `JobExecutionResultFailure` specialization (maybe also `JobExecutionResultCancelled` as a sub class of `JobExecutionResultFailure`). What do you think? ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157475492 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java --- @@ -55,6 +56,26 @@ public SerializedJobExecutionResult(JobID jobID, long netRuntime, this.accumulatorResults = accumulators; } + /** +* Creates an instance from {@link JobExecutionResult}. +*/ + public static SerializedJobExecutionResult from(final JobExecutionResult jobExecutionResult) { + final MapaccumulatorResults = jobExecutionResult.getAllAccumulatorResults(); + + final Map serializedAccumulatorResults = new HashMap<>(accumulatorResults.size()); + for (final Map.Entry entry : accumulatorResults.entrySet()) { + try { + serializedAccumulatorResults.put(entry.getKey(), new SerializedValue<>(entry.getValue())); + } catch (final IOException e) { + throw new RuntimeException(e); --- End diff -- We shouldn't throw `RuntimeExceptions` but instead let the `IOException` bubble up. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157477273 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.FlinkException; + +import static java.util.Objects.requireNonNull; + +/** + * Exception indicating that we could not find a + * {@link org.apache.flink.api.common.JobExecutionResult} under the given {@link JobID}. + */ +public class JobExecutionResultNotFoundException extends FlinkException { --- End diff -- Could this class extend `FlinkJobNotFoundException`? ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157476994 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> + jobExecutionResultCache = + CacheBuilder.newBuilder() + .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS) + .build(); + + public void put(final SerializedJobExecutionResult result) { + assertJobExecutionResultNotCached(result.getJobId()); --- End diff -- Why is this a problem? ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157462859 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResult.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobIDSerializer; +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This class is used to represent the information in {@link JobExecutionResult} as JSON. In case + * of a job failure, no {@link JobExecutionResult} will be available. In this case instances of this + * class will only store a {@link Throwable}. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class JobExecutionResult { + + private static final String FIELD_NAME_JOB_ID = "id"; + + private static final String FIELD_NAME_NET_RUNTIME = "net-runtime"; + + private static final String FIELD_NAME_ACCUMULATOR_RESULTS = "accumulator-results"; + + private static final String FIELD_NAME_FAILURE_CAUSE = "failure-cause"; + + @JsonSerialize(using = JobIDSerializer.class) + @JsonDeserialize(using = JobIDDeserializer.class) + @JsonProperty(value = FIELD_NAME_JOB_ID, required = true) + private final JobID jobId; + + @JsonProperty(FIELD_NAME_NET_RUNTIME) + private final Long netRuntime; + + @JsonProperty(FIELD_NAME_ACCUMULATOR_RESULTS) + private final MapaccumulatorResults; + + @JsonProperty(FIELD_NAME_FAILURE_CAUSE) + private final Throwable throwable; --- End diff -- Maybe only include the errorMessage to avoid ser/des issues. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027881 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> + jobExecutionResultCache = + CacheBuilder.newBuilder() + .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS) + .build(); + + public void put(final SerializedJobExecutionResult result) { + assertJobExecutionResultNotCached(result.getJobId()); + jobExecutionResultCache.put(result.getJobId(), Either.Right(result)); + } + + public void put(final JobID jobId, Throwable throwable) { + assertJobExecutionResultNotCached(jobId); + jobExecutionResultCache.put(jobId, Either.Left(throwable)); + } + + public boolean contains(final JobID jobId) { + return jobExecutionResultCache.getIfPresent(jobId) != null; + } + + @Nullable + public Either get(final JobID jobId) { --- End diff -- Not sure if I am abusing Flink's `Either` here. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027633 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.FlinkException; + +import static java.util.Objects.requireNonNull; + +/** + * Exception indicating that we could not find a + * {@link org.apache.flink.api.common.JobExecutionResult} under the given {@link JobID}. + */ +public class JobExecutionResultNotFoundException extends FlinkException { + + private final JobID jobId; + + private static final long serialVersionUID = 1L; --- End diff -- Should be on top. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027510 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> + jobExecutionResultCache = + CacheBuilder.newBuilder() + .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS) + .build(); + + public void put(final SerializedJobExecutionResult result) { --- End diff -- Javadocs are missing. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027178 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java --- @@ -33,10 +40,23 @@ objectMapper.enable( DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, - DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY, - DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES); --- End diff -- I had to remove `FAIL_ON_MISSING_CREATOR_PROPERTIES` because `null` fields are not always represented in the JSON. The `RestClient` would otherwise run into problems. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157026590 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.JobExecutionResultNotFoundException; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResult; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** + * Returns the {@link org.apache.flink.api.common.JobExecutionResult} for a given {@link JobID}. + */ +public class JobExecutionResultHandler --- End diff -- Sample response after running batch WordCount example: ``` { "status": { "id": "CREATED" }, "job-execution-result": { "id": "533a165a6de7f70919a54b1d6f36d3b3", "net-runtime": 0, "accumulator-results": { "94a58184eb17398571f35da42b714517": "rO0ABXNyABNqYXZhLnV0aWwuQXJyYXlMaXN0eIHSHZnHYZ0DAAFJAARzaXpleHCqdwQAAACqdXIAAltCrPMX+AYIVOACAAB4cAYCYQV1cQB+AAILB2FjdGlvbgF1cQB+AAIKBmFmdGVyAXVxAH4AAgwIYWdhaW5zdAF1cQB+AAIIBGFsbAJ1cQB+AAIIBGFuZAx1cQB+AAIJBWFybXMBdXEAfgACCwdhcnJvd3MBdXEAfgACCQVhd3J5AXVxAH4AAgcDYXkBdXEAfgACCQViYXJlAXVxAH4AAgcDYmUEdXEAfgACCQViZWFyA3VxAH4AAgsHYm9ka2luAXVxAH4AAgoGYm91cm4BdXEAfgACCARidXQBdXEAfgACBwNieQJ1cQB+AAINCWNhbGFtaXR5AXVxAH4AAgkFY2FzdAF1cQB+AAIJBWNvaWwBdXEAfgACCQVjb21lAXVxAH4AAg8LY29uc2NpZW5jZQF1cQB+AAIRDWNvbnN1bW1hdGlvbgF1cQB+AAIOCmNvbnR1bWVseQF1cQB+AAIMCGNvdW50cnkBdXEAfgACDAhjb3dhcmRzAXVxAH4AAg0JY3VycmVudHMBdXEAfgACBgJkBHVxAH4AAgoGZGVhdGgCdXEAfgACCgZkZWxheQF1cQB+AAILB2Rlc3BpcwF1cQB+AAINCWRldm91dGx5AAA
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157026313 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java --- @@ -33,10 +40,23 @@ objectMapper.enable( DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, - DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY, - DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES); + DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY); objectMapper.disable( SerializationFeature.FAIL_ON_EMPTY_BEANS); + + final SimpleModule jacksonFlinkModule = new SimpleModule(); + + final JavaType serializedValueWildcardType = objectMapper + .getTypeFactory() + .constructType(new TypeReference() { + }); + + jacksonFlinkModule.addSerializer(new SerializedValueSerializer(serializedValueWildcardType)); --- End diff -- Could also be done using `@JsonSerialization` annotation ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157025791 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> --- End diff -- Cache isn't size limited. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5168 [FLINK-8234][flip6] WIP WIP @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8234 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5168.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5168 commit cc969846791bf818fbc81feb241a188410431ae5 Author: gyaoDate: 2017-12-14T16:27:16Z [FLINK-8234][flip6] WIP ---