[BEAM-1871] Move GCP specific serialization CloudObject and supporting translation code to Dataflow runner module
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a5627b1a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a5627b1a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a5627b1a Branch: refs/heads/DSL_SQL Commit: a5627b1a64696d7526bc5aeec5a0b51571fb5ef1 Parents: 320f9af Author: Luke Cwik <lc...@google.com> Authored: Wed May 3 10:03:45 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Wed May 3 12:46:17 2017 -0700 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 2 +- .../dataflow/DataflowPipelineTranslator.java | 18 +- .../beam/runners/dataflow/DataflowRunner.java | 2 +- .../beam/runners/dataflow/ReadTranslator.java | 8 +- .../dataflow/internal/CustomSources.java | 6 +- .../util/AvroCoderCloudObjectTranslator.java | 2 - .../beam/runners/dataflow/util/CloudObject.java | 1 - .../runners/dataflow/util/CloudObjectKinds.java | 2 - .../dataflow/util/CloudObjectTranslator.java | 2 - .../dataflow/util/CloudObjectTranslators.java | 23 +- .../runners/dataflow/util/CloudObjects.java | 1 - .../CoderCloudObjectTranslatorRegistrar.java | 1 - .../runners/dataflow/util/PropertyNames.java | 112 ++++++ .../SerializableCoderCloudObjectTranslator.java | 2 - .../beam/runners/dataflow/util/Serializer.java | 262 ------------- .../beam/runners/dataflow/util/Structs.java | 372 +++++++++++++++++++ .../DataflowPipelineTranslatorTest.java | 10 +- .../runners/dataflow/util/CloudObjectsTest.java | 1 - .../beam/runners/dataflow/util/StructsTest.java | 206 ++++++++++ .../apache/beam/sdk/coders/CollectionCoder.java | 13 - .../apache/beam/sdk/coders/IterableCoder.java | 13 - .../org/apache/beam/sdk/coders/KvCoder.java | 13 - .../beam/sdk/coders/LengthPrefixCoder.java | 13 - .../org/apache/beam/sdk/coders/ListCoder.java | 13 - .../org/apache/beam/sdk/coders/SetCoder.java | 16 - .../apache/beam/sdk/util/CloudKnownType.java | 143 ------- .../org/apache/beam/sdk/util/CloudObject.java | 187 ---------- .../org/apache/beam/sdk/util/CoderUtils.java | 117 ------ .../org/apache/beam/sdk/util/PropertyNames.java | 112 ------ .../org/apache/beam/sdk/util/Serializer.java | 147 -------- .../java/org/apache/beam/sdk/util/Structs.java | 371 ------------------ .../java/org/apache/beam/sdk/util/Values.java | 88 ----- .../org/apache/beam/sdk/util/WindowedValue.java | 22 -- .../org/apache/beam/sdk/values/TupleTag.java | 22 -- .../apache/beam/sdk/util/SerializerTest.java | 162 -------- .../org/apache/beam/sdk/util/StructsTest.java | 206 ---------- 36 files changed, 729 insertions(+), 1962 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index bbad156..30ef84d 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -33,7 +33,7 @@ <packaging>jar</packaging> <properties> - <dataflow.container_version>beam-master-20170502</dataflow.container_version> + <dataflow.container_version>beam-master-20170503</dataflow.container_version> <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version> <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version> </properties> http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 28a9c1c..05edd28 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -21,15 +21,15 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; +import static org.apache.beam.runners.dataflow.util.Structs.addBoolean; +import static org.apache.beam.runners.dataflow.util.Structs.addDictionary; +import static org.apache.beam.runners.dataflow.util.Structs.addList; +import static org.apache.beam.runners.dataflow.util.Structs.addLong; +import static org.apache.beam.runners.dataflow.util.Structs.addObject; +import static org.apache.beam.runners.dataflow.util.Structs.addString; +import static org.apache.beam.runners.dataflow.util.Structs.getString; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; -import static org.apache.beam.sdk.util.Structs.addBoolean; -import static org.apache.beam.sdk.util.Structs.addDictionary; -import static org.apache.beam.sdk.util.Structs.addList; -import static org.apache.beam.sdk.util.Structs.addLong; -import static org.apache.beam.sdk.util.Structs.addObject; -import static org.apache.beam.sdk.util.Structs.addString; -import static org.apache.beam.sdk.util.Structs.getString; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -62,9 +62,11 @@ import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle; import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.runners.dataflow.util.OutputReference; +import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.coders.Coder; @@ -86,8 +88,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.AppliedCombineFn; -import org.apache.beam.sdk.util.CloudObject; -import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 6aaa11b..7da1755 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -73,6 +73,7 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.DataflowTemplateJob; import org.apache.beam.runners.dataflow.util.DataflowTransport; import org.apache.beam.runners.dataflow.util.MonitoringUtil; +import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.PipelineResult.State; @@ -115,7 +116,6 @@ import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.NameUtils; import org.apache.beam.sdk.util.PathValidator; -import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.ValueWithRecordId; http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java index bc68511..c304c32 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java @@ -17,20 +17,20 @@ */ package org.apache.beam.runners.dataflow; -import static org.apache.beam.sdk.util.Structs.addBoolean; -import static org.apache.beam.sdk.util.Structs.addDictionary; -import static org.apache.beam.sdk.util.Structs.addLong; +import static org.apache.beam.runners.dataflow.util.Structs.addBoolean; +import static org.apache.beam.runners.dataflow.util.Structs.addDictionary; +import static org.apache.beam.runners.dataflow.util.Structs.addLong; import com.google.api.services.dataflow.model.SourceMetadata; import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.dataflow.internal.CustomSources; +import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.sdk.io.FileBasedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.values.PValue; /** http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java index 778ccf3..0d93566 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java @@ -19,9 +19,9 @@ package org.apache.beam.runners.dataflow.internal; import static com.google.api.client.util.Base64.encodeBase64String; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.runners.dataflow.util.Structs.addString; +import static org.apache.beam.runners.dataflow.util.Structs.addStringList; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; -import static org.apache.beam.sdk.util.Structs.addString; -import static org.apache.beam.sdk.util.Structs.addStringList; import com.google.api.services.dataflow.model.SourceMetadata; import com.google.common.annotations.VisibleForTesting; @@ -29,11 +29,11 @@ import com.google.protobuf.ByteString; import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.CloudObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java index 444849d..c4d807e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java @@ -20,8 +20,6 @@ package org.apache.beam.runners.dataflow.util; import org.apache.avro.Schema; import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.util.CloudObject; -import org.apache.beam.sdk.util.Structs; /** A {@link CloudObjectTranslator} for {@link AvroCoder}. */ class AvroCoderCloudObjectTranslator implements CloudObjectTranslator<AvroCoder> { http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java index e4dd9be..b3680e9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java @@ -24,7 +24,6 @@ import com.google.api.client.json.GenericJson; import com.google.api.client.util.Key; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.sdk.util.PropertyNames; /** * A representation of an arbitrary Java object to be instantiated by Dataflow http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java index 1499f17..403ade2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java @@ -18,8 +18,6 @@ package org.apache.beam.runners.dataflow.util; -import org.apache.beam.sdk.util.CloudObject; - /** * Known kinds of {@link CloudObject}. */ http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java index 534370f..775495b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java @@ -18,8 +18,6 @@ package org.apache.beam.runners.dataflow.util; -import org.apache.beam.sdk.util.CloudObject; - /** * A translator that takes an object and creates a {@link CloudObject} which can be converted back * to the original object. http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java index f3e3312..012a669 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java @@ -38,14 +38,12 @@ import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder; import org.apache.beam.sdk.transforms.join.CoGbkResultSchema; import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; -import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.InstanceBuilder; -import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.StringUtils; -import org.apache.beam.sdk.util.Structs; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.values.TupleTag; @@ -94,7 +92,9 @@ class CloudObjectTranslators { @Override public KvCoder fromCloudObject(CloudObject object) { - return KvCoder.of(getComponents(object)); + List<Coder<?>> components = getComponents(object); + checkArgument(components.size() == 2, "Expecting 2 components, got %s", components.size()); + return KvCoder.of(components.get(0), components.get(1)); } @Override @@ -125,7 +125,9 @@ class CloudObjectTranslators { @Override public IterableCoder fromCloudObject(CloudObject object) { - return IterableCoder.of(getComponents(object)); + List<Coder<?>> components = getComponents(object); + checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size()); + return IterableCoder.of(components.get(0)); } @Override @@ -155,7 +157,9 @@ class CloudObjectTranslators { @Override public LengthPrefixCoder fromCloudObject(CloudObject object) { - return LengthPrefixCoder.of(getComponents(object)); + List<Coder<?>> components = getComponents(object); + checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size()); + return LengthPrefixCoder.of(components.get(0)); } @Override @@ -246,7 +250,12 @@ class CloudObjectTranslators { @Override public FullWindowedValueCoder fromCloudObject(CloudObject object) { - return FullWindowedValueCoder.of(getComponents(object)); + List<Coder<?>> components = getComponents(object); + checkArgument(components.size() == 2, + "Expecting 2 components, got " + components.size()); + @SuppressWarnings("unchecked") + Coder<? extends BoundedWindow> window = (Coder<? extends BoundedWindow>) components.get(1); + return FullWindowedValueCoder.of(components.get(0), window); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java index 9383c48..42c7012 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.ServiceLoader; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.util.CloudObject; /** Utilities for converting an object to a {@link CloudObject}. */ public class CloudObjects { http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java index 446eb3b..928e629 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java @@ -22,7 +22,6 @@ import com.google.auto.service.AutoService; import java.util.Map; import java.util.ServiceLoader; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.util.CloudObject; /** * {@link Coder} authors have the ability to automatically have their {@link Coder} registered with http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java new file mode 100644 index 0000000..c8c9903 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.util; + +/** + * Constant property names used by the SDK in CloudWorkflow specifications. + */ +public class PropertyNames { + public static final String ALLOWED_ENCODINGS = "allowed_encodings"; + public static final String APPEND_TRAILING_NEWLINES = "append_trailing_newlines"; + public static final String BIGQUERY_CREATE_DISPOSITION = "create_disposition"; + public static final String BIGQUERY_DATASET = "dataset"; + public static final String BIGQUERY_PROJECT = "project"; + public static final String BIGQUERY_SCHEMA = "schema"; + public static final String BIGQUERY_TABLE = "table"; + public static final String BIGQUERY_QUERY = "bigquery_query"; + public static final String BIGQUERY_FLATTEN_RESULTS = "bigquery_flatten_results"; + public static final String BIGQUERY_USE_LEGACY_SQL = "bigquery_use_legacy_sql"; + public static final String BIGQUERY_WRITE_DISPOSITION = "write_disposition"; + public static final String BIGQUERY_EXPORT_FORMAT = "bigquery_export_format"; + public static final String BIGQUERY_EXPORT_SCHEMA = "bigquery_export_schema"; + public static final String CO_GBK_RESULT_SCHEMA = "co_gbk_result_schema"; + public static final String COMBINE_FN = "combine_fn"; + public static final String COMPONENT_ENCODINGS = "component_encodings"; + public static final String COMPRESSION_TYPE = "compression_type"; + public static final String CUSTOM_SOURCE_FORMAT = "custom_source"; + public static final String CONCAT_SOURCE_SOURCES = "sources"; + public static final String CONCAT_SOURCE_BASE_SPECS = "base_specs"; + public static final String SOURCE_STEP_INPUT = "custom_source_step_input"; + public static final String SOURCE_SPEC = "spec"; + public static final String SOURCE_METADATA = "metadata"; + public static final String SOURCE_DOES_NOT_NEED_SPLITTING = "does_not_need_splitting"; + public static final String SOURCE_PRODUCES_SORTED_KEYS = "produces_sorted_keys"; + public static final String SOURCE_IS_INFINITE = "is_infinite"; + public static final String SOURCE_ESTIMATED_SIZE_BYTES = "estimated_size_bytes"; + public static final String ELEMENT = "element"; + public static final String ELEMENTS = "elements"; + public static final String ENCODING = "encoding"; + public static final String ENCODING_ID = "encoding_id"; + public static final String END_INDEX = "end_index"; + public static final String END_OFFSET = "end_offset"; + public static final String END_SHUFFLE_POSITION = "end_shuffle_position"; + public static final String ENVIRONMENT_VERSION_JOB_TYPE_KEY = "job_type"; + public static final String ENVIRONMENT_VERSION_MAJOR_KEY = "major"; + public static final String FILENAME = "filename"; + public static final String FILENAME_PREFIX = "filename_prefix"; + public static final String FILENAME_SUFFIX = "filename_suffix"; + public static final String FILEPATTERN = "filepattern"; + public static final String FOOTER = "footer"; + public static final String FORMAT = "format"; + public static final String HEADER = "header"; + public static final String INPUTS = "inputs"; + public static final String INPUT_CODER = "input_coder"; + public static final String IS_GENERATED = "is_generated"; + public static final String IS_MERGING_WINDOW_FN = "is_merging_window_fn"; + public static final String IS_PAIR_LIKE = "is_pair_like"; + public static final String IS_STREAM_LIKE = "is_stream_like"; + public static final String IS_WRAPPER = "is_wrapper"; + public static final String DISALLOW_COMBINER_LIFTING = "disallow_combiner_lifting"; + public static final String NON_PARALLEL_INPUTS = "non_parallel_inputs"; + public static final String NUM_SHARD_CODERS = "num_shard_coders"; + public static final String NUM_METADATA_SHARD_CODERS = "num_metadata_shard_coders"; + public static final String NUM_SHARDS = "num_shards"; + public static final String OBJECT_TYPE_NAME = "@type"; + public static final String OUTPUT = "output"; + public static final String OUTPUT_INFO = "output_info"; + public static final String OUTPUT_NAME = "output_name"; + public static final String PARALLEL_INPUT = "parallel_input"; + public static final String PHASE = "phase"; + public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label"; + public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = "pubsub_serialized_attributes_fn"; + public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription"; + public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override"; + public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = "pubsub_timestamp_label"; + public static final String PUBSUB_TOPIC = "pubsub_topic"; + public static final String PUBSUB_TOPIC_OVERRIDE = "pubsub_topic_runtime_override"; + public static final String SCALAR_FIELD_NAME = "value"; + public static final String SERIALIZED_FN = "serialized_fn"; + public static final String SHARD_NAME_TEMPLATE = "shard_template"; + public static final String SHUFFLE_KIND = "shuffle_kind"; + public static final String SHUFFLE_READER_CONFIG = "shuffle_reader_config"; + public static final String SHUFFLE_WRITER_CONFIG = "shuffle_writer_config"; + public static final String SORT_VALUES = "sort_values"; + public static final String START_INDEX = "start_index"; + public static final String START_OFFSET = "start_offset"; + public static final String START_SHUFFLE_POSITION = "start_shuffle_position"; + public static final String STRIP_TRAILING_NEWLINES = "strip_trailing_newlines"; + public static final String TUPLE_TAGS = "tuple_tags"; + public static final String USE_INDEXED_FORMAT = "use_indexed_format"; + public static final String USER_FN = "user_fn"; + public static final String USER_NAME = "user_name"; + public static final String USES_KEYED_STATE = "uses_keyed_state"; + public static final String VALIDATE_SINK = "validate_sink"; + public static final String VALIDATE_SOURCE = "validate_source"; + public static final String VALUE = "value"; + public static final String DISPLAY_DATA = "display_data"; +} http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java index 67c021c..dcc311e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java @@ -22,8 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument; import java.io.Serializable; import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.util.CloudObject; -import org.apache.beam.sdk.util.Structs; /** A {@link CloudObjectTranslator} for {@link SerializableCoder}. */ class SerializableCoderCloudObjectTranslator implements CloudObjectTranslator<SerializableCoder> { http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java deleted file mode 100644 index e2bcafe..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.util; - -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.fasterxml.jackson.annotation.JsonTypeInfo.As; -import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DatabindContext; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; -import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.fasterxml.jackson.databind.type.TypeFactory; -import com.google.common.collect.ImmutableMap; -import java.lang.reflect.TypeVariable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.LengthPrefixCoder; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.util.PropertyNames; -import org.apache.beam.sdk.util.WindowedValue; - -/** - * Utility for converting objects between Java and Cloud representations. - * - * @deprecated Will no longer be used once all coders are converted via {@link CloudObjects}. - */ -@Deprecated -public final class Serializer { - /** A mapping from well known coder types to their implementing classes. */ - private static final Map<String, Class<?>> WELL_KNOWN_CODER_TYPES = - ImmutableMap.<String, Class<?>>builder() - .put("kind:pair", KvCoder.class) - .put("kind:stream", IterableCoder.class) - .put("kind:global_window", GlobalWindow.Coder.class) - .put("kind:interval_window", IntervalWindow.IntervalWindowCoder.class) - .put("kind:length_prefix", LengthPrefixCoder.class) - .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class) - .build(); - - // Delay initialization of statics until the first call to Serializer. - private static class SingletonHelper { - static final ObjectMapper OBJECT_MAPPER = createObjectMapper(); - static final ObjectMapper TREE_MAPPER = createTreeMapper(); - - /** - * Creates the object mapper that will be used for serializing Google API - * client maps into Jackson trees. - */ - private static ObjectMapper createTreeMapper() { - return new ObjectMapper(); - } - - /** - * Creates the object mapper that will be used for deserializing Jackson - * trees into objects. - */ - private static ObjectMapper createObjectMapper() { - ObjectMapper m = new ObjectMapper(); - // Ignore properties that are not used by the object. - m.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - - // For parameters of type Object, use the @type property to determine the - // class to instantiate. - // - // TODO: It would be ideal to do this for all non-final classes. The - // problem with using DefaultTyping.NON_FINAL is that it insists on having - // type information in the JSON for classes with useful default - // implementations, such as List. Ideally, we'd combine these defaults - // with available type information if that information's present. - m.enableDefaultTypingAsProperty( - ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, - PropertyNames.OBJECT_TYPE_NAME); - - m.registerModule(new Jackson2Module()); - - return m; - } - } - - /** - * Deserializes an object from a Dataflow structured encoding (represented in - * Java as a map). - * - * <p>The standard Dataflow SDK object serialization protocol is based on JSON. - * Data is typically encoded as a JSON object whose fields represent the - * object's data. - * - * <p>The actual deserialization is performed by Jackson, which can deserialize - * public fields, use JavaBean setters, or use injection annotations to - * indicate how to construct the object. The {@link ObjectMapper} used is - * configured to use the "@type" field as the name of the class to instantiate - * (supporting polymorphic types), and may be further configured by - * annotations or via {@link ObjectMapper#registerModule}. - * - * @see <a href="http://wiki.fasterxml.com/JacksonFAQ#Data_Binding.2C_general"> - * Jackson Data-Binding</a> - * @see <a href="https://github.com/FasterXML/jackson-annotations/wiki/Jackson-Annotations"> - * Jackson-Annotations</a> - * @param serialized the object in untyped decoded form (i.e. a nested {@link Map}) - * @param clazz the expected object class - */ - public static <T> T deserialize(Map<String, Object> serialized, Class<T> clazz) { - try { - return SingletonHelper.OBJECT_MAPPER.treeToValue( - SingletonHelper.TREE_MAPPER.valueToTree( - deserializeCloudKnownTypes(serialized)), - clazz); - } catch (JsonProcessingException e) { - throw new RuntimeException( - "Unable to deserialize class " + clazz, e); - } - } - - /** - * Recursively walks the supplied map, looking for well-known cloud type information (keyed as - * {@link PropertyNames#OBJECT_TYPE_NAME}, matching a URI value from the {@link CloudKnownType} - * enum. Upon finding this type information, it converts it into the correspondingly typed Java - * value. - */ - @SuppressWarnings("unchecked") - private static Object deserializeCloudKnownTypes(Object src) { - if (src instanceof Map) { - Map<String, Object> srcMap = (Map<String, Object>) src; - @Nullable Object value = srcMap.get(PropertyNames.SCALAR_FIELD_NAME); - @Nullable CloudKnownType type = - CloudKnownType.forUri((String) srcMap.get(PropertyNames.OBJECT_TYPE_NAME)); - if (type != null && value != null) { - // It's a value of a well-known cloud type; let the known type handler - // handle the translation. - Object result = type.parse(value, type.defaultClass()); - return result; - } - // Otherwise, it's just an ordinary map. - Map<String, Object> dest = new HashMap<>(srcMap.size()); - for (Map.Entry<String, Object> entry : srcMap.entrySet()) { - dest.put(entry.getKey(), deserializeCloudKnownTypes(entry.getValue())); - } - return dest; - } - if (src instanceof List) { - List<Object> srcList = (List<Object>) src; - List<Object> dest = new ArrayList<>(srcList.size()); - for (Object obj : srcList) { - dest.add(deserializeCloudKnownTypes(obj)); - } - return dest; - } - // Neither a Map nor a List; no translation needed. - return src; - } - - /** - * A {@link com.fasterxml.jackson.databind.Module} that adds the type - * resolver needed for Coder definitions. - */ - static final class Jackson2Module extends SimpleModule { - /** - * The Coder custom type resolver. - * - * <p>This resolver resolves coders. If the Coder ID is a particular - * well-known identifier, it's replaced with the corresponding class. - * All other Coder instances are resolved by class name, using the package - * org.apache.beam.sdk.coders if there are no "."s in the ID. - */ - private static final class Resolver extends TypeIdResolverBase { - @SuppressWarnings("unused") // Used via @JsonTypeIdResolver annotation on Mixin - public Resolver() { - super(TypeFactory.defaultInstance().constructType(Coder.class), - TypeFactory.defaultInstance()); - } - - @Override - public JavaType typeFromId(DatabindContext context, String id) { - Class<?> clazz = getClassForId(id); - @SuppressWarnings("rawtypes") - TypeVariable[] tvs = clazz.getTypeParameters(); - JavaType[] types = new JavaType[tvs.length]; - for (int lupe = 0; lupe < tvs.length; lupe++) { - types[lupe] = TypeFactory.unknownType(); - } - return _typeFactory.constructSimpleType(clazz, types); - } - - private Class<?> getClassForId(String id) { - try { - if (id.contains(".")) { - return Class.forName(id); - } - - if (WELL_KNOWN_CODER_TYPES.containsKey(id)) { - return WELL_KNOWN_CODER_TYPES.get(id); - } - - // Otherwise, see if the ID is the name of a class in - // org.apache.beam.sdk.coders. We do this via creating - // the class object so that class loaders have a chance to get - // involved -- and since we need the class object anyway. - return Class.forName(Coder.class.getPackage().getName() + "." + id); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Unable to convert coder ID " + id + " to class", e); - } - } - - @Override - public String idFromValueAndType(Object o, Class<?> clazz) { - return clazz.getName(); - } - - @Override - public String idFromValue(Object o) { - return o.getClass().getName(); - } - - @Override - public JsonTypeInfo.Id getMechanism() { - return JsonTypeInfo.Id.CUSTOM; - } - } - - /** - * The mixin class defining how Coders are handled by the deserialization - * {@link ObjectMapper}. - * - * <p>This is done via a mixin so that this resolver is <i>only</i> used - * during deserialization requested by the Apache Beam SDK. - */ - @JsonTypeIdResolver(Resolver.class) - @JsonTypeInfo(use = Id.CUSTOM, include = As.PROPERTY, property = PropertyNames.OBJECT_TYPE_NAME) - private static final class Mixin {} - - public Jackson2Module() { - super("BeamCoders"); - setMixInAnnotation(Coder.class, Mixin.class); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Structs.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Structs.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Structs.java new file mode 100644 index 0000000..1929f3b --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Structs.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.util; + +import com.google.api.client.util.Data; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.util.StringUtils; + +/** + * A collection of static methods for manipulating datastructure representations transferred via the + * Dataflow API. + */ +public final class Structs { + private Structs() {} // Non-instantiable + + public static String getString(Map<String, Object> map, String name) { + return getValue(map, name, String.class, "a string"); + } + + public static String getString( + Map<String, Object> map, String name, @Nullable String defaultValue) { + return getValue(map, name, String.class, "a string", defaultValue); + } + + public static byte[] getBytes(Map<String, Object> map, String name) { + @Nullable byte[] result = getBytes(map, name, null); + if (result == null) { + throw new ParameterNotFoundException(name, map); + } + return result; + } + + @Nullable + public static byte[] getBytes( + Map<String, Object> map, String name, @Nullable byte[] defaultValue) { + @Nullable String jsonString = getString(map, name, null); + if (jsonString == null) { + return defaultValue; + } + // TODO: Need to agree on a format for encoding bytes in + // a string that can be sent to the backend, over the cloud + // map task work API. base64 encoding seems pretty common. Switch to it? + return StringUtils.jsonStringToByteArray(jsonString); + } + + public static Boolean getBoolean(Map<String, Object> map, String name) { + return getValue(map, name, Boolean.class, "a boolean"); + } + + @Nullable + public static Boolean getBoolean( + Map<String, Object> map, String name, @Nullable Boolean defaultValue) { + return getValue(map, name, Boolean.class, "a boolean", defaultValue); + } + + public static Long getLong(Map<String, Object> map, String name) { + return getValue(map, name, Long.class, "a long"); + } + + @Nullable + public static Long getLong(Map<String, Object> map, String name, @Nullable Long defaultValue) { + return getValue(map, name, Long.class, "a long", defaultValue); + } + + public static Integer getInt(Map<String, Object> map, String name) { + return getValue(map, name, Integer.class, "an int"); + } + + @Nullable + public static Integer getInt( + Map<String, Object> map, String name, @Nullable Integer defaultValue) { + return getValue(map, name, Integer.class, "an int", defaultValue); + } + + @Nullable + public static List<String> getStrings( + Map<String, Object> map, String name, @Nullable List<String> defaultValue) { + @Nullable Object value = map.get(name); + if (value == null) { + if (map.containsKey(name)) { + throw new IncorrectTypeException(name, map, "a string or a list"); + } + return defaultValue; + } + if (Data.isNull(value)) { + // This is a JSON literal null. When represented as a list of strings, + // this is an empty list. + return Collections.<String>emptyList(); + } + @Nullable String singletonString = decodeValue(value, String.class); + if (singletonString != null) { + return Collections.singletonList(singletonString); + } + if (!(value instanceof List)) { + throw new IncorrectTypeException(name, map, "a string or a list"); + } + @SuppressWarnings("unchecked") + List<Object> elements = (List<Object>) value; + List<String> result = new ArrayList<>(elements.size()); + for (Object o : elements) { + @Nullable String s = decodeValue(o, String.class); + if (s == null) { + throw new IncorrectTypeException(name, map, "a list of strings"); + } + result.add(s); + } + return result; + } + + public static Map<String, Object> getObject(Map<String, Object> map, String name) { + @Nullable Map<String, Object> result = getObject(map, name, null); + if (result == null) { + throw new ParameterNotFoundException(name, map); + } + return result; + } + + @Nullable + public static Map<String, Object> getObject( + Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue) { + @Nullable Object value = map.get(name); + if (value == null) { + if (map.containsKey(name)) { + throw new IncorrectTypeException(name, map, "an object"); + } + return defaultValue; + } + return checkObject(value, map, name); + } + + private static Map<String, Object> checkObject( + Object value, Map<String, Object> map, String name) { + if (Data.isNull(value)) { + // This is a JSON literal null. When represented as an object, this is an + // empty map. + return Collections.<String, Object>emptyMap(); + } + if (!(value instanceof Map)) { + throw new IncorrectTypeException(name, map, "an object (not a map)"); + } + @SuppressWarnings("unchecked") + Map<String, Object> mapValue = (Map<String, Object>) value; + if (!mapValue.containsKey(PropertyNames.OBJECT_TYPE_NAME)) { + throw new IncorrectTypeException( + name, map, "an object (no \"" + PropertyNames.OBJECT_TYPE_NAME + "\" field)"); + } + return mapValue; + } + + @Nullable + public static List<Map<String, Object>> getListOfMaps( + Map<String, Object> map, String name, @Nullable List<Map<String, Object>> defaultValue) { + @Nullable Object value = map.get(name); + if (value == null) { + if (map.containsKey(name)) { + throw new IncorrectTypeException(name, map, "a list"); + } + return defaultValue; + } + if (Data.isNull(value)) { + // This is a JSON literal null. When represented as a list, + // this is an empty list. + return Collections.<Map<String, Object>>emptyList(); + } + + if (!(value instanceof List)) { + throw new IncorrectTypeException(name, map, "a list"); + } + + List<?> elements = (List<?>) value; + for (Object elem : elements) { + if (!(elem instanceof Map)) { + throw new IncorrectTypeException(name, map, "a list of Map objects"); + } + } + + @SuppressWarnings("unchecked") + List<Map<String, Object>> result = (List<Map<String, Object>>) elements; + return result; + } + + public static Map<String, Object> getDictionary(Map<String, Object> map, String name) { + @Nullable Object value = map.get(name); + if (value == null) { + throw new ParameterNotFoundException(name, map); + } + if (Data.isNull(value)) { + // This is a JSON literal null. When represented as a dictionary, this is + // an empty map. + return Collections.<String, Object>emptyMap(); + } + if (!(value instanceof Map)) { + throw new IncorrectTypeException(name, map, "a dictionary"); + } + @SuppressWarnings("unchecked") + Map<String, Object> result = (Map<String, Object>) value; + return result; + } + + @Nullable + public static Map<String, Object> getDictionary( + Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue) { + @Nullable Object value = map.get(name); + if (value == null) { + if (map.containsKey(name)) { + throw new IncorrectTypeException(name, map, "a dictionary"); + } + return defaultValue; + } + if (Data.isNull(value)) { + // This is a JSON literal null. When represented as a dictionary, this is + // an empty map. + return Collections.<String, Object>emptyMap(); + } + if (!(value instanceof Map)) { + throw new IncorrectTypeException(name, map, "a dictionary"); + } + @SuppressWarnings("unchecked") + Map<String, Object> result = (Map<String, Object>) value; + return result; + } + + // Builder operations. + + public static void addString(Map<String, Object> map, String name, String value) { + addObject(map, name, CloudObject.forString(value)); + } + + public static void addBoolean(Map<String, Object> map, String name, boolean value) { + addObject(map, name, CloudObject.forBoolean(value)); + } + + public static void addLong(Map<String, Object> map, String name, long value) { + addObject(map, name, CloudObject.forInteger(value)); + } + + public static void addObject(Map<String, Object> map, String name, Map<String, Object> value) { + map.put(name, value); + } + + public static void addNull(Map<String, Object> map, String name) { + map.put(name, Data.nullOf(Object.class)); + } + + public static void addLongs(Map<String, Object> map, String name, long... longs) { + List<Map<String, Object>> elements = new ArrayList<>(longs.length); + for (Long value : longs) { + elements.add(CloudObject.forInteger(value)); + } + map.put(name, elements); + } + + public static void addList( + Map<String, Object> map, String name, List<? extends Map<String, Object>> elements) { + map.put(name, elements); + } + + public static void addStringList(Map<String, Object> map, String name, List<String> elements) { + ArrayList<CloudObject> objects = new ArrayList<>(elements.size()); + for (String element : elements) { + objects.add(CloudObject.forString(element)); + } + addList(map, name, objects); + } + + public static <T extends Map<String, Object>> void addList( + Map<String, Object> map, String name, T[] elements) { + map.put(name, Arrays.asList(elements)); + } + + public static void addDictionary( + Map<String, Object> map, String name, Map<String, Object> value) { + map.put(name, value); + } + + public static void addDouble(Map<String, Object> map, String name, Double value) { + addObject(map, name, CloudObject.forFloat(value)); + } + + // Helper methods for a few of the accessor methods. + + private static <T> T getValue(Map<String, Object> map, String name, Class<T> clazz, String type) { + @Nullable T result = getValue(map, name, clazz, type, null); + if (result == null) { + throw new ParameterNotFoundException(name, map); + } + return result; + } + + @Nullable + private static <T> T getValue( + Map<String, Object> map, String name, Class<T> clazz, String type, @Nullable T defaultValue) { + @Nullable Object value = map.get(name); + if (value == null) { + if (map.containsKey(name)) { + throw new IncorrectTypeException(name, map, type); + } + return defaultValue; + } + T result = decodeValue(value, clazz); + if (result == null) { + // The value exists, but can't be decoded. + throw new IncorrectTypeException(name, map, type); + } + return result; + } + + @Nullable + private static <T> T decodeValue(Object value, Class<T> clazz) { + try { + if (value.getClass() == clazz) { + // decodeValue() is only called for final classes; if the class matches, + // it's safe to just return the value, and if it doesn't match, decoding + // is needed. + return clazz.cast(value); + } + if (!(value instanceof Map)) { + return null; + } + @SuppressWarnings("unchecked") + Map<String, Object> map = (Map<String, Object>) value; + @Nullable String typeName = (String) map.get(PropertyNames.OBJECT_TYPE_NAME); + if (typeName == null) { + return null; + } + @Nullable CloudKnownType knownType = CloudKnownType.forUri(typeName); + if (knownType == null) { + return null; + } + @Nullable Object scalar = map.get(PropertyNames.SCALAR_FIELD_NAME); + if (scalar == null) { + return null; + } + return knownType.parse(scalar, clazz); + } catch (ClassCastException e) { + // If any class cast fails during decoding, the value's not decodable. + return null; + } + } + + private static final class ParameterNotFoundException extends RuntimeException { + public ParameterNotFoundException(String name, Map<String, Object> map) { + super("didn't find required parameter " + name + " in " + map); + } + } + + private static final class IncorrectTypeException extends RuntimeException { + public IncorrectTypeException(String name, Map<String, Object> map, String type) { + super("required parameter " + name + " in " + map + " not " + type); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 31c47b4..41f3c92 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -17,9 +17,9 @@ */ package org.apache.beam.runners.dataflow; -import static org.apache.beam.sdk.util.Structs.addObject; -import static org.apache.beam.sdk.util.Structs.getDictionary; -import static org.apache.beam.sdk.util.Structs.getString; +import static org.apache.beam.runners.dataflow.util.Structs.addObject; +import static org.apache.beam.runners.dataflow.util.Structs.getDictionary; +import static org.apache.beam.runners.dataflow.util.Structs.getString; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -60,6 +60,8 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecificat import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; import org.apache.beam.runners.dataflow.util.OutputReference; +import org.apache.beam.runners.dataflow.util.PropertyNames; +import org.apache.beam.runners.dataflow.util.Structs; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -80,8 +82,6 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.GcsPathValidator; import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.PropertyNames; -import org.apache.beam.sdk.util.Structs; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.util.state.StateSpec; http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index 2e66d43..64c0dbd 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -54,7 +54,6 @@ import org.apache.beam.sdk.transforms.join.CoGbkResultSchema; import org.apache.beam.sdk.transforms.join.UnionCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java new file mode 100644 index 0000000..0d2bc9f --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.util; + +import static org.apache.beam.runners.dataflow.util.Structs.addBoolean; +import static org.apache.beam.runners.dataflow.util.Structs.addDouble; +import static org.apache.beam.runners.dataflow.util.Structs.addList; +import static org.apache.beam.runners.dataflow.util.Structs.addLong; +import static org.apache.beam.runners.dataflow.util.Structs.addLongs; +import static org.apache.beam.runners.dataflow.util.Structs.addNull; +import static org.apache.beam.runners.dataflow.util.Structs.addString; +import static org.apache.beam.runners.dataflow.util.Structs.addStringList; +import static org.apache.beam.runners.dataflow.util.Structs.getBoolean; +import static org.apache.beam.runners.dataflow.util.Structs.getDictionary; +import static org.apache.beam.runners.dataflow.util.Structs.getInt; +import static org.apache.beam.runners.dataflow.util.Structs.getListOfMaps; +import static org.apache.beam.runners.dataflow.util.Structs.getLong; +import static org.apache.beam.runners.dataflow.util.Structs.getObject; +import static org.apache.beam.runners.dataflow.util.Structs.getString; +import static org.apache.beam.runners.dataflow.util.Structs.getStrings; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for Structs. + */ +@RunWith(JUnit4.class) +public class StructsTest { + private List<Map<String, Object>> makeCloudObjects() { + List<Map<String, Object>> objects = new ArrayList<>(); + { + CloudObject o = CloudObject.forClassName("string"); + addString(o, "singletonStringKey", "stringValue"); + objects.add(o); + } + { + CloudObject o = CloudObject.forClassName("long"); + addLong(o, "singletonLongKey", 42L); + objects.add(o); + } + return objects; + } + + private Map<String, Object> makeCloudDictionary() { + Map<String, Object> o = new HashMap<>(); + addList(o, "emptyKey", Collections.<Map<String, Object>>emptyList()); + addNull(o, "noStringsKey"); + addString(o, "singletonStringKey", "stringValue"); + addStringList(o, "multipleStringsKey", Arrays.asList("hi", "there", "bob")); + addLongs(o, "multipleLongsKey", 47L, 1L << 42, -5L); + addLong(o, "singletonLongKey", 42L); + addDouble(o, "singletonDoubleKey", 3.14); + addBoolean(o, "singletonBooleanKey", true); + addNull(o, "noObjectsKey"); + addList(o, "multipleObjectsKey", makeCloudObjects()); + return o; + } + + @Test + public void testGetStringParameter() throws Exception { + Map<String, Object> o = makeCloudDictionary(); + + Assert.assertEquals( + "stringValue", + getString(o, "singletonStringKey")); + Assert.assertEquals( + "stringValue", + getString(o, "singletonStringKey", "defaultValue")); + Assert.assertEquals( + "defaultValue", + getString(o, "missingKey", "defaultValue")); + + try { + getString(o, "missingKey"); + Assert.fail("should have thrown an exception"); + } catch (Exception exn) { + Assert.assertThat(exn.toString(), + Matchers.containsString( + "didn't find required parameter missingKey")); + } + + try { + getString(o, "noStringsKey"); + Assert.fail("should have thrown an exception"); + } catch (Exception exn) { + Assert.assertThat(exn.toString(), + Matchers.containsString("not a string")); + } + + Assert.assertThat(getStrings(o, "noStringsKey", null), Matchers.<String>emptyIterable()); + Assert.assertThat(getObject(o, "noStringsKey").keySet(), Matchers.<String>emptyIterable()); + Assert.assertThat(getDictionary(o, "noStringsKey").keySet(), Matchers.<String>emptyIterable()); + Assert.assertThat(getDictionary(o, "noStringsKey", null).keySet(), + Matchers.<String>emptyIterable()); + + try { + getString(o, "multipleStringsKey"); + Assert.fail("should have thrown an exception"); + } catch (Exception exn) { + Assert.assertThat(exn.toString(), + Matchers.containsString("not a string")); + } + + try { + getString(o, "emptyKey"); + Assert.fail("should have thrown an exception"); + } catch (Exception exn) { + Assert.assertThat(exn.toString(), + Matchers.containsString("not a string")); + } + } + + @Test + public void testGetBooleanParameter() throws Exception { + Map<String, Object> o = makeCloudDictionary(); + + Assert.assertEquals( + true, + getBoolean(o, "singletonBooleanKey", false)); + Assert.assertEquals( + false, + getBoolean(o, "missingKey", false)); + + try { + getBoolean(o, "emptyKey", false); + Assert.fail("should have thrown an exception"); + } catch (Exception exn) { + Assert.assertThat(exn.toString(), + Matchers.containsString("not a boolean")); + } + } + + @Test + public void testGetLongParameter() throws Exception { + Map<String, Object> o = makeCloudDictionary(); + + Assert.assertEquals( + (Long) 42L, + getLong(o, "singletonLongKey", 666L)); + Assert.assertEquals( + (Integer) 42, + getInt(o, "singletonLongKey", 666)); + Assert.assertEquals( + (Long) 666L, + getLong(o, "missingKey", 666L)); + + try { + getLong(o, "emptyKey", 666L); + Assert.fail("should have thrown an exception"); + } catch (Exception exn) { + Assert.assertThat(exn.toString(), + Matchers.containsString("not a long")); + } + try { + getInt(o, "emptyKey", 666); + Assert.fail("should have thrown an exception"); + } catch (Exception exn) { + Assert.assertThat(exn.toString(), + Matchers.containsString("not an int")); + } + } + + @Test + public void testGetListOfMaps() throws Exception { + Map<String, Object> o = makeCloudDictionary(); + + Assert.assertEquals( + makeCloudObjects(), + getListOfMaps(o, "multipleObjectsKey", null)); + + try { + getListOfMaps(o, "singletonLongKey", null); + Assert.fail("should have thrown an exception"); + } catch (Exception exn) { + Assert.assertThat(exn.toString(), + Matchers.containsString("not a list")); + } + } + + // TODO: Test builder operations. +} http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java index 3585f3e..523b69b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java @@ -17,13 +17,8 @@ */ package org.apache.beam.sdk.coders; -import static com.google.common.base.Preconditions.checkArgument; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import java.util.Collection; import java.util.List; -import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeParameter; @@ -51,14 +46,6 @@ public class CollectionCoder<T> extends IterableLikeCoder<T, Collection<T>> { return decodedElements; } - @JsonCreator - public static CollectionCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size()); - return of(components.get(0)); - } - /** * Returns the first element in this collection if it is non-empty, * otherwise returns {@code null}. http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java index 2949ddb..02c3d0f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java @@ -17,12 +17,7 @@ */ package org.apache.beam.sdk.coders; -import static com.google.common.base.Preconditions.checkArgument; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; -import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeParameter; @@ -46,14 +41,6 @@ public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> { return decodedElements; } - @JsonCreator - public static IterableCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size()); - return of(components.get(0)); - } - /** * Returns the first element in this iterable if it is non-empty, * otherwise returns {@code null}. http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java index b10db3a..8a689f7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java @@ -17,16 +17,11 @@ */ package org.apache.beam.sdk.coders; -import static com.google.common.base.Preconditions.checkArgument; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptor; @@ -44,14 +39,6 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> { return new KvCoder<>(keyCoder, valueCoder); } - @JsonCreator - public static KvCoder<?, ?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - checkArgument(components.size() == 2, "Expecting 2 components, got %s", components.size()); - return of(components.get(0), components.get(1)); - } - public static <K, V> List<Object> getInstanceComponents( KV<K, V> exampleValue) { return Arrays.asList( http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java index be26531..685e766 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java @@ -17,11 +17,8 @@ */ package org.apache.beam.sdk.coders; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.io.ByteStreams; import java.io.ByteArrayOutputStream; @@ -30,7 +27,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.List; import javax.annotation.Nullable; -import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.VarInt; /** @@ -48,15 +44,6 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> { return new LengthPrefixCoder<>(valueCoder); } - @JsonCreator - public static LengthPrefixCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - checkArgument(components.size() == 1, - "Expecting 1 components, got " + components.size()); - return of(components.get(0)); - } - ///////////////////////////////////////////////////////////////////////////// private final Coder<T> valueCoder; http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java index 6f7a0be..32467d2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java @@ -17,12 +17,7 @@ */ package org.apache.beam.sdk.coders; -import static com.google.common.base.Preconditions.checkArgument; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; -import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeParameter; @@ -45,14 +40,6 @@ public class ListCoder<T> extends IterableLikeCoder<T, List<T>> { return decodedElements; } - @JsonCreator - public static ListCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size()); - return of((Coder<?>) components.get(0)); - } - /** * Returns the first element in this list if it is non-empty, * otherwise returns {@code null}. http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java index 68ef3dc..da16165 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java @@ -17,14 +17,9 @@ */ package org.apache.beam.sdk.coders; -import static com.google.common.base.Preconditions.checkArgument; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeParameter; @@ -44,17 +39,6 @@ public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> { } /** - * Dynamically typed constructor for JSON deserialization. - */ - @JsonCreator - public static SetCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size()); - return of(components.get(0)); - } - - /** * {@inheritDoc} * * @throws NonDeterministicException always. Sets are not ordered, but http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java deleted file mode 100644 index c9e7427..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import javax.annotation.Nullable; - -/** - * A utility for manipulating well-known cloud types. - * - * @deprecated replaced by {@code org.apache.beam.runners.dataflow.CloudKnownType} - */ -@Deprecated -enum CloudKnownType { - TEXT("http://schema.org/Text", String.class) { - @Override - public <T> T parse(Object value, Class<T> clazz) { - return clazz.cast(value); - } - }, - BOOLEAN("http://schema.org/Boolean", Boolean.class) { - @Override - public <T> T parse(Object value, Class<T> clazz) { - return clazz.cast(value); - } - }, - INTEGER("http://schema.org/Integer", Long.class, Integer.class) { - @Override - public <T> T parse(Object value, Class<T> clazz) { - Object result = null; - if (value.getClass() == clazz) { - result = value; - } else if (clazz == Long.class) { - if (value instanceof Integer) { - result = ((Integer) value).longValue(); - } else if (value instanceof String) { - result = Long.valueOf((String) value); - } - } else if (clazz == Integer.class) { - if (value instanceof Long) { - result = ((Long) value).intValue(); - } else if (value instanceof String) { - result = Integer.valueOf((String) value); - } - } - return clazz.cast(result); - } - }, - FLOAT("http://schema.org/Float", Double.class, Float.class) { - @Override - public <T> T parse(Object value, Class<T> clazz) { - Object result = null; - if (value.getClass() == clazz) { - result = value; - } else if (clazz == Double.class) { - if (value instanceof Float) { - result = ((Float) value).doubleValue(); - } else if (value instanceof String) { - result = Double.valueOf((String) value); - } - } else if (clazz == Float.class) { - if (value instanceof Double) { - result = ((Double) value).floatValue(); - } else if (value instanceof String) { - result = Float.valueOf((String) value); - } - } - return clazz.cast(result); - } - }; - - private final String uri; - private final Class<?>[] classes; - - CloudKnownType(String uri, Class<?>... classes) { - this.uri = uri; - this.classes = classes; - } - - public String getUri() { - return uri; - } - - public abstract <T> T parse(Object value, Class<T> clazz); - - public Class<?> defaultClass() { - return classes[0]; - } - - private static final Map<String, CloudKnownType> typesByUri = - Collections.unmodifiableMap(buildTypesByUri()); - - private static Map<String, CloudKnownType> buildTypesByUri() { - Map<String, CloudKnownType> result = new HashMap<>(); - for (CloudKnownType ty : CloudKnownType.values()) { - result.put(ty.getUri(), ty); - } - return result; - } - - @Nullable - public static CloudKnownType forUri(@Nullable String uri) { - if (uri == null) { - return null; - } - return typesByUri.get(uri); - } - - private static final Map<Class<?>, CloudKnownType> typesByClass = - Collections.unmodifiableMap(buildTypesByClass()); - - private static Map<Class<?>, CloudKnownType> buildTypesByClass() { - Map<Class<?>, CloudKnownType> result = new HashMap<>(); - for (CloudKnownType ty : CloudKnownType.values()) { - for (Class<?> clazz : ty.classes) { - result.put(clazz, ty); - } - } - return result; - } - - @Nullable - public static CloudKnownType forClass(Class<?> clazz) { - return typesByClass.get(clazz); - } -}