Repository: incubator-eagle
Updated Branches:
  refs/heads/master f1cd71e90 -> f6fad2ebe


[EAGLE-580] : fix spoutSpecs load met invalid FieldName(Dot) exception

Author: Zeng, Bryant
Reviewer: ralphsu

This closes #467


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f6fad2eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f6fad2eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f6fad2eb

Branch: refs/heads/master
Commit: f6fad2ebe8a384a6d17e40c609041ff77f6a692c
Parents: f1cd71e
Author: mizeng <miz...@ebaysf.com>
Authored: Thu Sep 29 20:38:22 2016 +0800
Committer: Ralph, Su <suliang...@gmail.com>
Committed: Thu Sep 29 10:29:45 2016 -0700

----------------------------------------------------------------------
 .../metadata/impl/MongoMetadataDaoImpl.java     | 89 ++++++++++++++++----
 .../alert/resource/impl/MongoImplTest.java      | 20 +++++
 2 files changed, 94 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f6fad2eb/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
index 2f03bf9..aaf059b 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
@@ -48,10 +48,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * @since Apr 11, 2016.
@@ -98,7 +95,7 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
 
     private void init() {
         db = client.getDatabase(DB_NAME);
-        IndexOptions io = new 
IndexOptions().background(true).unique(true).name("nameIndex");
+        IndexOptions io = new 
IndexOptions().background(true).name("nameIndex");
         BsonDocument doc = new BsonDocument();
         doc.append("name", new BsonInt32(1));
         cluster = db.getCollection("clusters");
@@ -128,14 +125,14 @@ public class MongoMetadataDaoImpl implements IMetadataDao 
{
 
         // below is for schedule_specs and its splitted collections
         BsonDocument doc1 = new BsonDocument();
-        IndexOptions io1 = new 
IndexOptions().background(true).unique(true).name("versionIndex");
+        IndexOptions io1 = new 
IndexOptions().background(true).name("versionIndex");
         doc1.append("version", new BsonInt32(1));
         scheduleStates = db.getCollection("schedule_specs");
         scheduleStates.createIndex(doc1, io1);
 
         spoutSpecs = db.getCollection("spoutSpecs");
         {
-            IndexOptions ioInternal = new 
IndexOptions().background(true).unique(true).name("topologyIdIndex");
+            IndexOptions ioInternal = new 
IndexOptions().background(true).name("topologyIdIndex");
             BsonDocument docInternal = new BsonDocument();
             docInternal.append("topologyId", new BsonInt32(1));
             spoutSpecs.createIndex(docInternal, ioInternal);
@@ -143,7 +140,7 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
 
         alertSpecs = db.getCollection("alertSpecs");
         {
-            IndexOptions ioInternal = new 
IndexOptions().background(true).unique(true).name("topologyNameIndex");
+            IndexOptions ioInternal = new 
IndexOptions().background(true).name("topologyNameIndex");
             BsonDocument docInternal = new BsonDocument();
             docInternal.append("topologyName", new BsonInt32(1));
             alertSpecs.createIndex(docInternal, ioInternal);
@@ -304,15 +301,57 @@ public class MongoMetadataDaoImpl implements IMetadataDao 
{
 
     private <T> OpResult addOne(MongoCollection<Document> collection, T t) {
         OpResult result = new OpResult();
+        String json = "";
         try {
-            String json = mapper.writeValueAsString(t);
+            json = mapper.writeValueAsString(t);
             collection.insertOne(Document.parse(json));
             result.code = 200;
-            result.message = String.format("add one document to collection %s 
succeed!", collection.getNamespace());
+            result.message = String.format("add one document [%s] to 
collection [%s] succeed!", json, collection.getNamespace());
+            LOG.info(result.message);
         } catch (Exception e) {
             result.code = 400;
             result.message = e.getMessage();
-            LOG.error("", e);
+            LOG.error(String.format("Add one document [%s] to collection [%s] 
failed!", json, collection.getNamespace()), e);
+        }
+        return result;
+    }
+
+    /**
+     * Due to some field name in SpoutSpec contains dot(.) which is invalid 
Mongo Field name, we need to transform the
+     * format to store in Mongo.
+     * @return opresult
+     */
+    private <T> OpResult addOneSpoutSpec(T t) {
+        OpResult result = new OpResult();
+        String json = "";
+        try {
+            json = mapper.writeValueAsString(t);
+            Document doc = Document.parse(json);
+
+            String [] metadataMapArrays = {"kafka2TupleMetadataMap", 
"tuple2StreamMetadataMap", "streamRepartitionMetadataMap"};
+            for (String metadataMapName: metadataMapArrays) {
+                Document _metadataMapDoc = (Document) doc.get(metadataMapName);
+                doc.remove(metadataMapName);
+
+                ArrayList<Document> _metadataMapArray = new ArrayList<>();
+
+                for ( String key : _metadataMapDoc.keySet()) {
+                    Document _subDoc = new Document();
+                    _subDoc.put("topicName", key);
+                    _subDoc.put(metadataMapName, _metadataMapDoc.get(key));
+                    _metadataMapArray.add(_subDoc);
+                }
+                doc.append(metadataMapName, _metadataMapArray);
+            }
+
+            spoutSpecs.insertOne(doc);
+            result.code = 200;
+            result.message = String.format("add one document [%s] to 
collection [%s] succeed!", doc.toJson(), spoutSpecs.getNamespace());
+            LOG.info(result.message);
+        } catch (Exception e) {
+            result.code = 400;
+            result.message = e.getMessage();
+            LOG.error(String.format("Add one document [%s] to collection [%s] 
failed!", json, spoutSpecs.getNamespace()), e);
         }
         return result;
     }
@@ -427,7 +466,26 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
             public void apply(Document document) {
                 String json = document.toJson();
                 try {
-                    maps.put(document.getString(mapKey), 
mapper.readValue(json, clz));
+                    //Due to some field name in SpoutSpec contains dot(.) 
which is invalid Mongo Field name,
+                    // we need to transform the format while reading from 
Mongo.
+                    if (clz == SpoutSpec.class) {
+                        Document doc = Document.parse(json);
+                        String [] metadataMapArrays = 
{"kafka2TupleMetadataMap", "tuple2StreamMetadataMap", 
"streamRepartitionMetadataMap"};
+                        for (String metadataMapName: metadataMapArrays) {
+                            ArrayList<Document> subDocs = (ArrayList) 
doc.get(metadataMapName);
+                            doc.remove(metadataMapName);
+
+                            Document replaceDoc = new Document();
+                            for ( Document subDoc : subDocs) {
+                                replaceDoc.put((String) 
subDoc.get("topicName"), subDoc.get(metadataMapName));
+                            }
+                            doc.put(metadataMapName, replaceDoc);
+                        }
+
+                        json = doc.toJson();
+                    }
+                    T t = mapper.readValue(json, clz);
+                    maps.put(document.getString(mapKey), t);
                 } catch (IOException e) {
                     LOG.error("deserialize config item failed!", e);
                 }
@@ -487,7 +545,7 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
         try {
             for (String key : state.getSpoutSpecs().keySet()) {
                 SpoutSpec spoutSpec = state.getSpoutSpecs().get(key);
-                addOne(spoutSpecs, spoutSpec);
+                addOneSpoutSpec(spoutSpec);
             }
 
             for (String key : state.getAlertSpecs().keySet()) {
@@ -522,8 +580,8 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
             }
 
             ScheduleStateBase stateBase = new ScheduleStateBase(
-                state.getVersion(), state.getGenerateTime(), state.getCode(),
-                state.getMessage(), state.getScheduleTimeMillis());
+                    state.getVersion(), state.getGenerateTime(), 
state.getCode(),
+                    state.getMessage(), state.getScheduleTimeMillis());
 
             addOne(scheduleStates, stateBase);
 
@@ -581,4 +639,5 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
     public void close() throws IOException {
         client.close();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f6fad2eb/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
index 63871c3..06f5034 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
@@ -223,11 +223,31 @@ public class MongoImplTest {
         SpoutSpec spoutSpec1 = new SpoutSpec();
         String topologyId1 = "testUnitTopology1_" + timestamp;
         spoutSpec1.setTopologyId(topologyId1);
+
+        Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap = new 
HashMap<>();
+        Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata();
+        kafka2TupleMetadata.setType("KAFKA");
+        kafka2TupleMetadataMap.put("preprocess.network-sherlock.events", 
kafka2TupleMetadata);
+        spoutSpec1.setKafka2TupleMetadataMap(kafka2TupleMetadataMap);
+
+        Map<String, List<StreamRepartitionMetadata>> 
streamRepartitionMetadataMap= new HashMap<>();
+        List<StreamRepartitionMetadata> StreamRepartitionMetadataList = new 
ArrayList<>();
+        StreamRepartitionMetadata streamRepartitionMetadata = new 
StreamRepartitionMetadata();
+        List<StreamRepartitionStrategy> groupingStrategies = new ArrayList();
+        StreamRepartitionStrategy streamRepartitionStrategy = new 
StreamRepartitionStrategy();
+        streamRepartitionStrategy.setStartSequence(4);
+        groupingStrategies.add(streamRepartitionStrategy);
+        streamRepartitionMetadata.setGroupingStrategies(groupingStrategies);
+        StreamRepartitionMetadataList.add(streamRepartitionMetadata);
+        
streamRepartitionMetadataMap.put("preprocess.network-nervecenter.events", 
StreamRepartitionMetadataList);
+        
spoutSpec1.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap);
         spoutSpecsMap.put(topologyId1, spoutSpec1);
 
         SpoutSpec spoutSpec2 = new SpoutSpec();
         String topologyId2 = "testUnitTopology2_" + timestamp;
         spoutSpec2.setTopologyId(topologyId2);
+        spoutSpec2.setKafka2TupleMetadataMap(kafka2TupleMetadataMap);
+        
spoutSpec2.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap);
         spoutSpecsMap.put(topologyId2, spoutSpec2);
 
         // Alert Spec

Reply via email to