Repository: incubator-atlas Updated Branches: refs/heads/master 9eafb165a -> a2801f0ea
ATLAS-1089: fix Storm hook to handle cyclic references in topology object Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/a2801f0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/a2801f0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/a2801f0e Branch: refs/heads/master Commit: a2801f0eaff29a834915f243245e2d28b86a2bc3 Parents: 9eafb16 Author: Madhan Neethiraj <[email protected]> Authored: Wed Aug 3 17:11:56 2016 -0700 Committer: Suma Shivaprasad <[email protected]> Committed: Wed Aug 3 23:11:40 2016 -0700 ---------------------------------------------------------------------- .../apache/atlas/storm/hook/StormAtlasHook.java | 6 +- .../atlas/storm/hook/StormTopologyUtil.java | 129 ++++++++++--------- release-log.txt | 1 + 3 files changed, 73 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2801f0e/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index c4b4976..5ed4d99 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -179,7 +179,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { private Referenceable createDataSet(String name, String topologyOwner, Serializable instance, Map stormConf, List<Referenceable> dependentEntities) throws IllegalAccessException { - Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true); + Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null); String clusterName = null; Referenceable dataSetReferenceable; @@ -298,7 +298,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { stormSpout.get_spout_object().get_serialized_java(), Serializable.class); spoutReferenceable.set("driverClass", instance.getClass().getName()); - Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true); + Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null); spoutReferenceable.set("conf", flatConfigMap); return spoutReferenceable; @@ -322,7 +322,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { stormBolt.get_bolt_object().get_serialized_java(), Serializable.class); boltReferenceable.set("driverClass", instance.getClass().getName()); - Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true); + Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null); boltReferenceable.set("conf", flatConfigMap); return boltReferenceable; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2801f0e/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java index ce6a175..dc2ae57 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java @@ -127,86 +127,95 @@ public final class StormTopologyUtil { } public static Map<String, String> getFieldValues(Object instance, - boolean prependClassName) + boolean prependClassName, + Set<Object> objectsToSkip) throws IllegalAccessException { - Class clazz = instance.getClass(); + if (objectsToSkip == null) { + objectsToSkip = new HashSet<Object>(); + } + Map<String, String> output = new HashMap<>(); - for (Class<?> c = clazz; c != null; c = c.getSuperclass()) { - Field[] fields = c.getDeclaredFields(); - for (Field field : fields) { - if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) { - continue; - } - String key; - if (prependClassName) { - key = String.format("%s.%s", clazz.getSimpleName(), field.getName()); - } else { - key = field.getName(); - } + if (objectsToSkip.add(instance)) { + Class clazz = instance.getClass(); + for (Class<?> c = clazz; c != null; c = c.getSuperclass()) { + Field[] fields = c.getDeclaredFields(); + for (Field field : fields) { + if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) { + continue; + } - boolean accessible = field.isAccessible(); - if (!accessible) { - field.setAccessible(true); - } - Object fieldVal = field.get(instance); - if (fieldVal == null) { - continue; - } else if (fieldVal.getClass().isPrimitive() || - isWrapperType(fieldVal.getClass())) { - if (toString(fieldVal, false).isEmpty()) continue; - output.put(key, toString(fieldVal, false)); - } else if (isMapType(fieldVal.getClass())) { - //TODO: check if it makes more sense to just stick to json - // like structure instead of a flatten output. - Map map = (Map) fieldVal; - for (Object entry : map.entrySet()) { - Object mapKey = ((Map.Entry) entry).getKey(); - Object mapVal = ((Map.Entry) entry).getValue(); - - String keyStr = getString(mapKey, false); - String valStr = getString(mapVal, false); - if ((valStr == null) || (valStr.isEmpty())) { - continue; - } else { - output.put(String.format("%s.%s", key, keyStr), valStr); - } + String key; + if (prependClassName) { + key = String.format("%s.%s", clazz.getSimpleName(), field.getName()); + } else { + key = field.getName(); } - } else if (isCollectionType(fieldVal.getClass())) { - //TODO check if it makes more sense to just stick to - // json like structure instead of a flatten output. - Collection collection = (Collection) fieldVal; - if (collection.size()==0) continue; - String outStr = ""; - for (Object o : collection) { - outStr += getString(o, false) + ","; + + boolean accessible = field.isAccessible(); + if (!accessible) { + field.setAccessible(true); } - if (outStr.length() > 0) { - outStr = outStr.substring(0, outStr.length() - 1); + Object fieldVal = field.get(instance); + if (fieldVal == null) { + continue; + } else if (fieldVal.getClass().isPrimitive() || + isWrapperType(fieldVal.getClass())) { + if (toString(fieldVal, false).isEmpty()) continue; + output.put(key, toString(fieldVal, false)); + } else if (isMapType(fieldVal.getClass())) { + //TODO: check if it makes more sense to just stick to json + // like structure instead of a flatten output. + Map map = (Map) fieldVal; + for (Object entry : map.entrySet()) { + Object mapKey = ((Map.Entry) entry).getKey(); + Object mapVal = ((Map.Entry) entry).getValue(); + + String keyStr = getString(mapKey, false, objectsToSkip); + String valStr = getString(mapVal, false, objectsToSkip); + if ((valStr == null) || (valStr.isEmpty())) { + continue; + } else { + output.put(String.format("%s.%s", key, keyStr), valStr); + } + } + } else if (isCollectionType(fieldVal.getClass())) { + //TODO check if it makes more sense to just stick to + // json like structure instead of a flatten output. + Collection collection = (Collection) fieldVal; + if (collection.size() == 0) continue; + String outStr = ""; + for (Object o : collection) { + outStr += getString(o, false, objectsToSkip) + ","; + } + if (outStr.length() > 0) { + outStr = outStr.substring(0, outStr.length() - 1); + } + output.put(key, String.format("%s", outStr)); + } else { + Map<String, String> nestedFieldValues = getFieldValues(fieldVal, false, objectsToSkip); + for (Map.Entry<String, String> entry : nestedFieldValues.entrySet()) { + output.put(String.format("%s.%s", key, entry.getKey()), entry.getValue()); + } } - output.put(key, String.format("%s", outStr)); - } else { - Map<String, String> nestedFieldValues = getFieldValues(fieldVal, false); - for (Map.Entry<String, String> entry : nestedFieldValues.entrySet()) { - output.put(String.format("%s.%s", key, entry.getKey()), entry.getValue()); + if (!accessible) { + field.setAccessible(false); } } - if (!accessible) { - field.setAccessible(false); - } } } return output; } private static String getString(Object instance, - boolean wrapWithQuote) throws IllegalAccessException { + boolean wrapWithQuote, + Set<Object> objectsToSkip) throws IllegalAccessException { if (instance == null) { return null; } else if (instance.getClass().isPrimitive() || isWrapperType(instance.getClass())) { return toString(instance, wrapWithQuote); } else { - return getString(getFieldValues(instance, false), wrapWithQuote); + return getString(getFieldValues(instance, false, objectsToSkip), wrapWithQuote); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2801f0e/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 9424c2d..d620cc6 100644 --- a/release-log.txt +++ b/release-log.txt @@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES: ALL CHANGES: +ATLAS-1089 Storm hook should handle cyclic references in topology object (mneethiraj via sumasai) ATLAS-1086 Build failure in hive-bridge after security fixes in ATLAS-762 (sumasai) ATLAS-1088 Fix /search api to default to fulltext on dsl failure (sumasai) ATLAS-762 Assertion in NegativeSSLAndKerberosTest.testUnsecuredClient needs to be hardened (nixonrodrigues via sumasai)
