Condense FunctionSpec and remove SdkFunctionSpec, merging data and params
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3120fd9a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3120fd9a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3120fd9a Branch: refs/heads/master Commit: 3120fd9a499b01b08659edc4e045f5abbcc3ae07 Parents: 672d12a Author: Kenneth Knowles <k...@google.com> Authored: Thu Feb 23 18:06:56 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Feb 24 14:38:28 2017 -0800 ---------------------------------------------------------------------- .../src/main/proto/beam_runner_api.proto | 114 ++++++++++--------- .../beam/sdk/util/WindowingStrategies.java | 110 ++++++++++++------ 2 files changed, 134 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3120fd9a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 32c53fb..989e4bb 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -74,7 +74,6 @@ message MessageWithComponents { PTransform ptransform = 7; PCollection pcollection = 8; ReadPayload read_payload = 9; - SdkFunctionSpec sdk_function_spec = 10; SideInput side_input = 11; WindowIntoPayload window_into_payload = 12; WindowingStrategy windowing_strategy = 13; @@ -100,7 +99,8 @@ message Pipeline { // (Required) The id of the PTransform that is the root of the pipeline string root_transform_id = 2; - // (Required) Static display data for the pipeline. + // (Optional) Static display data for the pipeline. If there is none, + // it may be omitted. DisplayData display_data = 3; } @@ -116,7 +116,8 @@ message PTransform { // etc. // // If it is not stable, then the runner decides what will happen. But, most - // importantly, it must always be here, even if it is autogenerated. + // importantly, it must always be here and be unique, even if it is + // autogenerated. string unique_name = 5; // (Optional) A URN and payload that, together, fully defined the semantics @@ -125,13 +126,11 @@ message PTransform { // If absent, this must be an "anonymous" composite transform. // // For primitive transform in the Runner API, this is required, and the - // payloads are as follows: + // payloads are well-defined messages. When the URN indicates ParDo it + // is a ParDoPayload, and so on. // - // - when the URN is "urn:beam:transforms:pardo" it is a ParDoPayload - // - when the URN is "urn:beam:transforms:read" it is a ReadPayload - // - when the URN is "urn:beam:transforms:gbk" it is a GroupByKeyPayload - // - when the URN is "urn:beam:transforms:window" it is a WindowPayload - // - when the URN is "urn:beam:transforms:flatten" it is absent + // TODO: document the standardized URNs and payloads + // TODO: separate standardized payloads into a separate proto file // // For some special composite transforms, the payload is also officially // defined: @@ -144,7 +143,9 @@ message PTransform { // transforms that it contains. repeated string subtransforms = 2; - // (Required) A map from local names of inputs to PCollection ids. + // (Required) A map from local names of inputs (unique only with this map, and + // likely embedded in the transform payload and serialized user code) to + // PCollection ids. // // The payload for this transform may clarify the relationship of these // inputs. For example: @@ -157,7 +158,9 @@ message PTransform { // map<string, string> inputs = 3; - // (Required) A map from local names of outputs to PCollection ids. + // (Required) A map from local names of outputs (unique only within this map, + // and likely embedded in the transform payload and serialized user code) + // to PCollection ids. // // The URN or payload for this transform node may clarify the type and // relationship of these outputs. For example: @@ -167,7 +170,9 @@ message PTransform { // map<string, string> outputs = 4; - // (Required) Static display data for this PTransform application. + // (Optional) Static display data for this PTransform application. If + // there is none, or it is not relevant (such as use by the Fn API) + // then it may be omitted. DisplayData display_data = 6; } @@ -193,7 +198,9 @@ message PCollection { // (Required) The id of the windowing strategy for this PCollection. string windowing_strategy_id = 4; - // (Required) Static display data for this PCollection. + // (Optional) Static display data for this PTransform application. If + // there is none, or it is not relevant (such as use by the Fn API) + // then it may be omitted. DisplayData display_data = 5; } @@ -301,18 +308,17 @@ message CombinePayload { // a pipeline. message Coder { - // (Required) A cross-language, stable, unique identifier for the (possibly - // parametric) encoding. - string urn = 1; + // (Required) A specification for the coder, as a URN plus parameters. This + // may be a cross-language agreed-upon format, or it may be a "custom coder" + // that can only be used by a particular SDK. It does not include component + // coders, as it is beneficial for these to be comprehensible to a runner + // regardless of whether the binary format is agree-upon. + FunctionSpec spec = 1; // (Optional) If this coder is parametric, such as ListCoder(VarIntCoder), // this is a list of the components. In order for encodings to be identical, - // the URN and all components must be identical, recursively. - repeated string component_coder_id = 2; - - // (Optional) The pipeline-scoped id for the FunctionSpec of an SDK-specific - // UDF implementing the encoding. - string custom_coder_fn_id = 3; + // the FunctionSpec and all components must be identical, recursively. + repeated string component_coder_ids = 2; } // A windowing strategy describes the window function, triggering, allowed @@ -615,37 +621,51 @@ message Environment { string url = 1; } -// Description of a function in a Beam pipeline. +// A specification of a user defined function. // -// Contains one of _or both of_ a UrnWithParameter specifying the function -// and the specification for how to execute it against a particular -// SDK's harness. message FunctionSpec { - // (Optional) An SDK-independent specification of this function. - // If present, this must _fully_ specify the function. - // - // For example the distinguished urn "urn:beam:windowfn:FixedWindows" with - // payload `{ duration: n }` fully specifies a windowing function which can - // be implemented by the SDK constructing the pipeline, by another SDK (for - // language-to-language fusion compatibility) or by the runner directly. + // (Required) A full specification of this function. UrnWithParameter spec = 1; - // (Optional) An SDK-specific specification for how to execute this function, - // including a specification of the environment in which the function - // can be interpreted and executed. - SdkFunctionSpec sdk_fn_spec = 2; + // (Required) Reference to an execution environment capable of + // invoking this function. + string environment_id = 2; } // A URN along with a parameter object whose schema is determined by the // URN. // -// The URN will often specify a parametric function or transform such as -// "Top" or "FixedWindows" while the payload would specify _n_ or -// _duration_, respectively. +// This structure is reused in two distinct, but compatible, ways: +// +// 1. This can be a specification of the function over PCollections +// that a PTransform computes. +// 2. This can be a specification of a user-defined function, possibly +// SDK-specific. (external to this message must be adequate context +// to indicate the environment in which the UDF can be understood). +// +// Though not explicit in this proto, there are two possibilities +// for the relationship of a runner to this specification that +// one should bear in mind: +// +// 1. The runner understands the URN. For example, it might be +// a well-known URN like "urn:beam:transform:Top" or +// "urn:beam:windowfn:FixedWindows" with +// an agreed-upon payload (e.g. a number or duration, +// respectively). +// 2. The runner does not understand the URN. It might be an +// SDK specific URN such as "urn:beam:dofn:javasdk:1.0" +// that indicates to the SDK what the payload is, +// such as a serialized Java DoFn from a particular +// version of the Beam Java SDK. The payload will often +// then be an opaque message such as bytes in a +// language-specific serialization format. message UrnWithParameter { // (Required) A URN that describes the accompanying payload. + // For any URN that is not recognized (by whomever is inspecting + // it) the parameter payload should be treated as opaque and + // passed as-is. string urn = 1; // (Optional) The data specifying any parameters to the URN. If @@ -653,20 +673,6 @@ message UrnWithParameter { google.protobuf.Any parameter = 2; } -// An arbitrary payload tagged with the environment that knows how to -// interpret it as a user-defined function. -message SdkFunctionSpec { - - // (Required) Reference to the specification of the execution environment - // required to invoke this function. - string environment_id = 2; - - // (Required) The raw data of the function that the SDK knows how to - // deserialize, but need not be comprehensible to any other runner, SDK, or - // other entity. - bytes data = 4; -} - // TODO: transfer javadoc here message DisplayData { http://git-wip-us.apache.org/repos/asf/beam/blob/3120fd9a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java index 1af7719..3047da1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java @@ -17,14 +17,21 @@ */ package org.apache.beam.sdk.util; +import static com.google.common.base.Preconditions.checkArgument; + import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Any; import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.io.Serializable; import java.util.UUID; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.UrnWithParameter; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Trigger; @@ -108,7 +115,6 @@ public class WindowingStrategies implements Serializable { RunnerApi.ClosingBehavior.class.getCanonicalName(), ClosingBehavior.class.getCanonicalName(), proto)); - } } @@ -122,46 +128,70 @@ public class WindowingStrategies implements Serializable { } // This URN says that the coder is just a UDF blob the indicated SDK understands - private static final String CUSTOM_CODER_URN = "urn:beam:coders:custom:1.0"; + // TODO: standardize such things + private static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1"; + + // This URN says that the WindowFn is just a UDF blob the indicated SDK understands + // TODO: standardize such things + private static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1"; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + /** + * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where + * {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec} + * for the input {@link WindowFn}. + */ public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> windowFn) throws IOException { Coder<?> windowCoder = windowFn.windowCoder(); // TODO: re-use components String windowCoderId = UUID.randomUUID().toString(); - String customCoderId = UUID.randomUUID().toString(); - return RunnerApi.MessageWithComponents.newBuilder() - .setFunctionSpec( - RunnerApi.FunctionSpec.newBuilder() - .setSdkFnSpec( - RunnerApi.SdkFunctionSpec.newBuilder() - .setData( - ByteString.copyFrom(SerializableUtils.serializeToByteArray(windowFn))))) - .setComponents( - Components.newBuilder() - .putCoders( - windowCoderId, - RunnerApi.Coder.newBuilder() - .setUrn(CUSTOM_CODER_URN) - .setCustomCoderFnId(customCoderId) - .build()) - .putFunctionSpecs( - customCoderId, - RunnerApi.FunctionSpec.newBuilder() - .setSdkFnSpec( - RunnerApi.SdkFunctionSpec.newBuilder() - .setData( + RunnerApi.FunctionSpec windowFnSpec = + RunnerApi.FunctionSpec.newBuilder() + .setSpec( + UrnWithParameter.newBuilder() + .setUrn(CUSTOM_WINDOWFN_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes( - windowCoder.asCloudObject())))) - .build())) + SerializableUtils.serializeToByteArray(windowFn))) + .build()))) + .build(); + + RunnerApi.Coder windowCoderProto = + RunnerApi.Coder.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setSpec( + UrnWithParameter.newBuilder() + .setUrn(CUSTOM_CODER_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom( + OBJECT_MAPPER.writeValueAsBytes( + windowCoder.asCloudObject()))) + .build())))) + .build(); + + return RunnerApi.MessageWithComponents.newBuilder() + .setFunctionSpec(windowFnSpec) + .setComponents(Components.newBuilder().putCoders(windowCoderId, windowCoderProto)) .build(); } + /** + * Converts a {@link WindowingStrategy} into a {@link RunnerApi.MessageWithComponents} where + * {@link RunnerApi.MessageWithComponents#getWindowingStrategy()} ()} is a {@link + * RunnerApi.WindowingStrategy RunnerApi.WindowingStrategy (proto)} for the input {@link + * WindowingStrategy}. + */ public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy) throws IOException { @@ -195,7 +225,8 @@ public class WindowingStrategies implements Serializable { * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link RunnerApi.Components} * to the SDK's {@link WindowingStrategy}. */ - public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto) { + public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto) + throws InvalidProtocolBufferException { switch (proto.getRootCase()) { case WINDOWING_STRATEGY: return fromProto(proto.getWindowingStrategy(), proto.getComponents()); @@ -212,15 +243,23 @@ public class WindowingStrategies implements Serializable { * the provided components to dereferences identifiers found in the proto. */ public static WindowingStrategy<?, ?> fromProto( - RunnerApi.WindowingStrategy proto, RunnerApi.Components components) { + RunnerApi.WindowingStrategy proto, RunnerApi.Components components) + throws InvalidProtocolBufferException { + + FunctionSpec windowFnSpec = + components + .getFunctionSpecsMap() + .get(proto.getFnId()); + + checkArgument( + windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN), + "Only Java-serialized %s instances are supported, with URN %s. But found URN %s", + WindowFn.class.getSimpleName(), + windowFnSpec.getSpec().getUrn()); + Object deserializedWindowFn = SerializableUtils.deserializeFromByteArray( - components - .getFunctionSpecsMap() - .get(proto.getFnId()) - .getSdkFnSpec() - .getData() - .toByteArray(), + windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(), "WindowFn"); WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn; @@ -237,5 +276,4 @@ public class WindowingStrategies implements Serializable { .withOutputTimeFn(outputTimeFn) .withClosingBehavior(closingBehavior); } - }