This is an automated email from the ASF dual-hosted git repository. amestry pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 36678ab ATLAS-4306: Support for User-specific Spool Directory 36678ab is described below commit 36678ab1f331eb717578c3b2ed6677544ac3aa2a Author: Ashutosh Mestry <ames...@cloudera.com> AuthorDate: Mon May 31 21:34:44 2021 -0700 ATLAS-4306: Support for User-specific Spool Directory --- .../atlas/notification/spool/AtlasFileSpool.java | 4 ++- .../atlas/notification/spool/IndexManagement.java | 6 ++--- .../notification/spool/SpoolConfiguration.java | 22 +++++++++++++--- .../atlas/notification/spool/SpoolUtils.java | 30 +++++++++++++++++++++- .../apache/atlas/notification/spool/BaseTest.java | 2 +- 5 files changed, 54 insertions(+), 10 deletions(-) diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java index ea31284..0c92c30 100644 --- a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java +++ b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java @@ -41,6 +41,7 @@ public class AtlasFileSpool implements NotificationInterface { private final Publisher publisher; private Thread publisherThread; private Boolean initDone = null; + private String currentUser; public AtlasFileSpool(Configuration configuration, AbstractNotification notificationHandler) { this.notificationHandler = notificationHandler; @@ -56,7 +57,7 @@ public class AtlasFileSpool implements NotificationInterface { if (!isInitDone()) { try { - config.setSource(source); + config.setSource(source, this.currentUser); LOG.info("{}: Initialization: Starting...", this.config.getSourceName()); @@ -86,6 +87,7 @@ public class AtlasFileSpool implements NotificationInterface { @Override public void setCurrentUser(String user) { this.notificationHandler.setCurrentUser(user); + this.currentUser = user; } @Override diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java index f018983..adbb8d1 100644 --- a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java +++ b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java @@ -54,14 +54,14 @@ public class IndexManagement { public void init() throws IOException, AtlasException { String sourceName = config.getSourceName(); - File spoolDir = SpoolUtils.getCreateDirectory(config.getSpoolDir()); - + File spoolDir = SpoolUtils.getCreateDirectoryWithPermissionCheck(config.getSpoolDir(), config.getUser()); if (spoolDir == null) { throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, spoolDir.getAbsolutePath())); } - File archiveDir = SpoolUtils.getCreateDirectory(config.getArchiveDir()); + config.setSpoolDir(spoolDir.getAbsolutePath()); + File archiveDir = SpoolUtils.getCreateDirectory(config.getArchiveDir()); if (archiveDir == null) { throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, archiveDir.getAbsolutePath())); } diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java index 76f05ef..36ea7be 100644 --- a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java +++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java @@ -40,19 +40,23 @@ public class SpoolConfiguration { public static final String PROP_FILE_SPOOL_PAUSE_BEFORE_SEND_SEC = PROPERTY_PREFIX_SPOOL + "pause.before.send.sec"; private static final String PROP_HIVE_METASTORE_NAME = PROPERTY_PREFIX_SPOOL + "hivemetastore.name"; + private final Configuration config; + private final String messageHandlerName; private final int maxArchivedFilesCount; private final int messageBatchSize; private final int retryDestinationMS; private final int fileRollOverSec; private final int fileSpoolMaxFilesCount; - private final String spoolDirPath; - private final String archiveDir; + private String spoolDirPath; + private String archiveDir; private final int pauseBeforeSendSec; private final String hiveMetaStoreName; private String sourceName; + private String user; public SpoolConfiguration(Configuration cfg, String messageHandlerName) { + this.config = cfg; this.messageHandlerName = messageHandlerName; this.maxArchivedFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT); this.messageBatchSize = cfg.getInt(PROP_MESSAGE_BATCH_SIZE, PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT); @@ -65,8 +69,9 @@ public class SpoolConfiguration { this.hiveMetaStoreName = cfg.getString(PROP_HIVE_METASTORE_NAME, PROP_HIVE_METASTORE_NAME_DEFAULT); } - public void setSource(String val) { - this.sourceName = val; + public void setSource(String source, String user) { + this.sourceName = source; + this.user = user; } public String getSourceName() { @@ -97,7 +102,12 @@ public class SpoolConfiguration { return new File(getSpoolDirPath()); } + public void setSpoolDir(String absolutePath) { + this.spoolDirPath = absolutePath; + } + public File getArchiveDir() { + this.archiveDir = config.getString(PROP_FILE_SPOOL_ARCHIVE_DIR, new File(getSpoolDirPath(), PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT).toString()); return new File(this.archiveDir); } @@ -136,4 +146,8 @@ public class SpoolConfiguration { public boolean isHiveMetaStore() { return this.sourceName.equals(this.hiveMetaStoreName); } + + public String getUser() { + return this.user; + } } diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java index abbe33d..9ee4c80 100644 --- a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java +++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java @@ -37,6 +37,7 @@ import java.text.SimpleDateFormat; public class SpoolUtils { private static final Logger LOG = LoggerFactory.getLogger(SpoolUtils.class); + private static final String USER_SPECIFIC_PATH_NAME_FORMAT = "%s-%s"; public static final String DEFAULT_CHAR_SET = "UTF-8"; private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator"); private static final String FILE_EXT_JSON = ".json"; @@ -72,6 +73,33 @@ public class SpoolUtils { return ret; } + public static File getCreateDirectoryWithPermissionCheck(File file, String user) { + File ret = getCreateDirectory(file); + + LOG.info("SpoolUtils.getCreateDirectory({}): Checking permissions..."); + if (!file.canWrite() || !file.canRead()) { + File fileWithUserSuffix = getFileWithUserSuffix(file, user); + LOG.error("SpoolUtils.getCreateDirectory({}, {}): Insufficient permissions for user: {}! Will create: {}", + file.getAbsolutePath(), user, user, fileWithUserSuffix); + ret = getCreateDirectory(fileWithUserSuffix); + } + + return ret; + } + + private static File getFileWithUserSuffix(File file, String user) { + if (!file.isDirectory()) { + return file; + } + + String absolutePath = file.getAbsolutePath(); + if (absolutePath.endsWith(File.pathSeparator)) { + absolutePath = StringUtils.removeEnd(absolutePath, File.pathSeparator); + } + + return new File(String.format(USER_SPECIFIC_PATH_NAME_FORMAT, absolutePath, user)); + } + public static File getCreateDirectory(File file) { File ret = file; @@ -79,7 +107,7 @@ public class SpoolUtils { boolean result = file.mkdirs(); if (!file.isDirectory() || !result) { - LOG.error("SpoolUtils.getCreateDirectory({}): inaccessible!", file.toString()); + LOG.error("SpoolUtils.getCreateDirectory({}): cannot be created!", file); ret = null; } diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java index 304c821..83971f6 100644 --- a/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java @@ -50,7 +50,7 @@ public class BaseTest { public SpoolConfiguration getSpoolConfiguration(String spoolDir, String handlerName) { SpoolConfiguration cfg = new SpoolConfiguration(getConfiguration(spoolDir), handlerName); - cfg.setSource(SOURCE_TEST); + cfg.setSource(SOURCE_TEST, "testuser"); return cfg; }