Repository: beam Updated Branches: refs/heads/master 4f56acbba -> 2c71354d0
Use strings for ids in Fn API Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9933f271 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9933f271 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9933f271 Branch: refs/heads/master Commit: 9933f27140ddfe5b9ded4a0688a9c0506ef94113 Parents: 4f56acb Author: Kenneth Knowles <k...@google.com> Authored: Sun Feb 12 22:23:32 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Feb 23 17:35:22 2017 -0800 ---------------------------------------------------------------------- .../fn-api/src/main/proto/beam_fn_api.proto | 48 +++++----- .../fn/harness/control/BeamFnControlClient.java | 3 +- .../harness/control/ProcessBundleHandler.java | 8 +- .../fn/harness/control/RegisterHandler.java | 8 +- .../BeamFnDataBufferingOutboundObserver.java | 4 +- .../beam/fn/harness/data/BeamFnDataClient.java | 4 +- .../fn/harness/data/BeamFnDataGrpcClient.java | 4 +- .../harness/data/BeamFnDataGrpcMultiplexer.java | 11 +-- .../fn/harness/logging/BeamFnLoggingClient.java | 4 +- .../beam/runners/core/BeamFnDataReadRunner.java | 4 +- .../runners/core/BeamFnDataWriteRunner.java | 4 +- .../apache/beam/fn/harness/FnHarnessTest.java | 8 +- .../control/BeamFnControlClientTest.java | 12 +-- .../control/ProcessBundleHandlerTest.java | 95 ++++++++++---------- .../fn/harness/control/RegisterHandlerTest.java | 18 ++-- ...BeamFnDataBufferingOutboundObserverTest.java | 9 +- .../harness/data/BeamFnDataGrpcClientTest.java | 21 +++-- .../data/BeamFnDataGrpcMultiplexerTest.java | 12 +-- .../data/BeamFnDataInboundObserverTest.java | 4 +- .../runners/core/BeamFnDataReadRunnerTest.java | 8 +- .../runners/core/BeamFnDataWriteRunnerTest.java | 8 +- 21 files changed, 159 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/common/fn-api/src/main/proto/beam_fn_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto index 3ac0fbf..80bae2e 100644 --- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto +++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto @@ -59,7 +59,7 @@ message Target { } // (Required) The id of the PrimitiveTransform which is the target. - int64 primitive_transform_reference = 1; + string primitive_transform_reference = 1; // (Required) The local name of an input or output defined on the primitive // transform. @@ -69,7 +69,7 @@ message Target { // Information defining a PCollection message PCollection { // (Required) A reference to a coder. - int64 coder_reference = 1; + string coder_reference = 1; // TODO: Windowing strategy, ... } @@ -78,7 +78,7 @@ message PCollection { message PrimitiveTransform { // (Required) A pipeline level unique id which can be used as a reference to // refer to this. - int64 id = 1; + string id = 1; // (Required) A function spec that is used by this primitive // transform to process data. @@ -117,7 +117,7 @@ message PrimitiveTransform { message FunctionSpec { // (Required) A pipeline level unique id which can be used as a reference to // refer to this. - int64 id = 1; + string id = 1; // (Required) A globally unique name representing this user definable // function. @@ -131,7 +131,7 @@ message FunctionSpec { // (Required) Reference to specification of execution environment required to // invoke this function. - int64 environment_reference = 3; + string environment_reference = 3; // Data used to parameterize this function. Depending on the urn, this may be // optional or required. @@ -179,7 +179,7 @@ message Coder { // // TODO: Perhaps this is redundant with the data of the FunctionSpec // for known coders? - repeated int64 component_coder_reference = 2; + repeated string component_coder_reference = 2; } // A descriptor for connecting to a remote port using the Beam Fn Data API. @@ -218,7 +218,7 @@ service BeamFnControl { message InstructionRequest { // (Required) An unique identifier provided by the runner which represents // this requests execution. The InstructionResponse MUST have the matching id. - int64 instruction_id = 1; + string instruction_id = 1; // (Required) A request that the SDK Harness needs to interpret. oneof request { @@ -235,7 +235,7 @@ message InstructionResponse { // (Required) A reference provided by the runner which represents a requests // execution. The InstructionResponse MUST have the matching id when // responding to the runner. - int64 instruction_id = 1; + string instruction_id = 1; // If this is specified, then this instruction has failed. // A human readable string representing the reason as to why processing has @@ -269,7 +269,7 @@ message RegisterResponse { message ProcessBundleDescriptor { // (Required) A pipeline level unique id which can be used as a reference to // refer to this. - int64 id = 1; + string id = 1; // (Required) A list of primitive transforms that should // be used to construct the bundle processing graph. @@ -282,7 +282,7 @@ message ProcessBundleDescriptor { // A request to process a given bundle. // Stable message ProcessBundleRequest { - int64 process_bundle_descriptor_reference = 1; + string process_bundle_descriptor_reference = 1; } // Stable @@ -292,7 +292,7 @@ message ProcessBundleResponse { message ProcessBundleProgressRequest { // (Required) A reference to an active process bundle request with the given // instruction id. - int64 instruction_reference = 1; + string instruction_reference = 1; } message ProcessBundleProgressResponse { @@ -309,7 +309,7 @@ message ProcessBundleProgressResponse { message ProcessBundleSplitRequest { // (Required) A reference to an active process bundle request with the given // instruction id. - int64 instruction_reference = 1; + string instruction_reference = 1; // (Required) The fraction of work (when compared to the known amount of work) // the process bundle request should try to split at. @@ -344,7 +344,7 @@ message PrimitiveTransformSplit { // (Required) A reference to a primitive transform with the given id that // is part of the active process bundle request with the given instruction // id. - int64 primitive_transform_reference = 1; + string primitive_transform_reference = 1; // (Required) A function specification describing the restriction // that has been completed by the primitive transform. @@ -425,7 +425,7 @@ message Elements { message Data { // (Required) A reference to an active instruction request with the given // instruction id. - int64 instruction_reference = 1; + string instruction_reference = 1; // (Required) A definition representing a consumer or producer of this data. // If received by a harness, this represents the consumer within that @@ -475,12 +475,12 @@ service BeamFnData { message StateRequest { // (Required) An unique identifier provided by the SDK which represents this // requests execution. The StateResponse must have the matching id. - int64 id = 1; + string id = 1; // (Required) The associated instruction id of the work that is currently // being processed. This allows for the runner to associate any modifications // to state to be committed with the appropriate work execution. - int64 instruction_reference = 2; + string instruction_reference = 2; // At least one of the following fields should be populated. // Also, no request should use a state key referred to in another state key. @@ -499,11 +499,11 @@ message StateResponse { // (Required) A reference provided by the SDK which represents a requests // execution. The StateResponse must have the matching id when responding // to the SDK. - int64 id = 1; + string id = 1; // (Required) The associated instruction id of the work that is currently // being processed. - int64 instruction_reference = 2; + string instruction_reference = 2; // (Required) A key to associate with the version of this state. Allows for // SDKs to share state across work items if they have the same cache key and @@ -563,7 +563,7 @@ message StateKey { // (Required) Represents the namespace for the state. If this state is for a // DoFn, then this reference is expected to point to the DoFn. If this state // is for a side input, then this is expected to reference the ViewFn. - int64 function_spec_reference = 1; + string function_spec_reference = 1; // (Required) The bytes of the window which this state request is for encoded // in the outer context. @@ -693,11 +693,11 @@ message LogEntry { // (Optional) A reference to the instruction this log statement is associated // with. - int64 instruction_reference = 5; + string instruction_reference = 5; // (Optional) A reference to the primitive transform this log statement is // associated with. - int64 primitive_transform_reference = 6; + string primitive_transform_reference = 6; // (Optional) Human-readable name of the function or method being invoked, // with optional context such as the class or package name. The format can @@ -734,7 +734,7 @@ service BeamFnLogging { message ApiServiceDescriptor { // (Required) A pipeline level unique id which can be used as a reference to // refer to this. - int64 id = 1; + string id = 1; // (Required) The URL to connect to. string url = 2; @@ -758,7 +758,7 @@ message OAuth2ClientCredentialsGrant { message DockerContainer { // (Required) A pipeline level unique id which can be used as a reference to // refer to this. - int64 id = 1; + string id = 1; // (Required) The Docker container URI // For example "dataflow.gcr.io/v1beta3/java-batch:1.5.1" @@ -767,5 +767,5 @@ message DockerContainer { // (Optional) Docker registry specification. // If unspecified, the uri is expected to be able to be fetched without // requiring additional configuration by a runner. - int64 registry_reference = 3; + string registry_reference = 3; } http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java index 6d75315..e40bb2f 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java @@ -52,9 +52,10 @@ import org.slf4j.LoggerFactory; * {@link org.apache.beam.fn.v1.BeamFnApi.InstructionRequest}s. */ public class BeamFnControlClient { + private static final String FAKE_INSTRUCTION_ID = "FAKE_INSTRUCTION_ID"; private static final Logger LOG = LoggerFactory.getLogger(BeamFnControlClient.class); private static final BeamFnApi.InstructionRequest POISON_PILL = - BeamFnApi.InstructionRequest.newBuilder().setInstructionId(Long.MIN_VALUE).build(); + BeamFnApi.InstructionRequest.newBuilder().setInstructionId(FAKE_INSTRUCTION_ID).build(); private final StreamObserver<BeamFnApi.InstructionResponse> outboundObserver; private final BlockingDeque<BeamFnApi.InstructionRequest> bufferedInstructions; http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index d764a95..1f82085 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -85,12 +85,12 @@ public class ProcessBundleHandler { private static final Logger LOG = LoggerFactory.getLogger(ProcessBundleHandler.class); private final PipelineOptions options; - private final Function<Long, Message> fnApiRegistry; + private final Function<String, Message> fnApiRegistry; private final BeamFnDataClient beamFnDataClient; public ProcessBundleHandler( PipelineOptions options, - Function<Long, Message> fnApiRegistry, + Function<String, Message> fnApiRegistry, BeamFnDataClient beamFnDataClient) { this.options = options; this.fnApiRegistry = fnApiRegistry; @@ -99,7 +99,7 @@ public class ProcessBundleHandler { protected <InputT, OutputT> void createConsumersForPrimitiveTransform( BeamFnApi.PrimitiveTransform primitiveTransform, - Supplier<Long> processBundleInstructionId, + Supplier<String> processBundleInstructionId, Function<BeamFnApi.Target, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers, BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer, Consumer<ThrowingRunnable> addStartFunction, @@ -209,7 +209,7 @@ public class ProcessBundleHandler { BeamFnApi.InstructionResponse.newBuilder() .setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance()); - long bundleId = request.getProcessBundle().getProcessBundleDescriptorReference(); + String bundleId = request.getProcessBundle().getProcessBundleDescriptorReference(); BeamFnApi.ProcessBundleDescriptor bundleDescriptor = (BeamFnApi.ProcessBundleDescriptor) fnApiRegistry.apply(bundleId); http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java index be75b50..fb06231 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java @@ -37,13 +37,13 @@ import org.slf4j.LoggerFactory; */ public class RegisterHandler { private static final Logger LOG = LoggerFactory.getLogger(RegisterHandler.class); - private final ConcurrentMap<Long, CompletableFuture<Message>> idToObject; + private final ConcurrentMap<String, CompletableFuture<Message>> idToObject; public RegisterHandler() { idToObject = new ConcurrentHashMap<>(); } - public <T extends Message> T getById(long id) { + public <T extends Message> T getById(String id) { try { @SuppressWarnings("unchecked") CompletableFuture<T> returnValue = (CompletableFuture<T>) computeIfAbsent(id); @@ -86,7 +86,7 @@ public class RegisterHandler { return response; } - private CompletableFuture<Message> computeIfAbsent(long id) { - return idToObject.computeIfAbsent(id, (Long ignored) -> new CompletableFuture<>()); + private CompletableFuture<Message> computeIfAbsent(String id) { + return idToObject.computeIfAbsent(id, (String ignored) -> new CompletableFuture<>()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java index 25560ef..18e0d95 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java @@ -61,13 +61,13 @@ public class BeamFnDataBufferingOutboundObserver<T> private long counter; private final int bufferLimit; private final Coder<WindowedValue<T>> coder; - private final KV<Long, BeamFnApi.Target> outputLocation; + private final KV<String, BeamFnApi.Target> outputLocation; private final StreamObserver<BeamFnApi.Elements> outboundObserver; private final ByteString.Output bufferedElements; public BeamFnDataBufferingOutboundObserver( PipelineOptions options, - KV<Long, BeamFnApi.Target> outputLocation, + KV<String, BeamFnApi.Target> outputLocation, Coder<WindowedValue<T>> coder, StreamObserver<BeamFnApi.Elements> outboundObserver) { this.bufferLimit = getBufferLimit(options); http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java index 27b1acb..7be96b6 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java @@ -44,7 +44,7 @@ public interface BeamFnDataClient { */ <T> CompletableFuture<Void> forInboundConsumer( BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, - KV<Long, BeamFnApi.Target> inputLocation, + KV<String, BeamFnApi.Target> inputLocation, Coder<WindowedValue<T>> coder, ThrowingConsumer<WindowedValue<T>> consumer); @@ -59,6 +59,6 @@ public interface BeamFnDataClient { */ <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer( BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, - KV<Long, BeamFnApi.Target> outputLocation, + KV<String, BeamFnApi.Target> outputLocation, Coder<WindowedValue<T>> coder); } http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java index 8db1f48..4137cd7 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java @@ -75,7 +75,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient { @Override public <T> CompletableFuture<Void> forInboundConsumer( BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, - KV<Long, BeamFnApi.Target> inputLocation, + KV<String, BeamFnApi.Target> inputLocation, Coder<WindowedValue<T>> coder, ThrowingConsumer<WindowedValue<T>> consumer) { LOG.debug("Registering consumer instruction {} for target {}", @@ -102,7 +102,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient { @Override public <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer( BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, - KV<Long, BeamFnApi.Target> outputLocation, + KV<String, BeamFnApi.Target> outputLocation, Coder<WindowedValue<T>> coder) { BeamFnDataGrpcMultiplexer client = getClientFor(apiServiceDescriptor); http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java index fe3a693..53dfe11 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java @@ -50,8 +50,9 @@ public class BeamFnDataGrpcMultiplexer { private final StreamObserver<BeamFnApi.Elements> inboundObserver; private final StreamObserver<BeamFnApi.Elements> outboundObserver; @VisibleForTesting - final ConcurrentMap<KV<Long, BeamFnApi.Target>, - CompletableFuture<Consumer<BeamFnApi.Elements.Data>>> consumers; + final ConcurrentMap< + KV<String, BeamFnApi.Target>, CompletableFuture<Consumer<BeamFnApi.Elements.Data>>> + consumers; public BeamFnDataGrpcMultiplexer( BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, @@ -80,10 +81,10 @@ public class BeamFnDataGrpcMultiplexer { } public CompletableFuture<Consumer<BeamFnApi.Elements.Data>> futureForKey( - KV<Long, BeamFnApi.Target> key) { + KV<String, BeamFnApi.Target> key) { return consumers.computeIfAbsent( key, - (KV<Long, BeamFnApi.Target> providedKey) -> new CompletableFuture<>()); + (KV<String, BeamFnApi.Target> providedKey) -> new CompletableFuture<>()); } /** @@ -99,7 +100,7 @@ public class BeamFnDataGrpcMultiplexer { public void onNext(BeamFnApi.Elements value) { for (BeamFnApi.Elements.Data data : value.getDataList()) { try { - KV<Long, BeamFnApi.Target> key = + KV<String, BeamFnApi.Target> key = KV.of(data.getInstructionReference(), data.getTarget()); futureForKey(key).get().accept(data); if (data.getData().isEmpty()) { http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java index d74d9fa..e1ec03d 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java @@ -79,9 +79,11 @@ public class BeamFnLoggingClient implements AutoCloseable { private static final Formatter FORMATTER = new SimpleFormatter(); + private static final String FAKE_INSTRUCTION_ID = "FAKE_INSTRUCTION_ID"; + /* Used to signal to a thread processing a queue to finish its work gracefully. */ private static final BeamFnApi.LogEntry POISON_PILL = - BeamFnApi.LogEntry.newBuilder().setInstructionReference(Long.MIN_VALUE).build(); + BeamFnApi.LogEntry.newBuilder().setInstructionReference(FAKE_INSTRUCTION_ID).build(); /** * The number of log messages that will be buffered. Assuming log messages are at most 1 KiB, http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java index a6b8b33..034ef84 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java @@ -51,7 +51,7 @@ public class BeamFnDataReadRunner<OutputT> { private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor; private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers; - private final Supplier<Long> processBundleInstructionIdSupplier; + private final Supplier<String> processBundleInstructionIdSupplier; private final BeamFnDataClient beamFnDataClientFactory; private final Coder<WindowedValue<OutputT>> coder; private final BeamFnApi.Target inputTarget; @@ -60,7 +60,7 @@ public class BeamFnDataReadRunner<OutputT> { public BeamFnDataReadRunner( BeamFnApi.FunctionSpec functionSpec, - Supplier<Long> processBundleInstructionIdSupplier, + Supplier<String> processBundleInstructionIdSupplier, BeamFnApi.Target inputTarget, BeamFnApi.Coder coderSpec, BeamFnDataClient beamFnDataClientFactory, http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java index 596afe5..54fd626 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java @@ -44,13 +44,13 @@ public class BeamFnDataWriteRunner<InputT> { private final BeamFnApi.Target outputTarget; private final Coder<WindowedValue<InputT>> coder; private final BeamFnDataClient beamFnDataClientFactory; - private final Supplier<Long> processBundleInstructionIdSupplier; + private final Supplier<String> processBundleInstructionIdSupplier; private CloseableThrowingConsumer<WindowedValue<InputT>> consumer; public BeamFnDataWriteRunner( BeamFnApi.FunctionSpec functionSpec, - Supplier<Long> processBundleInstructionIdSupplier, + Supplier<String> processBundleInstructionIdSupplier, BeamFnApi.Target outputTarget, BeamFnApi.Coder coderSpec, BeamFnDataClient beamFnDataClientFactory) http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java index ff05225..6a45647 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java @@ -48,12 +48,12 @@ import org.junit.runners.JUnit4; public class FnHarnessTest { private static final BeamFnApi.InstructionRequest INSTRUCTION_REQUEST = BeamFnApi.InstructionRequest.newBuilder() - .setInstructionId(999L) + .setInstructionId("999L") .setRegister(BeamFnApi.RegisterRequest.getDefaultInstance()) .build(); private static final BeamFnApi.InstructionResponse INSTRUCTION_RESPONSE = BeamFnApi.InstructionResponse.newBuilder() - .setInstructionId(999L) + .setInstructionId("999L") .setRegister(BeamFnApi.RegisterResponse.getDefaultInstance()) .build(); @@ -108,12 +108,12 @@ public class FnHarnessTest { try { BeamFnApi.ApiServiceDescriptor loggingDescriptor = BeamFnApi.ApiServiceDescriptor .newBuilder() - .setId(1L) + .setId("1L") .setUrl("localhost:" + loggingServer.getPort()) .build(); BeamFnApi.ApiServiceDescriptor controlDescriptor = BeamFnApi.ApiServiceDescriptor .newBuilder() - .setId(2L) + .setId("2L") .setUrl("localhost:" + controlServer.getPort()) .build(); http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java index fc3af49..edb7903 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java @@ -51,33 +51,33 @@ import org.junit.runners.JUnit4; public class BeamFnControlClientTest { private static final BeamFnApi.InstructionRequest SUCCESSFUL_REQUEST = BeamFnApi.InstructionRequest.newBuilder() - .setInstructionId(1L) + .setInstructionId("1L") .setProcessBundle(BeamFnApi.ProcessBundleRequest.getDefaultInstance()) .build(); private static final BeamFnApi.InstructionResponse SUCCESSFUL_RESPONSE = BeamFnApi.InstructionResponse.newBuilder() - .setInstructionId(1L) + .setInstructionId("1L") .setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance()) .build(); private static final BeamFnApi.InstructionRequest UNKNOWN_HANDLER_REQUEST = BeamFnApi.InstructionRequest.newBuilder() - .setInstructionId(2L) + .setInstructionId("2L") .build(); private static final BeamFnApi.InstructionResponse UNKNOWN_HANDLER_RESPONSE = BeamFnApi.InstructionResponse.newBuilder() - .setInstructionId(2L) + .setInstructionId("2L") .setError("Unknown InstructionRequest type " + BeamFnApi.InstructionRequest.RequestCase.REQUEST_NOT_SET) .build(); private static final RuntimeException FAILURE = new RuntimeException("TestFailure"); private static final BeamFnApi.InstructionRequest FAILURE_REQUEST = BeamFnApi.InstructionRequest.newBuilder() - .setInstructionId(3L) + .setInstructionId("3L") .setRegister(BeamFnApi.RegisterRequest.getDefaultInstance()) .build(); private static final BeamFnApi.InstructionResponse FAILURE_RESPONSE = BeamFnApi.InstructionResponse.newBuilder() - .setInstructionId(3L) + .setInstructionId("3L") .setError(getStackTraceAsString(FAILURE)) .build(); http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index 1d451b5..de105d7 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -21,6 +21,7 @@ package org.apache.beam.fn.harness.control; import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -93,11 +94,11 @@ public class ProcessBundleHandlerTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final Coder<WindowedValue<String>> STRING_CODER = WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); - private static final long LONG_CODER_SPEC_ID = 998L; - private static final long STRING_CODER_SPEC_ID = 999L; + private static final String LONG_CODER_SPEC_ID = "998L"; + private static final String STRING_CODER_SPEC_ID = "999L"; private static final BeamFnApi.RemoteGrpcPort REMOTE_PORT = BeamFnApi.RemoteGrpcPort.newBuilder() .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.newBuilder() - .setId(58L) + .setId("58L") .setUrl("TestUrl")) .build(); private static final BeamFnApi.Coder LONG_CODER_SPEC; @@ -141,10 +142,10 @@ public class ProcessBundleHandlerTest { public void testOrderOfStartAndFinishCalls() throws Exception { BeamFnApi.ProcessBundleDescriptor processBundleDescriptor = BeamFnApi.ProcessBundleDescriptor.newBuilder() - .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(2L)) - .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(3L)) + .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L")) + .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L")) .build(); - Map<Long, Message> fnApiRegistry = ImmutableMap.of(1L, processBundleDescriptor); + Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor); List<BeamFnApi.PrimitiveTransform> transformsProcessed = new ArrayList<>(); List<String> orderOfOperations = new ArrayList<>(); @@ -156,7 +157,7 @@ public class ProcessBundleHandlerTest { @Override protected <InputT, OutputT> void createConsumersForPrimitiveTransform( BeamFnApi.PrimitiveTransform primitiveTransform, - Supplier<Long> processBundleInstructionId, + Supplier<String> processBundleInstructionId, Function<BeamFnApi.Target, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers, BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer, @@ -164,7 +165,7 @@ public class ProcessBundleHandlerTest { Consumer<ThrowingRunnable> addFinishFunction) throws IOException { - assertEquals((Long) 999L, processBundleInstructionId.get()); + assertThat(processBundleInstructionId.get(), equalTo("999L")); transformsProcessed.add(primitiveTransform); addStartFunction.accept( @@ -174,9 +175,9 @@ public class ProcessBundleHandlerTest { } }; handler.processBundle(BeamFnApi.InstructionRequest.newBuilder() - .setInstructionId(999L) + .setInstructionId("999L") .setProcessBundle( - BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(1L)) + BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L")) .build()); // Processing of primitive transforms is performed in reverse order. @@ -184,17 +185,17 @@ public class ProcessBundleHandlerTest { processBundleDescriptor.getPrimitiveTransform(1), processBundleDescriptor.getPrimitiveTransform(0))); // Start should occur in reverse order while finish calls should occur in forward order - assertThat(orderOfOperations, contains("Start3", "Start2", "Finish2", "Finish3")); + assertThat(orderOfOperations, contains("Start3L", "Start2L", "Finish2L", "Finish3L")); } @Test public void testCreatingPrimitiveTransformExceptionsArePropagated() throws Exception { BeamFnApi.ProcessBundleDescriptor processBundleDescriptor = BeamFnApi.ProcessBundleDescriptor.newBuilder() - .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(2L)) - .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(3L)) + .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L")) + .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L")) .build(); - Map<Long, Message> fnApiRegistry = ImmutableMap.of(1L, processBundleDescriptor); + Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor); ProcessBundleHandler handler = new ProcessBundleHandler( PipelineOptionsFactory.create(), @@ -203,7 +204,7 @@ public class ProcessBundleHandlerTest { @Override protected <InputT, OutputT> void createConsumersForPrimitiveTransform( BeamFnApi.PrimitiveTransform primitiveTransform, - Supplier<Long> processBundleInstructionId, + Supplier<String> processBundleInstructionId, Function<BeamFnApi.Target, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers, BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer, @@ -217,7 +218,7 @@ public class ProcessBundleHandlerTest { }; handler.processBundle( BeamFnApi.InstructionRequest.newBuilder().setProcessBundle( - BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(1L)) + BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L")) .build()); } @@ -225,10 +226,10 @@ public class ProcessBundleHandlerTest { public void testPrimitiveTransformStartExceptionsArePropagated() throws Exception { BeamFnApi.ProcessBundleDescriptor processBundleDescriptor = BeamFnApi.ProcessBundleDescriptor.newBuilder() - .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(2L)) - .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(3L)) + .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L")) + .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L")) .build(); - Map<Long, Message> fnApiRegistry = ImmutableMap.of(1L, processBundleDescriptor); + Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor); ProcessBundleHandler handler = new ProcessBundleHandler( PipelineOptionsFactory.create(), @@ -237,7 +238,7 @@ public class ProcessBundleHandlerTest { @Override protected <InputT, OutputT> void createConsumersForPrimitiveTransform( BeamFnApi.PrimitiveTransform primitiveTransform, - Supplier<Long> processBundleInstructionId, + Supplier<String> processBundleInstructionId, Function<BeamFnApi.Target, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers, BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer, @@ -255,7 +256,7 @@ public class ProcessBundleHandlerTest { }; handler.processBundle( BeamFnApi.InstructionRequest.newBuilder().setProcessBundle( - BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(1L)) + BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L")) .build()); } @@ -263,10 +264,10 @@ public class ProcessBundleHandlerTest { public void testPrimitiveTransformFinishExceptionsArePropagated() throws Exception { BeamFnApi.ProcessBundleDescriptor processBundleDescriptor = BeamFnApi.ProcessBundleDescriptor.newBuilder() - .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(2L)) - .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(3L)) + .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L")) + .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L")) .build(); - Map<Long, Message> fnApiRegistry = ImmutableMap.of(1L, processBundleDescriptor); + Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor); ProcessBundleHandler handler = new ProcessBundleHandler( PipelineOptionsFactory.create(), @@ -275,7 +276,7 @@ public class ProcessBundleHandlerTest { @Override protected <InputT, OutputT> void createConsumersForPrimitiveTransform( BeamFnApi.PrimitiveTransform primitiveTransform, - Supplier<Long> processBundleInstructionId, + Supplier<String> processBundleInstructionId, Function<BeamFnApi.Target, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers, BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer, @@ -293,7 +294,7 @@ public class ProcessBundleHandlerTest { }; handler.processBundle( BeamFnApi.InstructionRequest.newBuilder().setProcessBundle( - BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(1L)) + BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L")) .build()); } @@ -325,8 +326,8 @@ public class ProcessBundleHandlerTest { */ @Test public void testCreatingAndProcessingDoFn() throws Exception { - Map<Long, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC); - long primitiveTransformId = 100L; + Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC); + String primitiveTransformId = "100L"; long mainOutputId = 101L; long sideOutputId = 102L; @@ -340,22 +341,22 @@ public class ProcessBundleHandlerTest { mainOutputId, TestDoFn.mainOutput, sideOutputId, TestDoFn.sideOutput)); BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder() - .setId(1L) + .setId("1L") .setUrn(JAVA_DO_FN_URN) .setData(Any.pack(BytesValue.newBuilder() .setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo))) .build())) .build(); BeamFnApi.Target inputATarget1 = BeamFnApi.Target.newBuilder() - .setPrimitiveTransformReference(1000L) + .setPrimitiveTransformReference("1000L") .setName("inputATarget1") .build(); BeamFnApi.Target inputATarget2 = BeamFnApi.Target.newBuilder() - .setPrimitiveTransformReference(1001L) + .setPrimitiveTransformReference("1001L") .setName("inputATarget1") .build(); BeamFnApi.Target inputBTarget = BeamFnApi.Target.newBuilder() - .setPrimitiveTransformReference(1002L) + .setPrimitiveTransformReference("1002L") .setName("inputBTarget") .build(); BeamFnApi.PrimitiveTransform primitiveTransform = BeamFnApi.PrimitiveTransform.newBuilder() @@ -401,7 +402,7 @@ public class ProcessBundleHandlerTest { beamFnDataClient); handler.createConsumersForPrimitiveTransform( primitiveTransform, - Suppliers.ofInstance(57L)::get, + Suppliers.ofInstance("57L")::get, existingConsumers::get, newConsumers::put, startFunctions::add, @@ -435,12 +436,12 @@ public class ProcessBundleHandlerTest { @Test public void testCreatingAndProcessingSource() throws Exception { - Map<Long, Message> fnApiRegistry = ImmutableMap.of(LONG_CODER_SPEC_ID, LONG_CODER_SPEC); - long primitiveTransformId = 100L; + Map<String, Message> fnApiRegistry = ImmutableMap.of(LONG_CODER_SPEC_ID, LONG_CODER_SPEC); + String primitiveTransformId = "100L"; long outputId = 101L; BeamFnApi.Target inputTarget = BeamFnApi.Target.newBuilder() - .setPrimitiveTransformReference(1000L) + .setPrimitiveTransformReference("1000L") .setName("inputTarget") .build(); @@ -459,7 +460,7 @@ public class ProcessBundleHandlerTest { List<ThrowingRunnable> finishFunctions = new ArrayList<>(); BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder() - .setId(1L) + .setId("1L") .setUrn(JAVA_SOURCE_URN) .setData(Any.pack(BytesValue.newBuilder() .setValue(ByteString.copyFrom( @@ -483,7 +484,7 @@ public class ProcessBundleHandlerTest { handler.createConsumersForPrimitiveTransform( primitiveTransform, - Suppliers.ofInstance(57L)::get, + Suppliers.ofInstance("57L")::get, existingConsumers::get, newConsumers::put, startFunctions::add, @@ -511,9 +512,9 @@ public class ProcessBundleHandlerTest { @Test public void testCreatingAndProcessingBeamFnDataReadRunner() throws Exception { - Map<Long, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC); - long bundleId = 57L; - long primitiveTransformId = 100L; + Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC); + String bundleId = "57L"; + String primitiveTransformId = "100L"; long outputId = 101L; List<WindowedValue<String>> outputValues = new ArrayList<>(); @@ -530,7 +531,7 @@ public class ProcessBundleHandlerTest { List<ThrowingRunnable> finishFunctions = new ArrayList<>(); BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder() - .setId(1L) + .setId("1L") .setUrn(DATA_INPUT_URN) .setData(Any.pack(REMOTE_PORT)) .build(); @@ -585,13 +586,13 @@ public class ProcessBundleHandlerTest { @Test public void testCreatingAndProcessingBeamFnDataWriteRunner() throws Exception { - Map<Long, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC); - long bundleId = 57L; - long primitiveTransformId = 100L; + Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC); + String bundleId = "57L"; + String primitiveTransformId = "100L"; long outputId = 101L; BeamFnApi.Target inputTarget = BeamFnApi.Target.newBuilder() - .setPrimitiveTransformReference(1000L) + .setPrimitiveTransformReference("1000L") .setName("inputTarget") .build(); @@ -603,7 +604,7 @@ public class ProcessBundleHandlerTest { List<ThrowingRunnable> finishFunctions = new ArrayList<>(); BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder() - .setId(1L) + .setId("1L") .setUrn(DATA_OUTPUT_URN) .setData(Any.pack(REMOTE_PORT)) .build(); http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java index 7b07a08..c32fcc4 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java @@ -39,14 +39,14 @@ public class RegisterHandlerTest { private static final BeamFnApi.InstructionRequest REGISTER_REQUEST = BeamFnApi.InstructionRequest.newBuilder() - .setInstructionId(1L) + .setInstructionId("1L") .setRegister(BeamFnApi.RegisterRequest.newBuilder() - .addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(1L) + .addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("1L") .addCoders(BeamFnApi.Coder.newBuilder().setFunctionSpec( - BeamFnApi.FunctionSpec.newBuilder().setId(10L)).build())) - .addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(2L) + BeamFnApi.FunctionSpec.newBuilder().setId("10L")).build())) + .addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("2L") .addCoders(BeamFnApi.Coder.newBuilder().setFunctionSpec( - BeamFnApi.FunctionSpec.newBuilder().setId(20L)).build())) + BeamFnApi.FunctionSpec.newBuilder().setId("20L")).build())) .build()) .build(); private static final BeamFnApi.InstructionResponse REGISTER_RESPONSE = @@ -68,13 +68,13 @@ public class RegisterHandlerTest { } }); assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(0), - handler.getById(1L)); + handler.getById("1L")); assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1), - handler.getById(2L)); + handler.getById("2L")); assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(0).getCoders(0), - handler.getById(10L)); + handler.getById("10L")); assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1).getCoders(0), - handler.getById(20L)); + handler.getById("20L")); assertEquals(REGISTER_RESPONSE, responseFuture.get()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java index 64a0e11..7cbf8eb 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java @@ -46,8 +46,13 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class BeamFnDataBufferingOutboundObserverTest { private static final int DEFAULT_BUFFER_LIMIT = 1_000_000; - private static final KV<Long, BeamFnApi.Target> OUTPUT_LOCATION = KV.of(777L, - BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(555L).setName("Test").build()); + private static final KV<String, BeamFnApi.Target> OUTPUT_LOCATION = + KV.of( + "777L", + BeamFnApi.Target.newBuilder() + .setPrimitiveTransformReference("555L") + .setName("Test") + .build()); private static final Coder<WindowedValue<byte[]>> CODER = LengthPrefixCoder.of(WindowedValue.getValueOnlyCoder(ByteArrayCoder.of())); http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java index 20566ea..31eb0db 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java @@ -66,12 +66,21 @@ public class BeamFnDataGrpcClientTest { LengthPrefixCoder.of( WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)); - private static final KV<Long, BeamFnApi.Target> KEY_A = KV.of( - 12L, - BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(34L).setName("targetA").build()); - private static final KV<Long, BeamFnApi.Target> KEY_B = KV.of( - 56L, - BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(78L).setName("targetB").build()); + private static final KV<String, BeamFnApi.Target> KEY_A = + KV.of( + "12L", + BeamFnApi.Target.newBuilder() + .setPrimitiveTransformReference("34L") + .setName("targetA") + .build()); + + private static final KV<String, BeamFnApi.Target> KEY_B = + KV.of( + "56L", + BeamFnApi.Target.newBuilder() + .setPrimitiveTransformReference("78L") + .setName("targetB") + .build()); private static final BeamFnApi.Elements ELEMENTS_A_1; private static final BeamFnApi.Elements ELEMENTS_A_2; http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java index 38d9e2c..a9095ae 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java @@ -39,11 +39,13 @@ import org.junit.Test; public class BeamFnDataGrpcMultiplexerTest { private static final BeamFnApi.ApiServiceDescriptor DESCRIPTOR = BeamFnApi.ApiServiceDescriptor.newBuilder().setUrl("test").build(); - private static final KV<Long, BeamFnApi.Target> OUTPUT_LOCATION = KV.of(777L, - BeamFnApi.Target.newBuilder() - .setName("name") - .setPrimitiveTransformReference(888L) - .build()); + private static final KV<String, BeamFnApi.Target> OUTPUT_LOCATION = + KV.of( + "777L", + BeamFnApi.Target.newBuilder() + .setName("name") + .setPrimitiveTransformReference("888L") + .build()); private static final BeamFnApi.Elements ELEMENTS = BeamFnApi.Elements.newBuilder() .addData(BeamFnApi.Elements.Data.newBuilder() .setInstructionReference(OUTPUT_LOCATION.getKey()) http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java index ff0e083..c53f99d 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java @@ -102,9 +102,9 @@ public class BeamFnDataInboundObserverTest { private BeamFnApi.Elements.Data dataWith(String ... values) throws Exception { BeamFnApi.Elements.Data.Builder builder = BeamFnApi.Elements.Data.newBuilder() - .setInstructionReference(777L) + .setInstructionReference("777L") .setTarget(BeamFnApi.Target.newBuilder() - .setPrimitiveTransformReference(999L) + .setPrimitiveTransformReference("999L") .setName("Test")); ByteString.Output output = ByteString.newOutput(); for (String value : values) { http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java index 511cc3f..0cc5ef9 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java @@ -42,7 +42,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.fn.harness.data.BeamFnDataClient; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.test.TestExecutors; @@ -85,7 +85,7 @@ public class BeamFnDataReadRunnerTest { } } private static final BeamFnApi.Target INPUT_TARGET = BeamFnApi.Target.newBuilder() - .setPrimitiveTransformReference(1) + .setPrimitiveTransformReference("1") .setName("out") .build(); @@ -112,7 +112,7 @@ public class BeamFnDataReadRunnerTest { Map<String, Collection<ThrowingConsumer<WindowedValue<String>>>> outputMap = ImmutableMap.of( "outA", ImmutableList.of(valuesA::add), "outB", ImmutableList.of(valuesB::add)); - AtomicLong bundleId = new AtomicLong(0); + AtomicReference<String> bundleId = new AtomicReference<>("0"); BeamFnDataReadRunner<String> readRunner = new BeamFnDataReadRunner<>( FUNCTION_SPEC, bundleId::get, @@ -151,7 +151,7 @@ public class BeamFnDataReadRunnerTest { assertThat(valuesB, contains(valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"))); // Process for bundle id 1 - bundleId.incrementAndGet(); + bundleId.set("1"); valuesA.clear(); valuesB.clear(); readRunner.registerInputLocation(); http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java index ed67b14..378567a 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java @@ -34,7 +34,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.fn.harness.data.BeamFnDataClient; import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; import org.apache.beam.fn.v1.BeamFnApi; @@ -73,7 +73,7 @@ public class BeamFnDataWriteRunnerTest { } } private static final BeamFnApi.Target OUTPUT_TARGET = BeamFnApi.Target.newBuilder() - .setPrimitiveTransformReference(1) + .setPrimitiveTransformReference("1") .setName("out") .build(); @@ -92,7 +92,7 @@ public class BeamFnDataWriteRunnerTest { any(), any(), Matchers.<Coder<WindowedValue<String>>>any())).thenReturn(valuesA).thenReturn(valuesB); - AtomicLong bundleId = new AtomicLong(0); + AtomicReference<String> bundleId = new AtomicReference<>("0"); BeamFnDataWriteRunner<String> writeRunner = new BeamFnDataWriteRunner<>( FUNCTION_SPEC, bundleId::get, @@ -116,7 +116,7 @@ public class BeamFnDataWriteRunnerTest { assertThat(valuesA, contains(valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"))); // Process for bundle id 1 - bundleId.incrementAndGet(); + bundleId.set("1"); valuesA.clear(); valuesB.clear(); writeRunner.registerForOutput();