http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java deleted file mode 100644 index 061e56a..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.api.client.json.GenericJson; -import com.google.api.client.util.Key; -import java.util.Map; -import javax.annotation.Nullable; - -/** - * A representation of an arbitrary Java object to be instantiated by Dataflow - * workers. - * - * <p>Typically, an object to be written by the SDK to the Dataflow service will - * implement a method (typically called {@code asCloudObject()}) that returns a - * {@code CloudObject} to represent the object in the protocol. Once the - * {@code CloudObject} is constructed, the method should explicitly add - * additional properties to be presented during deserialization, representing - * child objects by building additional {@code CloudObject}s. - * - * @deprecated replaced by {@code org.apache.beam.runners.dataflow.CloudKnownType} - */ -@Deprecated -public final class CloudObject extends GenericJson { - /** - * Constructs a {@code CloudObject} by copying the supplied serialized object - * spec, which must represent an SDK object serialized for transport via the - * Dataflow API. - * - * <p>The most common use of this method is during deserialization on the worker, - * where it's used as a binding type during instance construction. - * - * @param spec supplies the serialized form of the object as a nested map - * @throws RuntimeException if the supplied map does not represent an SDK object - */ - public static CloudObject fromSpec(Map<String, Object> spec) { - CloudObject result = new CloudObject(); - result.putAll(spec); - if (result.className == null) { - throw new RuntimeException("Unable to create an SDK object from " + spec - + ": Object class not specified (missing \"" - + PropertyNames.OBJECT_TYPE_NAME + "\" field)"); - } - return result; - } - - /** - * Constructs a {@code CloudObject} to be used for serializing an instance of - * the supplied class for transport via the Dataflow API. The instance - * parameters to be serialized must be supplied explicitly after the - * {@code CloudObject} is created, by using {@link CloudObject#put}. - * - * @param cls the class to use when deserializing the object on the worker - */ - public static CloudObject forClass(Class<?> cls) { - CloudObject result = new CloudObject(); - result.className = checkNotNull(cls).getName(); - return result; - } - - /** - * Constructs a {@code CloudObject} to be used for serializing data to be - * deserialized using the supplied class name the supplied class name for - * transport via the Dataflow API. The instance parameters to be serialized - * must be supplied explicitly after the {@code CloudObject} is created, by - * using {@link CloudObject#put}. - * - * @param className the class to use when deserializing the object on the worker - */ - public static CloudObject forClassName(String className) { - CloudObject result = new CloudObject(); - result.className = checkNotNull(className); - return result; - } - - /** - * Constructs a {@code CloudObject} representing the given value. - * @param value the scalar value to represent. - */ - public static CloudObject forString(String value) { - CloudObject result = forClassName(CloudKnownType.TEXT.getUri()); - result.put(PropertyNames.SCALAR_FIELD_NAME, value); - return result; - } - - /** - * Constructs a {@code CloudObject} representing the given value. - * @param value the scalar value to represent. - */ - public static CloudObject forBoolean(Boolean value) { - CloudObject result = forClassName(CloudKnownType.BOOLEAN.getUri()); - result.put(PropertyNames.SCALAR_FIELD_NAME, value); - return result; - } - - /** - * Constructs a {@code CloudObject} representing the given value. - * @param value the scalar value to represent. - */ - public static CloudObject forInteger(Long value) { - CloudObject result = forClassName(CloudKnownType.INTEGER.getUri()); - result.put(PropertyNames.SCALAR_FIELD_NAME, value); - return result; - } - - /** - * Constructs a {@code CloudObject} representing the given value. - * @param value the scalar value to represent. - */ - public static CloudObject forInteger(Integer value) { - CloudObject result = forClassName(CloudKnownType.INTEGER.getUri()); - result.put(PropertyNames.SCALAR_FIELD_NAME, value); - return result; - } - - /** - * Constructs a {@code CloudObject} representing the given value. - * @param value the scalar value to represent. - */ - public static CloudObject forFloat(Float value) { - CloudObject result = forClassName(CloudKnownType.FLOAT.getUri()); - result.put(PropertyNames.SCALAR_FIELD_NAME, value); - return result; - } - - /** - * Constructs a {@code CloudObject} representing the given value. - * @param value the scalar value to represent. - */ - public static CloudObject forFloat(Double value) { - CloudObject result = forClassName(CloudKnownType.FLOAT.getUri()); - result.put(PropertyNames.SCALAR_FIELD_NAME, value); - return result; - } - - /** - * Constructs a {@code CloudObject} representing the given value of a - * well-known cloud object type. - * @param value the scalar value to represent. - * @throws RuntimeException if the value does not have a - * {@link CloudKnownType} mapping - */ - public static CloudObject forKnownType(Object value) { - @Nullable CloudKnownType ty = CloudKnownType.forClass(value.getClass()); - if (ty == null) { - throw new RuntimeException("Unable to represent value via the Dataflow API: " + value); - } - CloudObject result = forClassName(ty.getUri()); - result.put(PropertyNames.SCALAR_FIELD_NAME, value); - return result; - } - - @Key(PropertyNames.OBJECT_TYPE_NAME) - private String className; - - private CloudObject() {} - - /** - * Gets the name of the Java class that this CloudObject represents. - */ - public String getClassName() { - return className; - } - - @Override - public CloudObject clone() { - return (CloudObject) super.clone(); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java index 2d21561..3380a10 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java @@ -17,19 +17,8 @@ */ package org.apache.beam.sdk.util; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.fasterxml.jackson.annotation.JsonTypeInfo.As; -import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; -import com.fasterxml.jackson.databind.DatabindContext; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; -import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.fasterxml.jackson.databind.type.TypeFactory; import com.google.api.client.util.Base64; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -37,15 +26,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.lang.ref.SoftReference; import java.lang.reflect.ParameterizedType; -import java.lang.reflect.TypeVariable; -import java.util.Map; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.LengthPrefixCoder; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.values.TypeDescriptor; /** @@ -54,17 +36,6 @@ import org.apache.beam.sdk.values.TypeDescriptor; public final class CoderUtils { private CoderUtils() {} // Non-instantiable - /** A mapping from well known coder types to their implementing classes. */ - private static final Map<String, Class<?>> WELL_KNOWN_CODER_TYPES = - ImmutableMap.<String, Class<?>>builder() - .put("kind:pair", KvCoder.class) - .put("kind:stream", IterableCoder.class) - .put("kind:global_window", GlobalWindow.Coder.class) - .put("kind:interval_window", IntervalWindow.IntervalWindowCoder.class) - .put("kind:length_prefix", LengthPrefixCoder.class) - .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class) - .build(); - private static ThreadLocal<SoftReference<ExposedByteArrayOutputStream>> threadLocalOutputStream = new ThreadLocal<>(); @@ -214,92 +185,4 @@ public final class CoderUtils { TypeDescriptor codedType = TypeDescriptor.of(coderType.getActualTypeArguments()[0]); return codedType; } - - /** - * A {@link com.fasterxml.jackson.databind.Module} that adds the type - * resolver needed for Coder definitions. - * - * <p>Used only in {@link Serializer}, which will move modules - */ - @Deprecated - static final class Jackson2Module extends SimpleModule { - /** - * The Coder custom type resolver. - * - * <p>This resolver resolves coders. If the Coder ID is a particular - * well-known identifier, it's replaced with the corresponding class. - * All other Coder instances are resolved by class name, using the package - * org.apache.beam.sdk.coders if there are no "."s in the ID. - */ - private static final class Resolver extends TypeIdResolverBase { - @SuppressWarnings("unused") // Used via @JsonTypeIdResolver annotation on Mixin - public Resolver() { - super(TypeFactory.defaultInstance().constructType(Coder.class), - TypeFactory.defaultInstance()); - } - - @Override - public JavaType typeFromId(DatabindContext context, String id) { - Class<?> clazz = getClassForId(id); - @SuppressWarnings("rawtypes") - TypeVariable[] tvs = clazz.getTypeParameters(); - JavaType[] types = new JavaType[tvs.length]; - for (int lupe = 0; lupe < tvs.length; lupe++) { - types[lupe] = TypeFactory.unknownType(); - } - return _typeFactory.constructSimpleType(clazz, types); - } - - private Class<?> getClassForId(String id) { - try { - if (id.contains(".")) { - return Class.forName(id); - } - - if (WELL_KNOWN_CODER_TYPES.containsKey(id)) { - return WELL_KNOWN_CODER_TYPES.get(id); - } - - // Otherwise, see if the ID is the name of a class in - // org.apache.beam.sdk.coders. We do this via creating - // the class object so that class loaders have a chance to get - // involved -- and since we need the class object anyway. - return Class.forName(Coder.class.getPackage().getName() + "." + id); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Unable to convert coder ID " + id + " to class", e); - } - } - - @Override - public String idFromValueAndType(Object o, Class<?> clazz) { - return clazz.getName(); - } - - @Override - public String idFromValue(Object o) { - return o.getClass().getName(); - } - - @Override - public JsonTypeInfo.Id getMechanism() { - return JsonTypeInfo.Id.CUSTOM; - } - } - - /** - * The mixin class defining how Coders are handled by the deserialization - * {@link ObjectMapper}. - * - * <p>This is done via a mixin so that this resolver is <i>only</i> used - * during deserialization requested by the Apache Beam SDK. - */ - @JsonTypeIdResolver(Resolver.class) - @JsonTypeInfo(use = Id.CUSTOM, include = As.PROPERTY, property = PropertyNames.OBJECT_TYPE_NAME) - private static final class Mixin {} - - public Jackson2Module() { - super("BeamCoders"); - setMixInAnnotation(Coder.class, Mixin.class); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java deleted file mode 100644 index aa5855b..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -/** - * Constant property names used by the SDK in CloudWorkflow specifications. - */ -public class PropertyNames { - public static final String ALLOWED_ENCODINGS = "allowed_encodings"; - public static final String APPEND_TRAILING_NEWLINES = "append_trailing_newlines"; - public static final String BIGQUERY_CREATE_DISPOSITION = "create_disposition"; - public static final String BIGQUERY_DATASET = "dataset"; - public static final String BIGQUERY_PROJECT = "project"; - public static final String BIGQUERY_SCHEMA = "schema"; - public static final String BIGQUERY_TABLE = "table"; - public static final String BIGQUERY_QUERY = "bigquery_query"; - public static final String BIGQUERY_FLATTEN_RESULTS = "bigquery_flatten_results"; - public static final String BIGQUERY_USE_LEGACY_SQL = "bigquery_use_legacy_sql"; - public static final String BIGQUERY_WRITE_DISPOSITION = "write_disposition"; - public static final String BIGQUERY_EXPORT_FORMAT = "bigquery_export_format"; - public static final String BIGQUERY_EXPORT_SCHEMA = "bigquery_export_schema"; - public static final String CO_GBK_RESULT_SCHEMA = "co_gbk_result_schema"; - public static final String COMBINE_FN = "combine_fn"; - public static final String COMPONENT_ENCODINGS = "component_encodings"; - public static final String COMPRESSION_TYPE = "compression_type"; - public static final String CUSTOM_SOURCE_FORMAT = "custom_source"; - public static final String CONCAT_SOURCE_SOURCES = "sources"; - public static final String CONCAT_SOURCE_BASE_SPECS = "base_specs"; - public static final String SOURCE_STEP_INPUT = "custom_source_step_input"; - public static final String SOURCE_SPEC = "spec"; - public static final String SOURCE_METADATA = "metadata"; - public static final String SOURCE_DOES_NOT_NEED_SPLITTING = "does_not_need_splitting"; - public static final String SOURCE_PRODUCES_SORTED_KEYS = "produces_sorted_keys"; - public static final String SOURCE_IS_INFINITE = "is_infinite"; - public static final String SOURCE_ESTIMATED_SIZE_BYTES = "estimated_size_bytes"; - public static final String ELEMENT = "element"; - public static final String ELEMENTS = "elements"; - public static final String ENCODING = "encoding"; - public static final String ENCODING_ID = "encoding_id"; - public static final String END_INDEX = "end_index"; - public static final String END_OFFSET = "end_offset"; - public static final String END_SHUFFLE_POSITION = "end_shuffle_position"; - public static final String ENVIRONMENT_VERSION_JOB_TYPE_KEY = "job_type"; - public static final String ENVIRONMENT_VERSION_MAJOR_KEY = "major"; - public static final String FILENAME = "filename"; - public static final String FILENAME_PREFIX = "filename_prefix"; - public static final String FILENAME_SUFFIX = "filename_suffix"; - public static final String FILEPATTERN = "filepattern"; - public static final String FOOTER = "footer"; - public static final String FORMAT = "format"; - public static final String HEADER = "header"; - public static final String INPUTS = "inputs"; - public static final String INPUT_CODER = "input_coder"; - public static final String IS_GENERATED = "is_generated"; - public static final String IS_MERGING_WINDOW_FN = "is_merging_window_fn"; - public static final String IS_PAIR_LIKE = "is_pair_like"; - public static final String IS_STREAM_LIKE = "is_stream_like"; - public static final String IS_WRAPPER = "is_wrapper"; - public static final String DISALLOW_COMBINER_LIFTING = "disallow_combiner_lifting"; - public static final String NON_PARALLEL_INPUTS = "non_parallel_inputs"; - public static final String NUM_SHARD_CODERS = "num_shard_coders"; - public static final String NUM_METADATA_SHARD_CODERS = "num_metadata_shard_coders"; - public static final String NUM_SHARDS = "num_shards"; - public static final String OBJECT_TYPE_NAME = "@type"; - public static final String OUTPUT = "output"; - public static final String OUTPUT_INFO = "output_info"; - public static final String OUTPUT_NAME = "output_name"; - public static final String PARALLEL_INPUT = "parallel_input"; - public static final String PHASE = "phase"; - public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label"; - public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = "pubsub_serialized_attributes_fn"; - public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription"; - public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override"; - public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = "pubsub_timestamp_label"; - public static final String PUBSUB_TOPIC = "pubsub_topic"; - public static final String PUBSUB_TOPIC_OVERRIDE = "pubsub_topic_runtime_override"; - public static final String SCALAR_FIELD_NAME = "value"; - public static final String SERIALIZED_FN = "serialized_fn"; - public static final String SHARD_NAME_TEMPLATE = "shard_template"; - public static final String SHUFFLE_KIND = "shuffle_kind"; - public static final String SHUFFLE_READER_CONFIG = "shuffle_reader_config"; - public static final String SHUFFLE_WRITER_CONFIG = "shuffle_writer_config"; - public static final String SORT_VALUES = "sort_values"; - public static final String START_INDEX = "start_index"; - public static final String START_OFFSET = "start_offset"; - public static final String START_SHUFFLE_POSITION = "start_shuffle_position"; - public static final String STRIP_TRAILING_NEWLINES = "strip_trailing_newlines"; - public static final String TUPLE_TAGS = "tuple_tags"; - public static final String USE_INDEXED_FORMAT = "use_indexed_format"; - public static final String USER_FN = "user_fn"; - public static final String USER_NAME = "user_name"; - public static final String USES_KEYED_STATE = "uses_keyed_state"; - public static final String VALIDATE_SINK = "validate_sink"; - public static final String VALIDATE_SOURCE = "validate_source"; - public static final String VALUE = "value"; - public static final String DISPLAY_DATA = "display_data"; -} http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java deleted file mode 100644 index 166e4e7..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.annotation.Nullable; - -/** - * Utility for converting objects between Java and Cloud representations. - * - * @deprecated replaced by {@code org.apache.beam.runners.dataflow.util.Serializer} - */ -@Deprecated -public final class Serializer { - // Delay initialization of statics until the first call to Serializer. - private static class SingletonHelper { - static final ObjectMapper OBJECT_MAPPER = createObjectMapper(); - static final ObjectMapper TREE_MAPPER = createTreeMapper(); - - /** - * Creates the object mapper that will be used for serializing Google API - * client maps into Jackson trees. - */ - private static ObjectMapper createTreeMapper() { - return new ObjectMapper(); - } - - /** - * Creates the object mapper that will be used for deserializing Jackson - * trees into objects. - */ - private static ObjectMapper createObjectMapper() { - ObjectMapper m = new ObjectMapper(); - // Ignore properties that are not used by the object. - m.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - - // For parameters of type Object, use the @type property to determine the - // class to instantiate. - // - // TODO: It would be ideal to do this for all non-final classes. The - // problem with using DefaultTyping.NON_FINAL is that it insists on having - // type information in the JSON for classes with useful default - // implementations, such as List. Ideally, we'd combine these defaults - // with available type information if that information's present. - m.enableDefaultTypingAsProperty( - ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, - PropertyNames.OBJECT_TYPE_NAME); - - m.registerModule(new CoderUtils.Jackson2Module()); - - return m; - } - } - - /** - * Deserializes an object from a Dataflow structured encoding (represented in - * Java as a map). - * - * <p>The standard Dataflow SDK object serialization protocol is based on JSON. - * Data is typically encoded as a JSON object whose fields represent the - * object's data. - * - * <p>The actual deserialization is performed by Jackson, which can deserialize - * public fields, use JavaBean setters, or use injection annotations to - * indicate how to construct the object. The {@link ObjectMapper} used is - * configured to use the "@type" field as the name of the class to instantiate - * (supporting polymorphic types), and may be further configured by - * annotations or via {@link ObjectMapper#registerModule}. - * - * @see <a href="http://wiki.fasterxml.com/JacksonFAQ#Data_Binding.2C_general"> - * Jackson Data-Binding</a> - * @see <a href="https://github.com/FasterXML/jackson-annotations/wiki/Jackson-Annotations"> - * Jackson-Annotations</a> - * @param serialized the object in untyped decoded form (i.e. a nested {@link Map}) - * @param clazz the expected object class - */ - public static <T> T deserialize(Map<String, Object> serialized, Class<T> clazz) { - try { - return SingletonHelper.OBJECT_MAPPER.treeToValue( - SingletonHelper.TREE_MAPPER.valueToTree( - deserializeCloudKnownTypes(serialized)), - clazz); - } catch (JsonProcessingException e) { - throw new RuntimeException( - "Unable to deserialize class " + clazz, e); - } - } - - /** - * Recursively walks the supplied map, looking for well-known cloud type - * information (keyed as {@link PropertyNames#OBJECT_TYPE_NAME}, matching a - * URI value from the {@link CloudKnownType} enum. Upon finding this type - * information, it converts it into the correspondingly typed Java value. - */ - @SuppressWarnings("unchecked") - private static Object deserializeCloudKnownTypes(Object src) { - if (src instanceof Map) { - Map<String, Object> srcMap = (Map<String, Object>) src; - @Nullable Object value = srcMap.get(PropertyNames.SCALAR_FIELD_NAME); - @Nullable CloudKnownType type = - CloudKnownType.forUri((String) srcMap.get(PropertyNames.OBJECT_TYPE_NAME)); - if (type != null && value != null) { - // It's a value of a well-known cloud type; let the known type handler - // handle the translation. - Object result = type.parse(value, type.defaultClass()); - return result; - } - // Otherwise, it's just an ordinary map. - Map<String, Object> dest = new HashMap<>(srcMap.size()); - for (Map.Entry<String, Object> entry : srcMap.entrySet()) { - dest.put(entry.getKey(), deserializeCloudKnownTypes(entry.getValue())); - } - return dest; - } - if (src instanceof List) { - List<Object> srcList = (List<Object>) src; - List<Object> dest = new ArrayList<>(srcList.size()); - for (Object obj : srcList) { - dest.add(deserializeCloudKnownTypes(obj)); - } - return dest; - } - // Neither a Map nor a List; no translation needed. - return src; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java deleted file mode 100644 index a4be054..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java +++ /dev/null @@ -1,371 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.api.client.util.Data; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import javax.annotation.Nullable; - -/** - * A collection of static methods for manipulating datastructure representations transferred via the - * Dataflow API. - */ -public final class Structs { - private Structs() {} // Non-instantiable - - public static String getString(Map<String, Object> map, String name) { - return getValue(map, name, String.class, "a string"); - } - - public static String getString( - Map<String, Object> map, String name, @Nullable String defaultValue) { - return getValue(map, name, String.class, "a string", defaultValue); - } - - public static byte[] getBytes(Map<String, Object> map, String name) { - @Nullable byte[] result = getBytes(map, name, null); - if (result == null) { - throw new ParameterNotFoundException(name, map); - } - return result; - } - - @Nullable - public static byte[] getBytes( - Map<String, Object> map, String name, @Nullable byte[] defaultValue) { - @Nullable String jsonString = getString(map, name, null); - if (jsonString == null) { - return defaultValue; - } - // TODO: Need to agree on a format for encoding bytes in - // a string that can be sent to the backend, over the cloud - // map task work API. base64 encoding seems pretty common. Switch to it? - return StringUtils.jsonStringToByteArray(jsonString); - } - - public static Boolean getBoolean(Map<String, Object> map, String name) { - return getValue(map, name, Boolean.class, "a boolean"); - } - - @Nullable - public static Boolean getBoolean( - Map<String, Object> map, String name, @Nullable Boolean defaultValue) { - return getValue(map, name, Boolean.class, "a boolean", defaultValue); - } - - public static Long getLong(Map<String, Object> map, String name) { - return getValue(map, name, Long.class, "a long"); - } - - @Nullable - public static Long getLong(Map<String, Object> map, String name, @Nullable Long defaultValue) { - return getValue(map, name, Long.class, "a long", defaultValue); - } - - public static Integer getInt(Map<String, Object> map, String name) { - return getValue(map, name, Integer.class, "an int"); - } - - @Nullable - public static Integer getInt( - Map<String, Object> map, String name, @Nullable Integer defaultValue) { - return getValue(map, name, Integer.class, "an int", defaultValue); - } - - @Nullable - public static List<String> getStrings( - Map<String, Object> map, String name, @Nullable List<String> defaultValue) { - @Nullable Object value = map.get(name); - if (value == null) { - if (map.containsKey(name)) { - throw new IncorrectTypeException(name, map, "a string or a list"); - } - return defaultValue; - } - if (Data.isNull(value)) { - // This is a JSON literal null. When represented as a list of strings, - // this is an empty list. - return Collections.<String>emptyList(); - } - @Nullable String singletonString = decodeValue(value, String.class); - if (singletonString != null) { - return Collections.singletonList(singletonString); - } - if (!(value instanceof List)) { - throw new IncorrectTypeException(name, map, "a string or a list"); - } - @SuppressWarnings("unchecked") - List<Object> elements = (List<Object>) value; - List<String> result = new ArrayList<>(elements.size()); - for (Object o : elements) { - @Nullable String s = decodeValue(o, String.class); - if (s == null) { - throw new IncorrectTypeException(name, map, "a list of strings"); - } - result.add(s); - } - return result; - } - - public static Map<String, Object> getObject(Map<String, Object> map, String name) { - @Nullable Map<String, Object> result = getObject(map, name, null); - if (result == null) { - throw new ParameterNotFoundException(name, map); - } - return result; - } - - @Nullable - public static Map<String, Object> getObject( - Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue) { - @Nullable Object value = map.get(name); - if (value == null) { - if (map.containsKey(name)) { - throw new IncorrectTypeException(name, map, "an object"); - } - return defaultValue; - } - return checkObject(value, map, name); - } - - private static Map<String, Object> checkObject( - Object value, Map<String, Object> map, String name) { - if (Data.isNull(value)) { - // This is a JSON literal null. When represented as an object, this is an - // empty map. - return Collections.<String, Object>emptyMap(); - } - if (!(value instanceof Map)) { - throw new IncorrectTypeException(name, map, "an object (not a map)"); - } - @SuppressWarnings("unchecked") - Map<String, Object> mapValue = (Map<String, Object>) value; - if (!mapValue.containsKey(PropertyNames.OBJECT_TYPE_NAME)) { - throw new IncorrectTypeException( - name, map, "an object (no \"" + PropertyNames.OBJECT_TYPE_NAME + "\" field)"); - } - return mapValue; - } - - @Nullable - public static List<Map<String, Object>> getListOfMaps( - Map<String, Object> map, String name, @Nullable List<Map<String, Object>> defaultValue) { - @Nullable Object value = map.get(name); - if (value == null) { - if (map.containsKey(name)) { - throw new IncorrectTypeException(name, map, "a list"); - } - return defaultValue; - } - if (Data.isNull(value)) { - // This is a JSON literal null. When represented as a list, - // this is an empty list. - return Collections.<Map<String, Object>>emptyList(); - } - - if (!(value instanceof List)) { - throw new IncorrectTypeException(name, map, "a list"); - } - - List<?> elements = (List<?>) value; - for (Object elem : elements) { - if (!(elem instanceof Map)) { - throw new IncorrectTypeException(name, map, "a list of Map objects"); - } - } - - @SuppressWarnings("unchecked") - List<Map<String, Object>> result = (List<Map<String, Object>>) elements; - return result; - } - - public static Map<String, Object> getDictionary(Map<String, Object> map, String name) { - @Nullable Object value = map.get(name); - if (value == null) { - throw new ParameterNotFoundException(name, map); - } - if (Data.isNull(value)) { - // This is a JSON literal null. When represented as a dictionary, this is - // an empty map. - return Collections.<String, Object>emptyMap(); - } - if (!(value instanceof Map)) { - throw new IncorrectTypeException(name, map, "a dictionary"); - } - @SuppressWarnings("unchecked") - Map<String, Object> result = (Map<String, Object>) value; - return result; - } - - @Nullable - public static Map<String, Object> getDictionary( - Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue) { - @Nullable Object value = map.get(name); - if (value == null) { - if (map.containsKey(name)) { - throw new IncorrectTypeException(name, map, "a dictionary"); - } - return defaultValue; - } - if (Data.isNull(value)) { - // This is a JSON literal null. When represented as a dictionary, this is - // an empty map. - return Collections.<String, Object>emptyMap(); - } - if (!(value instanceof Map)) { - throw new IncorrectTypeException(name, map, "a dictionary"); - } - @SuppressWarnings("unchecked") - Map<String, Object> result = (Map<String, Object>) value; - return result; - } - - // Builder operations. - - public static void addString(Map<String, Object> map, String name, String value) { - addObject(map, name, CloudObject.forString(value)); - } - - public static void addBoolean(Map<String, Object> map, String name, boolean value) { - addObject(map, name, CloudObject.forBoolean(value)); - } - - public static void addLong(Map<String, Object> map, String name, long value) { - addObject(map, name, CloudObject.forInteger(value)); - } - - public static void addObject(Map<String, Object> map, String name, Map<String, Object> value) { - map.put(name, value); - } - - public static void addNull(Map<String, Object> map, String name) { - map.put(name, Data.nullOf(Object.class)); - } - - public static void addLongs(Map<String, Object> map, String name, long... longs) { - List<Map<String, Object>> elements = new ArrayList<>(longs.length); - for (Long value : longs) { - elements.add(CloudObject.forInteger(value)); - } - map.put(name, elements); - } - - public static void addList( - Map<String, Object> map, String name, List<? extends Map<String, Object>> elements) { - map.put(name, elements); - } - - public static void addStringList(Map<String, Object> map, String name, List<String> elements) { - ArrayList<CloudObject> objects = new ArrayList<>(elements.size()); - for (String element : elements) { - objects.add(CloudObject.forString(element)); - } - addList(map, name, objects); - } - - public static <T extends Map<String, Object>> void addList( - Map<String, Object> map, String name, T[] elements) { - map.put(name, Arrays.asList(elements)); - } - - public static void addDictionary( - Map<String, Object> map, String name, Map<String, Object> value) { - map.put(name, value); - } - - public static void addDouble(Map<String, Object> map, String name, Double value) { - addObject(map, name, CloudObject.forFloat(value)); - } - - // Helper methods for a few of the accessor methods. - - private static <T> T getValue(Map<String, Object> map, String name, Class<T> clazz, String type) { - @Nullable T result = getValue(map, name, clazz, type, null); - if (result == null) { - throw new ParameterNotFoundException(name, map); - } - return result; - } - - @Nullable - private static <T> T getValue( - Map<String, Object> map, String name, Class<T> clazz, String type, @Nullable T defaultValue) { - @Nullable Object value = map.get(name); - if (value == null) { - if (map.containsKey(name)) { - throw new IncorrectTypeException(name, map, type); - } - return defaultValue; - } - T result = decodeValue(value, clazz); - if (result == null) { - // The value exists, but can't be decoded. - throw new IncorrectTypeException(name, map, type); - } - return result; - } - - @Nullable - private static <T> T decodeValue(Object value, Class<T> clazz) { - try { - if (value.getClass() == clazz) { - // decodeValue() is only called for final classes; if the class matches, - // it's safe to just return the value, and if it doesn't match, decoding - // is needed. - return clazz.cast(value); - } - if (!(value instanceof Map)) { - return null; - } - @SuppressWarnings("unchecked") - Map<String, Object> map = (Map<String, Object>) value; - @Nullable String typeName = (String) map.get(PropertyNames.OBJECT_TYPE_NAME); - if (typeName == null) { - return null; - } - @Nullable CloudKnownType knownType = CloudKnownType.forUri(typeName); - if (knownType == null) { - return null; - } - @Nullable Object scalar = map.get(PropertyNames.SCALAR_FIELD_NAME); - if (scalar == null) { - return null; - } - return knownType.parse(scalar, clazz); - } catch (ClassCastException e) { - // If any class cast fails during decoding, the value's not decodable. - return null; - } - } - - private static final class ParameterNotFoundException extends RuntimeException { - public ParameterNotFoundException(String name, Map<String, Object> map) { - super("didn't find required parameter " + name + " in " + map); - } - } - - private static final class IncorrectTypeException extends RuntimeException { - public IncorrectTypeException(String name, Map<String, Object> map, String type) { - super("required parameter " + name + " in " + map + " not " + type); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Values.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Values.java deleted file mode 100644 index d8aa046..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Values.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import java.util.Map; -import javax.annotation.Nullable; - -/** - * A collection of static methods for manipulating value representations - * transfered via the Dataflow API. - */ -public final class Values { - private Values() {} // Non-instantiable - - public static Boolean asBoolean(Object value) throws ClassCastException { - @Nullable Boolean knownResult = checkKnownValue(CloudKnownType.BOOLEAN, value, Boolean.class); - if (knownResult != null) { - return knownResult; - } - return Boolean.class.cast(value); - } - - public static Double asDouble(Object value) throws ClassCastException { - @Nullable Double knownResult = checkKnownValue(CloudKnownType.FLOAT, value, Double.class); - if (knownResult != null) { - return knownResult; - } - if (value instanceof Double) { - return (Double) value; - } - return ((Float) value).doubleValue(); - } - - public static Long asLong(Object value) throws ClassCastException { - @Nullable Long knownResult = checkKnownValue(CloudKnownType.INTEGER, value, Long.class); - if (knownResult != null) { - return knownResult; - } - if (value instanceof Long) { - return (Long) value; - } - return ((Integer) value).longValue(); - } - - public static String asString(Object value) throws ClassCastException { - @Nullable String knownResult = checkKnownValue(CloudKnownType.TEXT, value, String.class); - if (knownResult != null) { - return knownResult; - } - return String.class.cast(value); - } - - @Nullable - private static <T> T checkKnownValue(CloudKnownType type, Object value, Class<T> clazz) { - if (!(value instanceof Map)) { - return null; - } - Map<String, Object> map = (Map<String, Object>) value; - @Nullable String typeName = (String) map.get(PropertyNames.OBJECT_TYPE_NAME); - if (typeName == null) { - return null; - } - @Nullable CloudKnownType knownType = CloudKnownType.forUri(typeName); - if (knownType == null || knownType != type) { - return null; - } - @Nullable Object scalar = map.get(PropertyNames.SCALAR_FIELD_NAME); - if (scalar == null) { - return null; - } - return knownType.parse(scalar, clazz); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 23666ca..13e499d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -17,11 +17,8 @@ */ package org.apache.beam.sdk.util; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import java.io.IOException; @@ -610,17 +607,6 @@ public abstract class WindowedValue<T> { return new FullWindowedValueCoder<>(valueCoder, windowCoder); } - @JsonCreator - public static FullWindowedValueCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - checkArgument(components.size() == 2, - "Expecting 2 components, got " + components.size()); - @SuppressWarnings("unchecked") - Coder<? extends BoundedWindow> window = (Coder<? extends BoundedWindow>) components.get(1); - return of(components.get(0), window); - } - FullWindowedValueCoder(Coder<T> valueCoder, Coder<? extends BoundedWindow> windowCoder) { super(valueCoder); @@ -717,14 +703,6 @@ public abstract class WindowedValue<T> { return new ValueOnlyWindowedValueCoder<>(valueCoder); } - @JsonCreator - public static ValueOnlyWindowedValueCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - checkArgument(components.size() == 1, "Expecting 1 component, got " + components.size()); - return of(components.get(0)); - } - ValueOnlyWindowedValueCoder(Coder<T> valueCoder) { super(valueCoder); } http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java index 37d41f7..c5e04b0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java @@ -17,18 +17,11 @@ */ package org.apache.beam.sdk.values; -import static org.apache.beam.sdk.util.Structs.addBoolean; -import static org.apache.beam.sdk.util.Structs.addString; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; import java.io.Serializable; import java.util.Random; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.CloudObject; -import org.apache.beam.sdk.util.PropertyNames; /** * A {@link TupleTag} is a typed tag to use as the key of a heterogeneously typed tuple, like {@link @@ -153,26 +146,11 @@ public class TupleTag<V> implements Serializable { return caller + "#" + nonce; } - @JsonCreator - @SuppressWarnings("unused") - private static TupleTag<?> fromJson( - @JsonProperty(PropertyNames.VALUE) String id, - @JsonProperty(PropertyNames.IS_GENERATED) boolean generated) { - return new TupleTag<>(id, generated); - } - private TupleTag(String id, boolean generated) { this.id = id; this.generated = generated; } - public CloudObject asCloudObject() { - CloudObject result = CloudObject.forClass(getClass()); - addString(result, PropertyNames.VALUE, id); - addBoolean(result, PropertyNames.IS_GENERATED, generated); - return result; - } - @Override public boolean equals(Object that) { if (that instanceof TupleTag) { http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java deleted file mode 100644 index f6bacc4..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.apache.beam.sdk.util.Structs.addBoolean; -import static org.apache.beam.sdk.util.Structs.addDouble; -import static org.apache.beam.sdk.util.Structs.addLong; -import static org.apache.beam.sdk.util.Structs.addString; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.hamcrest.Matchers; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests Serializer implementation. - */ -@RunWith(JUnit4.class) -public class SerializerTest { - /** - * A POJO to use for testing serialization. - */ - @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, - property = PropertyNames.OBJECT_TYPE_NAME) - public static class TestRecord { - // TODO: When we apply property name typing to all non-final classes, the - // annotation on this class should be removed. - public String name; - public boolean ok; - public int value; - public double dValue; - } - - @Test - public void testStatefulDeserialization() { - CloudObject object = CloudObject.forClass(TestRecord.class); - - addString(object, "name", "foobar"); - addBoolean(object, "ok", true); - addLong(object, "value", 42L); - addDouble(object, "dValue", .25); - - TestRecord record = Serializer.deserialize(object, TestRecord.class); - Assert.assertEquals("foobar", record.name); - Assert.assertEquals(true, record.ok); - Assert.assertEquals(42L, record.value); - Assert.assertEquals(0.25, record.dValue, 0.0001); - } - - private static class InjectedTestRecord { - private final String n; - private final int v; - - @SuppressWarnings("unused") // used for JSON serialization - public InjectedTestRecord( - @JsonProperty("name") String name, - @JsonProperty("value") int value) { - this.n = name; - this.v = value; - } - - public String getName() { - return n; - } - public int getValue() { - return v; - } - } - - @Test - public void testDeserializationInjection() { - CloudObject object = CloudObject.forClass(InjectedTestRecord.class); - addString(object, "name", "foobar"); - addLong(object, "value", 42L); - - InjectedTestRecord record = - Serializer.deserialize(object, InjectedTestRecord.class); - - Assert.assertEquals("foobar", record.getName()); - Assert.assertEquals(42L, record.getValue()); - } - - private static class FactoryInjectedTestRecord { - @JsonCreator - public static FactoryInjectedTestRecord of( - @JsonProperty("name") String name, - @JsonProperty("value") int value) { - return new FactoryInjectedTestRecord(name, value); - } - - private final String n; - private final int v; - - private FactoryInjectedTestRecord(String name, int value) { - this.n = name; - this.v = value; - } - - public String getName() { - return n; - } - public int getValue() { - return v; - } - } - - @Test - public void testDeserializationFactoryInjection() { - CloudObject object = CloudObject.forClass(FactoryInjectedTestRecord.class); - addString(object, "name", "foobar"); - addLong(object, "value", 42L); - - FactoryInjectedTestRecord record = - Serializer.deserialize(object, FactoryInjectedTestRecord.class); - Assert.assertEquals("foobar", record.getName()); - Assert.assertEquals(42L, record.getValue()); - } - - private static class DerivedTestRecord extends TestRecord { - public String derived; - } - - @Test - public void testSubclassDeserialization() { - CloudObject object = CloudObject.forClass(DerivedTestRecord.class); - - addString(object, "name", "foobar"); - addBoolean(object, "ok", true); - addLong(object, "value", 42L); - addDouble(object, "dValue", .25); - addString(object, "derived", "baz"); - - TestRecord result = Serializer.deserialize(object, TestRecord.class); - Assert.assertThat(result, Matchers.instanceOf(DerivedTestRecord.class)); - - DerivedTestRecord record = (DerivedTestRecord) result; - Assert.assertEquals("foobar", record.name); - Assert.assertEquals(true, record.ok); - Assert.assertEquals(42L, record.value); - Assert.assertEquals(0.25, record.dValue, 0.0001); - Assert.assertEquals("baz", record.derived); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StructsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StructsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StructsTest.java deleted file mode 100644 index 91090d1..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StructsTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.apache.beam.sdk.util.Structs.addBoolean; -import static org.apache.beam.sdk.util.Structs.addDouble; -import static org.apache.beam.sdk.util.Structs.addList; -import static org.apache.beam.sdk.util.Structs.addLong; -import static org.apache.beam.sdk.util.Structs.addLongs; -import static org.apache.beam.sdk.util.Structs.addNull; -import static org.apache.beam.sdk.util.Structs.addString; -import static org.apache.beam.sdk.util.Structs.addStringList; -import static org.apache.beam.sdk.util.Structs.getBoolean; -import static org.apache.beam.sdk.util.Structs.getDictionary; -import static org.apache.beam.sdk.util.Structs.getInt; -import static org.apache.beam.sdk.util.Structs.getListOfMaps; -import static org.apache.beam.sdk.util.Structs.getLong; -import static org.apache.beam.sdk.util.Structs.getObject; -import static org.apache.beam.sdk.util.Structs.getString; -import static org.apache.beam.sdk.util.Structs.getStrings; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.hamcrest.Matchers; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for Structs. - */ -@RunWith(JUnit4.class) -public class StructsTest { - private List<Map<String, Object>> makeCloudObjects() { - List<Map<String, Object>> objects = new ArrayList<>(); - { - CloudObject o = CloudObject.forClassName("string"); - addString(o, "singletonStringKey", "stringValue"); - objects.add(o); - } - { - CloudObject o = CloudObject.forClassName("long"); - addLong(o, "singletonLongKey", 42L); - objects.add(o); - } - return objects; - } - - private Map<String, Object> makeCloudDictionary() { - Map<String, Object> o = new HashMap<>(); - addList(o, "emptyKey", Collections.<Map<String, Object>>emptyList()); - addNull(o, "noStringsKey"); - addString(o, "singletonStringKey", "stringValue"); - addStringList(o, "multipleStringsKey", Arrays.asList("hi", "there", "bob")); - addLongs(o, "multipleLongsKey", 47L, 1L << 42, -5L); - addLong(o, "singletonLongKey", 42L); - addDouble(o, "singletonDoubleKey", 3.14); - addBoolean(o, "singletonBooleanKey", true); - addNull(o, "noObjectsKey"); - addList(o, "multipleObjectsKey", makeCloudObjects()); - return o; - } - - @Test - public void testGetStringParameter() throws Exception { - Map<String, Object> o = makeCloudDictionary(); - - Assert.assertEquals( - "stringValue", - getString(o, "singletonStringKey")); - Assert.assertEquals( - "stringValue", - getString(o, "singletonStringKey", "defaultValue")); - Assert.assertEquals( - "defaultValue", - getString(o, "missingKey", "defaultValue")); - - try { - getString(o, "missingKey"); - Assert.fail("should have thrown an exception"); - } catch (Exception exn) { - Assert.assertThat(exn.toString(), - Matchers.containsString( - "didn't find required parameter missingKey")); - } - - try { - getString(o, "noStringsKey"); - Assert.fail("should have thrown an exception"); - } catch (Exception exn) { - Assert.assertThat(exn.toString(), - Matchers.containsString("not a string")); - } - - Assert.assertThat(getStrings(o, "noStringsKey", null), Matchers.<String>emptyIterable()); - Assert.assertThat(getObject(o, "noStringsKey").keySet(), Matchers.<String>emptyIterable()); - Assert.assertThat(getDictionary(o, "noStringsKey").keySet(), Matchers.<String>emptyIterable()); - Assert.assertThat(getDictionary(o, "noStringsKey", null).keySet(), - Matchers.<String>emptyIterable()); - - try { - getString(o, "multipleStringsKey"); - Assert.fail("should have thrown an exception"); - } catch (Exception exn) { - Assert.assertThat(exn.toString(), - Matchers.containsString("not a string")); - } - - try { - getString(o, "emptyKey"); - Assert.fail("should have thrown an exception"); - } catch (Exception exn) { - Assert.assertThat(exn.toString(), - Matchers.containsString("not a string")); - } - } - - @Test - public void testGetBooleanParameter() throws Exception { - Map<String, Object> o = makeCloudDictionary(); - - Assert.assertEquals( - true, - getBoolean(o, "singletonBooleanKey", false)); - Assert.assertEquals( - false, - getBoolean(o, "missingKey", false)); - - try { - getBoolean(o, "emptyKey", false); - Assert.fail("should have thrown an exception"); - } catch (Exception exn) { - Assert.assertThat(exn.toString(), - Matchers.containsString("not a boolean")); - } - } - - @Test - public void testGetLongParameter() throws Exception { - Map<String, Object> o = makeCloudDictionary(); - - Assert.assertEquals( - (Long) 42L, - getLong(o, "singletonLongKey", 666L)); - Assert.assertEquals( - (Integer) 42, - getInt(o, "singletonLongKey", 666)); - Assert.assertEquals( - (Long) 666L, - getLong(o, "missingKey", 666L)); - - try { - getLong(o, "emptyKey", 666L); - Assert.fail("should have thrown an exception"); - } catch (Exception exn) { - Assert.assertThat(exn.toString(), - Matchers.containsString("not a long")); - } - try { - getInt(o, "emptyKey", 666); - Assert.fail("should have thrown an exception"); - } catch (Exception exn) { - Assert.assertThat(exn.toString(), - Matchers.containsString("not an int")); - } - } - - @Test - public void testGetListOfMaps() throws Exception { - Map<String, Object> o = makeCloudDictionary(); - - Assert.assertEquals( - makeCloudObjects(), - getListOfMaps(o, "multipleObjectsKey", null)); - - try { - getListOfMaps(o, "singletonLongKey", null); - Assert.fail("should have thrown an exception"); - } catch (Exception exn) { - Assert.assertThat(exn.toString(), - Matchers.containsString("not a list")); - } - } - - // TODO: Test builder operations. -}