[ https://issues.apache.org/jira/browse/BEAM-3787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Cwik updated BEAM-3787: ---------------------------- Description: Allow the SDK to request the Runner to do something on its behalf. This mechanism can be used for: * Reporting final counters * Work shedding (SDK can choose to reduce the amount of work it wants to do (checkpointing)) * Requesting process bundle descriptors (instead of requiring the Runner to send them and have the SDK cache them). * Decoupling the message type in control allows for new types of messages to be added which are not one to one. Example API change below (note that SdkMessage/RunnerMessage should use a different name): {code:java} // An API that describes control messages between the SDK and Runner to process // bundles, split bundles, report progress, ... service BeamFnControl { // rpc Control( // A stream of SDK requests/responses. stream SdkMessage ) returns ( // A stream of Runner requests/responses. stream RunnerMessage ) {} } // Messages a Runner can send over the control plane. message RunnerMessage { // (Required) An unique identifier provided by the runner which represents // this requests execution. The RunnerInstructionResponse MUST have the matching id. string id = 1; oneof message { ErrorResponse error = 999; RegisterRequest register = 1000; ProcessBundleRequest process_bundle = 1001; ProcessBundleProgressRequest process_bundle_progress = 1002; ProcessBundleSplitRequest process_bundle_split = 1003; ShedBundleResponse shed_bundle = 1000; } } // Messages an SDK can send over the control plane. message SdkMessage { oneof message { RunnerInstructionResponse runner_instruction_response = 1000; SdkInstructionRequest sdk_instruction_request = 1001; } } // A request sent by a runner which the SDK is asked to fulfill. // For any unsupported request type, an error should be returned with a // matching instruction id. // Stable message RunnerInstructionRequest { // (Required) An unique identifier provided by the runner which represents // this requests execution. The RunnerInstructionResponse MUST have the matching id. string instruction_id = 1; // (Required) A request that the SDK Harness needs to interpret. oneof request { RegisterRequest register = 1000; ProcessBundleRequest process_bundle = 1001; ProcessBundleProgressRequest process_bundle_progress = 1002; ProcessBundleSplitRequest process_bundle_split = 1003; } } // The response for an associated request the SDK had been asked to fulfill. // Stable message RunnerInstructionResponse { // (Required) A reference provided by the runner which represents a requests // execution. The RunnerInstructionResponse MUST have the matching id when // responding to the runner. string instruction_id = 1; // If this is specified, then this instruction has failed. // A human readable string representing the reason as to why processing has // failed. string error = 2; // If the instruction did not fail, it is required to return an equivalent // response type depending on the request this matches. oneof response { RegisterResponse register = 1000; ProcessBundleResponse process_bundle = 1001; ProcessBundleProgressResponse process_bundle_progress = 1002; ProcessBundleSplitResponse process_bundle_split = 1003; } } message SdkInstructionRequest { // (Required) An unique identifier provided by the SDK which represents // this requests execution. The SdkInstructionResponse MUST have the matching id. string instruction_id = 1; // (Required) A request that the Runner needs to interpret. oneof request { ShedBundleRequest shed_bundle = 1000; } } // The response for an associated request the Runner had been asked to fulfill. // Stable message RunnerInstructionResponse { // (Required) A reference provided by the SDK which represents a requests // execution. The RunnerInstructionResponse MUST have the matching id when // responding to the SDK. string instruction_id = 1; // If this is specified, then this instruction has failed. // A human readable string representing the reason as to why processing has // failed. string error = 2; // If the instruction did not fail, it is required to return an equivalent // response type depending on the request this matches. oneof response { ShedBundleResponse shed_bundle = 1000; } } {code} was: Allow the SDK to request the Runner to do something on its behalf. This mechanism can be used for: * Reporting final counters * Work shedding (SDK can choose to reduce the amount of work it wants to do (checkpointing)) * Requesting process bundle descriptors (instead of requiring the Runner to send them and have the SDK cache them). * Decoupling the message type in control allows for new types of messages to be added which are not one to one. Example API change below (note that SdkMessage/RunnerMessage should use a different name): // An API that describes control messages between the SDK and Runner to process // bundles, split bundles, report progress, ... service BeamFnControl { // rpc Control( // A stream of SDK requests/responses. stream SdkMessage ) returns ( // A stream of Runner requests/responses. stream RunnerMessage ) {} } // Messages a Runner can send over the control plane. message RunnerMessage { // (Required) An unique identifier provided by the runner which represents // this requests execution. The RunnerInstructionResponse MUST have the matching id. string id = 1; oneof message { ErrorResponse error = 999; RegisterRequest register = 1000; ProcessBundleRequest process_bundle = 1001; ProcessBundleProgressRequest process_bundle_progress = 1002; ProcessBundleSplitRequest process_bundle_split = 1003; ShedBundleResponse shed_bundle = 1000; } } // Messages an SDK can send over the control plane. message SdkMessage { oneof message { RunnerInstructionResponse runner_instruction_response = 1000; SdkInstructionRequest sdk_instruction_request = 1001; } } // A request sent by a runner which the SDK is asked to fulfill. // For any unsupported request type, an error should be returned with a // matching instruction id. // Stable message RunnerInstructionRequest { // (Required) An unique identifier provided by the runner which represents // this requests execution. The RunnerInstructionResponse MUST have the matching id. string instruction_id = 1; // (Required) A request that the SDK Harness needs to interpret. oneof request { RegisterRequest register = 1000; ProcessBundleRequest process_bundle = 1001; ProcessBundleProgressRequest process_bundle_progress = 1002; ProcessBundleSplitRequest process_bundle_split = 1003; } } // The response for an associated request the SDK had been asked to fulfill. // Stable message RunnerInstructionResponse { // (Required) A reference provided by the runner which represents a requests // execution. The RunnerInstructionResponse MUST have the matching id when // responding to the runner. string instruction_id = 1; // If this is specified, then this instruction has failed. // A human readable string representing the reason as to why processing has // failed. string error = 2; // If the instruction did not fail, it is required to return an equivalent // response type depending on the request this matches. oneof response { RegisterResponse register = 1000; ProcessBundleResponse process_bundle = 1001; ProcessBundleProgressResponse process_bundle_progress = 1002; ProcessBundleSplitResponse process_bundle_split = 1003; } } message SdkInstructionRequest { // (Required) An unique identifier provided by the SDK which represents // this requests execution. The SdkInstructionResponse MUST have the matching id. string instruction_id = 1; // (Required) A request that the Runner needs to interpret. oneof request { ShedBundleRequest shed_bundle = 1000; } } // The response for an associated request the Runner had been asked to fulfill. // Stable message RunnerInstructionResponse { // (Required) A reference provided by the SDK which represents a requests // execution. The RunnerInstructionResponse MUST have the matching id when // responding to the SDK. string instruction_id = 1; // If this is specified, then this instruction has failed. // A human readable string representing the reason as to why processing has // failed. string error = 2; // If the instruction did not fail, it is required to return an equivalent // response type depending on the request this matches. oneof response { ShedBundleResponse shed_bundle = 1000; } } > Migrate Fn API to be bidirectional instruction/request stream > ------------------------------------------------------------- > > Key: BEAM-3787 > URL: https://issues.apache.org/jira/browse/BEAM-3787 > Project: Beam > Issue Type: Improvement > Components: beam-model > Reporter: Luke Cwik > Assignee: Robert Bradshaw > Priority: Major > Labels: portability > > Allow the SDK to request the Runner to do something on its behalf. This > mechanism can be used for: > * Reporting final counters > * Work shedding (SDK can choose to reduce the amount of work it wants to do > (checkpointing)) > * Requesting process bundle descriptors (instead of requiring the Runner to > send them and have the SDK cache them). > * Decoupling the message type in control allows for new types of messages to > be added which are not one to one. > Example API change below (note that SdkMessage/RunnerMessage should use a > different name): > {code:java} > // An API that describes control messages between the SDK and Runner to > process > // bundles, split bundles, report progress, ... > service BeamFnControl { > // > rpc Control( > // A stream of SDK requests/responses. > stream SdkMessage > ) returns ( > // A stream of Runner requests/responses. > stream RunnerMessage > ) {} > } > // Messages a Runner can send over the control plane. > message RunnerMessage { > // (Required) An unique identifier provided by the runner which represents > // this requests execution. The RunnerInstructionResponse MUST have the > matching id. > string id = 1; > oneof message { > ErrorResponse error = 999; > RegisterRequest register = 1000; > ProcessBundleRequest process_bundle = 1001; > ProcessBundleProgressRequest process_bundle_progress = 1002; > ProcessBundleSplitRequest process_bundle_split = 1003; > ShedBundleResponse shed_bundle = 1000; > } > } > // Messages an SDK can send over the control plane. > message SdkMessage { > oneof message { > RunnerInstructionResponse runner_instruction_response = 1000; > SdkInstructionRequest sdk_instruction_request = 1001; > } > } > // A request sent by a runner which the SDK is asked to fulfill. > // For any unsupported request type, an error should be returned with a > // matching instruction id. > // Stable > message RunnerInstructionRequest { > // (Required) An unique identifier provided by the runner which represents > // this requests execution. The RunnerInstructionResponse MUST have the > matching id. > string instruction_id = 1; > // (Required) A request that the SDK Harness needs to interpret. > oneof request { > RegisterRequest register = 1000; > ProcessBundleRequest process_bundle = 1001; > ProcessBundleProgressRequest process_bundle_progress = 1002; > ProcessBundleSplitRequest process_bundle_split = 1003; > } > } > // The response for an associated request the SDK had been asked to fulfill. > // Stable > message RunnerInstructionResponse { > // (Required) A reference provided by the runner which represents a requests > // execution. The RunnerInstructionResponse MUST have the matching id when > // responding to the runner. > string instruction_id = 1; > // If this is specified, then this instruction has failed. > // A human readable string representing the reason as to why processing has > // failed. > string error = 2; > // If the instruction did not fail, it is required to return an equivalent > // response type depending on the request this matches. > oneof response { > RegisterResponse register = 1000; > ProcessBundleResponse process_bundle = 1001; > ProcessBundleProgressResponse process_bundle_progress = 1002; > ProcessBundleSplitResponse process_bundle_split = 1003; > } > } > message SdkInstructionRequest { > // (Required) An unique identifier provided by the SDK which represents > // this requests execution. The SdkInstructionResponse MUST have the > matching id. > string instruction_id = 1; > // (Required) A request that the Runner needs to interpret. > oneof request { > ShedBundleRequest shed_bundle = 1000; > } > } > // The response for an associated request the Runner had been asked to > fulfill. > // Stable > message RunnerInstructionResponse { > // (Required) A reference provided by the SDK which represents a requests > // execution. The RunnerInstructionResponse MUST have the matching id when > // responding to the SDK. > string instruction_id = 1; > // If this is specified, then this instruction has failed. > // A human readable string representing the reason as to why processing has > // failed. > string error = 2; > // If the instruction did not fail, it is required to return an equivalent > // response type depending on the request this matches. > oneof response { > ShedBundleResponse shed_bundle = 1000; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)