This is an automated email from the ASF dual-hosted git repository.
mandarambawane pushed a commit to branch atlas-2.5
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/atlas-2.5 by this push:
new 7a2157921 ATLAS-4977 : storm-bridge, storm-bridge-shim modules: update
for code readability improvement
7a2157921 is described below
commit 7a2157921494e5e03ea3be4fc20f5aa5735e28b7
Author: Mandar Ambawane <[email protected]>
AuthorDate: Fri Feb 27 11:14:18 2026 +0530
ATLAS-4977 : storm-bridge, storm-bridge-shim modules: update for code
readability improvement
---
.../apache/atlas/storm/hook/StormAtlasHook.java | 11 ++---
.../apache/atlas/storm/hook/StormAtlasHook.java | 16 ++++----
.../apache/atlas/storm/hook/StormTopologyUtil.java | 47 +++++++++++++---------
.../apache/atlas/storm/model/StormDataTypes.java | 8 +---
.../apache/atlas/storm/hook/StormAtlasHookIT.java | 14 +++----
.../org/apache/atlas/storm/hook/StormTestUtil.java | 3 +-
6 files changed, 47 insertions(+), 52 deletions(-)
diff --git
a/addons/storm-bridge-shim/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
b/addons/storm-bridge-shim/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 0ce7633aa..e30a674db 100644
---
a/addons/storm-bridge-shim/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++
b/addons/storm-bridge-shim/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -18,7 +18,6 @@
package org.apache.atlas.storm.hook;
-
import org.apache.atlas.plugin.classloader.AtlasPluginClassLoader;
import org.apache.storm.ISubmitterHook;
import org.apache.storm.generated.StormTopology;
@@ -33,14 +32,11 @@ import java.util.Map;
*/
public class StormAtlasHook implements ISubmitterHook {
private static final Logger LOG =
LoggerFactory.getLogger(StormAtlasHook.class);
-
-
private static final String ATLAS_PLUGIN_TYPE = "storm";
private static final String ATLAS_STORM_HOOK_IMPL_CLASSNAME =
"org.apache.atlas.storm.hook.StormAtlasHook";
- private AtlasPluginClassLoader atlasPluginClassLoader = null;
- private ISubmitterHook stormHook = null;
-
+ private AtlasPluginClassLoader atlasPluginClassLoader;
+ private ISubmitterHook stormHook;
public StormAtlasHook() {
this.initialize();
@@ -48,11 +44,10 @@ public class StormAtlasHook implements ISubmitterHook {
@Override
public void notify(TopologyInfo topologyInfo, Map stormConf, StormTopology
stormTopology)
- throws IllegalAccessException {
+ throws IllegalAccessException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> StormAtlasHook.notify({}, {}, {})", topologyInfo,
stormConf, stormTopology);
}
-
try {
activatePluginClassLoader();
stormHook.notify(topologyInfo, stormConf, stormTopology);
diff --git
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index fe44d36cb..6c339e7c1 100644
---
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -18,15 +18,20 @@
package org.apache.atlas.storm.hook;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasConstants;
+import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
+import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
import org.apache.atlas.model.notification.HookNotification;
import
org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
+import org.apache.atlas.storm.model.StormDataTypes;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.HdfsNameServiceResolver;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -37,21 +42,16 @@ import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.utils.Utils;
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasConstants;
-import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
-import org.apache.atlas.hook.AtlasHook;
-import org.apache.atlas.storm.model.StormDataTypes;
import org.slf4j.Logger;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Date;
import static org.apache.atlas.repository.Constants.STORM_SOURCE;
@@ -264,7 +264,7 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
final String dbName =
config.get("HiveBolt.options.databaseName");
final String tblName =
config.get("HiveBolt.options.tableName");
- if (dbName == null || tblName ==null) {
+ if (dbName == null || tblName == null) {
LOG.error("Hive database or table name not found");
} else {
AtlasEntity dbEntity = new AtlasEntity("hive_db");
diff --git
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
index d9fd64f1b..91bd47399 100644
---
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
+++
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
@@ -94,25 +94,26 @@ public final class StormTopologyUtil {
public static Set<String> removeSystemComponents(Set<String> components) {
Set<String> userComponents = new HashSet<>();
for (String component : components) {
- if (!isSystemComponent(component))
+ if (!isSystemComponent(component)) {
userComponents.add(component);
+ }
}
return userComponents;
}
private static final Set<Class> WRAPPER_TYPES = new HashSet<Class>() {{
- add(Boolean.class);
- add(Character.class);
- add(Byte.class);
- add(Short.class);
- add(Integer.class);
- add(Long.class);
- add(Float.class);
- add(Double.class);
- add(Void.class);
- add(String.class);
- }};
+ add(Boolean.class);
+ add(Character.class);
+ add(Byte.class);
+ add(Short.class);
+ add(Integer.class);
+ add(Long.class);
+ add(Float.class);
+ add(Double.class);
+ add(Void.class);
+ add(String.class);
+ }};
public static boolean isWrapperType(Class clazz) {
return WRAPPER_TYPES.contains(clazz);
@@ -161,7 +162,9 @@ public final class StormTopologyUtil {
continue;
} else if (fieldVal.getClass().isPrimitive() ||
isWrapperType(fieldVal.getClass())) {
- if (toString(fieldVal, false).isEmpty()) continue;
+ if (toString(fieldVal, false).isEmpty()) {
+ continue;
+ }
output.put(key, toString(fieldVal, false));
} else if (isMapType(fieldVal.getClass())) {
//TODO: check if it makes more sense to just stick
to json
@@ -181,7 +184,9 @@ public final class StormTopologyUtil {
//TODO check if it makes more sense to just stick
to
// json like structure instead of a flatten output.
Collection collection = (Collection) fieldVal;
- if (collection.size() == 0) continue;
+ if (collection.size() == 0) {
+ continue;
+ }
String outStr = "";
for (Object o : collection) {
outStr += getString(o, false, objectsToSkip) +
",";
@@ -203,7 +208,7 @@ public final class StormTopologyUtil {
}
}
}
- catch (Exception e){
+ catch (Exception e) {
LOG.warn("Exception while constructing topology", e);
}
return output;
@@ -237,12 +242,16 @@ public final class StormTopologyUtil {
}
private static String toString(Object instance, boolean wrapWithQuote) {
- if (instance instanceof String)
- if (wrapWithQuote)
+ if (instance instanceof String) {
+ if (wrapWithQuote) {
return "\"" + instance + "\"";
- else
+ }
+ else {
return instance.toString();
- else
+ }
+ }
+ else {
return instance.toString();
+ }
}
}
diff --git
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/model/StormDataTypes.java
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/model/StormDataTypes.java
index 7eb1e3cb8..fd29146a4 100644
---
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/model/StormDataTypes.java
+++
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/model/StormDataTypes.java
@@ -15,15 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.atlas.storm.model;
-
-
/**
* Storm Data Types for model and hook.
*/
public enum StormDataTypes {
-
// Topology Classes
STORM_TOPOLOGY, // represents the topology containing the DAG
@@ -34,9 +30,7 @@ public enum StormDataTypes {
// Data Sets
KAFKA_TOPIC, // kafka data set
JMS_TOPIC, // jms data set
- HBASE_TABLE, // hbase table data set
- ;
-
+ HBASE_TABLE; // hbase table data set
public String getName() {
return name().toLowerCase();
}
diff --git
a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
index e11e1b8b0..36c19c7a7 100644
---
a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
+++
b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
@@ -19,26 +19,25 @@
package org.apache.atlas.storm.hook;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
-import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.storm.model.StormDataTypes;
import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.commons.configuration.Configuration;
import org.apache.storm.ILocalCluster;
import org.apache.storm.generated.StormTopology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertNotNull;
+
@Test
public class StormAtlasHookIT {
-
public static final Logger LOG =
LoggerFactory.getLogger(StormAtlasHookIT.class);
private static final String ATLAS_URL = "http://localhost:21000/";
@@ -55,13 +54,12 @@ public class StormAtlasHookIT {
Configuration configuration = ApplicationProperties.get();
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
- atlasClient = new
AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT),
new String[]{"admin", "admin"});
+ atlasClient = new
AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT),
new String[] {"admin", "admin"});
} else {
atlasClient = new
AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT));
}
}
-
@AfterClass
public void tearDown() throws Exception {
LOG.info("Shutting down storm local cluster");
@@ -78,11 +76,11 @@ public class StormAtlasHookIT {
// todo: test if topology metadata is registered in atlas
String guid = getTopologyGUID();
- Assert.assertNotNull(guid);
+ assertNotNull(guid);
LOG.info("GUID is {}", guid);
Referenceable topologyReferenceable = atlasClient.getEntity(guid);
- Assert.assertNotNull(topologyReferenceable);
+ assertNotNull(topologyReferenceable);
}
private String getTopologyGUID() throws Exception {
diff --git
a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java
b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java
index d869f18cd..eb0171084 100644
---
a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java
+++
b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java
@@ -34,13 +34,12 @@ import java.util.HashMap;
* An until to create a test topology.
*/
final class StormTestUtil {
-
private StormTestUtil() {
}
public static ILocalCluster createLocalStormCluster() {
// start a local storm cluster
- HashMap<String,Object> localClusterConf = new HashMap<>();
+ HashMap<String, Object> localClusterConf = new HashMap<>();
localClusterConf.put("nimbus-daemon", true);
return Testing.getLocalCluster(localClusterConf);
}