[ 
https://issues.apache.org/jira/browse/BEAM-3787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik updated BEAM-3787:
----------------------------
    Description: 
Allow the SDK to request the Runner to do something on its behalf. This 
mechanism can be used for:
* Reporting final counters
* Work shedding (SDK can choose to reduce the amount of work it wants to do 
(checkpointing))
* Requesting process bundle descriptors (instead of requiring the Runner to 
send them and have the SDK cache them).
* Decoupling the message type in control allows for new types of messages to be 
added which are not one to one.

Example API change below (note that SdkMessage/RunnerMessage should use a 
different name):

{code:java}
// An API that describes control messages between the SDK and Runner to process
// bundles, split bundles, report progress, ...
service BeamFnControl {
  // 
  rpc Control(
    // A stream of SDK requests/responses.
    stream SdkMessage
  ) returns (
    // A stream of Runner requests/responses.
    stream RunnerMessage
  ) {}
}

// Messages a Runner can send over the control plane.
message RunnerMessage {
  // (Required) An unique identifier provided by the runner which represents
  // this requests execution. The RunnerInstructionResponse MUST have the 
matching id.
  string id = 1;

  oneof message {
    ErrorResponse error = 999;
    RegisterRequest register = 1000;
    ProcessBundleRequest process_bundle = 1001;
    ProcessBundleProgressRequest process_bundle_progress = 1002;
    ProcessBundleSplitRequest process_bundle_split = 1003;
    ShedBundleResponse shed_bundle = 1000;
  }
}

// Messages an SDK can send over the control plane.
message SdkMessage {
  oneof message {
    RunnerInstructionResponse runner_instruction_response = 1000;
    SdkInstructionRequest sdk_instruction_request = 1001;
  }
}

// A request sent by a runner which the SDK is asked to fulfill.
// For any unsupported request type, an error should be returned with a
// matching instruction id.
// Stable
message RunnerInstructionRequest {
  // (Required) An unique identifier provided by the runner which represents
  // this requests execution. The RunnerInstructionResponse MUST have the 
matching id.
  string instruction_id = 1;

  // (Required) A request that the SDK Harness needs to interpret.
  oneof request {
    RegisterRequest register = 1000;
    ProcessBundleRequest process_bundle = 1001;
    ProcessBundleProgressRequest process_bundle_progress = 1002;
    ProcessBundleSplitRequest process_bundle_split = 1003;
  }
}

// The response for an associated request the SDK had been asked to fulfill.
// Stable
message RunnerInstructionResponse {
  // (Required) A reference provided by the runner which represents a requests
  // execution. The RunnerInstructionResponse MUST have the matching id when
  // responding to the runner.
  string instruction_id = 1;

  // If this is specified, then this instruction has failed.
  // A human readable string representing the reason as to why processing has
  // failed.
  string error = 2;

  // If the instruction did not fail, it is required to return an equivalent
  // response type depending on the request this matches.
  oneof response {
    RegisterResponse register = 1000;
    ProcessBundleResponse process_bundle = 1001;
    ProcessBundleProgressResponse process_bundle_progress = 1002;
    ProcessBundleSplitResponse process_bundle_split = 1003;
  }
}

message SdkInstructionRequest {
  // (Required) An unique identifier provided by the SDK which represents
  // this requests execution. The SdkInstructionResponse MUST have the matching 
id.
  string instruction_id = 1;

  // (Required) A request that the Runner needs to interpret.
  oneof request {
    ShedBundleRequest shed_bundle = 1000;
  }
}

// The response for an associated request the Runner had been asked to fulfill.
// Stable
message RunnerInstructionResponse {
  // (Required) A reference provided by the SDK which represents a requests
  // execution. The RunnerInstructionResponse MUST have the matching id when
  // responding to the SDK.
  string instruction_id = 1;

  // If this is specified, then this instruction has failed.
  // A human readable string representing the reason as to why processing has
  // failed.
  string error = 2;

  // If the instruction did not fail, it is required to return an equivalent
  // response type depending on the request this matches.
  oneof response {
    ShedBundleResponse shed_bundle = 1000;
  }
}

{code}


  was:
Allow the SDK to request the Runner to do something on its behalf. This 
mechanism can be used for:
* Reporting final counters
* Work shedding (SDK can choose to reduce the amount of work it wants to do 
(checkpointing))
* Requesting process bundle descriptors (instead of requiring the Runner to 
send them and have the SDK cache them).
* Decoupling the message type in control allows for new types of messages to be 
added which are not one to one.

Example API change below (note that SdkMessage/RunnerMessage should use a 
different name):

// An API that describes control messages between the SDK and Runner to process
// bundles, split bundles, report progress, ...
service BeamFnControl {
  // 
  rpc Control(
    // A stream of SDK requests/responses.
    stream SdkMessage
  ) returns (
    // A stream of Runner requests/responses.
    stream RunnerMessage
  ) {}
}

// Messages a Runner can send over the control plane.
message RunnerMessage {
  // (Required) An unique identifier provided by the runner which represents
  // this requests execution. The RunnerInstructionResponse MUST have the 
matching id.
  string id = 1;

  oneof message {
    ErrorResponse error = 999;
    RegisterRequest register = 1000;
    ProcessBundleRequest process_bundle = 1001;
    ProcessBundleProgressRequest process_bundle_progress = 1002;
    ProcessBundleSplitRequest process_bundle_split = 1003;
    ShedBundleResponse shed_bundle = 1000;
  }
}

// Messages an SDK can send over the control plane.
message SdkMessage {
  oneof message {
    RunnerInstructionResponse runner_instruction_response = 1000;
    SdkInstructionRequest sdk_instruction_request = 1001;
  }
}

// A request sent by a runner which the SDK is asked to fulfill.
// For any unsupported request type, an error should be returned with a
// matching instruction id.
// Stable
message RunnerInstructionRequest {
  // (Required) An unique identifier provided by the runner which represents
  // this requests execution. The RunnerInstructionResponse MUST have the 
matching id.
  string instruction_id = 1;

  // (Required) A request that the SDK Harness needs to interpret.
  oneof request {
    RegisterRequest register = 1000;
    ProcessBundleRequest process_bundle = 1001;
    ProcessBundleProgressRequest process_bundle_progress = 1002;
    ProcessBundleSplitRequest process_bundle_split = 1003;
  }
}

// The response for an associated request the SDK had been asked to fulfill.
// Stable
message RunnerInstructionResponse {
  // (Required) A reference provided by the runner which represents a requests
  // execution. The RunnerInstructionResponse MUST have the matching id when
  // responding to the runner.
  string instruction_id = 1;

  // If this is specified, then this instruction has failed.
  // A human readable string representing the reason as to why processing has
  // failed.
  string error = 2;

  // If the instruction did not fail, it is required to return an equivalent
  // response type depending on the request this matches.
  oneof response {
    RegisterResponse register = 1000;
    ProcessBundleResponse process_bundle = 1001;
    ProcessBundleProgressResponse process_bundle_progress = 1002;
    ProcessBundleSplitResponse process_bundle_split = 1003;
  }
}

message SdkInstructionRequest {
  // (Required) An unique identifier provided by the SDK which represents
  // this requests execution. The SdkInstructionResponse MUST have the matching 
id.
  string instruction_id = 1;

  // (Required) A request that the Runner needs to interpret.
  oneof request {
    ShedBundleRequest shed_bundle = 1000;
  }
}

// The response for an associated request the Runner had been asked to fulfill.
// Stable
message RunnerInstructionResponse {
  // (Required) A reference provided by the SDK which represents a requests
  // execution. The RunnerInstructionResponse MUST have the matching id when
  // responding to the SDK.
  string instruction_id = 1;

  // If this is specified, then this instruction has failed.
  // A human readable string representing the reason as to why processing has
  // failed.
  string error = 2;

  // If the instruction did not fail, it is required to return an equivalent
  // response type depending on the request this matches.
  oneof response {
    ShedBundleResponse shed_bundle = 1000;
  }
}



> Migrate Fn API to be bidirectional instruction/request stream
> -------------------------------------------------------------
>
>                 Key: BEAM-3787
>                 URL: https://issues.apache.org/jira/browse/BEAM-3787
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model
>            Reporter: Luke Cwik
>            Assignee: Robert Bradshaw
>            Priority: Major
>              Labels: portability
>
> Allow the SDK to request the Runner to do something on its behalf. This 
> mechanism can be used for:
> * Reporting final counters
> * Work shedding (SDK can choose to reduce the amount of work it wants to do 
> (checkpointing))
> * Requesting process bundle descriptors (instead of requiring the Runner to 
> send them and have the SDK cache them).
> * Decoupling the message type in control allows for new types of messages to 
> be added which are not one to one.
> Example API change below (note that SdkMessage/RunnerMessage should use a 
> different name):
> {code:java}
> // An API that describes control messages between the SDK and Runner to 
> process
> // bundles, split bundles, report progress, ...
> service BeamFnControl {
>   // 
>   rpc Control(
>     // A stream of SDK requests/responses.
>     stream SdkMessage
>   ) returns (
>     // A stream of Runner requests/responses.
>     stream RunnerMessage
>   ) {}
> }
> // Messages a Runner can send over the control plane.
> message RunnerMessage {
>   // (Required) An unique identifier provided by the runner which represents
>   // this requests execution. The RunnerInstructionResponse MUST have the 
> matching id.
>   string id = 1;
>   oneof message {
>     ErrorResponse error = 999;
>     RegisterRequest register = 1000;
>     ProcessBundleRequest process_bundle = 1001;
>     ProcessBundleProgressRequest process_bundle_progress = 1002;
>     ProcessBundleSplitRequest process_bundle_split = 1003;
>     ShedBundleResponse shed_bundle = 1000;
>   }
> }
> // Messages an SDK can send over the control plane.
> message SdkMessage {
>   oneof message {
>     RunnerInstructionResponse runner_instruction_response = 1000;
>     SdkInstructionRequest sdk_instruction_request = 1001;
>   }
> }
> // A request sent by a runner which the SDK is asked to fulfill.
> // For any unsupported request type, an error should be returned with a
> // matching instruction id.
> // Stable
> message RunnerInstructionRequest {
>   // (Required) An unique identifier provided by the runner which represents
>   // this requests execution. The RunnerInstructionResponse MUST have the 
> matching id.
>   string instruction_id = 1;
>   // (Required) A request that the SDK Harness needs to interpret.
>   oneof request {
>     RegisterRequest register = 1000;
>     ProcessBundleRequest process_bundle = 1001;
>     ProcessBundleProgressRequest process_bundle_progress = 1002;
>     ProcessBundleSplitRequest process_bundle_split = 1003;
>   }
> }
> // The response for an associated request the SDK had been asked to fulfill.
> // Stable
> message RunnerInstructionResponse {
>   // (Required) A reference provided by the runner which represents a requests
>   // execution. The RunnerInstructionResponse MUST have the matching id when
>   // responding to the runner.
>   string instruction_id = 1;
>   // If this is specified, then this instruction has failed.
>   // A human readable string representing the reason as to why processing has
>   // failed.
>   string error = 2;
>   // If the instruction did not fail, it is required to return an equivalent
>   // response type depending on the request this matches.
>   oneof response {
>     RegisterResponse register = 1000;
>     ProcessBundleResponse process_bundle = 1001;
>     ProcessBundleProgressResponse process_bundle_progress = 1002;
>     ProcessBundleSplitResponse process_bundle_split = 1003;
>   }
> }
> message SdkInstructionRequest {
>   // (Required) An unique identifier provided by the SDK which represents
>   // this requests execution. The SdkInstructionResponse MUST have the 
> matching id.
>   string instruction_id = 1;
>   // (Required) A request that the Runner needs to interpret.
>   oneof request {
>     ShedBundleRequest shed_bundle = 1000;
>   }
> }
> // The response for an associated request the Runner had been asked to 
> fulfill.
> // Stable
> message RunnerInstructionResponse {
>   // (Required) A reference provided by the SDK which represents a requests
>   // execution. The RunnerInstructionResponse MUST have the matching id when
>   // responding to the SDK.
>   string instruction_id = 1;
>   // If this is specified, then this instruction has failed.
>   // A human readable string representing the reason as to why processing has
>   // failed.
>   string error = 2;
>   // If the instruction did not fail, it is required to return an equivalent
>   // response type depending on the request this matches.
>   oneof response {
>     ShedBundleResponse shed_bundle = 1000;
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to