nicoweidner commented on a change in pull request #17578:
URL: https://github.com/apache/flink/pull/17578#discussion_r741924689
##########
File path: docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
##########
@@ -3260,6 +3260,9 @@
},
"target-directory" : {
"type" : "string"
+ },
+ "triggerId" : {
+ "type" : "any"
Review comment:
Why is this generated as `any` instead of `string`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/stop/StopWithSavepointRequestBody.java
##########
@@ -19,32 +19,44 @@
package org.apache.flink.runtime.rest.messages.job.savepoints.stop;
import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.TriggerId;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
+import java.util.Optional;
+
/** {@link RequestBody} for stopping a job with a savepoint. */
public class StopWithSavepointRequestBody implements RequestBody {
public static final String FIELD_NAME_TARGET_DIRECTORY = "targetDirectory";
Review comment:
Here, camel case is used for everything. So maybe that should be the
standard :D
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java
##########
@@ -44,8 +45,10 @@ public SavepointTriggerRequestBodyTest(
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
- {new SavepointTriggerRequestBody("/tmp", true)},
- {new SavepointTriggerRequestBody("/tmp", false)}
+ {new SavepointTriggerRequestBody("/tmp", true, null)},
+ {new SavepointTriggerRequestBody("/tmp", false, null)},
+ {new SavepointTriggerRequestBody("/tmp", true, new
TriggerId())},
+ {new SavepointTriggerRequestBody("/tmp", false, new
TriggerId())},
Review comment:
Is there a reason we have these tests for triggerSavepoint, but not
stopWithSavepoint?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/StopWithSavepointHandlersTest.java
##########
@@ -213,20 +216,61 @@ public void testSavepointCompletedWithException() throws
Exception {
assertThat(savepointError, instanceOf(RuntimeException.class));
}
+ @Test
+ public void testProvidedTriggerId() throws Exception {
+ final OperationResult<String> successfulResult =
+ OperationResult.success(COMPLETED_SAVEPOINT_EXTERNAL_POINTER);
+ AtomicReference<AsynchronousJobOperationKey> keyReference = new
AtomicReference<>();
+ final TestingRestfulGateway testingRestfulGateway =
+ new TestingRestfulGateway.Builder()
+
.setStopWithSavepointFunction(setReferenceToOperationKey(keyReference))
+ .setGetSavepointStatusFunction(
+ getResultIfKeyMatches(successfulResult,
keyReference))
+ .build();
+
+ final TriggerId providedTriggerId = new TriggerId();
+
+ final TriggerId returnedTriggerId =
+ savepointTriggerHandler
+ .handleRequest(
+ triggerSavepointRequest(
+
DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY,
+ providedTriggerId),
+ testingRestfulGateway)
+ .get()
+ .getTriggerId();
+
+ assertEquals(providedTriggerId, returnedTriggerId);
+
+ AsynchronousOperationResult<SavepointInfo> savepointResponseBody;
+ savepointResponseBody =
+ savepointStatusHandler
+ .handleRequest(
+ savepointStatusRequest(providedTriggerId),
testingRestfulGateway)
+ .get();
+
+ assertThat(savepointResponseBody.queueStatus().getId(),
equalTo(QueueStatus.Id.COMPLETED));
+ assertThat(savepointResponseBody.resource(), notNullValue());
+ assertThat(
+ savepointResponseBody.resource().getLocation(),
+ equalTo(COMPLETED_SAVEPOINT_EXTERNAL_POINTER));
+ }
+
private static HandlerRequest<StopWithSavepointRequestBody>
triggerSavepointRequest()
throws HandlerRequestException {
- return
triggerSavepointRequest(DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY);
+ return
triggerSavepointRequest(DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY, null);
}
private static HandlerRequest<StopWithSavepointRequestBody>
triggerSavepointRequestWithDefaultDirectory() throws
HandlerRequestException {
- return triggerSavepointRequest(null);
+ return triggerSavepointRequest(null, null);
}
private static HandlerRequest<StopWithSavepointRequestBody>
triggerSavepointRequest(
- final String targetDirectory) throws HandlerRequestException {
+ final String targetDirectory, @Nullable TriggerId triggerId)
Review comment:
Same comment about `@Nullable`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java
##########
@@ -19,39 +19,56 @@
package org.apache.flink.runtime.rest.messages.job.savepoints;
import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.TriggerId;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
+import java.util.Optional;
+
/** {@link RequestBody} to trigger savepoints. */
public class SavepointTriggerRequestBody implements RequestBody {
public static final String FIELD_NAME_TARGET_DIRECTORY =
"target-directory";
private static final String FIELD_NAME_CANCEL_JOB = "cancel-job";
+ private static final String FIELD_NAME_TRIGGER_ID = "triggerId";
Review comment:
Are we using camel case or something else, e.g. `target-directory`
(kebab-case or whatever you call that)?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
##########
@@ -208,20 +211,61 @@ public void testSavepointCompletedWithException() throws
Exception {
assertThat(savepointError, instanceOf(RuntimeException.class));
}
+ @Test
+ public void testProvidedTriggerId() throws Exception {
+ final OperationResult<String> successfulResult =
+ OperationResult.success(COMPLETED_SAVEPOINT_EXTERNAL_POINTER);
+ AtomicReference<AsynchronousJobOperationKey> keyReference = new
AtomicReference<>();
+ final TestingRestfulGateway testingRestfulGateway =
+ new TestingRestfulGateway.Builder()
+
.setTriggerSavepointFunction(setReferenceToOperationKey(keyReference))
+ .setGetSavepointStatusFunction(
+ getResultIfKeyMatches(successfulResult,
keyReference))
+ .build();
+
+ final TriggerId providedTriggerId = new TriggerId();
+
+ final TriggerId returnedTriggerId =
+ savepointTriggerHandler
+ .handleRequest(
+ triggerSavepointRequest(
+
DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY,
+ providedTriggerId),
+ testingRestfulGateway)
+ .get()
+ .getTriggerId();
+
+ assertEquals(providedTriggerId, returnedTriggerId);
+
+ AsynchronousOperationResult<SavepointInfo> savepointResponseBody;
+ savepointResponseBody =
+ savepointStatusHandler
+ .handleRequest(
+ savepointStatusRequest(providedTriggerId),
testingRestfulGateway)
+ .get();
+
+ assertThat(savepointResponseBody.queueStatus().getId(),
equalTo(QueueStatus.Id.COMPLETED));
+ assertThat(savepointResponseBody.resource(), notNullValue());
+ assertThat(
+ savepointResponseBody.resource().getLocation(),
+ equalTo(COMPLETED_SAVEPOINT_EXTERNAL_POINTER));
+ }
+
private static HandlerRequest<SavepointTriggerRequestBody>
triggerSavepointRequest()
throws HandlerRequestException {
- return
triggerSavepointRequest(DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY);
+ return
triggerSavepointRequest(DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY, null);
}
private static HandlerRequest<SavepointTriggerRequestBody>
triggerSavepointRequestWithDefaultDirectory() throws
HandlerRequestException {
- return triggerSavepointRequest(null);
+ return triggerSavepointRequest(null, null);
}
private static HandlerRequest<SavepointTriggerRequestBody>
triggerSavepointRequest(
- final String targetDirectory) throws HandlerRequestException {
+ final String targetDirectory, @Nullable TriggerId triggerId)
Review comment:
Looking at the call just above, `targetDirectory` should probably have
`@Nullable` as well
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java
##########
@@ -19,39 +19,56 @@
package org.apache.flink.runtime.rest.messages.job.savepoints;
import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.TriggerId;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
+import java.util.Optional;
+
/** {@link RequestBody} to trigger savepoints. */
public class SavepointTriggerRequestBody implements RequestBody {
public static final String FIELD_NAME_TARGET_DIRECTORY =
"target-directory";
private static final String FIELD_NAME_CANCEL_JOB = "cancel-job";
+ private static final String FIELD_NAME_TRIGGER_ID = "triggerId";
+
@JsonProperty(FIELD_NAME_TARGET_DIRECTORY)
@Nullable
private final String targetDirectory;
@JsonProperty(FIELD_NAME_CANCEL_JOB)
private final boolean cancelJob;
+ @JsonProperty(FIELD_NAME_TRIGGER_ID)
+ @Nullable
+ private final TriggerId triggerId;
+
@JsonCreator
public SavepointTriggerRequestBody(
@Nullable @JsonProperty(FIELD_NAME_TARGET_DIRECTORY) final String
targetDirectory,
- @Nullable @JsonProperty(FIELD_NAME_CANCEL_JOB) final Boolean
cancelJob) {
+ @Nullable @JsonProperty(FIELD_NAME_CANCEL_JOB) final Boolean
cancelJob,
+ @Nullable @JsonProperty(FIELD_NAME_TRIGGER_ID) TriggerId
triggerId) {
this.targetDirectory = targetDirectory;
this.cancelJob = cancelJob != null ? cancelJob : false;
+ this.triggerId = triggerId;
}
@Nullable
public String getTargetDirectory() {
return targetDirectory;
}
+ @JsonIgnore
+ public Optional<TriggerId> getTriggerId() {
+ return Optional.ofNullable(triggerId);
+ }
Review comment:
I feel a bit uncomfortable that this is handled differently than
`targetDirectory` which is also nullable but does not return an Optional. Not
sure if it's worth it changing targetDirectory though as it's not related to
this PR
##########
File path: docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
##########
@@ -3260,6 +3260,9 @@
},
"target-directory" : {
"type" : "string"
+ },
+ "triggerId" : {
+ "type" : "any"
Review comment:
Ah, to answer my own question: Most likely because it's a `TriggerId`,
not a `String`...
##########
File path: docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
##########
@@ -3260,6 +3260,9 @@
},
"target-directory" : {
"type" : "string"
+ },
+ "triggerId" : {
+ "type" : "any"
Review comment:
Why is this generated as `any` instead of `string`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/stop/StopWithSavepointRequestBody.java
##########
@@ -19,32 +19,44 @@
package org.apache.flink.runtime.rest.messages.job.savepoints.stop;
import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.TriggerId;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
+import java.util.Optional;
+
/** {@link RequestBody} for stopping a job with a savepoint. */
public class StopWithSavepointRequestBody implements RequestBody {
public static final String FIELD_NAME_TARGET_DIRECTORY = "targetDirectory";
Review comment:
Here, camel case is used for everything. So maybe that should be the
standard :D
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java
##########
@@ -44,8 +45,10 @@ public SavepointTriggerRequestBodyTest(
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
- {new SavepointTriggerRequestBody("/tmp", true)},
- {new SavepointTriggerRequestBody("/tmp", false)}
+ {new SavepointTriggerRequestBody("/tmp", true, null)},
+ {new SavepointTriggerRequestBody("/tmp", false, null)},
+ {new SavepointTriggerRequestBody("/tmp", true, new
TriggerId())},
+ {new SavepointTriggerRequestBody("/tmp", false, new
TriggerId())},
Review comment:
Is there a reason we have these tests for triggerSavepoint, but not
stopWithSavepoint?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/StopWithSavepointHandlersTest.java
##########
@@ -213,20 +216,61 @@ public void testSavepointCompletedWithException() throws
Exception {
assertThat(savepointError, instanceOf(RuntimeException.class));
}
+ @Test
+ public void testProvidedTriggerId() throws Exception {
+ final OperationResult<String> successfulResult =
+ OperationResult.success(COMPLETED_SAVEPOINT_EXTERNAL_POINTER);
+ AtomicReference<AsynchronousJobOperationKey> keyReference = new
AtomicReference<>();
+ final TestingRestfulGateway testingRestfulGateway =
+ new TestingRestfulGateway.Builder()
+
.setStopWithSavepointFunction(setReferenceToOperationKey(keyReference))
+ .setGetSavepointStatusFunction(
+ getResultIfKeyMatches(successfulResult,
keyReference))
+ .build();
+
+ final TriggerId providedTriggerId = new TriggerId();
+
+ final TriggerId returnedTriggerId =
+ savepointTriggerHandler
+ .handleRequest(
+ triggerSavepointRequest(
+
DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY,
+ providedTriggerId),
+ testingRestfulGateway)
+ .get()
+ .getTriggerId();
+
+ assertEquals(providedTriggerId, returnedTriggerId);
+
+ AsynchronousOperationResult<SavepointInfo> savepointResponseBody;
+ savepointResponseBody =
+ savepointStatusHandler
+ .handleRequest(
+ savepointStatusRequest(providedTriggerId),
testingRestfulGateway)
+ .get();
+
+ assertThat(savepointResponseBody.queueStatus().getId(),
equalTo(QueueStatus.Id.COMPLETED));
+ assertThat(savepointResponseBody.resource(), notNullValue());
+ assertThat(
+ savepointResponseBody.resource().getLocation(),
+ equalTo(COMPLETED_SAVEPOINT_EXTERNAL_POINTER));
+ }
+
private static HandlerRequest<StopWithSavepointRequestBody>
triggerSavepointRequest()
throws HandlerRequestException {
- return
triggerSavepointRequest(DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY);
+ return
triggerSavepointRequest(DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY, null);
}
private static HandlerRequest<StopWithSavepointRequestBody>
triggerSavepointRequestWithDefaultDirectory() throws
HandlerRequestException {
- return triggerSavepointRequest(null);
+ return triggerSavepointRequest(null, null);
}
private static HandlerRequest<StopWithSavepointRequestBody>
triggerSavepointRequest(
- final String targetDirectory) throws HandlerRequestException {
+ final String targetDirectory, @Nullable TriggerId triggerId)
Review comment:
Same comment about `@Nullable`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java
##########
@@ -19,39 +19,56 @@
package org.apache.flink.runtime.rest.messages.job.savepoints;
import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.TriggerId;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
+import java.util.Optional;
+
/** {@link RequestBody} to trigger savepoints. */
public class SavepointTriggerRequestBody implements RequestBody {
public static final String FIELD_NAME_TARGET_DIRECTORY =
"target-directory";
private static final String FIELD_NAME_CANCEL_JOB = "cancel-job";
+ private static final String FIELD_NAME_TRIGGER_ID = "triggerId";
Review comment:
Are we using camel case or something else, e.g. `target-directory`
(kebab-case or whatever you call that)?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
##########
@@ -208,20 +211,61 @@ public void testSavepointCompletedWithException() throws
Exception {
assertThat(savepointError, instanceOf(RuntimeException.class));
}
+ @Test
+ public void testProvidedTriggerId() throws Exception {
+ final OperationResult<String> successfulResult =
+ OperationResult.success(COMPLETED_SAVEPOINT_EXTERNAL_POINTER);
+ AtomicReference<AsynchronousJobOperationKey> keyReference = new
AtomicReference<>();
+ final TestingRestfulGateway testingRestfulGateway =
+ new TestingRestfulGateway.Builder()
+
.setTriggerSavepointFunction(setReferenceToOperationKey(keyReference))
+ .setGetSavepointStatusFunction(
+ getResultIfKeyMatches(successfulResult,
keyReference))
+ .build();
+
+ final TriggerId providedTriggerId = new TriggerId();
+
+ final TriggerId returnedTriggerId =
+ savepointTriggerHandler
+ .handleRequest(
+ triggerSavepointRequest(
+
DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY,
+ providedTriggerId),
+ testingRestfulGateway)
+ .get()
+ .getTriggerId();
+
+ assertEquals(providedTriggerId, returnedTriggerId);
+
+ AsynchronousOperationResult<SavepointInfo> savepointResponseBody;
+ savepointResponseBody =
+ savepointStatusHandler
+ .handleRequest(
+ savepointStatusRequest(providedTriggerId),
testingRestfulGateway)
+ .get();
+
+ assertThat(savepointResponseBody.queueStatus().getId(),
equalTo(QueueStatus.Id.COMPLETED));
+ assertThat(savepointResponseBody.resource(), notNullValue());
+ assertThat(
+ savepointResponseBody.resource().getLocation(),
+ equalTo(COMPLETED_SAVEPOINT_EXTERNAL_POINTER));
+ }
+
private static HandlerRequest<SavepointTriggerRequestBody>
triggerSavepointRequest()
throws HandlerRequestException {
- return
triggerSavepointRequest(DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY);
+ return
triggerSavepointRequest(DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY, null);
}
private static HandlerRequest<SavepointTriggerRequestBody>
triggerSavepointRequestWithDefaultDirectory() throws
HandlerRequestException {
- return triggerSavepointRequest(null);
+ return triggerSavepointRequest(null, null);
}
private static HandlerRequest<SavepointTriggerRequestBody>
triggerSavepointRequest(
- final String targetDirectory) throws HandlerRequestException {
+ final String targetDirectory, @Nullable TriggerId triggerId)
Review comment:
Looking at the call just above, `targetDirectory` should probably have
`@Nullable` as well
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java
##########
@@ -19,39 +19,56 @@
package org.apache.flink.runtime.rest.messages.job.savepoints;
import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.TriggerId;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
+import java.util.Optional;
+
/** {@link RequestBody} to trigger savepoints. */
public class SavepointTriggerRequestBody implements RequestBody {
public static final String FIELD_NAME_TARGET_DIRECTORY =
"target-directory";
private static final String FIELD_NAME_CANCEL_JOB = "cancel-job";
+ private static final String FIELD_NAME_TRIGGER_ID = "triggerId";
+
@JsonProperty(FIELD_NAME_TARGET_DIRECTORY)
@Nullable
private final String targetDirectory;
@JsonProperty(FIELD_NAME_CANCEL_JOB)
private final boolean cancelJob;
+ @JsonProperty(FIELD_NAME_TRIGGER_ID)
+ @Nullable
+ private final TriggerId triggerId;
+
@JsonCreator
public SavepointTriggerRequestBody(
@Nullable @JsonProperty(FIELD_NAME_TARGET_DIRECTORY) final String
targetDirectory,
- @Nullable @JsonProperty(FIELD_NAME_CANCEL_JOB) final Boolean
cancelJob) {
+ @Nullable @JsonProperty(FIELD_NAME_CANCEL_JOB) final Boolean
cancelJob,
+ @Nullable @JsonProperty(FIELD_NAME_TRIGGER_ID) TriggerId
triggerId) {
this.targetDirectory = targetDirectory;
this.cancelJob = cancelJob != null ? cancelJob : false;
+ this.triggerId = triggerId;
}
@Nullable
public String getTargetDirectory() {
return targetDirectory;
}
+ @JsonIgnore
+ public Optional<TriggerId> getTriggerId() {
+ return Optional.ofNullable(triggerId);
+ }
Review comment:
I feel a bit uncomfortable that this is handled differently than
`targetDirectory` which is also nullable but does not return an Optional. Not
sure if it's worth it changing targetDirectory though as it's not related to
this PR
##########
File path: docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
##########
@@ -3260,6 +3260,9 @@
},
"target-directory" : {
"type" : "string"
+ },
+ "triggerId" : {
+ "type" : "any"
Review comment:
Ah, to answer my own question: Most likely because it's a `TriggerId`,
not a `String`...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]