Add source->stream->fields mapping to multi-lang handshake.

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/427cef54
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/427cef54
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/427cef54

Branch: refs/heads/master
Commit: 427cef54c74204a257bc97a3e8fb793854603aaf
Parents: a1912ea
Author: Dan Blanchard <d...@parsely.com>
Authored: Tue Jul 7 10:25:58 2015 -0400
Committer: Dan Blanchard <d...@parsely.com>
Committed: Tue Jul 7 10:25:58 2015 -0400

----------------------------------------------------------------------
 docs/documentation/Multilang-protocol.md        |  9 ++++++--
 .../backtype/storm/task/TopologyContext.java    | 23 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/427cef54/docs/documentation/Multilang-protocol.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Multilang-protocol.md 
b/docs/documentation/Multilang-protocol.md
index 017ad32..2a90059 100644
--- a/docs/documentation/Multilang-protocol.md
+++ b/docs/documentation/Multilang-protocol.md
@@ -66,7 +66,7 @@ The initial handshake is the same for both types of shell 
components:
             "4": "example-bolt2"
         },
         "taskid": 3,
-        // Everything below this line is only available in Storm 0.11.0+
+        // Everything below this line is only available in Storm 0.10.0+
         "componentid": "example-bolt"
         "stream->target->grouping": {
                "default": {
@@ -82,6 +82,11 @@ The initial handshake is the same for both types of shell 
components:
                        }
                }
            }
+           "source->stream->fields": {
+               "example-spout": {
+                       "default": ["word"]
+               }
+           }
        }
 }
 ```
@@ -90,7 +95,7 @@ Your script should create an empty file named with its PID in 
this directory. e.
 the PID is 1234, so an empty file named 1234 is created in the directory. This
 file lets the supervisor know the PID so it can shutdown the process later on.
 
-As of Storm 0.11.0, the context sent by Storm to shell components has been
+As of Storm 0.10.0, the context sent by Storm to shell components has been
 enhanced substantially to include all aspects of the topology context available
 to JVM components.  One key addition is the ability to determine a shell
 component's source and targets (i.e., inputs and outputs) in the topology via

http://git-wip-us.apache.org/repos/asf/storm/blob/427cef54/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java 
b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index b3c6d22..cefa207 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -191,6 +191,28 @@ public class TopologyContext extends WorkerTopologyContext 
implements IMetricsCo
     }
 
     /**
+     * Gets the declared input fields for this component.
+     *
+     * @return A map from sources to streams to fields.
+     */
+    public Map<String, Map<String, List<String>>> getThisInputFields() {
+       Map<String, Map<String, List<String>>> outputMap = new HashMap<>();
+        for (Map.Entry<GlobalStreamId, Grouping> entry : 
this.getThisSources().entrySet()) {
+               String componentId = entry.getKey().get_componentId();
+               Set<String> streams = getComponentStreams(componentId);
+               for (String stream : streams) {
+                       Map<String, List<String>> streamFieldMap = 
outputMap.get(componentId);
+                       if (streamFieldMap == null) {
+                               streamFieldMap = new HashMap<>();
+                               outputMap.put(componentId, streamFieldMap);
+                       }
+                       streamFieldMap.put(stream, 
getComponentOutputFields(componentId, stream).toList());
+               }
+        }
+        return outputMap;
+    }
+
+    /**
      * Gets the declared inputs to this component.
      *
      * @return A map from subscribed component/stream to the grouping 
subscribed with.
@@ -274,6 +296,7 @@ public class TopologyContext extends WorkerTopologyContext 
implements IMetricsCo
                stringSourceMap.put(gid.get_streamId(), 
groupingToJSONableMap(entry.getValue()));               
         }
         obj.put("source->stream->grouping", stringSources);
+        obj.put("source->stream->fields", this.getThisInputFields());
         return JSONValue.toJSONString(obj);
     }
 

Reply via email to