ATLAS-835 Falcon Integration with Atlas (sowmyaramesh via shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/e30ab3d8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/e30ab3d8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/e30ab3d8 Branch: refs/heads/master Commit: e30ab3d8d78cfe3dae70babff9d3c6bcf9065f20 Parents: 436a524 Author: Shwetha GS <[email protected]> Authored: Mon Jun 20 12:21:55 2016 +0530 Committer: Shwetha GS <[email protected]> Committed: Mon Jun 20 12:21:55 2016 +0530 ---------------------------------------------------------------------- .../org/apache/atlas/falcon/Util/EventUtil.java | 64 +++ .../atlas/falcon/bridge/FalconBridge.java | 401 +++++++++++++++++++ .../apache/atlas/falcon/event/FalconEvent.java | 72 ++++ .../apache/atlas/falcon/hook/FalconHook.java | 246 ++++-------- .../falcon/model/FalconDataModelGenerator.java | 142 +++++-- .../atlas/falcon/model/FalconDataTypes.java | 18 +- .../falcon/publisher/FalconEventPublisher.java | 41 ++ .../atlas/falcon/service/AtlasService.java | 141 +++++++ .../org/apache/falcon/atlas/Util/EventUtil.java | 68 ---- .../apache/falcon/atlas/event/FalconEvent.java | 65 --- .../atlas/publisher/FalconEventPublisher.java | 41 -- .../falcon/atlas/service/AtlasService.java | 115 ------ .../apache/atlas/falcon/hook/FalconHookIT.java | 242 +++++++---- .../src/test/resources/feed-replication.xml | 43 ++ .../org/apache/atlas/hive/hook/HiveHook.java | 51 +-- .../org/apache/atlas/hive/hook/HiveHookIT.java | 3 - distro/src/conf/atlas-log4j.xml | 12 +- docs/src/site/twiki/Bridge-Falcon.twiki | 17 +- .../java/org/apache/atlas/hook/AtlasHook.java | 5 +- .../org/apache/atlas/hook/AtlasHookTest.java | 18 +- release-log.txt | 1 + .../main/resources/atlas-application.properties | 4 + typesystem/src/main/resources/atlas-log4j.xml | 4 +- .../apache/atlas/web/filters/AuditFilter.java | 4 +- 24 files changed, 1191 insertions(+), 627 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java new file mode 100644 index 0000000..c1ccd05 --- /dev/null +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.falcon.Util; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.security.CurrentUser; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Falcon event util + */ +public final class EventUtil { + + private EventUtil() {} + + + public static Map<String, String> convertKeyValueStringToMap(final String keyValueString) { + if (StringUtils.isBlank(keyValueString)) { + return null; + } + + Map<String, String> keyValueMap = new HashMap<>(); + + String[] tags = keyValueString.split(","); + for (String tag : tags) { + int index = tag.indexOf("="); + String tagKey = tag.substring(0, index); + String tagValue = tag.substring(index + 1, tag.length()); + keyValueMap.put(tagKey, tagValue); + } + return keyValueMap; + } + + public static UserGroupInformation getUgi() throws FalconException { + UserGroupInformation ugi; + try { + ugi = CurrentUser.getAuthenticatedUGI(); + } catch (IOException ioe) { + throw new FalconException(ioe); + } + return ugi; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java new file mode 100644 index 0000000..1621d95 --- /dev/null +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java @@ -0,0 +1,401 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.falcon.bridge; + +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasConstants; +import org.apache.atlas.falcon.model.FalconDataModelGenerator; +import org.apache.atlas.falcon.model.FalconDataTypes; +import org.apache.atlas.fs.model.FSDataTypes; +import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; +import org.apache.atlas.hive.model.HiveDataModelGenerator; +import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.commons.lang3.StringUtils; +import org.apache.atlas.falcon.Util.EventUtil; +import org.apache.falcon.entity.CatalogStorage; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.FileSystemStorage; +import org.apache.falcon.entity.ProcessHelper; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.feed.CatalogTable; +import org.apache.falcon.entity.v0.feed.ClusterType; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Location; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.process.Cluster; +import org.apache.falcon.entity.v0.process.Input; +import org.apache.falcon.entity.v0.process.Output; +import org.apache.falcon.entity.v0.process.Workflow; +import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A Bridge Utility to register Falcon entities metadata to Atlas. + */ +public class FalconBridge { + private static final Logger LOG = LoggerFactory.getLogger(FalconBridge.class); + + /** + * Creates cluster entity + * + * @param cluster ClusterEntity + * @return cluster instance reference + */ + public static Referenceable createClusterEntity(final org.apache.falcon.entity.v0.cluster.Cluster cluster, + final String user, + final Date timestamp) throws Exception { + LOG.info("Creating cluster Entity : {}", cluster.getName()); + + Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER.getName()); + + clusterRef.set(FalconDataModelGenerator.NAME, cluster.getName()); + clusterRef.set("description", cluster.getDescription()); + clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, cluster.getName()); + + clusterRef.set(FalconDataModelGenerator.TIMESTAMP, timestamp); + clusterRef.set(FalconDataModelGenerator.COLO, cluster.getColo()); + + clusterRef.set(FalconDataModelGenerator.USER, user); + + if (StringUtils.isNotEmpty(cluster.getTags())) { + clusterRef.set(FalconDataModelGenerator.TAGS, + EventUtil.convertKeyValueStringToMap(cluster.getTags())); + } + + return clusterRef; + } + + private static Referenceable createFeedEntity(Feed feed, Referenceable clusterReferenceable, + String user, Date timestamp) throws Exception { + LOG.info("Creating feed dataset: {}", feed.getName()); + + Referenceable datasetReferenceable = new Referenceable(FalconDataTypes.FALCON_FEED.getName()); + datasetReferenceable.set(FalconDataModelGenerator.NAME, feed.getName()); + String feedQualifiedName = + getFeedQualifiedName(feed.getName(), (String) clusterReferenceable.get(FalconDataModelGenerator.NAME)); + datasetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName); + datasetReferenceable.set(FalconDataModelGenerator.TIMESTAMP, timestamp); + + datasetReferenceable.set(FalconDataModelGenerator.STOREDIN, clusterReferenceable); + datasetReferenceable.set(FalconDataModelGenerator.USER, user); + + if (StringUtils.isNotEmpty(feed.getTags())) { + datasetReferenceable.set(FalconDataModelGenerator.TAGS, + EventUtil.convertKeyValueStringToMap(feed.getTags())); + } + + if (feed.getGroups() != null) { + datasetReferenceable.set(FalconDataModelGenerator.GROUPS, feed.getGroups()); + } + + return datasetReferenceable; + } + + public static List<Referenceable> createFeedCreationEntity(Feed feed, ConfigurationStore falconStore, String user, + Date timestamp) throws Exception { + LOG.info("Creating feed : {}", feed.getName()); + + List<Referenceable> entities = new ArrayList<>(); + + if (feed.getClusters() != null) { + List<Referenceable> replicationInputs = new ArrayList<>(); + List<Referenceable> replicationOutputs = new ArrayList<>(); + + for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) { + org.apache.falcon.entity.v0.cluster.Cluster cluster = falconStore.get(EntityType.CLUSTER, + feedCluster.getName()); + + // set cluster + Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo()); + entities.add(clusterReferenceable); + + // input as hive_table or hdfs_path, output as falcon_feed dataset + List<Referenceable> inputs = new ArrayList<>(); + List<Referenceable> inputReferenceables = getInputEntities(cluster, feed); + if (inputReferenceables != null) { + entities.addAll(inputReferenceables); + inputs.add(inputReferenceables.get(inputReferenceables.size() - 1)); + } + + List<Referenceable> outputs = new ArrayList<>(); + Referenceable feedEntity = createFeedEntity(feed, clusterReferenceable, user, timestamp); + if (feedEntity != null) { + entities.add(feedEntity); + outputs.add(feedEntity); + } + + if (!inputs.isEmpty() || !outputs.isEmpty()) { + Referenceable feedCreateEntity = new Referenceable(FalconDataTypes.FALCON_FEED_CREATION.getName()); + String feedQualifiedName = getFeedQualifiedName(feed.getName(), cluster.getName()); + + feedCreateEntity.set(FalconDataModelGenerator.NAME, feed.getName()); + feedCreateEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName); + + feedCreateEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp); + if (!inputs.isEmpty()) { + feedCreateEntity.set(FalconDataModelGenerator.INPUTS, inputs); + } + if (!outputs.isEmpty()) { + feedCreateEntity.set(FalconDataModelGenerator.OUTPUTS, outputs); + } + + feedCreateEntity.set(FalconDataModelGenerator.STOREDIN, clusterReferenceable); + feedCreateEntity.set(FalconDataModelGenerator.USER, user); + entities.add(feedCreateEntity); + } + + if (ClusterType.SOURCE == feedCluster.getType()) { + replicationInputs.add(feedEntity); + } else if (ClusterType.TARGET == feedCluster.getType()) { + replicationOutputs.add(feedEntity); + } + } + + if (!replicationInputs.isEmpty() && !replicationInputs.isEmpty()) { + Referenceable feedReplicationEntity = new Referenceable(FalconDataTypes + .FALCON_FEED_REPLICATION.getName()); + + feedReplicationEntity.set(FalconDataModelGenerator.NAME, feed.getName()); + feedReplicationEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feed.getName()); + + feedReplicationEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp); + feedReplicationEntity.set(FalconDataModelGenerator.INPUTS, replicationInputs); + feedReplicationEntity.set(FalconDataModelGenerator.OUTPUTS, replicationOutputs); + feedReplicationEntity.set(FalconDataModelGenerator.USER, user); + entities.add(feedReplicationEntity); + } + + } + return entities; + } + + /** + * + * Creates process entity + * + * + * + * @param process process entity + * + * @param falconStore config store + * + * @param user falcon user + * + * @param timestamp timestamp of entity + * + * @return process instance reference + * + + */ + public static List<Referenceable> createProcessEntity(org.apache.falcon.entity.v0.process.Process process, + ConfigurationStore falconStore, String user, + Date timestamp) throws Exception { + LOG.info("Creating process Entity : {}", process.getName()); + + // The requirement is for each cluster, create a process entity with name + // clustername.processname + List<Referenceable> entities = new ArrayList<>(); + + if (process.getClusters() != null) { + + for (Cluster processCluster : process.getClusters().getClusters()) { + org.apache.falcon.entity.v0.cluster.Cluster cluster = + falconStore.get(EntityType.CLUSTER, processCluster.getName()); + Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo()); + entities.add(clusterReferenceable); + + List<Referenceable> inputs = new ArrayList<>(); + if (process.getInputs() != null) { + for (Input input : process.getInputs().getInputs()) { + Referenceable inputReferenceable = getFeedDataSetReference(getFeedQualifiedName(input.getFeed(), + cluster.getName()), clusterReferenceable); + entities.add(inputReferenceable); + inputs.add(inputReferenceable); + } + } + + List<Referenceable> outputs = new ArrayList<>(); + if (process.getOutputs() != null) { + for (Output output : process.getOutputs().getOutputs()) { + Referenceable outputReferenceable = getFeedDataSetReference(getFeedQualifiedName(output.getFeed(), + cluster.getName()), clusterReferenceable); + entities.add(outputReferenceable); + outputs.add(outputReferenceable); + } + } + + if (!inputs.isEmpty() || !outputs.isEmpty()) { + + Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS.getName()); + processEntity.set(FalconDataModelGenerator.NAME, process.getName()); + processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + getProcessQualifiedName(process.getName(), cluster.getName())); + processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp); + + if (!inputs.isEmpty()) { + processEntity.set(FalconDataModelGenerator.INPUTS, inputs); + } + if (!outputs.isEmpty()) { + processEntity.set(FalconDataModelGenerator.OUTPUTS, outputs); + } + + // set cluster + processEntity.set(FalconDataModelGenerator.RUNSON, clusterReferenceable); + + // Set user + processEntity.set(FalconDataModelGenerator.USER, user); + + if (StringUtils.isNotEmpty(process.getTags())) { + processEntity.set(FalconDataModelGenerator.TAGS, + EventUtil.convertKeyValueStringToMap(process.getTags())); + } + + if (process.getPipelines() != null) { + processEntity.set(FalconDataModelGenerator.PIPELINES, process.getPipelines()); + } + + processEntity.set(FalconDataModelGenerator.WFPROPERTIES, + getProcessEntityWFProperties(process.getWorkflow(), + process.getName())); + + entities.add(processEntity); + } + + } + } + return entities; + } + + private static List<Referenceable> getInputEntities(org.apache.falcon.entity.v0.cluster.Cluster cluster, + Feed feed) throws Exception { + org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); + + final CatalogTable table = getTable(feedCluster, feed); + if (table != null) { + CatalogStorage storage = new CatalogStorage(cluster, table); + return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(), + storage.getTable().toLowerCase()); + } else { + List<Location> locations = FeedHelper.getLocations(feedCluster, feed); + Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA); + final String pathUri = normalize(dataLocation.getPath()); + LOG.info("Registering DFS Path {} ", pathUri); + return fillHDFSDataSet(pathUri, cluster.getName()); + } + } + + private static CatalogTable getTable(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) { + // check if table is overridden in cluster + if (cluster.getTable() != null) { + return cluster.getTable(); + } + + return feed.getTable(); + } + + private static List<Referenceable> fillHDFSDataSet(final String pathUri, final String clusterName) { + List<Referenceable> entities = new ArrayList<>(); + Referenceable ref = new Referenceable(FSDataTypes.HDFS_PATH().toString()); + ref.set("path", pathUri); + // Path path = new Path(pathUri); + // ref.set("name", path.getName()); + //TODO - Fix after ATLAS-542 to shorter Name + ref.set(FalconDataModelGenerator.NAME, pathUri); + ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri); + ref.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); + entities.add(ref); + return entities; + } + + private static Referenceable createHiveDatabaseInstance(String clusterName, String dbName) + throws Exception { + Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); + dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); + dbRef.set(HiveDataModelGenerator.NAME, dbName); + dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); + return dbRef; + } + + private static List<Referenceable> createHiveTableInstance(String clusterName, String dbName, + String tableName) throws Exception { + List<Referenceable> entities = new ArrayList<>(); + Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName); + entities.add(dbRef); + + Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); + tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); + tableRef.set(HiveDataModelGenerator.NAME, tableName.toLowerCase()); + tableRef.set(HiveDataModelGenerator.DB, dbRef); + entities.add(tableRef); + + return entities; + } + + private static Referenceable getClusterEntityReference(final String clusterName, + final String colo) { + LOG.info("Getting reference for entity {}", clusterName); + Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER.getName()); + clusterRef.set(FalconDataModelGenerator.NAME, String.format("%s", clusterName)); + clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, clusterName); + clusterRef.set(FalconDataModelGenerator.COLO, colo); + return clusterRef; + } + + + private static Referenceable getFeedDataSetReference(final String feedDatasetName, + Referenceable clusterReference) { + LOG.info("Getting reference for entity {}", feedDatasetName); + Referenceable feedDatasetRef = new Referenceable(FalconDataTypes.FALCON_FEED.getName()); + feedDatasetRef.set(FalconDataModelGenerator.NAME, feedDatasetName); + feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedDatasetName); + feedDatasetRef.set(FalconDataModelGenerator.STOREDIN, clusterReference); + return feedDatasetRef; + } + + private static Map<String, String> getProcessEntityWFProperties(final Workflow workflow, + final String processName) { + Map<String, String> wfProperties = new HashMap<>(); + wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), + ProcessHelper.getProcessWorkflowName(workflow.getName(), processName)); + wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), + workflow.getVersion()); + wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), + workflow.getEngine().value()); + + return wfProperties; + } + + public static String getFeedQualifiedName(final String feedName, final String clusterName) { + return String.format("%s@%s", feedName, clusterName); + } + + public static String getProcessQualifiedName(final String processName, final String clusterName) { + return String.format("%s@%s", processName, clusterName); + } + + public static String normalize(final String str) { + if (StringUtils.isBlank(str)) { + return null; + } + return str.toLowerCase().trim(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java new file mode 100644 index 0000000..37df6da --- /dev/null +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.falcon.event; + +import org.apache.falcon.entity.v0.Entity; +import org.apache.hadoop.security.UserGroupInformation; + +import java.util.Date; + +/** + * Falcon event to interface with Atlas Service. + */ +public class FalconEvent { + protected String user; + protected UserGroupInformation ugi; + protected OPERATION operation; + protected long timestamp; + protected Entity entity; + + public FalconEvent(String doAsUser, UserGroupInformation ugi, OPERATION falconOperation, long timestamp, Entity entity) { + this.user = doAsUser; + this.ugi = ugi; + this.operation = falconOperation; + this.timestamp = timestamp; + this.entity = entity; + } + + public enum OPERATION { + ADD_CLUSTER, + UPDATE_CLUSTER, + ADD_FEED, + UPDATE_FEED, + ADD_PROCESS, + UPDATE_PROCESS, + } + + public String getUser() { + return user; + } + + public UserGroupInformation getUgi() { + return ugi; + } + + public OPERATION getOperation() { + return operation; + } + + public Date getTimestamp() { + return new Date(timestamp); + } + + public Entity getEntity() { + return entity; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java index f27a8b0..95f255e 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,32 +21,17 @@ package org.apache.atlas.falcon.hook; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Guice; import com.google.inject.Injector; -import org.apache.atlas.AtlasClient; -import org.apache.atlas.AtlasConstants; -import org.apache.atlas.falcon.model.FalconDataModelGenerator; -import org.apache.atlas.falcon.model.FalconDataTypes; -import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; -import org.apache.atlas.hive.model.HiveDataModelGenerator; -import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.falcon.bridge.FalconBridge; import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationModule; +import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; -import org.apache.commons.lang.StringUtils; -import org.apache.falcon.atlas.Util.EventUtil; -import org.apache.falcon.atlas.event.FalconEvent; -import org.apache.falcon.atlas.publisher.FalconEventPublisher; -import org.apache.falcon.entity.CatalogStorage; -import org.apache.falcon.entity.FeedHelper; +import org.apache.atlas.falcon.event.FalconEvent; +import org.apache.atlas.falcon.publisher.FalconEventPublisher; import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.feed.CatalogTable; import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Cluster; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Output; import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.security.CurrentUser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,6 +71,11 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { private static ConfigurationStore STORE; + private enum Operation { + ADD, + UPDATE + } + static { try { // initialize the async facility to process hook calls. We don't @@ -115,12 +105,14 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { }); STORE = ConfigurationStore.get(); + + Injector injector = Guice.createInjector(new NotificationModule()); + notifInterface = injector.getInstance(NotificationInterface.class); + } catch (Exception e) { - LOG.info("Caught exception initializing the falcon hook.", e); + LOG.error("Caught exception initializing the falcon hook.", e); } - Injector injector = Guice.createInjector(new NotificationModule()); - notifInterface = injector.getInstance(NotificationInterface.class); LOG.info("Created Atlas Hook for Falcon"); } @@ -128,166 +120,92 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { @Override public void publish(final Data data) throws Exception { final FalconEvent event = data.getEvent(); - if (sync) { - fireAndForget(event); - } else { - executor.submit(new Runnable() { - @Override - public void run() { - try { - fireAndForget(event); - } catch (Throwable e) { - LOG.info("Atlas hook failed", e); + try { + if (sync) { + fireAndForget(event); + } else { + executor.submit(new Runnable() { + @Override + public void run() { + try { + fireAndForget(event); + } catch (Throwable e) { + LOG.info("Atlas hook failed", e); + } } - } - }); + }); + } + } catch (Throwable t) { + LOG.warn("Error in processing data {}", data); } } + @Override + protected String getNumberOfRetriesPropertyKey() { + return HOOK_NUM_RETRIES; + } + private void fireAndForget(FalconEvent event) throws Exception { LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation()); + List<HookNotification.HookNotificationMessage> messages = new ArrayList<>(); - notifyEntities(getAuthenticatedUser(), createEntities(event)); - } + Operation op = getOperation(event.getOperation()); + String user = getUser(event.getUser()); + LOG.info("fireAndForget user:{}, ugi: {}", user, event.getUgi()); + switch (op) { + case ADD: + messages.add(new HookNotification.EntityCreateRequest(user, createEntities(event, user))); + break; - private String getAuthenticatedUser() { - String user = null; - try { - user = CurrentUser.getAuthenticatedUser(); - } catch (IllegalArgumentException e) { - LOG.warn("Failed to get user from CurrentUser.getAuthenticatedUser"); } - return getUser(user, null); + notifyEntities(messages); } - private List<Referenceable> createEntities(FalconEvent event) throws Exception { - switch (event.getOperation()) { - case ADD_PROCESS: - return createProcessInstance((Process) event.getEntity(), event.getUser(), event.getTimestamp()); - } - - return null; - } - - /** - + * Creates process entity - + * - + * @param event process entity event - + * @return process instance reference - + */ - public List<Referenceable> createProcessInstance(Process process, String user, long timestamp) throws Exception { - LOG.info("Creating process Instance : {}", process.getName()); - - // The requirement is for each cluster, create a process entity with name - // clustername.processname + private List<Referenceable> createEntities(FalconEvent event, String user) throws Exception { List<Referenceable> entities = new ArrayList<>(); - if (process.getClusters() != null) { - - for (Cluster processCluster : process.getClusters().getClusters()) { - org.apache.falcon.entity.v0.cluster.Cluster cluster = STORE.get(EntityType.CLUSTER, processCluster.getName()); - - List<Referenceable> inputs = new ArrayList<>(); - if (process.getInputs() != null) { - for (Input input : process.getInputs().getInputs()) { - List<Referenceable> clusterInputs = getInputOutputEntity(cluster, input.getFeed()); - if (clusterInputs != null) { - entities.addAll(clusterInputs); - inputs.add(clusterInputs.get(clusterInputs.size() - 1)); - } - } - } - - List<Referenceable> outputs = new ArrayList<>(); - if (process.getOutputs() != null) { - for (Output output : process.getOutputs().getOutputs()) { - List<Referenceable> clusterOutputs = getInputOutputEntity(cluster, output.getFeed()); - if (clusterOutputs != null) { - entities.addAll(clusterOutputs); - outputs.add(clusterOutputs.get(clusterOutputs.size() - 1)); - } - } - } - - if (!inputs.isEmpty() || !outputs.isEmpty()) { - Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS_ENTITY.getName()); - processEntity.set(FalconDataModelGenerator.NAME, String.format("%s", process.getName(), - cluster.getName())); - processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, String.format("%s@%s", process.getName(), - cluster.getName())); - processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp); - if (!inputs.isEmpty()) { - processEntity.set(FalconDataModelGenerator.INPUTS, inputs); - } - if (!outputs.isEmpty()) { - processEntity.set(FalconDataModelGenerator.OUTPUTS, outputs); - } - processEntity.set(FalconDataModelGenerator.USER, user); - - if (StringUtils.isNotEmpty(process.getTags())) { - processEntity.set(FalconDataModelGenerator.TAGS, - EventUtil.convertKeyValueStringToMap(process.getTags())); - } - entities.add(processEntity); - } - - } + switch (event.getOperation()) { + case ADD_CLUSTER: + entities.add(FalconBridge + .createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster) event.getEntity(), user, + event.getTimestamp())); + break; + + case ADD_PROCESS: + entities.addAll(FalconBridge.createProcessEntity((Process) event.getEntity(), STORE, + user, event.getTimestamp())); + break; + + case ADD_FEED: + entities.addAll(FalconBridge.createFeedCreationEntity((Feed) event.getEntity(), STORE, + user, event.getTimestamp())); + break; + + case UPDATE_CLUSTER: + case UPDATE_FEED: + case UPDATE_PROCESS: + default: + LOG.info("Falcon operation {} is not valid or supported", event.getOperation()); } return entities; } - private List<Referenceable> getInputOutputEntity(org.apache.falcon.entity.v0.cluster.Cluster cluster, String feedName) throws Exception { - Feed feed = STORE.get(EntityType.FEED, feedName); - org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); + private static Operation getOperation(final FalconEvent.OPERATION op) throws Exception { + switch (op) { + case ADD_CLUSTER: + case ADD_FEED: + case ADD_PROCESS: + return Operation.ADD; - final CatalogTable table = getTable(feedCluster, feed); - if (table != null) { - CatalogStorage storage = new CatalogStorage(cluster, table); - return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(), - storage.getTable().toLowerCase()); - } - - return null; - } + case UPDATE_CLUSTER: + case UPDATE_FEED: + case UPDATE_PROCESS: + return Operation.UPDATE; - private CatalogTable getTable(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) { - // check if table is overridden in cluster - if (cluster.getTable() != null) { - return cluster.getTable(); + default: + throw new Exception("Falcon operation " + op + " is not valid or supported"); } - - return feed.getTable(); - } - - private Referenceable createHiveDatabaseInstance(String clusterName, String dbName) - throws Exception { - Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); - dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); - dbRef.set(HiveDataModelGenerator.NAME, dbName); - dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); - return dbRef; - } - - private List<Referenceable> createHiveTableInstance(String clusterName, String dbName, String tableName) throws Exception { - List<Referenceable> entities = new ArrayList<>(); - Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName); - entities.add(dbRef); - - Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); - tableRef.set(HiveDataModelGenerator.NAME, - tableName); - tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); - tableRef.set(HiveDataModelGenerator.DB, dbRef); - entities.add(tableRef); - - return entities; - } - - @Override - protected String getNumberOfRetriesPropertyKey() { - return HOOK_NUM_RETRIES; } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java index 397dea4..81cd5e0 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,7 +20,6 @@ package org.apache.atlas.falcon.model; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; - import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.addons.ModelDefinitionDump; @@ -53,48 +52,46 @@ public class FalconDataModelGenerator { private static final Logger LOG = LoggerFactory.getLogger(FalconDataModelGenerator.class); private final Map<String, HierarchicalTypeDefinition<ClassType>> classTypeDefinitions; - private final Map<String, EnumTypeDefinition> enumTypeDefinitionMap; - private final Map<String, StructTypeDefinition> structTypeDefinitionMap; public static final String NAME = "name"; public static final String TIMESTAMP = "timestamp"; - public static final String USER = "owned-by"; - public static final String TAGS = "tag-classification"; + public static final String COLO = "colo"; + public static final String USER = "owner"; + public static final String TAGS = "tags"; + public static final String GROUPS = "groups"; + public static final String PIPELINES = "pipelines"; + public static final String WFPROPERTIES = "workflow-properties"; + public static final String RUNSON = "runs-on"; + public static final String STOREDIN = "stored-in"; // multiple inputs and outputs for process public static final String INPUTS = "inputs"; public static final String OUTPUTS = "outputs"; - public FalconDataModelGenerator() { classTypeDefinitions = new HashMap<>(); - enumTypeDefinitionMap = new HashMap<>(); - structTypeDefinitionMap = new HashMap<>(); } public void createDataModel() throws AtlasException { LOG.info("Generating the Falcon Data Model"); - createProcessEntityClass(); + // classes + createClusterEntityClass(); + createProcessEntityClass(); + createFeedEntityClass(); + createFeedDatasetClass(); + createReplicationFeedEntityClass(); } private TypesDef getTypesDef() { - return TypesUtil.getTypesDef(getEnumTypeDefinitions(), getStructTypeDefinitions(), getTraitTypeDefinitions(), - getClassTypeDefinitions()); + return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(), + getTraitTypeDefinitions(), getClassTypeDefinitions()); } public String getDataModelAsJSON() { return TypesSerialization.toJson(getTypesDef()); } - private ImmutableList<EnumTypeDefinition> getEnumTypeDefinitions() { - return ImmutableList.copyOf(enumTypeDefinitionMap.values()); - } - - private ImmutableList<StructTypeDefinition> getStructTypeDefinitions() { - return ImmutableList.copyOf(structTypeDefinitionMap.values()); - } - private ImmutableList<HierarchicalTypeDefinition<ClassType>> getClassTypeDefinitions() { return ImmutableList.copyOf(classTypeDefinitions.values()); } @@ -103,24 +100,103 @@ public class FalconDataModelGenerator { return ImmutableList.of(); } + private void createClusterEntityClass() throws AtlasException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false, + null), + new AttributeDefinition(COLO, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, + null), + new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, + null), + // map of tags + new AttributeDefinition(TAGS, + DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()), + Multiplicity.OPTIONAL, false, null),}; - private void createProcessEntityClass() throws AtlasException { + HierarchicalTypeDefinition<ClassType> definition = + new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_CLUSTER.getName(), null, + ImmutableSet.of(AtlasClient.INFRASTRUCTURE_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(FalconDataTypes.FALCON_CLUSTER.getName(), definition); + LOG.debug("Created definition for {}", FalconDataTypes.FALCON_CLUSTER.getName()); + } + + private void createFeedEntityClass() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(TIMESTAMP, DataTypes.LONG_TYPE.getName(), Multiplicity.REQUIRED, false, + new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, null), + new AttributeDefinition(STOREDIN, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED, + false, null), new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, + null)}; + + HierarchicalTypeDefinition<ClassType> definition = + new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_FEED_CREATION.getName(), null, + ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(FalconDataTypes.FALCON_FEED_CREATION.getName(), definition); + LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED_CREATION.getName()); + } + + private void createFeedDatasetClass() throws AtlasException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false, + null), + new AttributeDefinition(STOREDIN, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED, + false, null), + new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), + new AttributeDefinition(GROUPS, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), // map of tags - new AttributeDefinition(TAGS, DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()), + new AttributeDefinition(TAGS, + DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()), Multiplicity.OPTIONAL, false, null),}; HierarchicalTypeDefinition<ClassType> definition = - new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), null, - ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions); - classTypeDefinitions.put(FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), definition); - LOG.debug("Created definition for {}", FalconDataTypes.FALCON_PROCESS_ENTITY.getName()); + new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_FEED.getName(), null, + ImmutableSet.of(AtlasClient.DATA_SET_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(FalconDataTypes.FALCON_FEED.getName(), definition); + LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED.getName()); + } + + + private void createReplicationFeedEntityClass() throws AtlasException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, + null), + new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, + null)}; + + HierarchicalTypeDefinition<ClassType> definition = + new HierarchicalTypeDefinition<>(ClassType.class, + FalconDataTypes.FALCON_FEED_REPLICATION.getName(), null, + ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(FalconDataTypes.FALCON_FEED_REPLICATION.getName(), definition); + LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED_REPLICATION.getName()); } + private void createProcessEntityClass() throws AtlasException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, + null), + new AttributeDefinition(RUNSON, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED, + false, null), + new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, + null), + // map of tags + new AttributeDefinition(TAGS, + DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition(PIPELINES, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), + // wf properties + new AttributeDefinition(WFPROPERTIES, + DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()), + Multiplicity.OPTIONAL, false, null),}; + + HierarchicalTypeDefinition<ClassType> definition = + new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_PROCESS.getName(), null, + ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(FalconDataTypes.FALCON_PROCESS.getName(), definition); + LOG.debug("Created definition for {}", FalconDataTypes.FALCON_PROCESS.getName()); + } public String getModelAsJson() throws AtlasException { @@ -145,11 +221,13 @@ public class FalconDataModelGenerator { Arrays.toString(enumType.enumValues))); } for (StructTypeDefinition structType : typesDef.structTypesAsJavaList()) { - System.out.println(String.format("%s(%s) - attributes %s", structType.typeName, StructType.class.getSimpleName(), - Arrays.toString(structType.attributeDefinitions))); + System.out.println( + String.format("%s(%s) - attributes %s", structType.typeName, StructType.class.getSimpleName(), + Arrays.toString(structType.attributeDefinitions))); } for (HierarchicalTypeDefinition<ClassType> classType : typesDef.classTypesAsJavaList()) { - System.out.println(String.format("%s(%s) - super types [%s] - attributes %s", classType.typeName, ClassType.class.getSimpleName(), + System.out.println(String.format("%s(%s) - super types [%s] - attributes %s", classType.typeName, + ClassType.class.getSimpleName(), StringUtils.join(classType.superTypes, ","), Arrays.toString(classType.attributeDefinitions))); } for (HierarchicalTypeDefinition<TraitType> traitType : typesDef.traitTypesAsJavaList()) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java index f1f350b..e36ff23 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java @@ -22,19 +22,15 @@ package org.apache.atlas.falcon.model; * Falcon Data Types for model and bridge. */ public enum FalconDataTypes { - - - FALCON_PROCESS_ENTITY("falcon_process"), - ; - - private final String name; - - FalconDataTypes(java.lang.String name) { - this.name = name; - } + // Classes + FALCON_CLUSTER, + FALCON_FEED_CREATION, + FALCON_FEED, + FALCON_FEED_REPLICATION, + FALCON_PROCESS; public String getName() { - return name; + return name().toLowerCase(); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java new file mode 100644 index 0000000..ea81226 --- /dev/null +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.falcon.publisher; + + +import org.apache.atlas.falcon.event.FalconEvent; + +/** + * Falcon publisher for Atlas + */ +public interface FalconEventPublisher { + class Data { + private FalconEvent event; + + public Data(FalconEvent event) { + this.event = event; + } + + public FalconEvent getEvent() { + return event; + } + } + + void publish(final Data data) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java new file mode 100644 index 0000000..c92bd43 --- /dev/null +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.falcon.service; + +import org.apache.atlas.falcon.Util.EventUtil; +import org.apache.atlas.falcon.event.FalconEvent; +import org.apache.atlas.falcon.hook.FalconHook; +import org.apache.atlas.falcon.publisher.FalconEventPublisher; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.service.ConfigurationChangeListener; +import org.apache.falcon.service.FalconService; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Atlas service to publish Falcon events + */ +public class AtlasService implements FalconService, ConfigurationChangeListener { + + private static final Logger LOG = LoggerFactory.getLogger(AtlasService.class); + private FalconEventPublisher publisher; + + /** + * Constant for the service name. + */ + public static final String SERVICE_NAME = AtlasService.class.getSimpleName(); + + @Override + public String getName() { + return SERVICE_NAME; + } + + @Override + public void init() throws FalconException { + ConfigurationStore.get().registerListener(this); + publisher = new FalconHook(); + } + + @Override + public void destroy() throws FalconException { + ConfigurationStore.get().unregisterListener(this); + } + + @Override + public void onAdd(Entity entity) throws FalconException { + try { + EntityType entityType = entity.getEntityType(); + switch (entityType) { + case CLUSTER: + addEntity(entity, FalconEvent.OPERATION.ADD_CLUSTER); + break; + + case PROCESS: + addEntity(entity, FalconEvent.OPERATION.ADD_PROCESS); + break; + + case FEED: + addEntity(entity, FalconEvent.OPERATION.ADD_FEED); + break; + + default: + LOG.debug("Entity type not processed {}", entityType); + } + } catch(Throwable t) { + LOG.warn("Error handling entity {}", entity, t); + } + } + + @Override + public void onRemove(Entity entity) throws FalconException { + } + + @Override + public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { + /** + * Skipping update for now - update uses full update currently and this might result in all attributes wiped for hive entities + EntityType entityType = newEntity.getEntityType(); + switch (entityType) { + case CLUSTER: + addEntity(newEntity, FalconEvent.OPERATION.UPDATE_CLUSTER); + break; + + case PROCESS: + addEntity(newEntity, FalconEvent.OPERATION.UPDATE_PROCESS); + break; + + case FEED: + FalconEvent.OPERATION operation = isReplicationFeed((Feed) newEntity) ? + FalconEvent.OPERATION.UPDATE_REPLICATION_FEED : + FalconEvent.OPERATION.UPDATE_FEED; + addEntity(newEntity, operation); + break; + + default: + LOG.debug("Entity type not processed {}", entityType); + } + **/ + } + + @Override + public void onReload(Entity entity) throws FalconException { + //Since there is no import script that can import existing falcon entities to atlas, adding on falcon service start + onAdd(entity); + } + + private void addEntity(Entity entity, FalconEvent.OPERATION operation) throws FalconException { + LOG.info("Adding {} entity to Atlas: {}", entity.getEntityType().name(), entity.getName()); + + try { + String user = entity.getACL() != null ? entity.getACL().getOwner() : + UserGroupInformation.getLoginUser().getShortUserName(); + FalconEvent event = + new FalconEvent(user, EventUtil.getUgi(), operation, System.currentTimeMillis(), entity); + FalconEventPublisher.Data data = new FalconEventPublisher.Data(event); + publisher.publish(data); + } catch (Exception ex) { + throw new FalconException("Unable to publish data to publisher " + ex.getMessage(), ex); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java deleted file mode 100644 index 7f67407..0000000 --- a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.atlas.Util; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.security.CurrentUser; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * Falcon event util - */ -public final class EventUtil { - private static final Logger LOG = LoggerFactory.getLogger(EventUtil.class); - - private EventUtil() {} - - - public static Map<String, String> convertKeyValueStringToMap(final String keyValueString) { - if (StringUtils.isBlank(keyValueString)) { - return null; - } - - Map<String, String> keyValueMap = new HashMap<>(); - - String[] tags = keyValueString.split(","); - for (String tag : tags) { - int index = tag.indexOf("="); - String tagKey = tag.substring(0, index); - String tagValue = tag.substring(index + 1, tag.length()); - keyValueMap.put(tagKey, tagValue); - } - return keyValueMap; - } - - - public static UserGroupInformation getUgi() throws FalconException { - UserGroupInformation ugi; - try { - ugi = CurrentUser.getAuthenticatedUGI(); - } catch (IOException ioe) { - throw new FalconException(ioe); - } - return ugi; - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java deleted file mode 100644 index e587e73..0000000 --- a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.atlas.event; - -import org.apache.falcon.entity.v0.Entity; -import org.apache.hadoop.security.UserGroupInformation; - -/** - * Falcon event to interface with Atlas Service. - */ -public class FalconEvent { - protected String user; - protected UserGroupInformation ugi; - protected OPERATION operation; - protected long timestamp; - protected Entity entity; - - public FalconEvent(String doAsUser, UserGroupInformation ugi, OPERATION falconOperation, long timestamp, Entity entity) { - this.user = doAsUser; - this.ugi = ugi; - this.operation = falconOperation; - this.timestamp = timestamp; - this.entity = entity; - } - - public enum OPERATION { - ADD_PROCESS, UPDATE_PROCESS - } - - public String getUser() { - return user; - } - - public UserGroupInformation getUgi() { - return ugi; - } - - public OPERATION getOperation() { - return operation; - } - - public long getTimestamp() { - return timestamp; - } - - public Entity getEntity() { - return entity; - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java deleted file mode 100644 index 8029be9..0000000 --- a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.atlas.publisher; - - -import org.apache.falcon.atlas.event.FalconEvent; - -/** - * Falcon publisher for Atlas - */ -public interface FalconEventPublisher { - class Data { - private FalconEvent event; - - public Data(FalconEvent event) { - this.event = event; - } - - public FalconEvent getEvent() { - return event; - } - } - - void publish(final Data data) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java deleted file mode 100644 index 373846d..0000000 --- a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.atlas.service; - -import org.apache.atlas.falcon.hook.FalconHook; -import org.apache.falcon.FalconException; -import org.apache.falcon.atlas.Util.EventUtil; -import org.apache.falcon.atlas.event.FalconEvent; -import org.apache.falcon.atlas.publisher.FalconEventPublisher; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.service.ConfigurationChangeListener; -import org.apache.falcon.service.FalconService; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Atlas service to publish Falcon events - */ -public class AtlasService implements FalconService, ConfigurationChangeListener { - - private static final Logger LOG = LoggerFactory.getLogger(AtlasService.class); - private FalconEventPublisher publisher; - - /** - * Constant for the service name. - */ - public static final String SERVICE_NAME = AtlasService.class.getSimpleName(); - - @Override - public String getName() { - return SERVICE_NAME; - } - - @Override - public void init() throws FalconException { - ConfigurationStore.get().registerListener(this); - publisher = new FalconHook(); - } - - - @Override - public void destroy() throws FalconException { - ConfigurationStore.get().unregisterListener(this); - } - - @Override - public void onAdd(Entity entity) throws FalconException { - EntityType entityType = entity.getEntityType(); - switch (entityType) { - case PROCESS: - addProcessEntity((Process) entity, FalconEvent.OPERATION.ADD_PROCESS); - break; - - default: - LOG.debug("Entity type not processed " + entityType); - } - } - - @Override - public void onRemove(Entity entity) throws FalconException { - } - - @Override - public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { - EntityType entityType = newEntity.getEntityType(); - switch (entityType) { - case PROCESS: - addProcessEntity((Process) newEntity, FalconEvent.OPERATION.UPDATE_PROCESS); - break; - - default: - LOG.debug("Entity type not processed " + entityType); - } - } - - @Override - public void onReload(Entity entity) throws FalconException { - //Since there is no import script that can import existing falcon entities to atlas, adding on falcon service start - onAdd(entity); - } - - private void addProcessEntity(Process entity, FalconEvent.OPERATION operation) throws FalconException { - LOG.info("Adding process entity to Atlas: {}", entity.getName()); - - try { - String user = entity.getACL() != null ? entity.getACL().getOwner() : - UserGroupInformation.getLoginUser().getShortUserName(); - FalconEvent event = new FalconEvent(user, EventUtil.getUgi(), operation, System.currentTimeMillis(), entity); - FalconEventPublisher.Data data = new FalconEventPublisher.Data(event); - publisher.publish(data); - } catch (Exception ex) { - throw new FalconException("Unable to publish data to publisher " + ex.getMessage(), ex); - } - } -}
