Repository: beam Updated Branches: refs/heads/master 7f50ea2e5 -> 3afd338fd
Copy CloudObject to the Dataflow Module Once migrated on the Dataflow worker, these classes can be removed from the sdk. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/79b364f7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/79b364f7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/79b364f7 Branch: refs/heads/master Commit: 79b364f7df6122e438a4ee0f12b5cdc7cb694d91 Parents: 7f50ea2 Author: Thomas Groh <tg...@google.com> Authored: Mon May 1 14:31:17 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Mon May 1 18:02:23 2017 -0700 ---------------------------------------------------------------------- .../runners/dataflow/util/CloudKnownType.java | 138 ++++++++++ .../beam/runners/dataflow/util/CloudObject.java | 185 +++++++++++++ .../beam/runners/dataflow/util/Serializer.java | 262 +++++++++++++++++++ .../apache/beam/sdk/util/CloudKnownType.java | 7 +- .../org/apache/beam/sdk/util/CloudObject.java | 3 + .../org/apache/beam/sdk/util/CoderUtils.java | 15 +- .../org/apache/beam/sdk/util/Serializer.java | 3 + .../apache/beam/sdk/util/CoderUtilsTest.java | 104 -------- 8 files changed, 600 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java new file mode 100644 index 0000000..ce23a1b --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.util; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; + +/** A utility for manipulating well-known cloud types. */ +enum CloudKnownType { + TEXT("http://schema.org/Text", String.class) { + @Override + public <T> T parse(Object value, Class<T> clazz) { + return clazz.cast(value); + } + }, + BOOLEAN("http://schema.org/Boolean", Boolean.class) { + @Override + public <T> T parse(Object value, Class<T> clazz) { + return clazz.cast(value); + } + }, + INTEGER("http://schema.org/Integer", Long.class, Integer.class) { + @Override + public <T> T parse(Object value, Class<T> clazz) { + Object result = null; + if (value.getClass() == clazz) { + result = value; + } else if (clazz == Long.class) { + if (value instanceof Integer) { + result = ((Integer) value).longValue(); + } else if (value instanceof String) { + result = Long.valueOf((String) value); + } + } else if (clazz == Integer.class) { + if (value instanceof Long) { + result = ((Long) value).intValue(); + } else if (value instanceof String) { + result = Integer.valueOf((String) value); + } + } + return clazz.cast(result); + } + }, + FLOAT("http://schema.org/Float", Double.class, Float.class) { + @Override + public <T> T parse(Object value, Class<T> clazz) { + Object result = null; + if (value.getClass() == clazz) { + result = value; + } else if (clazz == Double.class) { + if (value instanceof Float) { + result = ((Float) value).doubleValue(); + } else if (value instanceof String) { + result = Double.valueOf((String) value); + } + } else if (clazz == Float.class) { + if (value instanceof Double) { + result = ((Double) value).floatValue(); + } else if (value instanceof String) { + result = Float.valueOf((String) value); + } + } + return clazz.cast(result); + } + }; + + private final String uri; + private final Class<?>[] classes; + + CloudKnownType(String uri, Class<?>... classes) { + this.uri = uri; + this.classes = classes; + } + + public String getUri() { + return uri; + } + + public abstract <T> T parse(Object value, Class<T> clazz); + + public Class<?> defaultClass() { + return classes[0]; + } + + private static final Map<String, CloudKnownType> typesByUri = + Collections.unmodifiableMap(buildTypesByUri()); + + private static Map<String, CloudKnownType> buildTypesByUri() { + Map<String, CloudKnownType> result = new HashMap<>(); + for (CloudKnownType ty : CloudKnownType.values()) { + result.put(ty.getUri(), ty); + } + return result; + } + + @Nullable + public static CloudKnownType forUri(@Nullable String uri) { + if (uri == null) { + return null; + } + return typesByUri.get(uri); + } + + private static final Map<Class<?>, CloudKnownType> typesByClass = + Collections.unmodifiableMap(buildTypesByClass()); + + private static Map<Class<?>, CloudKnownType> buildTypesByClass() { + Map<Class<?>, CloudKnownType> result = new HashMap<>(); + for (CloudKnownType ty : CloudKnownType.values()) { + for (Class<?> clazz : ty.classes) { + result.put(clazz, ty); + } + } + return result; + } + + @Nullable + public static CloudKnownType forClass(Class<?> clazz) { + return typesByClass.get(clazz); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java new file mode 100644 index 0000000..e4dd9be --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.util; + + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.json.GenericJson; +import com.google.api.client.util.Key; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.util.PropertyNames; + +/** + * A representation of an arbitrary Java object to be instantiated by Dataflow + * workers. + * + * <p>Typically, an object to be written by the SDK to the Dataflow service will + * implement a method (typically called {@code asCloudObject()}) that returns a + * {@code CloudObject} to represent the object in the protocol. Once the + * {@code CloudObject} is constructed, the method should explicitly add + * additional properties to be presented during deserialization, representing + * child objects by building additional {@code CloudObject}s. + */ +public final class CloudObject extends GenericJson { + /** + * Constructs a {@code CloudObject} by copying the supplied serialized object + * spec, which must represent an SDK object serialized for transport via the + * Dataflow API. + * + * <p>The most common use of this method is during deserialization on the worker, + * where it's used as a binding type during instance construction. + * + * @param spec supplies the serialized form of the object as a nested map + * @throws RuntimeException if the supplied map does not represent an SDK object + */ + public static CloudObject fromSpec(Map<String, Object> spec) { + CloudObject result = new CloudObject(); + result.putAll(spec); + if (result.className == null) { + throw new RuntimeException("Unable to create an SDK object from " + spec + + ": Object class not specified (missing \"" + + PropertyNames.OBJECT_TYPE_NAME + "\" field)"); + } + return result; + } + + /** + * Constructs a {@code CloudObject} to be used for serializing an instance of + * the supplied class for transport via the Dataflow API. The instance + * parameters to be serialized must be supplied explicitly after the + * {@code CloudObject} is created, by using {@link CloudObject#put}. + * + * @param cls the class to use when deserializing the object on the worker + */ + public static CloudObject forClass(Class<?> cls) { + CloudObject result = new CloudObject(); + result.className = checkNotNull(cls).getName(); + return result; + } + + /** + * Constructs a {@code CloudObject} to be used for serializing data to be + * deserialized using the supplied class name the supplied class name for + * transport via the Dataflow API. The instance parameters to be serialized + * must be supplied explicitly after the {@code CloudObject} is created, by + * using {@link CloudObject#put}. + * + * @param className the class to use when deserializing the object on the worker + */ + public static CloudObject forClassName(String className) { + CloudObject result = new CloudObject(); + result.className = checkNotNull(className); + return result; + } + + /** + * Constructs a {@code CloudObject} representing the given value. + * @param value the scalar value to represent. + */ + public static CloudObject forString(String value) { + CloudObject result = forClassName(CloudKnownType.TEXT.getUri()); + result.put(PropertyNames.SCALAR_FIELD_NAME, value); + return result; + } + + /** + * Constructs a {@code CloudObject} representing the given value. + * @param value the scalar value to represent. + */ + public static CloudObject forBoolean(Boolean value) { + CloudObject result = forClassName(CloudKnownType.BOOLEAN.getUri()); + result.put(PropertyNames.SCALAR_FIELD_NAME, value); + return result; + } + + /** + * Constructs a {@code CloudObject} representing the given value. + * @param value the scalar value to represent. + */ + public static CloudObject forInteger(Long value) { + CloudObject result = forClassName(CloudKnownType.INTEGER.getUri()); + result.put(PropertyNames.SCALAR_FIELD_NAME, value); + return result; + } + + /** + * Constructs a {@code CloudObject} representing the given value. + * @param value the scalar value to represent. + */ + public static CloudObject forInteger(Integer value) { + CloudObject result = forClassName(CloudKnownType.INTEGER.getUri()); + result.put(PropertyNames.SCALAR_FIELD_NAME, value); + return result; + } + + /** + * Constructs a {@code CloudObject} representing the given value. + * @param value the scalar value to represent. + */ + public static CloudObject forFloat(Float value) { + CloudObject result = forClassName(CloudKnownType.FLOAT.getUri()); + result.put(PropertyNames.SCALAR_FIELD_NAME, value); + return result; + } + + /** + * Constructs a {@code CloudObject} representing the given value. + * @param value the scalar value to represent. + */ + public static CloudObject forFloat(Double value) { + CloudObject result = forClassName(CloudKnownType.FLOAT.getUri()); + result.put(PropertyNames.SCALAR_FIELD_NAME, value); + return result; + } + + /** + * Constructs a {@code CloudObject} representing the given value of a + * well-known cloud object type. + * @param value the scalar value to represent. + * @throws RuntimeException if the value does not have a + * {@link CloudKnownType} mapping + */ + public static CloudObject forKnownType(Object value) { + @Nullable CloudKnownType ty = CloudKnownType.forClass(value.getClass()); + if (ty == null) { + throw new RuntimeException("Unable to represent value via the Dataflow API: " + value); + } + CloudObject result = forClassName(ty.getUri()); + result.put(PropertyNames.SCALAR_FIELD_NAME, value); + return result; + } + + @Key(PropertyNames.OBJECT_TYPE_NAME) + private String className; + + private CloudObject() {} + + /** + * Gets the name of the Java class that this CloudObject represents. + */ + public String getClassName() { + return className; + } + + @Override + public CloudObject clone() { + return (CloudObject) super.clone(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java new file mode 100644 index 0000000..e2bcafe --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.util; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeInfo.As; +import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DatabindContext; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; +import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.type.TypeFactory; +import com.google.common.collect.ImmutableMap; +import java.lang.reflect.TypeVariable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * Utility for converting objects between Java and Cloud representations. + * + * @deprecated Will no longer be used once all coders are converted via {@link CloudObjects}. + */ +@Deprecated +public final class Serializer { + /** A mapping from well known coder types to their implementing classes. */ + private static final Map<String, Class<?>> WELL_KNOWN_CODER_TYPES = + ImmutableMap.<String, Class<?>>builder() + .put("kind:pair", KvCoder.class) + .put("kind:stream", IterableCoder.class) + .put("kind:global_window", GlobalWindow.Coder.class) + .put("kind:interval_window", IntervalWindow.IntervalWindowCoder.class) + .put("kind:length_prefix", LengthPrefixCoder.class) + .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class) + .build(); + + // Delay initialization of statics until the first call to Serializer. + private static class SingletonHelper { + static final ObjectMapper OBJECT_MAPPER = createObjectMapper(); + static final ObjectMapper TREE_MAPPER = createTreeMapper(); + + /** + * Creates the object mapper that will be used for serializing Google API + * client maps into Jackson trees. + */ + private static ObjectMapper createTreeMapper() { + return new ObjectMapper(); + } + + /** + * Creates the object mapper that will be used for deserializing Jackson + * trees into objects. + */ + private static ObjectMapper createObjectMapper() { + ObjectMapper m = new ObjectMapper(); + // Ignore properties that are not used by the object. + m.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + + // For parameters of type Object, use the @type property to determine the + // class to instantiate. + // + // TODO: It would be ideal to do this for all non-final classes. The + // problem with using DefaultTyping.NON_FINAL is that it insists on having + // type information in the JSON for classes with useful default + // implementations, such as List. Ideally, we'd combine these defaults + // with available type information if that information's present. + m.enableDefaultTypingAsProperty( + ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, + PropertyNames.OBJECT_TYPE_NAME); + + m.registerModule(new Jackson2Module()); + + return m; + } + } + + /** + * Deserializes an object from a Dataflow structured encoding (represented in + * Java as a map). + * + * <p>The standard Dataflow SDK object serialization protocol is based on JSON. + * Data is typically encoded as a JSON object whose fields represent the + * object's data. + * + * <p>The actual deserialization is performed by Jackson, which can deserialize + * public fields, use JavaBean setters, or use injection annotations to + * indicate how to construct the object. The {@link ObjectMapper} used is + * configured to use the "@type" field as the name of the class to instantiate + * (supporting polymorphic types), and may be further configured by + * annotations or via {@link ObjectMapper#registerModule}. + * + * @see <a href="http://wiki.fasterxml.com/JacksonFAQ#Data_Binding.2C_general"> + * Jackson Data-Binding</a> + * @see <a href="https://github.com/FasterXML/jackson-annotations/wiki/Jackson-Annotations"> + * Jackson-Annotations</a> + * @param serialized the object in untyped decoded form (i.e. a nested {@link Map}) + * @param clazz the expected object class + */ + public static <T> T deserialize(Map<String, Object> serialized, Class<T> clazz) { + try { + return SingletonHelper.OBJECT_MAPPER.treeToValue( + SingletonHelper.TREE_MAPPER.valueToTree( + deserializeCloudKnownTypes(serialized)), + clazz); + } catch (JsonProcessingException e) { + throw new RuntimeException( + "Unable to deserialize class " + clazz, e); + } + } + + /** + * Recursively walks the supplied map, looking for well-known cloud type information (keyed as + * {@link PropertyNames#OBJECT_TYPE_NAME}, matching a URI value from the {@link CloudKnownType} + * enum. Upon finding this type information, it converts it into the correspondingly typed Java + * value. + */ + @SuppressWarnings("unchecked") + private static Object deserializeCloudKnownTypes(Object src) { + if (src instanceof Map) { + Map<String, Object> srcMap = (Map<String, Object>) src; + @Nullable Object value = srcMap.get(PropertyNames.SCALAR_FIELD_NAME); + @Nullable CloudKnownType type = + CloudKnownType.forUri((String) srcMap.get(PropertyNames.OBJECT_TYPE_NAME)); + if (type != null && value != null) { + // It's a value of a well-known cloud type; let the known type handler + // handle the translation. + Object result = type.parse(value, type.defaultClass()); + return result; + } + // Otherwise, it's just an ordinary map. + Map<String, Object> dest = new HashMap<>(srcMap.size()); + for (Map.Entry<String, Object> entry : srcMap.entrySet()) { + dest.put(entry.getKey(), deserializeCloudKnownTypes(entry.getValue())); + } + return dest; + } + if (src instanceof List) { + List<Object> srcList = (List<Object>) src; + List<Object> dest = new ArrayList<>(srcList.size()); + for (Object obj : srcList) { + dest.add(deserializeCloudKnownTypes(obj)); + } + return dest; + } + // Neither a Map nor a List; no translation needed. + return src; + } + + /** + * A {@link com.fasterxml.jackson.databind.Module} that adds the type + * resolver needed for Coder definitions. + */ + static final class Jackson2Module extends SimpleModule { + /** + * The Coder custom type resolver. + * + * <p>This resolver resolves coders. If the Coder ID is a particular + * well-known identifier, it's replaced with the corresponding class. + * All other Coder instances are resolved by class name, using the package + * org.apache.beam.sdk.coders if there are no "."s in the ID. + */ + private static final class Resolver extends TypeIdResolverBase { + @SuppressWarnings("unused") // Used via @JsonTypeIdResolver annotation on Mixin + public Resolver() { + super(TypeFactory.defaultInstance().constructType(Coder.class), + TypeFactory.defaultInstance()); + } + + @Override + public JavaType typeFromId(DatabindContext context, String id) { + Class<?> clazz = getClassForId(id); + @SuppressWarnings("rawtypes") + TypeVariable[] tvs = clazz.getTypeParameters(); + JavaType[] types = new JavaType[tvs.length]; + for (int lupe = 0; lupe < tvs.length; lupe++) { + types[lupe] = TypeFactory.unknownType(); + } + return _typeFactory.constructSimpleType(clazz, types); + } + + private Class<?> getClassForId(String id) { + try { + if (id.contains(".")) { + return Class.forName(id); + } + + if (WELL_KNOWN_CODER_TYPES.containsKey(id)) { + return WELL_KNOWN_CODER_TYPES.get(id); + } + + // Otherwise, see if the ID is the name of a class in + // org.apache.beam.sdk.coders. We do this via creating + // the class object so that class loaders have a chance to get + // involved -- and since we need the class object anyway. + return Class.forName(Coder.class.getPackage().getName() + "." + id); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Unable to convert coder ID " + id + " to class", e); + } + } + + @Override + public String idFromValueAndType(Object o, Class<?> clazz) { + return clazz.getName(); + } + + @Override + public String idFromValue(Object o) { + return o.getClass().getName(); + } + + @Override + public JsonTypeInfo.Id getMechanism() { + return JsonTypeInfo.Id.CUSTOM; + } + } + + /** + * The mixin class defining how Coders are handled by the deserialization + * {@link ObjectMapper}. + * + * <p>This is done via a mixin so that this resolver is <i>only</i> used + * during deserialization requested by the Apache Beam SDK. + */ + @JsonTypeIdResolver(Resolver.class) + @JsonTypeInfo(use = Id.CUSTOM, include = As.PROPERTY, property = PropertyNames.OBJECT_TYPE_NAME) + private static final class Mixin {} + + public Jackson2Module() { + super("BeamCoders"); + setMixInAnnotation(Coder.class, Mixin.class); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java index 430319b..c9e7427 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java @@ -22,7 +22,12 @@ import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; -/** A utility for manipulating well-known cloud types. */ +/** + * A utility for manipulating well-known cloud types. + * + * @deprecated replaced by {@code org.apache.beam.runners.dataflow.CloudKnownType} + */ +@Deprecated enum CloudKnownType { TEXT("http://schema.org/Text", String.class) { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java index 9cab453..061e56a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java @@ -35,7 +35,10 @@ import javax.annotation.Nullable; * {@code CloudObject} is constructed, the method should explicitly add * additional properties to be presented during deserialization, representing * child objects by building additional {@code CloudObject}s. + * + * @deprecated replaced by {@code org.apache.beam.runners.dataflow.CloudKnownType} */ +@Deprecated public final class CloudObject extends GenericJson { /** * Constructs a {@code CloudObject} by copying the supplied serialized object http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java index 857f903..2d21561 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.util; -import static org.apache.beam.sdk.util.Structs.addList; - import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo.As; import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; @@ -217,20 +215,13 @@ public final class CoderUtils { return codedType; } - public static CloudObject makeCloudEncoding( - String type, - CloudObject... componentSpecs) { - CloudObject encoding = CloudObject.forClassName(type); - if (componentSpecs.length > 0) { - addList(encoding, PropertyNames.COMPONENT_ENCODINGS, componentSpecs); - } - return encoding; - } - /** * A {@link com.fasterxml.jackson.databind.Module} that adds the type * resolver needed for Coder definitions. + * + * <p>Used only in {@link Serializer}, which will move modules */ + @Deprecated static final class Jackson2Module extends SimpleModule { /** * The Coder custom type resolver. http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java index 86a3b8e..166e4e7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java @@ -28,7 +28,10 @@ import javax.annotation.Nullable; /** * Utility for converting objects between Java and Cloud representations. + * + * @deprecated replaced by {@code org.apache.beam.runners.dataflow.util.Serializer} */ +@Deprecated public final class Serializer { // Delay initialization of statics until the first call to Serializer. private static class SingletonHelper { http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java index 32c2af4..0db5355 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.util; -import static org.apache.beam.sdk.util.CoderUtils.makeCloudEncoding; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doThrow; @@ -25,18 +24,11 @@ import static org.mockito.Mockito.mock; import java.io.InputStream; import java.io.OutputStream; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.testing.CoderPropertiesTest.ClosingCoder; -import org.hamcrest.CoreMatchers; -import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -89,102 +81,6 @@ public class CoderUtilsTest { } @Test - public void testCreateAtomicCoders() throws Exception { - Assert.assertEquals( - BigEndianIntegerCoder.of(), - Serializer.deserialize(makeCloudEncoding("BigEndianIntegerCoder"), Coder.class)); - Assert.assertEquals( - StringUtf8Coder.of(), - Serializer.deserialize( - makeCloudEncoding(StringUtf8Coder.class.getName()), Coder.class)); - Assert.assertEquals( - VoidCoder.of(), - Serializer.deserialize(makeCloudEncoding("VoidCoder"), Coder.class)); - Assert.assertEquals( - TestCoder.of(), - Serializer.deserialize(makeCloudEncoding(TestCoder.class.getName()), Coder.class)); - } - - @Test - public void testCreateCompositeCoders() throws Exception { - Assert.assertEquals( - IterableCoder.of(StringUtf8Coder.of()), - Serializer.deserialize( - makeCloudEncoding("IterableCoder", - makeCloudEncoding("StringUtf8Coder")), Coder.class)); - Assert.assertEquals( - KvCoder.of(BigEndianIntegerCoder.of(), VoidCoder.of()), - Serializer.deserialize( - makeCloudEncoding( - "KvCoder", - makeCloudEncoding(BigEndianIntegerCoder.class.getName()), - makeCloudEncoding("VoidCoder")), Coder.class)); - Assert.assertEquals( - IterableCoder.of( - KvCoder.of(IterableCoder.of(BigEndianIntegerCoder.of()), - KvCoder.of(VoidCoder.of(), - TestCoder.of()))), - Serializer.deserialize( - makeCloudEncoding( - IterableCoder.class.getName(), - makeCloudEncoding( - KvCoder.class.getName(), - makeCloudEncoding( - "IterableCoder", - makeCloudEncoding("BigEndianIntegerCoder")), - makeCloudEncoding( - "KvCoder", - makeCloudEncoding("VoidCoder"), - makeCloudEncoding(TestCoder.class.getName())))), Coder.class)); - } - - @Test - public void testCreateUntypedCoders() throws Exception { - Assert.assertEquals( - IterableCoder.of(StringUtf8Coder.of()), - Serializer.deserialize( - makeCloudEncoding( - "kind:stream", - makeCloudEncoding("StringUtf8Coder")), Coder.class)); - Assert.assertEquals( - KvCoder.of(BigEndianIntegerCoder.of(), VoidCoder.of()), - Serializer.deserialize( - makeCloudEncoding( - "kind:pair", - makeCloudEncoding(BigEndianIntegerCoder.class.getName()), - makeCloudEncoding("VoidCoder")), Coder.class)); - Assert.assertEquals( - IterableCoder.of( - KvCoder.of(IterableCoder.of(BigEndianIntegerCoder.of()), - KvCoder.of(VoidCoder.of(), - TestCoder.of()))), - Serializer.deserialize( - makeCloudEncoding( - "kind:stream", - makeCloudEncoding( - "kind:pair", - makeCloudEncoding( - "kind:stream", - makeCloudEncoding("BigEndianIntegerCoder")), - makeCloudEncoding( - "kind:pair", - makeCloudEncoding("VoidCoder"), - makeCloudEncoding(TestCoder.class.getName())))), Coder.class)); - } - - @Test - public void testCreateUnknownCoder() throws Exception { - try { - Serializer.deserialize(makeCloudEncoding("UnknownCoder"), Coder.class); - Assert.fail("should have thrown an exception"); - } catch (Exception exn) { - Assert.assertThat(exn.toString(), - CoreMatchers.containsString( - "Unable to convert coder ID UnknownCoder to class")); - } - } - - @Test public void testClosingCoderFailsWhenDecodingBase64() throws Exception { expectedException.expect(UnsupportedOperationException.class); expectedException.expectMessage("Caller does not own the underlying");