ATLAS-631 Introduce Versioning to Atlas Notification Payload (tbeerbower via shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/b2ae1371 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/b2ae1371 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/b2ae1371 Branch: refs/heads/master Commit: b2ae1371be24cfcb13f12dcb4ebad6920b9bfd80 Parents: 73640cc Author: Shwetha GS <[email protected]> Authored: Wed May 4 11:38:06 2016 +0530 Committer: Shwetha GS <[email protected]> Committed: Wed May 4 11:38:06 2016 +0530 ---------------------------------------------------------------------- .../org/apache/atlas/kafka/KafkaConsumer.java | 16 +- .../apache/atlas/kafka/KafkaNotification.java | 51 ++-- .../AbstractMessageDeserializer.java | 165 ++++++++++++ .../notification/AbstractNotification.java | 107 +++++++- .../AbstractNotificationConsumer.java | 154 +---------- .../IncompatibleVersionException.java | 32 +++ .../atlas/notification/MessageDeserializer.java | 33 +++ .../atlas/notification/MessageVersion.java | 133 ++++++++++ .../notification/NotificationInterface.java | 44 +++- .../atlas/notification/VersionedMessage.java | 75 ++++++ .../VersionedMessageDeserializer.java | 107 ++++++++ .../entity/EntityMessageDeserializer.java | 76 ++++++ .../hook/HookMessageDeserializer.java | 60 +++++ .../apache/atlas/kafka/KafkaConsumerTest.java | 176 +++++++++++++ .../atlas/kafka/KafkaNotificationTest.java | 7 +- .../AbstractNotificationConsumerTest.java | 264 +++++++++++++++++++ .../notification/AbstractNotificationTest.java | 120 +++++++++ .../atlas/notification/MessageVersionTest.java | 125 +++++++++ .../notification/VersionedMessageTest.java | 57 ++++ .../entity/EntityMessageDeserializerTest.java | 61 +++++ .../entity/EntityNotificationImplTest.java | 2 +- .../hook/HookMessageDeserializerTest.java | 70 +++++ .../notification/hook/HookNotificationTest.java | 20 +- release-log.txt | 1 + 24 files changed, 1758 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java index 029a072..f1c9742 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java @@ -21,6 +21,7 @@ import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.message.MessageAndMetadata; import org.apache.atlas.notification.AbstractNotificationConsumer; +import org.apache.atlas.notification.MessageDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,13 +42,16 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> { /** * Create a Kafka consumer. * - * @param type the notification type returned by this consumer - * @param stream the underlying Kafka stream - * @param consumerId an id value for this consumer + * @param type the notification type returned by this consumer + * @param deserializer the message deserializer used for this consumer + * @param stream the underlying Kafka stream + * @param consumerId an id value for this consumer */ - public KafkaConsumer(Class<T> type, KafkaStream<String, String> stream, int consumerId) { - super(type); - this.iterator = stream.iterator(); + public KafkaConsumer(Class<T> type, + MessageDeserializer<T> deserializer, KafkaStream<String, String> stream, int consumerId) { + super(deserializer); + + this.iterator = stream.iterator(); this.consumerId = consumerId; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 889af11..cfffec4 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -28,6 +28,7 @@ import kafka.utils.Time; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.notification.AbstractNotification; +import org.apache.atlas.notification.MessageDeserializer; import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationException; import org.apache.atlas.service.Service; @@ -172,7 +173,10 @@ public class KafkaNotification extends AbstractNotification implements Service { List<NotificationConsumer<T>> consumers = new ArrayList<>(numConsumers); int consumerId = 0; for (KafkaStream stream : kafkaConsumers) { - consumers.add(createKafkaConsumer(notificationType.getClassType(), stream, consumerId++)); + KafkaConsumer<T> kafkaConsumer = + createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(), + stream, consumerId++); + consumers.add(kafkaConsumer); } consumerConnectors.add(consumerConnector); @@ -180,6 +184,22 @@ public class KafkaNotification extends AbstractNotification implements Service { } @Override + public void close() { + if (producer != null) { + producer.close(); + producer = null; + } + + for (ConsumerConnector consumerConnector : consumerConnectors) { + consumerConnector.shutdown(); + } + consumerConnectors.clear(); + } + + + // ----- AbstractNotification -------------------------------------------- + + @Override public void sendInternal(NotificationType type, String... messages) throws NotificationException { if (producer == null) { createProducer(); @@ -197,27 +217,13 @@ public class KafkaNotification extends AbstractNotification implements Service { try { RecordMetadata response = future.get(); LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(), - response.partition(), response.offset()); + response.partition(), response.offset()); } catch (Exception e) { throw new NotificationException(e); } } } - @Override - public void close() { - if (producer != null) { - producer.close(); - producer = null; - } - - for (ConsumerConnector consumerConnector : consumerConnectors) { - consumerConnector.shutdown(); - } - consumerConnectors.clear(); - } - - // ----- helper methods -------------------------------------------------- /** @@ -234,14 +240,17 @@ public class KafkaNotification extends AbstractNotification implements Service { /** * Create a Kafka consumer from the given Kafka stream. * - * @param stream the Kafka stream - * @param consumerId the id for the new consumer + * @param type the notification type to be returned by the consumer + * @param deserializer the deserializer for the created consumers + * @param stream the Kafka stream + * @param consumerId the id for the new consumer * * @return a new Kafka consumer */ - protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, KafkaStream stream, - int consumerId) { - return new org.apache.atlas.kafka.KafkaConsumer<T>(type, stream, consumerId); + protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, + MessageDeserializer<T> deserializer, KafkaStream stream, + int consumerId) { + return new org.apache.atlas.kafka.KafkaConsumer<T>(type, deserializer, stream, consumerId); } // Get properties for consumer request http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java new file mode 100644 index 0000000..9585827 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonParseException; +import com.google.gson.reflect.TypeToken; +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.atlas.typesystem.IStruct; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.Struct; +import org.apache.atlas.typesystem.json.InstanceSerialization; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.slf4j.Logger; + +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Base notification message deserializer. + */ +public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDeserializer<T> { + + private static final Map<Type, JsonDeserializer> DESERIALIZER_MAP = new HashMap<>(); + + static { + DESERIALIZER_MAP.put(ImmutableList.class, new ImmutableListDeserializer()); + DESERIALIZER_MAP.put(ImmutableMap.class, new ImmutableMapDeserializer()); + DESERIALIZER_MAP.put(JSONArray.class, new JSONArrayDeserializer()); + DESERIALIZER_MAP.put(IStruct.class, new StructDeserializer()); + DESERIALIZER_MAP.put(IReferenceableInstance.class, new ReferenceableDeserializer()); + DESERIALIZER_MAP.put(Referenceable.class, new ReferenceableDeserializer()); + } + + + // ----- Constructors ---------------------------------------------------- + + /** + * Create a deserializer. + * + * @param versionedMessageType the type of the versioned message + * @param expectedVersion the expected message version + * @param deserializerMap map of individual deserializers used to define this message deserializer + * @param notificationLogger logger for message version mismatch + */ + public AbstractMessageDeserializer(Type versionedMessageType, + MessageVersion expectedVersion, + Map<Type, JsonDeserializer> deserializerMap, + Logger notificationLogger) { + super(versionedMessageType, expectedVersion, getDeserializer(deserializerMap), notificationLogger); + } + + + // ----- helper methods -------------------------------------------------- + + private static Gson getDeserializer(Map<Type, JsonDeserializer> deserializerMap) { + GsonBuilder builder = new GsonBuilder(); + + for (Map.Entry<Type, JsonDeserializer> entry : DESERIALIZER_MAP.entrySet()) { + builder.registerTypeAdapter(entry.getKey(), entry.getValue()); + } + + for (Map.Entry<Type, JsonDeserializer> entry : deserializerMap.entrySet()) { + builder.registerTypeAdapter(entry.getKey(), entry.getValue()); + } + return builder.create(); + } + + + // ----- deserializer classes -------------------------------------------- + + /** + * Deserializer for ImmutableList. + */ + protected static class ImmutableListDeserializer implements JsonDeserializer<ImmutableList<?>> { + public static final Type LIST_TYPE = new TypeToken<List<?>>() { + }.getType(); + + @Override + public ImmutableList<?> deserialize(JsonElement json, Type type, + JsonDeserializationContext context) { + final List<?> list = context.deserialize(json, LIST_TYPE); + return ImmutableList.copyOf(list); + } + } + + /** + * Deserializer for ImmutableMap. + */ + protected static class ImmutableMapDeserializer implements JsonDeserializer<ImmutableMap<?, ?>> { + + public static final Type MAP_TYPE = new TypeToken<Map<?, ?>>() { + }.getType(); + + @Override + public ImmutableMap<?, ?> deserialize(JsonElement json, Type type, + JsonDeserializationContext context) { + final Map<?, ?> map = context.deserialize(json, MAP_TYPE); + return ImmutableMap.copyOf(map); + } + } + + /** + * Deserializer for JSONArray. + */ + protected static final class JSONArrayDeserializer implements JsonDeserializer<JSONArray> { + @Override + public JSONArray deserialize(final JsonElement json, final Type type, + final JsonDeserializationContext context) { + try { + return new JSONArray(json.toString()); + } catch (JSONException e) { + throw new JsonParseException(e.getMessage(), e); + } + } + } + + /** + * Deserializer for Struct. + */ + protected static final class StructDeserializer implements JsonDeserializer<IStruct> { + @Override + public IStruct deserialize(final JsonElement json, final Type type, + final JsonDeserializationContext context) { + return context.deserialize(json, Struct.class); + } + } + + /** + * Deserializer for Referenceable. + */ + protected static final class ReferenceableDeserializer implements JsonDeserializer<IReferenceableInstance> { + @Override + public IReferenceableInstance deserialize(final JsonElement json, final Type type, + final JsonDeserializationContext context) { + + return InstanceSerialization.fromJsonReferenceable(json.toString(), true); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java index 596f988..7d22126 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java @@ -17,10 +17,22 @@ */ package org.apache.atlas.notification; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; import org.apache.atlas.AtlasException; import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.commons.configuration.Configuration; +import org.codehaus.jettison.json.JSONArray; + +import java.lang.reflect.Type; import java.util.Arrays; import java.util.List; @@ -29,12 +41,26 @@ import java.util.List; */ public abstract class AbstractNotification implements NotificationInterface { + /** + * The current expected version for notification messages. + */ + public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0"); + private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded"; private final boolean embedded; private final boolean isHAEnabled; + /** + * Used for message serialization. + */ + public static final Gson GSON = new GsonBuilder(). + registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializer()). + registerTypeAdapter(Referenceable.class, new ReferenceableSerializer()). + registerTypeAdapter(JSONArray.class, new JSONArraySerializer()). + create(); + - // ----- Constructors ------------------------------------------------------ + // ----- Constructors ---------------------------------------------------- public AbstractNotification(Configuration applicationProperties) throws AtlasException { this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false); @@ -42,7 +68,23 @@ public abstract class AbstractNotification implements NotificationInterface { } - // ----- AbstractNotificationInterface ------------------------------------- + // ----- NotificationInterface ------------------------------------------- + + @Override + public <T> void send(NotificationType type, List<T> messages) throws NotificationException { + String[] strMessages = new String[messages.size()]; + for (int index = 0; index < messages.size(); index++) { + strMessages[index] = getMessageJson(messages.get(index)); + } + sendInternal(type, strMessages); + } + + @Override + public <T> void send(NotificationType type, T... messages) throws NotificationException { + send(type, Arrays.asList(messages)); + } + + // ----- AbstractNotification -------------------------------------------- /** * Determine whether or not the notification service embedded in Atlas server. @@ -53,23 +95,62 @@ public abstract class AbstractNotification implements NotificationInterface { return embedded; } + /** + * Determine whether or not the high availability feature is enabled. + * + * @return true if the high availability feature is enabled. + */ protected final boolean isHAEnabled() { return isHAEnabled; } - @Override - public <T> void send(NotificationType type, List<T> messages) throws NotificationException { - String[] strMessages = new String[messages.size()]; - for (int index = 0; index < messages.size(); index++) { - strMessages[index] = AbstractNotificationConsumer.GSON.toJson(messages.get(index)); - } - sendInternal(type, strMessages); + /** + * Send the given messages. + * + * @param type the message type + * @param messages the array of messages to send + * + * @throws NotificationException if an error occurs while sending + */ + protected abstract void sendInternal(NotificationType type, String[] messages) throws NotificationException; + + + // ----- utility methods ------------------------------------------------- + + /** + * Get the notification message JSON from the given object. + * + * @param message the message in object form + * + * @return the message as a JSON string + */ + public static String getMessageJson(Object message) { + VersionedMessage<?> versionedMessage = new VersionedMessage<>(CURRENT_MESSAGE_VERSION, message); + + return GSON.toJson(versionedMessage); } - @Override - public <T> void send(NotificationType type, T... messages) throws NotificationException { - send(type, Arrays.asList(messages)); + + // ----- serializers ----------------------------------------------------- + + /** + * Serializer for Referenceable. + */ + public static final class ReferenceableSerializer implements JsonSerializer<IReferenceableInstance> { + @Override + public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) { + String instanceJson = InstanceSerialization.toJson(src, true); + return new JsonParser().parse(instanceJson).getAsJsonObject(); + } } - protected abstract void sendInternal(NotificationType type, String[] messages) throws NotificationException; + /** + * Serializer for JSONArray. + */ + public static final class JSONArraySerializer implements JsonSerializer<JSONArray> { + @Override + public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) { + return new JsonParser().parse(src.toString()).getAsJsonArray(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java index 1cadb99..f00bbca 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java @@ -17,50 +17,15 @@ */ package org.apache.atlas.notification; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonParseException; -import com.google.gson.JsonParser; -import com.google.gson.JsonSerializationContext; -import com.google.gson.JsonSerializer; -import com.google.gson.reflect.TypeToken; -import org.apache.atlas.notification.entity.EntityNotification; -import org.apache.atlas.notification.entity.EntityNotificationImpl; -import org.apache.atlas.notification.hook.HookNotification; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.Struct; -import org.apache.atlas.typesystem.json.InstanceSerialization; -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONException; - -import java.lang.reflect.Type; -import java.util.List; -import java.util.Map; - /** * Abstract notification consumer. */ public abstract class AbstractNotificationConsumer<T> implements NotificationConsumer<T> { - public static final Gson GSON = new GsonBuilder(). - registerTypeAdapter(ImmutableList.class, new ImmutableListDeserializer()). - registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()). - registerTypeAdapter(EntityNotification.class, new EntityNotificationDeserializer()). - registerTypeAdapter(IStruct.class, new StructDeserializer()). - registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializerDeserializer()). - registerTypeAdapter(Referenceable.class, new ReferenceableSerializerDeserializer()). - registerTypeAdapter(JSONArray.class, new JSONArraySerializerDeserializer()). - registerTypeAdapter(HookNotification.HookNotificationMessage.class, new HookNotification()). - create(); - - private final Class<T> type; + /** + * Deserializer used to deserialize notification messages for this consumer. + */ + private final MessageDeserializer<T> deserializer; // ----- Constructors ---------------------------------------------------- @@ -68,10 +33,10 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon /** * Construct an AbstractNotificationConsumer. * - * @param type the notification type + * @param deserializer the message deserializer used by this consumer */ - public AbstractNotificationConsumer(Class<T> type) { - this.type = type; + public AbstractNotificationConsumer(MessageDeserializer<T> deserializer) { + this.deserializer = deserializer; } @@ -96,112 +61,11 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon @Override public T next() { - return GSON.fromJson(getNext(), type); + return deserializer.deserialize(getNext()); } @Override public T peek() { - return GSON.fromJson(peekMessage(), type); - } - - - /** - * Deserializer for ImmutableList used by AbstractNotificationConsumer.GSON. - */ - public static class ImmutableListDeserializer implements JsonDeserializer<ImmutableList<?>> { - public static final Type LIST_TYPE = new TypeToken<List<?>>() { - }.getType(); - - @Override - public ImmutableList<?> deserialize(JsonElement json, Type type, - JsonDeserializationContext context) { - final List<?> list = context.deserialize(json, LIST_TYPE); - return ImmutableList.copyOf(list); - } - } - - /** - * Deserializer for ImmutableMap used by AbstractNotificationConsumer.GSON. - */ - public static class ImmutableMapDeserializer implements JsonDeserializer<ImmutableMap<?, ?>> { - - public static final Type MAP_TYPE = new TypeToken<Map<?, ?>>() { - }.getType(); - - @Override - public ImmutableMap<?, ?> deserialize(JsonElement json, Type type, - JsonDeserializationContext context) { - final Map<?, ?> map = context.deserialize(json, MAP_TYPE); - return ImmutableMap.copyOf(map); - } - } - - - /** - * Deserializer for EntityNotification used by AbstractNotificationConsumer.GSON. - */ - public static final class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> { - @Override - public EntityNotification deserialize(final JsonElement json, final Type type, - final JsonDeserializationContext context) { - return context.deserialize(json, EntityNotificationImpl.class); - } - } - - /** - * Serde for Struct used by AbstractNotificationConsumer.GSON. - */ - public static final class StructDeserializer implements JsonDeserializer<IStruct>, JsonSerializer<IStruct> { - @Override - public IStruct deserialize(final JsonElement json, final Type type, - final JsonDeserializationContext context) { - return context.deserialize(json, Struct.class); - } - - @Override - public JsonElement serialize(IStruct src, Type typeOfSrc, JsonSerializationContext context) { - String instanceJson = InstanceSerialization.toJson(src, true); - return new JsonParser().parse(instanceJson).getAsJsonObject(); - } - } - - /** - * Serde for Referenceable used by AbstractNotificationConsumer.GSON. - */ - public static final class ReferenceableSerializerDeserializer implements JsonDeserializer<IStruct>, - JsonSerializer<IReferenceableInstance> { - @Override - public IReferenceableInstance deserialize(final JsonElement json, final Type type, - final JsonDeserializationContext context) { - - return InstanceSerialization.fromJsonReferenceable(json.toString(), true); - } - - @Override - public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) { - String instanceJson = InstanceSerialization.toJson(src, true); - return new JsonParser().parse(instanceJson).getAsJsonObject(); - } - } - - /** - * Serde for JSONArray used by AbstractNotificationConsumer.GSON. - */ - public static final class JSONArraySerializerDeserializer implements JsonDeserializer<JSONArray>, - JsonSerializer<JSONArray> { - @Override - public JSONArray deserialize(final JsonElement json, final Type type, - final JsonDeserializationContext context) { - try { - return new JSONArray(json.toString()); - } catch (JSONException e) { - throw new JsonParseException(e.getMessage(), e); - } - } - - @Override - public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) { - return new JsonParser().parse(src.toString()).getAsJsonArray(); - } + return deserializer.deserialize(peekMessage()); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/IncompatibleVersionException.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/IncompatibleVersionException.java b/notification/src/main/java/org/apache/atlas/notification/IncompatibleVersionException.java new file mode 100644 index 0000000..6a59014 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/IncompatibleVersionException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +/** + * Exception thrown when notification message is consumed that has a version that is incompatable with + * the expected version. + */ +public class IncompatibleVersionException extends RuntimeException { + + // ----- Constructors ---------------------------------------------------- + + public IncompatibleVersionException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/MessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/MessageDeserializer.java new file mode 100644 index 0000000..7778908 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/MessageDeserializer.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +/** + * Deserializer for JSON messages. + */ +public interface MessageDeserializer<T> { + /** + * Get a message of type T from the given JSON message string. + * + * @param json the JSON message + * + * @return the message deserialized from the given JSON + */ + T deserialize(String json); +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java new file mode 100644 index 0000000..3f16a9a --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import java.util.ArrayList; +import java.util.Arrays; + +/** + * Represents the version of a notification message. + */ +public class MessageVersion implements Comparable<MessageVersion> { + /** + * Used for message with no version (old format). + */ + public static final MessageVersion NO_VERSION = new MessageVersion("0"); + + private final String version; + + + // ----- Constructors ---------------------------------------------------- + + /** + * Create a message version. + * + * @param version the version string + */ + public MessageVersion(String version) { + this.version = version; + + try { + getVersionParts(); + } catch (NumberFormatException e) { + throw new IllegalArgumentException(String.format("Invalid version string : %s.", version), e); + } + } + + + // ----- Comparable ------------------------------------------------------ + + @Override + public int compareTo(MessageVersion that) { + if (that == null) { + return 1; + } + + Integer[] thisParts = getVersionParts(); + Integer[] thatParts = that.getVersionParts(); + + int length = Math.max(thisParts.length, thatParts.length); + + for (int i = 0; i < length; i++) { + + int comp = getVersionPart(thisParts, i) - getVersionPart(thatParts, i); + + if (comp != 0) { + return comp; + } + } + return 0; + } + + + // ----- Object overrides ------------------------------------------------ + + @Override + public boolean equals(Object that) { + if (this == that){ + return true; + } + + if (that == null || getClass() != that.getClass()) { + return false; + } + + return compareTo((MessageVersion) that) == 0; + } + + @Override + public int hashCode() { + return Arrays.hashCode(getVersionParts()); + } + + + // ----- helper methods -------------------------------------------------- + + /** + * Get the version parts array by splitting the version string. + * Strip the trailing zeros (i.e. '1.0.0' equals '1'). + * + * @return the version parts array + */ + protected Integer[] getVersionParts() { + + String[] sParts = version.split("\\."); + ArrayList<Integer> iParts = new ArrayList<>(); + int trailingZeros = 0; + + for (String sPart : sParts) { + Integer iPart = new Integer(sPart); + + if (iPart == 0) { + ++trailingZeros; + } else { + for (int i = 0; i < trailingZeros; ++i) { + iParts.add(0); + } + trailingZeros = 0; + iParts.add(iPart); + } + } + return iParts.toArray(new Integer[iParts.size()]); + } + + private Integer getVersionPart(Integer[] versionParts, int i) { + return i < versionParts.length ? versionParts[i] : 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java index ac285aa..384f383 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java @@ -17,9 +17,13 @@ */ package org.apache.atlas.notification; +import org.apache.atlas.notification.entity.EntityMessageDeserializer; import org.apache.atlas.notification.entity.EntityNotification; +import org.apache.atlas.notification.hook.HookMessageDeserializer; import org.apache.atlas.notification.hook.HookNotification; +import com.google.gson.reflect.TypeToken; +import java.lang.reflect.Type; import java.util.List; /** @@ -37,25 +41,59 @@ public interface NotificationInterface { String PROPERTY_PREFIX = "atlas.notification"; /** + * Notification message class types. + */ + Class<HookNotification.HookNotificationMessage> HOOK_NOTIFICATION_CLASS = + HookNotification.HookNotificationMessage.class; + + Class<EntityNotification> ENTITY_NOTIFICATION_CLASS = EntityNotification.class; + + /** + * Versioned notification message class types. + */ + Type HOOK_VERSIONED_MESSAGE_TYPE = + new TypeToken<VersionedMessage<HookNotification.HookNotificationMessage>>(){}.getType(); + + Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<VersionedMessage<EntityNotification>>(){}.getType(); + + /** * Atlas notification types. */ enum NotificationType { - HOOK(HookNotification.HookNotificationMessage.class), // notifications from the Atlas integration hook producers - ENTITIES(EntityNotification.class); // notifications to entity change consumers + // Notifications from the Atlas integration hooks. + HOOK(HOOK_NOTIFICATION_CLASS, new HookMessageDeserializer()), + + // Notifications to entity change consumers. + ENTITIES(ENTITY_NOTIFICATION_CLASS, new EntityMessageDeserializer()); + /** * The notification class associated with this type. */ private final Class classType; - NotificationType(Class classType) { + /** + * The message deserializer for this type. + */ + private final MessageDeserializer deserializer; + + + NotificationType(Class classType, MessageDeserializer<?> deserializer) { this.classType = classType; + this.deserializer = deserializer; } + + // ----- accessors --------------------------------------------------- + public Class getClassType() { return classType; } + + public MessageDeserializer getDeserializer() { + return deserializer; + } } /** http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java b/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java new file mode 100644 index 0000000..1929eb4 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +/** + * Represents a notification message that is associated with a version. + */ +public class VersionedMessage<T> { + + /** + * The version of the message. + */ + private final MessageVersion version; + + /** + * The actual message. + */ + private final T message; + + + // ----- Constructors ---------------------------------------------------- + + /** + * Create a versioned message. + * + * @param version the message version + * @param message the actual message + */ + public VersionedMessage(MessageVersion version, T message) { + this.version = version; + this.message = message; + } + + + // ----- VersionedMessage ------------------------------------------------ + + /** + * Compare the version of this message with the given version. + * + * @param compareToVersion the version to compare to + * + * @return a negative integer, zero, or a positive integer as this message's version is less than, equal to, + * or greater than the given version. + */ + public int compareVersion(MessageVersion compareToVersion) { + return version.compareTo(compareToVersion); + } + + + // ----- accessors ------------------------------------------------------- + + public MessageVersion getVersion() { + return version; + } + + public T getMessage() { + return message; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java new file mode 100644 index 0000000..290be59 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import com.google.gson.Gson; +import org.slf4j.Logger; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; + +/** + * Deserializer that works with versioned messages. The version of each deserialized message is checked against an + * expected version. + */ +public abstract class VersionedMessageDeserializer<T> implements MessageDeserializer<T> { + + public static final String VERSION_MISMATCH_MSG = + "Notification message version mismatch. Expected %s but recieved %s"; + + private final Type versionedMessageType; + private final MessageVersion expectedVersion; + private final Logger notificationLogger; + private final Gson gson; + + + // ----- Constructors ---------------------------------------------------- + + /** + * Create a versioned message deserializer. + * + * @param versionedMessageType the type of the versioned message + * @param expectedVersion the expected message version + * @param gson JSON serialization/deserialization + * @param notificationLogger logger for message version mismatch + */ + public VersionedMessageDeserializer(Type versionedMessageType, MessageVersion expectedVersion, + Gson gson, Logger notificationLogger) { + this.versionedMessageType = versionedMessageType; + this.expectedVersion = expectedVersion; + this.gson = gson; + this.notificationLogger = notificationLogger; + } + + + // ----- MessageDeserializer --------------------------------------------- + + @Override + public T deserialize(String messageJson) { + VersionedMessage<T> versionedMessage = gson.fromJson(messageJson, versionedMessageType); + + // older style messages not wrapped with VersionedMessage + if (versionedMessage.getVersion() == null) { + Type t = ((ParameterizedType) versionedMessageType).getActualTypeArguments()[0]; + versionedMessage = new VersionedMessage<>(MessageVersion.NO_VERSION, gson.<T>fromJson(messageJson, t)); + } + checkVersion(versionedMessage, messageJson); + + return versionedMessage.getMessage(); + } + + + // ----- helper methods -------------------------------------------------- + + /** + * Check the message version against the expected version. + * + * @param versionedMessage the versioned message + * @param messageJson the notification message json + * + * @throws IncompatibleVersionException if the message version is incompatable with the expected version + */ + protected void checkVersion(VersionedMessage<T> versionedMessage, String messageJson) { + int comp = versionedMessage.compareVersion(expectedVersion); + + // message has newer version + if (comp > 0) { + String msg = String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion()); + notificationLogger.error(msg); + notificationLogger.info(messageJson); + throw new IncompatibleVersionException(msg); + } + + // message has older version + if (comp < 0) { + notificationLogger.info( + String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion())); + + notificationLogger.info(messageJson); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java new file mode 100644 index 0000000..a6f7e64 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification.entity; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import org.apache.atlas.notification.AbstractMessageDeserializer; +import org.apache.atlas.notification.AbstractNotification; +import org.apache.atlas.notification.NotificationInterface; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Type; +import java.util.Collections; +import java.util.Map; + +/** + * Entity notification message deserializer. + */ +public class EntityMessageDeserializer extends AbstractMessageDeserializer<EntityNotification> { + + /** + * Logger for entity notification messages. + */ + private static final Logger NOTIFICATION_LOGGER = LoggerFactory.getLogger(EntityMessageDeserializer.class); + + + // ----- Constructors ---------------------------------------------------- + + /** + * Create an entity notification message deserializer. + */ + public EntityMessageDeserializer() { + super(NotificationInterface.ENTITY_VERSIONED_MESSAGE_TYPE, + AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER); + } + + + // ----- helper methods -------------------------------------------------- + + private static Map<Type, JsonDeserializer> getDeserializerMap() { + return Collections.<Type, JsonDeserializer>singletonMap( + NotificationInterface.ENTITY_NOTIFICATION_CLASS, new EntityNotificationDeserializer()); + } + + + // ----- deserializer classes -------------------------------------------- + + /** + * Deserializer for EntityNotification. + */ + protected static final class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> { + @Override + public EntityNotification deserialize(final JsonElement json, final Type type, + final JsonDeserializationContext context) { + return context.deserialize(json, EntityNotificationImpl.class); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java new file mode 100644 index 0000000..8337de0 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification.hook; + +import com.google.gson.JsonDeserializer; +import org.apache.atlas.notification.AbstractMessageDeserializer; +import org.apache.atlas.notification.AbstractNotification; +import org.apache.atlas.notification.NotificationInterface; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Type; +import java.util.Collections; +import java.util.Map; + +/** + * Hook notification message deserializer. + */ +public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNotification.HookNotificationMessage> { + + /** + * Logger for hook notification messages. + */ + private static final Logger NOTIFICATION_LOGGER = LoggerFactory.getLogger(HookMessageDeserializer.class); + + + // ----- Constructors ---------------------------------------------------- + + /** + * Create a hook notification message deserializer. + */ + public HookMessageDeserializer() { + super(NotificationInterface.HOOK_VERSIONED_MESSAGE_TYPE, + AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER); + } + + + // ----- helper methods -------------------------------------------------- + + private static Map<Type, JsonDeserializer> getDeserializerMap() { + return Collections.<Type, JsonDeserializer>singletonMap( + NotificationInterface.HOOK_NOTIFICATION_CLASS, new HookNotification()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java new file mode 100644 index 0000000..7f607c6 --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.kafka; + +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.message.MessageAndMetadata; +import org.apache.atlas.notification.AbstractNotification; +import org.apache.atlas.notification.MessageVersion; +import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.notification.IncompatibleVersionException; +import org.apache.atlas.notification.VersionedMessage; +import org.apache.atlas.notification.entity.EntityNotificationImplTest; +import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.typesystem.IStruct; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.Struct; +import org.codehaus.jettison.json.JSONException; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.NoSuchElementException; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.*; + +/** + * KafkaConsumer tests. + */ +public class KafkaConsumerTest { + + private static final String TRAIT_NAME = "MyTrait"; + + @Test + public void testNext() throws Exception { + KafkaStream<String, String> stream = mock(KafkaStream.class); + ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class); + MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class); + + Referenceable entity = getEntity(TRAIT_NAME); + + HookNotification.EntityUpdateRequest message = + new HookNotification.EntityUpdateRequest("user1", entity); + + String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message)); + + when(stream.iterator()).thenReturn(iterator); + when(iterator.hasNext()).thenReturn(true).thenReturn(false); + when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException()); + when(messageAndMetadata.message()).thenReturn(json); + + NotificationConsumer<HookNotification.HookNotificationMessage> consumer = + new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(), + NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99); + + assertTrue(consumer.hasNext()); + + HookNotification.HookNotificationMessage consumedMessage = consumer.next(); + + assertMessagesEqual(message, consumedMessage, entity); + + assertFalse(consumer.hasNext()); + } + + @Test + public void testNextVersionMismatch() throws Exception { + KafkaStream<String, String> stream = mock(KafkaStream.class); + ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class); + MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class); + + Referenceable entity = getEntity(TRAIT_NAME); + + HookNotification.EntityUpdateRequest message = + new HookNotification.EntityUpdateRequest("user1", entity); + + String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), message)); + + when(stream.iterator()).thenReturn(iterator); + when(iterator.hasNext()).thenReturn(true).thenReturn(false); + when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException()); + when(messageAndMetadata.message()).thenReturn(json); + + NotificationConsumer<HookNotification.HookNotificationMessage> consumer = + new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(), + NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99); + + assertTrue(consumer.hasNext()); + + try { + consumer.next(); + fail("Expected VersionMismatchException!"); + } catch (IncompatibleVersionException e) { + e.printStackTrace(); + } + + assertFalse(consumer.hasNext()); + } + + @Test + public void testPeekMessage() throws Exception { + KafkaStream<String, String> stream = mock(KafkaStream.class); + ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class); + MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class); + + Referenceable entity = getEntity(TRAIT_NAME); + + HookNotification.EntityUpdateRequest message = + new HookNotification.EntityUpdateRequest("user1", entity); + + String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message)); + + when(stream.iterator()).thenReturn(iterator); + when(iterator.hasNext()).thenReturn(true); + when(iterator.peek()).thenReturn(messageAndMetadata); + when(messageAndMetadata.message()).thenReturn(json); + + NotificationConsumer<HookNotification.HookNotificationMessage> consumer = + new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(), + NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99); + + assertTrue(consumer.hasNext()); + + HookNotification.HookNotificationMessage consumedMessage = consumer.peek(); + + assertMessagesEqual(message, consumedMessage, entity); + + assertTrue(consumer.hasNext()); + } + + private Referenceable getEntity(String traitName) { + Referenceable entity = EntityNotificationImplTest.getEntity("id"); + List<IStruct> traitInfo = new LinkedList<>(); + IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap()); + traitInfo.add(trait); + return entity; + } + + private void assertMessagesEqual(HookNotification.EntityUpdateRequest message, + HookNotification.HookNotificationMessage consumedMessage, + Referenceable entity) throws JSONException { + + assertEquals(consumedMessage.getType(), message.getType()); + assertEquals(consumedMessage.getUser(), message.getUser()); + + assertTrue(consumedMessage instanceof HookNotification.EntityUpdateRequest); + + HookNotification.EntityUpdateRequest deserializedEntityUpdateRequest = + (HookNotification.EntityUpdateRequest) consumedMessage; + + Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0); + assertEquals(deserializedEntity.getId(), entity.getId()); + assertEquals(deserializedEntity.getTypeName(), entity.getTypeName()); + assertEquals(deserializedEntity.getTraits(), entity.getTraits()); + assertEquals(deserializedEntity.getTrait(TRAIT_NAME), entity.getTrait(TRAIT_NAME)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java index db34815..17fda25 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java @@ -22,6 +22,7 @@ import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import org.apache.atlas.AtlasException; +import org.apache.atlas.notification.MessageDeserializer; import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationModule; @@ -130,10 +131,12 @@ public class KafkaNotificationTest { } @Override - protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, KafkaStream stream, + protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, + MessageDeserializer<T> deserializer, + KafkaStream stream, int consumerId) { kafkaStreams.add(stream); - return super.createKafkaConsumer(type, stream, consumerId); + return super.createKafkaConsumer(type, deserializer, stream, consumerId); } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java new file mode 100644 index 0000000..e63175d --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.slf4j.Logger; +import org.testng.annotations.Test; + +import java.lang.reflect.Type; +import java.util.LinkedList; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.*; + +/** + * AbstractNotificationConsumer tests. + */ +public class AbstractNotificationConsumerTest { + + private static final Gson GSON = new Gson(); + + @Test + public void testNext() throws Exception { + Logger logger = mock(Logger.class); + + TestMessage testMessage1 = new TestMessage("sValue1", 99); + TestMessage testMessage2 = new TestMessage("sValue2", 98); + TestMessage testMessage3 = new TestMessage("sValue3", 97); + TestMessage testMessage4 = new TestMessage("sValue4", 96); + + List<String> jsonList = new LinkedList<>(); + + jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1))); + jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2))); + jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3))); + jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4))); + + Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType(); + + NotificationConsumer<TestMessage> consumer = + new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); + + assertTrue(consumer.hasNext()); + + assertEquals(testMessage1, consumer.next()); + + assertTrue(consumer.hasNext()); + + assertEquals(testMessage2, consumer.next()); + + assertTrue(consumer.hasNext()); + + assertEquals(testMessage3, consumer.next()); + + assertTrue(consumer.hasNext()); + + assertEquals(testMessage4, consumer.next()); + + assertFalse(consumer.hasNext()); + } + + @Test + public void testNextBackVersion() throws Exception { + Logger logger = mock(Logger.class); + + TestMessage testMessage1 = new TestMessage("sValue1", 99); + TestMessage testMessage2 = new TestMessage("sValue2", 98); + TestMessage testMessage3 = new TestMessage("sValue3", 97); + TestMessage testMessage4 = new TestMessage("sValue4", 96); + + List<String> jsonList = new LinkedList<>(); + + String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)); + String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.0.5"), testMessage2)); + String json3 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.5.0"), testMessage3)); + String json4 = GSON.toJson(testMessage4); + + jsonList.add(json1); + jsonList.add(json2); + jsonList.add(json3); + jsonList.add(json4); + + Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType(); + + NotificationConsumer<TestMessage> consumer = + new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); + assertTrue(consumer.hasNext()); + + assertEquals(new TestMessage("sValue1", 99), consumer.next()); + + assertTrue(consumer.hasNext()); + + assertEquals(new TestMessage("sValue2", 98), consumer.next()); + verify(logger).info(json2); + + assertTrue(consumer.hasNext()); + + assertEquals(new TestMessage("sValue3", 97), consumer.next()); + verify(logger).info(json3); + + assertTrue(consumer.hasNext()); + + assertEquals(new TestMessage("sValue4", 96), consumer.next()); + verify(logger).info(json4); + + assertFalse(consumer.hasNext()); + } + + @Test + public void testNextForwardVersion() throws Exception { + Logger logger = mock(Logger.class); + + TestMessage testMessage1 = new TestMessage("sValue1", 99); + TestMessage testMessage2 = new TestMessage("sValue2", 98); + + List<String> jsonList = new LinkedList<>(); + + String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)); + String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), testMessage2)); + + jsonList.add(json1); + jsonList.add(json2); + + Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType(); + + NotificationConsumer<TestMessage> consumer = + new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); + assertTrue(consumer.hasNext()); + + assertEquals(testMessage1, consumer.next()); + + assertTrue(consumer.hasNext()); + + try { + consumer.next(); + fail("Expected VersionMismatchException!"); + } catch (IncompatibleVersionException e) { + verify(logger).info(json2); + } + + assertFalse(consumer.hasNext()); + } + + @Test + public void testPeek() throws Exception { + Logger logger = mock(Logger.class); + + TestMessage testMessage1 = new TestMessage("sValue1", 99); + TestMessage testMessage2 = new TestMessage("sValue2", 98); + TestMessage testMessage3 = new TestMessage("sValue3", 97); + TestMessage testMessage4 = new TestMessage("sValue4", 96); + + List<String> jsonList = new LinkedList<>(); + + jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1))); + jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2))); + jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3))); + jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4))); + + Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType(); + + NotificationConsumer<TestMessage> consumer = + new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); + assertTrue(consumer.hasNext()); + + assertEquals(testMessage1, consumer.peek()); + + assertTrue(consumer.hasNext()); + + assertEquals(testMessage1, consumer.peek()); + + assertTrue(consumer.hasNext()); + } + + private static class TestMessage { + private String s; + private int i; + + public TestMessage(String s, int i) { + this.s = s; + this.i = i; + } + + public String getS() { + return s; + } + + public void setS(String s) { + this.s = s; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TestMessage that = (TestMessage) o; + + return i == that.i && (s != null ? s.equals(that.s) : that.s == null); + } + + @Override + public int hashCode() { + int result = s != null ? s.hashCode() : 0; + result = 31 * result + i; + return result; + } + } + + private static class TestNotificationConsumer<T> extends AbstractNotificationConsumer<T> { + private final List<String> messageList; + private int index = 0; + + public TestNotificationConsumer(Type versionedMessageType, List<String> messages, Logger logger) { + super(new TestDeserializer<T>(versionedMessageType, logger)); + this.messageList = messages; + } + + @Override + protected String getNext() { + return messageList.get(index++); + } + + @Override + protected String peekMessage() { + return messageList.get(index); + } + + @Override + public boolean hasNext() { + return index < messageList.size(); + } + } + + private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> { + + private TestDeserializer(Type versionedMessageType, Logger logger) { + super(versionedMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java new file mode 100644 index 0000000..61107a9 --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.notification.hook.HookNotification; +import org.apache.commons.configuration.Configuration; +import org.testng.annotations.Test; + +import java.util.LinkedList; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.*; + +/** + * AbstractNotification tests. + */ +public class AbstractNotificationTest { + + @Test + public void testSend() throws Exception { + Configuration configuration = mock(Configuration.class); + + TestNotification notification = new TestNotification(configuration); + + TestMessage message1 = new TestMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "user1"); + TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1"); + TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1"); + + String messageJson1 = AbstractNotification.getMessageJson(message1); + String messageJson2 = AbstractNotification.getMessageJson(message2); + String messageJson3 = AbstractNotification.getMessageJson(message3); + + notification.send(NotificationInterface.NotificationType.HOOK, message1, message2, message3); + + assertEquals(NotificationInterface.NotificationType.HOOK, notification.type); + assertEquals(3, notification.messages.length); + assertEquals(messageJson1, notification.messages[0]); + assertEquals(messageJson2, notification.messages[1]); + assertEquals(messageJson3, notification.messages[2]); + } + + @Test + public void testSend2() throws Exception { + Configuration configuration = mock(Configuration.class); + + TestNotification notification = new TestNotification(configuration); + + TestMessage message1 = new TestMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "user1"); + TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1"); + TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1"); + + List<TestMessage> messages = new LinkedList<>(); + messages.add(message1); + messages.add(message2); + messages.add(message3); + + String messageJson1 = AbstractNotification.getMessageJson(message1); + String messageJson2 = AbstractNotification.getMessageJson(message2); + String messageJson3 = AbstractNotification.getMessageJson(message3); + + notification.send(NotificationInterface.NotificationType.HOOK, messages); + + assertEquals(NotificationInterface.NotificationType.HOOK, notification.type); + assertEquals(3, notification.messages.length); + assertEquals(messageJson1, notification.messages[0]); + assertEquals(messageJson2, notification.messages[1]); + assertEquals(messageJson3, notification.messages[2]); + } + + public static class TestMessage extends HookNotification.HookNotificationMessage { + + public TestMessage(HookNotification.HookNotificationType type, String user) { + super(type, user); + } + } + + public static class TestNotification extends AbstractNotification { + private NotificationType type; + private String[] messages; + + public TestNotification(Configuration applicationProperties) throws AtlasException { + super(applicationProperties); + } + + @Override + protected void sendInternal(NotificationType notificationType, String[] notificationMessages) + throws NotificationException { + + type = notificationType; + messages = notificationMessages; + } + + @Override + public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) { + return null; + } + + @Override + public void close() { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java b/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java new file mode 100644 index 0000000..d1af4b0 --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import org.testng.annotations.Test; + +import java.util.Arrays; + +import static org.testng.Assert.*; + +/** + * MessageVersion tests. + */ +public class MessageVersionTest { + + @Test + public void testConstructor() throws Exception { + new MessageVersion("1.0.0"); + + try { + new MessageVersion("foo"); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + // expected + } + + try { + new MessageVersion("A.0.0"); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + // expected + } + + try { + new MessageVersion("1.0.0a"); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + // expected + } + } + + @Test + public void testCompareTo() throws Exception { + MessageVersion version1 = new MessageVersion("1.0.0"); + MessageVersion version2 = new MessageVersion("1.0.0"); + MessageVersion version3 = new MessageVersion("2.0.0"); + MessageVersion version4 = new MessageVersion("1"); + MessageVersion version5 = new MessageVersion("1.5"); + MessageVersion version6 = new MessageVersion("1.0.5"); + + assertTrue(version1.compareTo(version2) == 0); + assertTrue(version2.compareTo(version1) == 0); + assertTrue(version1.compareTo(version3) < 0); + assertTrue(version3.compareTo(version1) > 0); + assertTrue(version1.compareTo(version4) == 0); + assertTrue(version4.compareTo(version1) == 0); + assertTrue(version1.compareTo(version5) < 0); + assertTrue(version5.compareTo(version1) > 0); + assertTrue(version1.compareTo(version6) < 0); + assertTrue(version6.compareTo(version1) > 0); + } + + @Test + public void testEquals() throws Exception { + MessageVersion version1 = new MessageVersion("1.0.0"); + MessageVersion version2 = new MessageVersion("1.0.0"); + MessageVersion version3 = new MessageVersion("2.0.0"); + MessageVersion version4 = new MessageVersion("1"); + MessageVersion version5 = new MessageVersion("1.5"); + MessageVersion version6 = new MessageVersion("1.0.5"); + + assertTrue(version1.equals(version2)); + assertTrue(version2.equals(version1)); + assertFalse(version1.equals(version3)); + assertFalse(version3.equals(version1)); + assertTrue(version1.equals(version4)); + assertTrue(version4.equals(version1)); + assertFalse(version1.equals(version5)); + assertFalse(version5.equals(version1)); + assertFalse(version1.equals(version6)); + assertFalse(version6.equals(version1)); + } + + @Test + public void testHashCode() throws Exception { + MessageVersion version1 = new MessageVersion("1.0.0"); + MessageVersion version2 = new MessageVersion("1.0.0"); + MessageVersion version3 = new MessageVersion("1"); + + assertEquals(version1.hashCode(), version2.hashCode()); + assertEquals(version1.hashCode(), version3.hashCode()); + } + + @Test + public void testGetVersionParts() throws Exception { + + MessageVersion version = new MessageVersion("1.0.0"); + assertTrue(Arrays.equals(new Integer[]{1}, version.getVersionParts())); + + version = new MessageVersion("1.0"); + assertTrue(Arrays.equals(new Integer[]{1}, version.getVersionParts())); + + version = new MessageVersion("1"); + assertTrue(Arrays.equals(new Integer[]{1}, version.getVersionParts())); + + version = new MessageVersion("1.0.2"); + assertTrue(Arrays.equals(new Integer[]{1, 0, 2}, version.getVersionParts())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java new file mode 100644 index 0000000..587b7eb --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + +/** + * VersionedMessage tests. + */ +public class VersionedMessageTest { + + @Test + public void testGetVersion() throws Exception { + MessageVersion version = new MessageVersion("1.0.0"); + VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, "a"); + assertEquals(versionedMessage.getVersion(), version); + } + + @Test + public void testGetMessage() throws Exception { + String message = "a"; + MessageVersion version = new MessageVersion("1.0.0"); + VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, message); + assertEquals(versionedMessage.getMessage(), message); + } + + @Test + public void testCompareVersion() throws Exception { + MessageVersion version1 = new MessageVersion("1.0.0"); + MessageVersion version2 = new MessageVersion("2.0.0"); + MessageVersion version3 = new MessageVersion("0.5.0"); + + VersionedMessage<String> versionedMessage = new VersionedMessage<>(version1, "a"); + + assertTrue(versionedMessage.compareVersion(version1) == 0); + assertTrue(versionedMessage.compareVersion(version2) < 0); + assertTrue(versionedMessage.compareVersion(version3) > 0); + } +}
