Repository: atlas Updated Branches: refs/heads/branch-0.8 e0697265f -> ba717c6e4
ATLAS-2886: Support for fully qualified server name Signed-off-by: Madhan Neethiraj <mad...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/ba717c6e Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/ba717c6e Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/ba717c6e Branch: refs/heads/branch-0.8 Commit: ba717c6e4bdfc9e9d59698ac150d8f12efe3e56e Parents: e069726 Author: Ashutosh Mestry <ames...@hortonworks.com> Authored: Sun Sep 23 08:50:06 2018 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Sun Sep 23 09:24:01 2018 -0700 ---------------------------------------------------------------------- addons/models/0010-base_model.json | 10 +- .../apache/atlas/model/impexp/AtlasServer.java | 114 +++++++++++-------- .../atlas/repository/impexp/AuditsWriter.java | 41 ++++--- .../atlas/repository/ogm/AtlasServerDTO.java | 15 ++- .../impexp/AtlasServerServiceTest.java | 2 +- .../impexp/ExportImportAuditServiceTest.java | 2 +- .../IncrementalExportEntityProviderTest.java | 1 - .../impexp/ReplicationEntityAttributeTest.java | 14 ++- .../stocksDB-Entities/export-replicatedTo.json | 2 +- .../import-replicatedFrom.json | 2 +- 10 files changed, 123 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/ba717c6e/addons/models/0010-base_model.json ---------------------------------------------------------------------- diff --git a/addons/models/0010-base_model.json b/addons/models/0010-base_model.json index dc9e9d6..1bfbf2f 100644 --- a/addons/models/0010-base_model.json +++ b/addons/models/0010-base_model.json @@ -122,6 +122,14 @@ ], "attributeDefs": [ { + "name": "name", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": false, + "isUnique": false + }, + { "name": "displayName", "typeName": "string", "cardinality": "SINGLE", @@ -130,7 +138,7 @@ "isUnique": false }, { - "name": "qualifiedName", + "name": "fullName", "typeName": "string", "cardinality": "SINGLE", "isIndexable": true, http://git-wip-us.apache.org/repos/asf/atlas/blob/ba717c6e/intg/src/main/java/org/apache/atlas/model/impexp/AtlasServer.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasServer.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasServer.java index 67d0110..8809094 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasServer.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasServer.java @@ -40,20 +40,23 @@ public class AtlasServer extends AtlasBaseModelObject implements Serializable { public static final String KEY_REPLICATION_DETAILS = "REPL_DETAILS"; - private String name; - private String qualifiedName; - private Map<String, String> additionalInfo; - private List<String> urls; + private String name; + private String fullName; + private String displayName; + private Map<String, String> additionalInfo = new HashMap<>(); + private List<String> urls = new ArrayList<>(); public AtlasServer() { - urls = new ArrayList<>(); - additionalInfo = new HashMap<>(); } - public AtlasServer(String name, String qualifiedName) { - this(); - this.name = name; - this.qualifiedName = qualifiedName; + public AtlasServer(String name, String fullName) { + this(name, name, fullName); + } + + public AtlasServer(String name, String displayName, String fullName) { + this.name = name; + this.displayName = displayName; + this.fullName = fullName; } public void setName(String name) { @@ -64,12 +67,45 @@ public class AtlasServer extends AtlasBaseModelObject implements Serializable { return this.name; } + public String getFullName() { + return fullName; + } + + public void setFullName(String fullName) { + this.fullName = fullName; + } + + public String getDisplayName() { + return displayName; + } + + public void setDisplayName(String displayName) { + this.displayName = displayName; + } + public void setAdditionalInfo(Map<String, String> additionalInfo) { this.additionalInfo = additionalInfo; } + public Map<String, String> getAdditionalInfo() { + return this.additionalInfo; + } + + public String getAdditionalInfo(String key) { + return additionalInfo.get(key); + } + + public void setUrls(List<String> urls) { + this.urls = urls; + } + + public List<String> getUrls() { + return this.urls; + } + + public void setAdditionalInfo(String key, String value) { - if(additionalInfo == null) { + if (additionalInfo == null) { additionalInfo = new HashMap<>(); } @@ -79,15 +115,15 @@ public class AtlasServer extends AtlasBaseModelObject implements Serializable { public void setAdditionalInfoRepl(String guid, long modifiedTimestamp) { Map<String, Object> replicationDetailsMap = null; - if(additionalInfo != null && additionalInfo.containsKey(KEY_REPLICATION_DETAILS)) { + if (additionalInfo != null && additionalInfo.containsKey(KEY_REPLICATION_DETAILS)) { replicationDetailsMap = AtlasType.fromJson(getAdditionalInfo().get(KEY_REPLICATION_DETAILS), Map.class); } - if(replicationDetailsMap == null) { + if (replicationDetailsMap == null) { replicationDetailsMap = new HashMap<>(); } - if(modifiedTimestamp == 0) { + if (modifiedTimestamp == 0) { replicationDetailsMap.remove(guid); } else { replicationDetailsMap.put(guid, modifiedTimestamp); @@ -96,59 +132,37 @@ public class AtlasServer extends AtlasBaseModelObject implements Serializable { updateReplicationMap(replicationDetailsMap); } - private void updateReplicationMap(Map<String, Object> replicationDetailsMap) { - String json = AtlasType.toJson(replicationDetailsMap); - setAdditionalInfo(KEY_REPLICATION_DETAILS, json); - } - - public Object getAdditionalInfoRepl(String guid) { - if(additionalInfo == null || !additionalInfo.containsKey(KEY_REPLICATION_DETAILS)) { + if (additionalInfo == null || !additionalInfo.containsKey(KEY_REPLICATION_DETAILS)) { return null; } - String key = guid; + String key = guid; String mapJson = additionalInfo.get(KEY_REPLICATION_DETAILS); Map<String, String> replicationDetailsMap = AtlasType.fromJson(mapJson, Map.class); - if(!replicationDetailsMap.containsKey(key)) { + + if (!replicationDetailsMap.containsKey(key)) { return null; } return replicationDetailsMap.get(key); } - public Map<String, String> getAdditionalInfo() { - return this.additionalInfo; - } - - public String getAdditionalInfo(String key) { - return additionalInfo.get(key); - } - - public String getQualifiedName() { - return qualifiedName; - } - - public void setQualifiedName(String qualifiedName) { - this.qualifiedName = qualifiedName; - } - - public void setUrls(List<String> urls) { - this.urls = urls; - } - - public List<String> getUrls() { - return this.urls; - } - @Override public StringBuilder toString(StringBuilder sb) { sb.append(", name=").append(name); - sb.append(", qualifiedName=").append(getQualifiedName()); - sb.append(", urls=").append(urls); + sb.append(", fullName=").append(fullName); + sb.append(", displayName=").append(displayName); sb.append(", additionalInfo=").append(additionalInfo); - sb.append("}"); + sb.append(", urls=").append(urls); + return sb; } + + private void updateReplicationMap(Map<String, Object> replicationDetailsMap) { + String json = AtlasType.toJson(replicationDetailsMap); + + setAdditionalInfo(KEY_REPLICATION_DETAILS, json); + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/ba717c6e/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java index 08576fe..f3f49c9 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java @@ -44,6 +44,7 @@ import java.util.Map; public class AuditsWriter { private static final Logger LOG = LoggerFactory.getLogger(AuditsWriter.class); private static final String CLUSTER_NAME_DEFAULT = "default"; + private static final String DC_SERVER_NAME_SEPARATOR = "$"; private AtlasServerService atlasServerService; private ExportImportAuditService auditService; @@ -74,7 +75,7 @@ public class AuditsWriter { } private void updateReplicationAttribute(boolean isReplicationSet, - String serverName, + String serverName, String serverFullName, List<String> exportedGuids, String attrNameReplicated, long lastModifiedTimestamp) throws AtlasBaseException { @@ -82,7 +83,7 @@ public class AuditsWriter { return; } - AtlasServer server = saveServer(serverName, exportedGuids.get(0), lastModifiedTimestamp); + AtlasServer server = saveServer(serverName, serverFullName, exportedGuids.get(0), lastModifiedTimestamp); atlasServerService.updateEntitiesWithServer(server, exportedGuids, attrNameReplicated); } @@ -92,16 +93,16 @@ public class AuditsWriter { : StringUtils.EMPTY; } - private AtlasServer saveServer(String clusterName) throws AtlasBaseException { - AtlasServer cluster = new AtlasServer(clusterName, clusterName); + private AtlasServer saveServer(String clusterName, String serverFullName) throws AtlasBaseException { + AtlasServer cluster = new AtlasServer(clusterName, serverFullName); return atlasServerService.save(cluster); } - private AtlasServer saveServer(String clusterName, + private AtlasServer saveServer(String clusterName, String serverFullName, String entityGuid, long lastModifiedTimestamp) throws AtlasBaseException { - AtlasServer server = new AtlasServer(clusterName, clusterName); + AtlasServer server = new AtlasServer(clusterName, serverFullName); server.setAdditionalInfoRepl(entityGuid, lastModifiedTimestamp); if (LOG.isDebugEnabled()) { @@ -121,11 +122,20 @@ public class AuditsWriter { return StringUtils.EMPTY; } + static String getServerNameFromFullName(String fullName) { + if (StringUtils.isEmpty(fullName) || !fullName.contains(DC_SERVER_NAME_SEPARATOR)) { + return fullName; + } + + return StringUtils.split(fullName, "$")[1]; + } + private class ExportAudits { private AtlasExportRequest request; private String targetServerName; private String optionKeyReplicatedTo; private boolean replicationOptionState; + private String targetServerFullName; public void add(String userName, AtlasExportResult result, long startTime, long endTime, @@ -144,16 +154,17 @@ public class AuditsWriter { return; } - updateReplicationAttribute(replicationOptionState, targetServerName, + updateReplicationAttribute(replicationOptionState, targetServerName, targetServerFullName, entityGuids, Constants.ATTR_NAME_REPLICATED_TO, result.getChangeMarker()); } private void saveServers() throws AtlasBaseException { - saveServer(getCurrentClusterName()); + saveServer(getCurrentClusterName(), getCurrentClusterName()); - targetServerName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo); + targetServerFullName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo); + targetServerName = getServerNameFromFullName(targetServerFullName); if(StringUtils.isNotEmpty(targetServerName)) { - saveServer(targetServerName); + saveServer(targetServerName, targetServerFullName); } } } @@ -163,6 +174,7 @@ public class AuditsWriter { private boolean replicationOptionState; private String sourceServerName; private String optionKeyReplicatedFrom; + private String sourceServerFullName; public void add(String userName, AtlasImportResult result, long startTime, long endTime, @@ -182,16 +194,17 @@ public class AuditsWriter { return; } - updateReplicationAttribute(replicationOptionState, this.sourceServerName, entityGuids, + updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids, Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker()); } private void saveServers() throws AtlasBaseException { - saveServer(getCurrentClusterName()); + saveServer(getCurrentClusterName(), getCurrentClusterName()); - sourceServerName = getClusterNameFromOptionsState(); + sourceServerFullName = getClusterNameFromOptionsState(); + sourceServerName = getServerNameFromFullName(sourceServerFullName); if(StringUtils.isNotEmpty(sourceServerName)) { - saveServer(sourceServerName); + saveServer(sourceServerName, sourceServerFullName); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/ba717c6e/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasServerDTO.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasServerDTO.java b/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasServerDTO.java index 6343a5d..d4ccb00 100644 --- a/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasServerDTO.java +++ b/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasServerDTO.java @@ -27,8 +27,9 @@ import java.util.List; import java.util.Map; public class AtlasServerDTO extends AbstractDataTransferObject<AtlasServer> { + private final String PROPERTY_NAME = "name"; private final String PROPERTY_DISPLAY_NAME = "displayName"; - private final String PROPERTY_QUALIFIED_NAME = "qualifiedName"; + private final String PROPERTY_FULL_NAME = "fullName"; private final String PROPERTY_ADDITIONAL_INFO = "additionalInfo"; private final String PROPERTY_URLS = "urls"; @@ -40,8 +41,9 @@ public class AtlasServerDTO extends AbstractDataTransferObject<AtlasServer> { AtlasServer cluster = new AtlasServer(); setGuid(cluster, entity); - cluster.setName((String) entity.getAttribute(PROPERTY_DISPLAY_NAME)); - cluster.setQualifiedName((String) entity.getAttribute(PROPERTY_QUALIFIED_NAME)); + cluster.setName((String) entity.getAttribute(PROPERTY_NAME)); + cluster.setFullName((String) entity.getAttribute(PROPERTY_FULL_NAME)); + cluster.setDisplayName((String) entity.getAttribute(PROPERTY_DISPLAY_NAME)); cluster.setAdditionalInfo((Map<String,String>) entity.getAttribute(PROPERTY_ADDITIONAL_INFO)); cluster.setUrls((List<String>) entity.getAttribute(PROPERTY_URLS)); @@ -56,8 +58,9 @@ public class AtlasServerDTO extends AbstractDataTransferObject<AtlasServer> { public AtlasEntity toEntity(AtlasServer obj) { AtlasEntity entity = getDefaultAtlasEntity(obj); - entity.setAttribute(PROPERTY_DISPLAY_NAME, obj.getName()); - entity.setAttribute(PROPERTY_QUALIFIED_NAME, obj.getQualifiedName()); + entity.setAttribute(PROPERTY_NAME, obj.getName()); + entity.setAttribute(PROPERTY_DISPLAY_NAME, obj.getDisplayName()); + entity.setAttribute(PROPERTY_FULL_NAME, obj.getFullName()); entity.setAttribute(PROPERTY_ADDITIONAL_INFO, obj.getAdditionalInfo()); entity.setAttribute(PROPERTY_URLS, obj.getUrls()); @@ -72,7 +75,7 @@ public class AtlasServerDTO extends AbstractDataTransferObject<AtlasServer> { @Override public Map<String, Object> getUniqueAttributes(final AtlasServer obj) { return new HashMap<String, Object>() {{ - put(PROPERTY_QUALIFIED_NAME, obj.getQualifiedName()); + put(PROPERTY_FULL_NAME, obj.getFullName()); }}; } } http://git-wip-us.apache.org/repos/asf/atlas/blob/ba717c6e/repository/src/test/java/org/apache/atlas/repository/impexp/AtlasServerServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/AtlasServerServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/AtlasServerServiceTest.java index 243c64f..7ab0f32 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/AtlasServerServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/AtlasServerServiceTest.java @@ -87,7 +87,7 @@ public class AtlasServerServiceTest { assertEquals(actual.getName(), expected.getName()); - assertEquals(actual.getQualifiedName(), expected.getQualifiedName()); + assertEquals(actual.getFullName(), expected.getFullName()); } private AtlasServer getServer(String serverName, String topLevelEntity, String operation, long nextModifiedTimestamp, String targetServerName) { http://git-wip-us.apache.org/repos/asf/atlas/blob/ba717c6e/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java index 16fd39d..ba7a8a0 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java @@ -61,7 +61,7 @@ public class ExportImportAuditServiceTest extends ExportImportTestBase { } @Test - public void saveLogEntry() throws AtlasBaseException, InterruptedException { + public void saveLogEntry() throws AtlasBaseException { final String source1 = "clx"; final String target1 = "cly"; ExportImportAuditEntry entry = saveAndGet(source1, ExportImportAuditEntry.OPERATION_EXPORT, target1); http://git-wip-us.apache.org/repos/asf/atlas/blob/ba717c6e/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java index 22ad66f..99d88eb 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java @@ -64,7 +64,6 @@ public class IncrementalExportEntityProviderTest extends ExportImportTestBase { verifyCreatedEntities(entityStore, entityGuids, 2); gremlinScriptEngine = atlasGraph.getGremlinScriptEngine(); - EntityGraphRetriever entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry); incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, gremlinScriptEngine); } http://git-wip-us.apache.org/repos/asf/atlas/blob/ba717c6e/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java index 8de7368..9b9bdd9 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java @@ -116,7 +116,10 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { assertNotNull(zipSource.getCreationOrder()); assertEquals(zipSource.getCreationOrder().size(), expectedEntityCount); - assertCluster(REPLICATED_TO_CLUSTER_NAME, null); + assertCluster( + AuditsWriter.getServerNameFromFullName(REPLICATED_TO_CLUSTER_NAME), + REPLICATED_TO_CLUSTER_NAME, null); + assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_TO); } @@ -125,7 +128,9 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { AtlasImportRequest request = getImportRequestWithReplicationOption(); AtlasImportResult importResult = runImportWithParameters(importService, request, zipSource); - assertCluster(REPLICATED_FROM_CLUSTER_NAME, importResult); + assertCluster( + AuditsWriter.getServerNameFromFullName(REPLICATED_FROM_CLUSTER_NAME), + REPLICATED_FROM_CLUSTER_NAME, importResult); assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_FROM); } @@ -141,11 +146,12 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { } } - private void assertCluster(String name, AtlasImportResult importResult) throws AtlasBaseException { - AtlasServer actual = atlasServerService.get(new AtlasServer(name, name)); + private void assertCluster(String name, String fullName, AtlasImportResult importResult) throws AtlasBaseException { + AtlasServer actual = atlasServerService.get(new AtlasServer(name, fullName)); assertNotNull(actual); assertEquals(actual.getName(), name); + assertEquals(actual.getFullName(), fullName); if(importResult != null) { assertClusterAdditionalInfo(actual, importResult); http://git-wip-us.apache.org/repos/asf/atlas/blob/ba717c6e/repository/src/test/resources/json/stocksDB-Entities/export-replicatedTo.json ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/json/stocksDB-Entities/export-replicatedTo.json b/repository/src/test/resources/json/stocksDB-Entities/export-replicatedTo.json index a69fe9e..a6fec6c 100644 --- a/repository/src/test/resources/json/stocksDB-Entities/export-replicatedTo.json +++ b/repository/src/test/resources/json/stocksDB-Entities/export-replicatedTo.json @@ -6,6 +6,6 @@ ], "options": { "fetchType": "full", - "replicatedTo": "clTarget" + "replicatedTo": "dc2$clTarget" } } http://git-wip-us.apache.org/repos/asf/atlas/blob/ba717c6e/repository/src/test/resources/json/stocksDB-Entities/import-replicatedFrom.json ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/json/stocksDB-Entities/import-replicatedFrom.json b/repository/src/test/resources/json/stocksDB-Entities/import-replicatedFrom.json index 1ce00ad..29268ef 100644 --- a/repository/src/test/resources/json/stocksDB-Entities/import-replicatedFrom.json +++ b/repository/src/test/resources/json/stocksDB-Entities/import-replicatedFrom.json @@ -1,5 +1,5 @@ { "options": { - "replicatedFrom": "clSource" + "replicatedFrom": "dc1$clSource" } }