[ https://issues.apache.org/jira/browse/BEAM-3787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Cwik updated BEAM-3787: ---------------------------- Labels: portability (was: ) > Migrate Fn API to be bidirectional instruction/request stream > ------------------------------------------------------------- > > Key: BEAM-3787 > URL: https://issues.apache.org/jira/browse/BEAM-3787 > Project: Beam > Issue Type: Improvement > Components: beam-model > Reporter: Luke Cwik > Assignee: Robert Bradshaw > Priority: Major > Labels: portability > > Allow the SDK to request the Runner to do something on its behalf. This > mechanism can be used for: > * Reporting final counters > * Work shedding (SDK can choose to reduce the amount of work it wants to do > (checkpointing)) > * Requesting process bundle descriptors (instead of requiring the Runner to > send them and have the SDK cache them). > * Decoupling the message type in control allows for new types of messages to > be added which are not one to one. > Example API change below (note that SdkMessage/RunnerMessage should use a > different name): > {code:java} > // An API that describes control messages between the SDK and Runner to > process > // bundles, split bundles, report progress, ... > service BeamFnControl { > // > rpc Control( > // A stream of SDK requests/responses. > stream SdkMessage > ) returns ( > // A stream of Runner requests/responses. > stream RunnerMessage > ) {} > } > // Messages a Runner can send over the control plane. > message RunnerMessage { > // (Required) An unique identifier provided by the runner which represents > // this requests execution. The RunnerInstructionResponse MUST have the > matching id. > string id = 1; > oneof message { > ErrorResponse error = 999; > RegisterRequest register = 1000; > ProcessBundleRequest process_bundle = 1001; > ProcessBundleProgressRequest process_bundle_progress = 1002; > ProcessBundleSplitRequest process_bundle_split = 1003; > ShedBundleResponse shed_bundle = 1000; > } > } > // Messages an SDK can send over the control plane. > message SdkMessage { > oneof message { > RunnerInstructionResponse runner_instruction_response = 1000; > SdkInstructionRequest sdk_instruction_request = 1001; > } > } > // A request sent by a runner which the SDK is asked to fulfill. > // For any unsupported request type, an error should be returned with a > // matching instruction id. > // Stable > message RunnerInstructionRequest { > // (Required) An unique identifier provided by the runner which represents > // this requests execution. The RunnerInstructionResponse MUST have the > matching id. > string instruction_id = 1; > // (Required) A request that the SDK Harness needs to interpret. > oneof request { > RegisterRequest register = 1000; > ProcessBundleRequest process_bundle = 1001; > ProcessBundleProgressRequest process_bundle_progress = 1002; > ProcessBundleSplitRequest process_bundle_split = 1003; > } > } > // The response for an associated request the SDK had been asked to fulfill. > // Stable > message RunnerInstructionResponse { > // (Required) A reference provided by the runner which represents a requests > // execution. The RunnerInstructionResponse MUST have the matching id when > // responding to the runner. > string instruction_id = 1; > // If this is specified, then this instruction has failed. > // A human readable string representing the reason as to why processing has > // failed. > string error = 2; > // If the instruction did not fail, it is required to return an equivalent > // response type depending on the request this matches. > oneof response { > RegisterResponse register = 1000; > ProcessBundleResponse process_bundle = 1001; > ProcessBundleProgressResponse process_bundle_progress = 1002; > ProcessBundleSplitResponse process_bundle_split = 1003; > } > } > message SdkInstructionRequest { > // (Required) An unique identifier provided by the SDK which represents > // this requests execution. The SdkInstructionResponse MUST have the > matching id. > string instruction_id = 1; > // (Required) A request that the Runner needs to interpret. > oneof request { > ShedBundleRequest shed_bundle = 1000; > } > } > // The response for an associated request the Runner had been asked to > fulfill. > // Stable > message RunnerInstructionResponse { > // (Required) A reference provided by the SDK which represents a requests > // execution. The RunnerInstructionResponse MUST have the matching id when > // responding to the SDK. > string instruction_id = 1; > // If this is specified, then this instruction has failed. > // A human readable string representing the reason as to why processing has > // failed. > string error = 2; > // If the instruction did not fail, it is required to return an equivalent > // response type depending on the request this matches. > oneof response { > ShedBundleResponse shed_bundle = 1000; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)