This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 7581672 ATLAS-4428 : Enhance Atlas hook messages to capture hook version & message source 7581672 is described below commit 7581672b7a0d2ecd4588667bc225c9da90810937 Author: chaitali borole <chaitali.bor...@freestoneinfotech.com> AuthorDate: Fri Oct 29 13:58:41 2021 +0530 ATLAS-4428 : Enhance Atlas hook messages to capture hook version & message source Signed-off-by: Pinal Shah <pinal.s...@freestoneinfotech.com> --- .../org/apache/atlas/falcon/hook/FalconHook.java | 6 ++ .../apache/atlas/hbase/bridge/HBaseAtlasHook.java | 8 +- .../java/org/apache/atlas/hive/hook/HiveHook.java | 4 + .../atlas/hive/hook/HiveMetastoreHookImpl.java | 6 ++ .../atlas/impala/hook/ImpalaLineageHook.java | 6 ++ .../org/apache/atlas/sqoop/hook/SqoopHook.java | 8 ++ .../apache/atlas/storm/hook/StormAtlasHook.java | 7 ++ .../org/apache/atlas/repository/Constants.java | 9 +++ .../notification/AtlasNotificationBaseMessage.java | 18 +++++ .../notification/AtlasNotificationMessage.java | 8 +- .../atlas/model/notification/MessageSource.java | 90 ++++++++++++++++++++++ intg/src/main/resources/atlas-buildinfo.properties | 28 +++++++ .../main/java/org/apache/atlas/hook/AtlasHook.java | 20 +++-- .../atlas/notification/AbstractNotification.java | 13 +++- .../atlas/notification/NotificationInterface.java | 3 + .../atlas/notification/spool/AtlasFileSpool.java | 15 +++- .../java/org/apache/atlas/hook/AtlasHookTest.java | 33 ++++---- .../notification/AbstractNotificationTest.java | 17 ++-- .../notification/AtlasNotificationMessageTest.java | 33 +++++++- .../entity/EntityNotificationDeserializerTest.java | 4 +- .../hook/HookNotificationDeserializerTest.java | 8 +- 21 files changed, 302 insertions(+), 42 deletions(-) diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java index 8c09d33..b8a73cb 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; +import static org.apache.atlas.repository.Constants.FALCON_SOURCE; /** * Falcon hook sends lineage information to the Atlas Service. @@ -44,6 +45,11 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { private static ConfigurationStore STORE; + @Override + public String getMessageSource() { + return FALCON_SOURCE; + } + private enum Operation { ADD, UPDATE diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java index 0ab06f2..8e6c57d 100644 --- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java +++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java @@ -49,11 +49,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.atlas.repository.Constants.HBASE_SOURCE; + // This will register Hbase entities into Atlas public class HBaseAtlasHook extends AtlasHook { private static final Logger LOG = LoggerFactory.getLogger(HBaseAtlasHook.class); - + public static final String ATTR_DESCRIPTION = "description"; public static final String ATTR_ATLAS_ENDPOINT = "atlas.rest.address"; public static final String ATTR_PARAMETERS = "parameters"; @@ -497,6 +499,10 @@ public class HBaseAtlasHook extends AtlasHook { return columnFamily; } + public String getMessageSource() { + return HBASE_SOURCE; + } + private String getTableName(HBaseOperationContext hbaseOperationContext) { final String ret; diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 3cc7b3b..6ea4848 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -49,6 +49,7 @@ import java.util.regex.Pattern; import static org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIED_NAME; import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_DB; import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_TABLE; +import static org.apache.atlas.repository.Constants.HS2_SOURCE; public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class); @@ -176,6 +177,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { super(name); } + public String getMessageSource() { + return HS2_SOURCE; + } @Override public void run(HookContext hookContext) throws Exception { diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java index 6a492c2..33266ce 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import static org.apache.atlas.hive.hook.events.AlterTableRenameCol.findRenamedColumn; import static org.apache.atlas.hive.hook.events.BaseHiveEvent.toTable; +import static org.apache.atlas.repository.Constants.HMS_SOURCE; import static org.apache.hadoop.hive.ql.plan.HiveOperation.*; public class HiveMetastoreHookImpl extends MetaStoreEventListener { @@ -106,6 +107,11 @@ public class HiveMetastoreHookImpl extends MetaStoreEventListener { public HiveMetastoreHook() { } + @Override + public String getMessageSource() { + return HMS_SOURCE; + } + public void handleEvent(HiveOperationContext operContext) { ListenerEvent listenerEvent = operContext.getEvent(); diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java index 10ae08f..907f244 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java @@ -36,6 +36,8 @@ import javax.security.auth.Subject; import javax.security.auth.kerberos.KerberosPrincipal; import java.util.HashSet; +import static org.apache.atlas.repository.Constants.IMPALA_SOURCE; + public class ImpalaLineageHook extends AtlasHook { private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageHook.class); public static final String ATLAS_ENDPOINT = "atlas.rest.address"; @@ -65,6 +67,10 @@ public class ImpalaLineageHook extends AtlasHook { } + public String getMessageSource() { + return IMPALA_SOURCE; + } + public void process(String impalaQueryString) throws Exception { if (StringUtils.isEmpty(impalaQueryString)) { LOG.warn("==> ImpalaLineageHook.process skips because the impalaQueryString is empty <=="); diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java index 4785960..0a8cb96 100644 --- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java +++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java @@ -47,6 +47,9 @@ import java.util.HashMap; import java.util.Properties; import java.util.List; import java.util.Date; + +import static org.apache.atlas.repository.Constants.SQOOP_SOURCE; + /** * AtlasHook sends lineage information to the AtlasSever. */ @@ -243,6 +246,11 @@ public class SqoopHook extends SqoopJobDataPublisher { } private static class AtlasHookImpl extends AtlasHook { + + public String getMessageSource() { + return SQOOP_SOURCE; + } + public void sendNotification(HookNotification notification) { super.notifyEntities(Collections.singletonList(notification), null); } diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index 779c5cb..69d58d5 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -53,6 +53,8 @@ import java.util.Map; import java.util.Set; import java.util.Date; +import static org.apache.atlas.repository.Constants.STORM_SOURCE; + /** * StormAtlasHook sends storm topology metadata information to Atlas * via a Kafka Broker for durability. @@ -406,4 +408,9 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { return clusterName; } + + @Override + public String getMessageSource() { + return STORM_SOURCE; + } } \ No newline at end of file diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 2669c8a..7cd67a0 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -236,6 +236,15 @@ public final class Constants { public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime"); public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime"); + public static final String SQOOP_SOURCE = "sqoop"; + public static final String FALCON_SOURCE = "falcon"; + public static final String HBASE_SOURCE = "hbase"; + public static final String HS2_SOURCE = "hive_server2"; + public static final String HMS_SOURCE = "hive_metastore"; + public static final String IMPALA_SOURCE = "impala"; + public static final String STORM_SOURCE = "storm"; + public static final String FILE_SPOOL_SOURCE = "file_spool"; + /* * All supported file-format extensions for Bulk Imports through file upload */ diff --git a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java index ff45d57..97fcbac 100644 --- a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java +++ b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java @@ -55,6 +55,7 @@ public class AtlasNotificationBaseMessage { public enum CompressionKind { NONE, GZIP }; + private MessageSource source = null; private MessageVersion version = null; private String msgId = null; private CompressionKind msgCompressionKind = CompressionKind.NONE; @@ -70,9 +71,18 @@ public class AtlasNotificationBaseMessage { } public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind) { + this (version, msgId, msgCompressionKind, null); + } + + public AtlasNotificationBaseMessage(MessageVersion version, MessageSource source) { + this (version, null, CompressionKind.NONE, source); + } + + public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, MessageSource source) { this.version = version; this.msgId = msgId; this.msgCompressionKind = msgCompressionKind; + this.source = source; } public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, int msgSplitIdx, int msgSplitCount) { @@ -91,6 +101,14 @@ public class AtlasNotificationBaseMessage { return version; } + public void setSource(MessageSource source) { + this.source = source; + } + + public MessageSource getSource() { + return source; + } + public String getMsgId() { return msgId; } diff --git a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java index 5869910..42032b4 100644 --- a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java +++ b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java @@ -60,7 +60,11 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage { } public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy, boolean spooled) { - super(version); + this(version, message, msgSourceIP, createdBy, spooled, null); + } + + public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy, boolean spooled, MessageSource source) { + super(version, source); this.msgSourceIP = msgSourceIP; this.msgCreatedBy = createdBy; @@ -70,7 +74,7 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage { } public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) { - this(version, message, msgSourceIP, createdBy, false); + this(version, message, msgSourceIP, createdBy, false, null); } public String getMsgSourceIP() { diff --git a/intg/src/main/java/org/apache/atlas/model/notification/MessageSource.java b/intg/src/main/java/org/apache/atlas/model/notification/MessageSource.java new file mode 100644 index 0000000..b5c3cdc --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/notification/MessageSource.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.model.notification; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Properties; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +/** + * Base class of hook information. + */ +@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE) +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown=true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.PROPERTY) +public class MessageSource { + + private static final Logger LOG = LoggerFactory.getLogger(MessageSource.class); + private static final String BUILDINFO_PROPERTIES = "/atlas-buildinfo.properties"; + private static final String BUILD_VERSION_PROPERTY_KEY = "build.version"; + private static final String BUILD_VERSION_DEFAULT = "UNKNOWN"; + + private static String storedVersion; + private String name; + private String version; + + + static { + storedVersion = fetchBuildVersion(); + } + + public MessageSource() { + } + + public MessageSource(String name) { + this.version = storedVersion; + this.name = name; + } + + public String getSource () { return name; } + + public void setSource(String name) { this.name = name; } + + public String getVersion () { + return version; + } + + private static String fetchBuildVersion() { + Properties properties = new java.util.Properties(); + InputStream inputStream = MessageSource.class.getResourceAsStream(BUILDINFO_PROPERTIES); + InputStreamReader inputStreamReader = new InputStreamReader(inputStream); + try { + properties.load(inputStreamReader); + } catch (IOException e) { + LOG.error("Failed to load atlas-buildinfo properties. Will use default version.", e); + } + + return properties.getProperty(BUILD_VERSION_PROPERTY_KEY, BUILD_VERSION_DEFAULT); + } +} diff --git a/intg/src/main/resources/atlas-buildinfo.properties b/intg/src/main/resources/atlas-buildinfo.properties new file mode 100644 index 0000000..2404f8f --- /dev/null +++ b/intg/src/main/resources/atlas-buildinfo.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +###################### +project.name=${pom.parent.name} +project.description=${pom.parent.description} +build.user=${user.name} +build.epoch=${timestamp} +project.version=${pom.version} +build.version=${pom.version} +vc.revision=${buildNumber} +vc.source.url=${scm.connection} +###################### diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index 9162ac1..24ea6ea 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -28,6 +28,7 @@ import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.utils.AtlasConfigurationUtil; import org.apache.commons.configuration.Configuration; +import org.apache.atlas.model.notification.MessageSource; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ShutdownHookManager; @@ -65,6 +66,7 @@ public abstract class AtlasHook { protected static Configuration atlasProperties; protected static NotificationInterface notificationInterface; + protected MessageSource source; private static final String metadataNamespace; private static final int SHUTDOWN_HOOK_WAIT_TIME_MS = 3000; @@ -143,14 +145,18 @@ public abstract class AtlasHook { } public AtlasHook() { + source = new MessageSource(getMessageSource()); notificationInterface.init(this.getClass().getSimpleName(), failedMessagesLogger); } public AtlasHook(String name) { + source = new MessageSource(getMessageSource()); LOG.info("AtlasHook: Spool name: Passed from caller.: {}", name); notificationInterface.init(name, failedMessagesLogger); } + public abstract String getMessageSource(); + /** * Notify atlas of the entity through message. The entity can be a * complex entity with reference to other entities. @@ -160,14 +166,14 @@ public abstract class AtlasHook { * @param messages hook notification messages * @param maxRetries maximum number of retries while sending message to messaging system */ - public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries) { + public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries, MessageSource source) { if (executor == null) { // send synchronously - notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger); + notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger, source); } else { executor.submit(new Runnable() { @Override public void run() { - notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger); + notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger, source); } }); } @@ -176,7 +182,7 @@ public abstract class AtlasHook { @VisibleForTesting static void notifyEntitiesInternal(List<HookNotification> messages, int maxRetries, UserGroupInformation ugi, NotificationInterface notificationInterface, - boolean shouldLogFailedMessages, FailedMessagesLogger logger) { + boolean shouldLogFailedMessages, FailedMessagesLogger logger, MessageSource source) { if (messages == null || messages.isEmpty()) { return; } @@ -199,12 +205,12 @@ public abstract class AtlasHook { try { if (ugi == null) { - notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages); + notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages, source); } else { PrivilegedExceptionAction<Object> privilegedNotify = new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { - notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages); + notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages, source); return messages; } }; @@ -244,7 +250,7 @@ public abstract class AtlasHook { * @param messages hook notification messages */ protected void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi) { - notifyEntities(messages, ugi, notificationMaxRetries); + notifyEntities(messages, ugi, notificationMaxRetries, source); } /** diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java index c45a1da..cca4cb8 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java @@ -23,6 +23,7 @@ import org.apache.atlas.model.notification.AtlasNotificationBaseMessage; import org.apache.atlas.model.notification.AtlasNotificationMessage; import org.apache.atlas.model.notification.AtlasNotificationStringMessage; import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind; +import org.apache.atlas.model.notification.MessageSource; import org.apache.atlas.type.AtlasType; import org.apache.atlas.model.notification.MessageVersion; import org.apache.commons.configuration.Configuration; @@ -84,10 +85,15 @@ public abstract class AbstractNotification implements NotificationInterface { @Override public <T> void send(NotificationType type, List<T> messages) throws NotificationException { + send(type, messages, new MessageSource()); + } + + @Override + public <T> void send(NotificationType type, List<T> messages, MessageSource source) throws NotificationException { List<String> strMessages = new ArrayList<>(messages.size()); for (int index = 0; index < messages.size(); index++) { - createNotificationMessages(messages.get(index), strMessages); + createNotificationMessages(messages.get(index), strMessages, source); } sendInternal(type, strMessages); @@ -146,10 +152,11 @@ public abstract class AbstractNotification implements NotificationInterface { * * @param message the message in object form * + * @param source * @return the message as a JSON string */ - public static void createNotificationMessages(Object message, List<String> msgJsonList) { - AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser()); + public static void createNotificationMessages(Object message, List<String> msgJsonList, MessageSource source) { + AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser(), false, source); String msgJson = AtlasType.toV1Json(notificationMsg); boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES; diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java index 3d8d9cc..a9cd4a6 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java @@ -17,6 +17,7 @@ */ package org.apache.atlas.notification; +import org.apache.atlas.model.notification.MessageSource; import org.apache.atlas.notification.entity.EntityMessageDeserializer; import org.apache.atlas.notification.hook.HookMessageDeserializer; @@ -109,6 +110,8 @@ public interface NotificationInterface { /** * Shutdown any notification producers and consumers associated with this interface instance. */ + <T> void send(NotificationType type, List<T> messages, MessageSource source) throws NotificationException; + void close(); /** diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java index 0c92c30..57c0c7d 100644 --- a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java +++ b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java @@ -19,6 +19,7 @@ package org.apache.atlas.notification.spool; import org.apache.atlas.AtlasException; import org.apache.atlas.hook.FailedMessagesLogger; +import org.apache.atlas.model.notification.MessageSource; import org.apache.atlas.notification.AbstractNotification; import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationException; @@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static org.apache.atlas.repository.Constants.FILE_SPOOL_SOURCE; public class AtlasFileSpool implements NotificationInterface { private static final Logger LOG = LoggerFactory.getLogger(AtlasFileSpool.class); @@ -103,13 +105,18 @@ public class AtlasFileSpool implements NotificationInterface { } @Override + public <T> void send(NotificationType type, List<T> messages) throws NotificationException { + send(type, messages, new MessageSource(FILE_SPOOL_SOURCE)); + } + + @Override public boolean isReady(NotificationType type) { return true; } @Override - public <T> void send(NotificationType type, List<T> messages) throws NotificationException { - List<String> serializedMessages = getSerializedMessages(messages); + public <T> void send(NotificationType type, List<T> messages, MessageSource source) throws NotificationException { + List<String> serializedMessages = getSerializedMessages(messages, source); if (hasInitSucceeded() && (this.indexManagement.isPending() || this.publisher.isDestinationDown())) { if (LOG.isDebugEnabled()) { LOG.debug("AtlasFileSpool.send(): sending to spooler"); @@ -139,10 +146,10 @@ public class AtlasFileSpool implements NotificationInterface { } } - private <T> List<String> getSerializedMessages(List<T> messages) { + private <T> List<String> getSerializedMessages(List<T> messages, MessageSource source) { List<String> serializedMessages = new ArrayList<>(messages.size()); for (int index = 0; index < messages.size(); index++) { - notificationHandler.createNotificationMessages(messages.get(index), serializedMessages); + notificationHandler.createNotificationMessages(messages.get(index), serializedMessages, source); } return serializedMessages; diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java index 1ae7c27..b094247 100644 --- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java +++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java @@ -18,6 +18,7 @@ package org.apache.atlas.hook; +import org.apache.atlas.model.notification.MessageSource; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; @@ -52,32 +53,35 @@ public class AtlasHookTest { @Test (timeOut = 10000) public void testNotifyEntitiesDoesNotHangOnException() throws Exception { + MessageSource source = new MessageSource(this.getClass().getSimpleName()); List<HookNotification> hookNotifications = new ArrayList<>(); doThrow(new NotificationException(new Exception())).when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source); AtlasHook.notifyEntitiesInternal(hookNotifications, 0, null, notificationInterface, false, - failedMessagesLogger); + failedMessagesLogger, source); // if we've reached here, the method finished OK. } @Test public void testNotifyEntitiesRetriesOnException() throws NotificationException { + MessageSource source = new MessageSource(this.getClass().getSimpleName()); List<HookNotification> hookNotifications = new ArrayList<HookNotification>() {{ add(new EntityCreateRequest("user")); } }; doThrow(new NotificationException(new Exception())).when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source); AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, false, - failedMessagesLogger); + failedMessagesLogger, source); verify(notificationInterface, times(2)). - send(NotificationInterface.NotificationType.HOOK, hookNotifications); + send(NotificationInterface.NotificationType.HOOK, hookNotifications, source); } @Test public void testFailedMessageIsLoggedIfRequired() throws NotificationException { + MessageSource source = new MessageSource(this.getClass().getSimpleName()); List<HookNotification> hookNotifications = new ArrayList<HookNotification>() {{ add(new EntityCreateRequest("user")); @@ -85,27 +89,29 @@ public class AtlasHookTest { }; doThrow(new NotificationException(new Exception(), Arrays.asList("test message"))) .when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source); AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, true, - failedMessagesLogger); + failedMessagesLogger, source); verify(failedMessagesLogger, times(1)).log("test message"); } @Test public void testFailedMessageIsNotLoggedIfNotRequired() throws NotificationException { + MessageSource source = new MessageSource(this.getClass().getSimpleName()); List<HookNotification> hookNotifications = new ArrayList<>(); doThrow(new NotificationException(new Exception(), Arrays.asList("test message"))) .when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source); AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, false, - failedMessagesLogger); + failedMessagesLogger, source); verifyZeroInteractions(failedMessagesLogger); } @Test public void testAllFailedMessagesAreLogged() throws NotificationException { + MessageSource source = new MessageSource(this.getClass().getSimpleName()); List<HookNotification> hookNotifications = new ArrayList<HookNotification>() {{ add(new EntityCreateRequest("user")); @@ -113,9 +119,9 @@ public class AtlasHookTest { }; doThrow(new NotificationException(new Exception(), Arrays.asList("test message1", "test message2"))) .when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source); AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, true, - failedMessagesLogger); + failedMessagesLogger, source); verify(failedMessagesLogger, times(1)).log("test message1"); verify(failedMessagesLogger, times(1)).log("test message2"); @@ -123,11 +129,12 @@ public class AtlasHookTest { @Test public void testFailedMessageIsNotLoggedIfNotANotificationException() throws Exception { + MessageSource source = new MessageSource(this.getClass().getSimpleName()); List<HookNotification> hookNotifications = new ArrayList<>(); doThrow(new RuntimeException("test message")).when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source); AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, true, - failedMessagesLogger); + failedMessagesLogger, source); verifyZeroInteractions(failedMessagesLogger); } diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java index 8078a6c..4e1c094 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java @@ -19,6 +19,7 @@ package org.apache.atlas.notification; import org.apache.atlas.AtlasException; +import org.apache.atlas.model.notification.MessageSource; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.HookNotificationType; import org.apache.atlas.notification.NotificationInterface.NotificationType; @@ -40,6 +41,7 @@ public class AbstractNotificationTest { @org.testng.annotations.Test public void testSend() throws Exception { + MessageSource source = new MessageSource(); Configuration configuration = mock(Configuration.class); TestNotification notification = new TestNotification(configuration); Test message1 = new Test(HookNotificationType.ENTITY_CREATE, "user1"); @@ -47,9 +49,9 @@ public class AbstractNotificationTest { Test message3 = new Test(HookNotificationType.ENTITY_FULL_UPDATE, "user1"); List<String> messageJson = new ArrayList<>(); - AbstractNotification.createNotificationMessages(message1, messageJson); - AbstractNotification.createNotificationMessages(message2, messageJson); - AbstractNotification.createNotificationMessages(message3, messageJson); + AbstractNotification.createNotificationMessages(message1, messageJson, source); + AbstractNotification.createNotificationMessages(message2, messageJson, source); + AbstractNotification.createNotificationMessages(message3, messageJson, source); notification.send(NotificationType.HOOK, message1, message2, message3); @@ -63,6 +65,7 @@ public class AbstractNotificationTest { @org.testng.annotations.Test public void testSend2() throws Exception { + MessageSource source = new MessageSource(); Configuration configuration = mock(Configuration.class); TestNotification notification = new TestNotification(configuration); Test message1 = new Test(HookNotificationType.ENTITY_CREATE, "user1"); @@ -71,11 +74,11 @@ public class AbstractNotificationTest { List<Test> messages = Arrays.asList(message1, message2, message3); List<String> messageJson = new ArrayList<>(); - AbstractNotification.createNotificationMessages(message1, messageJson); - AbstractNotification.createNotificationMessages(message2, messageJson); - AbstractNotification.createNotificationMessages(message3, messageJson); + AbstractNotification.createNotificationMessages(message1, messageJson, source); + AbstractNotification.createNotificationMessages(message2, messageJson, source); + AbstractNotification.createNotificationMessages(message3, messageJson, source); - notification.send(NotificationInterface.NotificationType.HOOK, messages); + notification.send(NotificationInterface.NotificationType.HOOK, messages, source); assertEquals(notification.type, NotificationType.HOOK); assertEquals(notification.messages.size(), messageJson.size()); diff --git a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java index 91a195d..463797c 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java @@ -19,16 +19,26 @@ package org.apache.atlas.notification; import org.apache.atlas.model.notification.AtlasNotificationMessage; +import org.apache.atlas.model.notification.MessageSource; import org.apache.atlas.model.notification.MessageVersion; +import org.apache.atlas.notification.entity.EntityNotificationTest; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.instance.Struct; +import org.apache.atlas.v1.model.notification.HookNotificationV1; import org.testng.annotations.Test; +import java.util.LinkedList; +import java.util.List; +import java.util.Collections; + import static org.testng.Assert.*; /** * AtlasNotificationMessage tests. */ public class AtlasNotificationMessageTest { - + @Test public void testGetVersion() throws Exception { MessageVersion version = new MessageVersion("1.0.0"); @@ -56,4 +66,25 @@ public class AtlasNotificationMessageTest { assertTrue(atlasNotificationMessage.compareVersion(version2) < 0); assertTrue(atlasNotificationMessage.compareVersion(version3) > 0); } + + @Test + public void testMessageSource() throws Exception { + Referenceable entity = generateEntityWithTrait(); + HookNotificationV1.EntityUpdateRequest message = new HookNotificationV1.EntityUpdateRequest("user1", entity); + MessageSource source = new MessageSource(this.getClass().getSimpleName()); + List<String> jsonList = new LinkedList<>(); + + AbstractNotification.createNotificationMessages(message, jsonList, source); + for(Object json : jsonList) { + AtlasNotificationMessage atlasNotificationMessage = AtlasType.fromV1Json((String) json, AtlasNotificationMessage.class); + assertEquals("\"" + source.getSource() + "\"" ,AtlasType.toV1Json(atlasNotificationMessage.getSource().getSource())); + } + } + + private Referenceable generateEntityWithTrait() { + Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap())); + + return ret; + } + } diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java index 13eafb6..2953b63 100644 --- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java @@ -19,6 +19,7 @@ package org.apache.atlas.notification.entity; import org.apache.atlas.model.notification.EntityNotification; +import org.apache.atlas.model.notification.MessageSource; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; import org.apache.atlas.notification.AbstractNotification; @@ -37,6 +38,7 @@ import static org.testng.Assert.assertTrue; */ public class EntityNotificationDeserializerTest { private EntityMessageDeserializer deserializer = new EntityMessageDeserializer(); + MessageSource source = new MessageSource(this.getClass().getSimpleName()); @Test public void testDeserialize() throws Exception { @@ -46,7 +48,7 @@ public class EntityNotificationDeserializerTest { EntityNotificationV1 notification = new EntityNotificationV1(entity, EntityNotificationV1.OperationType.TRAIT_ADD, traits); List<String> jsonMsgList = new ArrayList<>(); - AbstractNotification.createNotificationMessages(notification, jsonMsgList); + AbstractNotification.createNotificationMessages(notification, jsonMsgList, source); EntityNotification deserializedNotification = null; diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java index d048170..bfc9b53 100644 --- a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java @@ -18,6 +18,7 @@ package org.apache.atlas.notification.hook; +import org.apache.atlas.model.notification.MessageSource; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.notification.entity.EntityNotificationTest; import org.apache.atlas.v1.model.instance.Referenceable; @@ -41,6 +42,7 @@ import static org.testng.Assert.assertTrue; */ public class HookNotificationDeserializerTest { private HookMessageDeserializer deserializer = new HookMessageDeserializer(); + MessageSource source = new MessageSource(this.getClass().getSimpleName()); @Test public void testDeserialize() throws Exception { @@ -48,7 +50,7 @@ public class HookNotificationDeserializerTest { EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); List<String> jsonMsgList = new ArrayList<>(); - AbstractNotification.createNotificationMessages(message, jsonMsgList); + AbstractNotification.createNotificationMessages(message, jsonMsgList, source); HookNotification deserializedMessage = deserialize(jsonMsgList); @@ -72,7 +74,7 @@ public class HookNotificationDeserializerTest { EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); List<String> jsonMsgList = new ArrayList<>(); - AbstractNotification.createNotificationMessages(message, jsonMsgList); + AbstractNotification.createNotificationMessages(message, jsonMsgList, source); assertTrue(jsonMsgList.size() == 1); @@ -92,7 +94,7 @@ public class HookNotificationDeserializerTest { EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); List<String> jsonMsgList = new ArrayList<>(); - AbstractNotification.createNotificationMessages(message, jsonMsgList); + AbstractNotification.createNotificationMessages(message, jsonMsgList, source); assertTrue(jsonMsgList.size() > 1);