http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/simple_hdfs.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-examples/src/main/resources/simple_hdfs.yaml index 0000000,0000000..9007869 new file mode 100644 --- /dev/null +++ b/external/flux/flux-examples/src/main/resources/simple_hdfs.yaml @@@ -1,0 -1,0 +1,105 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++# Test ability to wire together shell spouts/bolts ++--- ++ ++# topology definition ++# name to be used when submitting ++name: "hdfs-topology" ++ ++# Components ++# Components are analagous to Spring beans. They are meant to be used as constructor, ++# property(setter), and builder arguments. ++# ++# for the time being, components must be declared in the order they are referenced ++components: ++ - id: "syncPolicy" ++ className: "org.apache.storm.hdfs.bolt.sync.CountSyncPolicy" ++ constructorArgs: ++ - 1000 ++ - id: "rotationPolicy" ++ className: "org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy" ++ constructorArgs: ++ - 30 ++ - SECONDS ++ ++ - id: "fileNameFormat" ++ className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat" ++ configMethods: ++ - name: "withPath" ++ args: ["${hdfs.write.dir}"] ++ - name: "withExtension" ++ args: [".txt"] ++ ++ - id: "recordFormat" ++ className: "org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat" ++ configMethods: ++ - name: "withFieldDelimiter" ++ args: ["|"] ++ ++ - id: "rotationAction" ++ className: "org.apache.storm.hdfs.common.rotation.MoveFileAction" ++ configMethods: ++ - name: "toDestination" ++ args: ["${hdfs.dest.dir}"] ++ ++# spout definitions ++spouts: ++ - id: "spout-1" ++ className: "backtype.storm.testing.TestWordSpout" ++ parallelism: 1 ++ # ... ++ ++# bolt definitions ++ ++bolts: ++ - id: "bolt-1" ++ className: "org.apache.storm.hdfs.bolt.HdfsBolt" ++ configMethods: ++ - name: "withConfigKey" ++ args: ["hdfs.config"] ++ - name: "withFsUrl" ++ args: ["${hdfs.url}"] ++ - name: "withFileNameFormat" ++ args: [ref: "fileNameFormat"] ++ - name: "withRecordFormat" ++ args: [ref: "recordFormat"] ++ - name: "withRotationPolicy" ++ args: [ref: "rotationPolicy"] ++ - name: "withSyncPolicy" ++ args: [ref: "syncPolicy"] ++ - name: "addRotationAction" ++ args: [ref: "rotationAction"] ++ parallelism: 1 ++ # ... ++ ++ - id: "bolt-2" ++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" ++ parallelism: 1 ++ ++streams: ++ - name: "" # name isn't used (placeholder for logging, UI, etc.) ++ from: "spout-1" ++ to: "bolt-1" ++ grouping: ++ type: SHUFFLE ++ ++ - name: "" # name isn't used (placeholder for logging, UI, etc.) ++ from: "spout-1" ++ to: "bolt-2" ++ grouping: ++ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/simple_wordcount.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-examples/src/main/resources/simple_wordcount.yaml index 0000000,0000000..380f9d2 new file mode 100644 --- /dev/null +++ b/external/flux/flux-examples/src/main/resources/simple_wordcount.yaml @@@ -1,0 -1,0 +1,68 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++--- ++ ++# topology definition ++# name to be used when submitting ++name: "yaml-topology" ++ ++# topology configuration ++# this will be passed to the submitter as a map of config options ++# ++config: ++ topology.workers: 1 ++ ++# spout definitions ++spouts: ++ - id: "spout-1" ++ className: "backtype.storm.testing.TestWordSpout" ++ parallelism: 1 ++ ++# bolt definitions ++bolts: ++ - id: "bolt-1" ++ className: "backtype.storm.testing.TestWordCounter" ++ parallelism: 1 ++ ++ - id: "bolt-2" ++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" ++ parallelism: 1 ++ ++#stream definitions ++# stream definitions define connections between spouts and bolts. ++# note that such connections can be cyclical ++streams: ++ - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.) ++# id: "connection-1" ++ from: "spout-1" ++ to: "bolt-1" ++ grouping: ++ type: FIELDS ++ args: ["word"] ++ ++ - name: "bolt-1 --> bolt2" ++ from: "bolt-1" ++ to: "bolt-2" ++ grouping: ++ type: SHUFFLE ++ ++ ++ ++ ++ ++ ++ http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-ui/README.md ---------------------------------------------------------------------- diff --cc external/flux/flux-ui/README.md index 0000000,0000000..8b6bd5f new file mode 100644 --- /dev/null +++ b/external/flux/flux-ui/README.md @@@ -1,0 -1,0 +1,3 @@@ ++# Flux-UI ++ ++Placeholder for Flux GUI http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/pom.xml ---------------------------------------------------------------------- diff --cc external/flux/flux-wrappers/pom.xml index 0000000,0000000..6784141 new file mode 100644 --- /dev/null +++ b/external/flux/flux-wrappers/pom.xml @@@ -1,0 -1,0 +1,35 @@@ ++<?xml version="1.0" encoding="UTF-8"?> ++<!-- ++ Licensed to the Apache Software Foundation (ASF) under one or more ++ contributor license agreements. See the NOTICE file distributed with ++ this work for additional information regarding copyright ownership. ++ The ASF licenses this file to You under the Apache License, Version 2.0 ++ (the "License"); you may not use this file except in compliance with ++ the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++ Unless required by applicable law or agreed to in writing, software ++ distributed under the License is distributed on an "AS IS" BASIS, ++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ See the License for the specific language governing permissions and ++ limitations under the License. ++--> ++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> ++ <modelVersion>4.0.0</modelVersion> ++ ++ <parent> ++ <groupId>com.github.ptgoetz</groupId> ++ <artifactId>flux</artifactId> ++ <version>0.3.1-SNAPSHOT</version> ++ <relativePath>../pom.xml</relativePath> ++ </parent> ++ ++ <groupId>com.github.ptgoetz</groupId> ++ <artifactId>flux-wrappers</artifactId> ++ <packaging>jar</packaging> ++ ++ <name>flux-wrappers</name> ++ <url>https://github.com/ptgoetz/flux</url> ++ ++</project> http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java ---------------------------------------------------------------------- diff --cc external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java index 0000000,0000000..4e0f91c new file mode 100644 --- /dev/null +++ b/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java @@@ -1,0 -1,0 +1,56 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.wrappers.bolts; ++ ++import backtype.storm.task.ShellBolt; ++import backtype.storm.topology.IRichBolt; ++import backtype.storm.topology.OutputFieldsDeclarer; ++import backtype.storm.tuple.Fields; ++ ++import java.util.Map; ++ ++/** ++ * A generic `ShellBolt` implementation that allows you specify output fields ++ * without having to subclass `ShellBolt` to do so. ++ * ++ */ ++public class FluxShellBolt extends ShellBolt implements IRichBolt{ ++ private String[] outputFields; ++ private Map<String, Object> componentConfig; ++ ++ /** ++ * Create a ShellBolt with command line arguments and output fields ++ * @param command Command line arguments for the bolt ++ * @param outputFields Names of fields the bolt will emit (if any). ++ */ ++ ++ public FluxShellBolt(String[] command, String[] outputFields){ ++ super(command); ++ this.outputFields = outputFields; ++ } ++ ++ @Override ++ public void declareOutputFields(OutputFieldsDeclarer declarer) { ++ declarer.declare(new Fields(this.outputFields)); ++ } ++ ++ @Override ++ public Map<String, Object> getComponentConfiguration() { ++ return this.componentConfig; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java ---------------------------------------------------------------------- diff --cc external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java index 0000000,0000000..a42d7c3 new file mode 100644 --- /dev/null +++ b/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java @@@ -1,0 -1,0 +1,44 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.storm.flux.wrappers.bolts; ++ ++import backtype.storm.topology.BasicOutputCollector; ++import backtype.storm.topology.OutputFieldsDeclarer; ++import backtype.storm.topology.base.BaseBasicBolt; ++import backtype.storm.tuple.Tuple; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++/** ++ * Simple bolt that does nothing other than LOG.info() every tuple recieveed. ++ * ++ */ ++public class LogInfoBolt extends BaseBasicBolt { ++ private static final Logger LOG = LoggerFactory.getLogger(LogInfoBolt.class); ++ ++ @Override ++ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { ++ LOG.info("{}", tuple); ++ } ++ ++ @Override ++ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { ++ ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java ---------------------------------------------------------------------- diff --cc external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java index 0000000,0000000..c7e9058 new file mode 100644 --- /dev/null +++ b/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java @@@ -1,0 -1,0 +1,55 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.wrappers.spouts; ++ ++import backtype.storm.spout.ShellSpout; ++import backtype.storm.topology.IRichSpout; ++import backtype.storm.topology.OutputFieldsDeclarer; ++import backtype.storm.tuple.Fields; ++ ++import java.util.Map; ++ ++/** ++ * A generic `ShellSpout` implementation that allows you specify output fields ++ * without having to subclass `ShellSpout` to do so. ++ * ++ */ ++public class FluxShellSpout extends ShellSpout implements IRichSpout { ++ private String[] outputFields; ++ private Map<String, Object> componentConfig; ++ ++ /** ++ * Create a ShellSpout with command line arguments and output fields ++ * @param args Command line arguments for the spout ++ * @param outputFields Names of fields the spout will emit. ++ */ ++ public FluxShellSpout(String[] args, String[] outputFields){ ++ super(args); ++ this.outputFields = outputFields; ++ } ++ ++ @Override ++ public void declareOutputFields(OutputFieldsDeclarer declarer) { ++ declarer.declare(new Fields(this.outputFields)); ++ } ++ ++ @Override ++ public Map<String, Object> getComponentConfiguration() { ++ return this.componentConfig; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/resources/resources/randomsentence.js ---------------------------------------------------------------------- diff --cc external/flux/flux-wrappers/src/main/resources/resources/randomsentence.js index 0000000,0000000..36fc5f5 new file mode 100644 --- /dev/null +++ b/external/flux/flux-wrappers/src/main/resources/resources/randomsentence.js @@@ -1,0 -1,0 +1,93 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++/** ++ * Example for storm spout. Emits random sentences. ++ * The original class in java - storm.starter.spout.RandomSentenceSpout. ++ * ++ */ ++ ++var storm = require('./storm'); ++var Spout = storm.Spout; ++ ++ ++var SENTENCES = [ ++ "the cow jumped over the moon", ++ "an apple a day keeps the doctor away", ++ "four score and seven years ago", ++ "snow white and the seven dwarfs", ++ "i am at two with nature"] ++ ++function RandomSentenceSpout(sentences) { ++ Spout.call(this); ++ this.runningTupleId = 0; ++ this.sentences = sentences; ++ this.pending = {}; ++}; ++ ++RandomSentenceSpout.prototype = Object.create(Spout.prototype); ++RandomSentenceSpout.prototype.constructor = RandomSentenceSpout; ++ ++RandomSentenceSpout.prototype.getRandomSentence = function() { ++ return this.sentences[getRandomInt(0, this.sentences.length - 1)]; ++} ++ ++RandomSentenceSpout.prototype.nextTuple = function(done) { ++ var self = this; ++ var sentence = this.getRandomSentence(); ++ var tup = [sentence]; ++ var id = this.createNextTupleId(); ++ this.pending[id] = tup; ++ //This timeout can be removed if TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS is configured to 100 ++ setTimeout(function() { ++ self.emit({tuple: tup, id: id}, function(taskIds) { ++ self.log(tup + ' sent to task ids - ' + taskIds); ++ }); ++ done(); ++ },100); ++} ++ ++RandomSentenceSpout.prototype.createNextTupleId = function() { ++ var id = this.runningTupleId; ++ this.runningTupleId++; ++ return id; ++} ++ ++RandomSentenceSpout.prototype.ack = function(id, done) { ++ this.log('Received ack for - ' + id); ++ delete this.pending[id]; ++ done(); ++} ++ ++RandomSentenceSpout.prototype.fail = function(id, done) { ++ var self = this; ++ this.log('Received fail for - ' + id + '. Retrying.'); ++ this.emit({tuple: this.pending[id], id:id}, function(taskIds) { ++ self.log(self.pending[id] + ' sent to task ids - ' + taskIds); ++ }); ++ done(); ++} ++ ++/** ++ * Returns a random integer between min (inclusive) and max (inclusive) ++ */ ++function getRandomInt(min, max) { ++ return Math.floor(Math.random() * (max - min + 1)) + min; ++} ++ ++new RandomSentenceSpout(SENTENCES).run(); http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/resources/resources/splitsentence.py ---------------------------------------------------------------------- diff --cc external/flux/flux-wrappers/src/main/resources/resources/splitsentence.py index 0000000,0000000..300105f new file mode 100644 --- /dev/null +++ b/external/flux/flux-wrappers/src/main/resources/resources/splitsentence.py @@@ -1,0 -1,0 +1,24 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++import storm ++ ++class SplitSentenceBolt(storm.BasicBolt): ++ def process(self, tup): ++ words = tup.values[0].split(" ") ++ for word in words: ++ storm.emit([word]) ++ ++SplitSentenceBolt().run() http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/resources/resources/storm.js ---------------------------------------------------------------------- diff --cc external/flux/flux-wrappers/src/main/resources/resources/storm.js index 0000000,0000000..355c2d2 new file mode 100755 --- /dev/null +++ b/external/flux/flux-wrappers/src/main/resources/resources/storm.js @@@ -1,0 -1,0 +1,373 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++/** ++ * Base classes in node-js for storm Bolt and Spout. ++ * Implements the storm multilang protocol for nodejs. ++ */ ++ ++ ++var fs = require('fs'); ++ ++function Storm() { ++ this.messagePart = ""; ++ this.taskIdsCallbacks = []; ++ this.isFirstMessage = true; ++ this.separator = '\nend\n'; ++} ++ ++Storm.prototype.sendMsgToParent = function(msg) { ++ var str = JSON.stringify(msg); ++ process.stdout.write(str + this.separator); ++} ++ ++Storm.prototype.sync = function() { ++ this.sendMsgToParent({"command":"sync"}); ++} ++ ++Storm.prototype.sendPid = function(heartbeatdir) { ++ var pid = process.pid; ++ fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w")); ++ this.sendMsgToParent({"pid": pid}) ++} ++ ++Storm.prototype.log = function(msg) { ++ this.sendMsgToParent({"command": "log", "msg": msg}); ++} ++ ++Storm.prototype.initSetupInfo = function(setupInfo) { ++ var self = this; ++ var callback = function() { ++ self.sendPid(setupInfo['pidDir']); ++ } ++ this.initialize(setupInfo['conf'], setupInfo['context'], callback); ++} ++ ++Storm.prototype.startReadingInput = function() { ++ var self = this; ++ process.stdin.on('readable', function() { ++ var chunk = process.stdin.read(); ++ var messages = self.handleNewChunk(chunk); ++ messages.forEach(function(message) { ++ self.handleNewMessage(message); ++ }) ++ ++ }); ++} ++ ++/** ++ * receives a new string chunk and returns a list of new messages with the separator removed ++ * stores state in this.messagePart ++ * @param chunk ++ */ ++Storm.prototype.handleNewChunk = function(chunk) { ++ //invariant: this.messagePart has no separator otherwise we would have parsed it already ++ var messages = []; ++ if (chunk && chunk.length !== 0) { ++ //"{}".split("\nend\n") ==> ['{}'] ++ //"\nend\n".split("\nend\n") ==> ['' , ''] ++ //"{}\nend\n".split("\nend\n") ==> ['{}', ''] ++ //"\nend\n{}".split("\nend\n") ==> ['' , '{}'] ++ // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ] ++ this.messagePart = this.messagePart + chunk; ++ var newMessageParts = this.messagePart.split(this.separator); ++ while (newMessageParts.length > 0) { ++ var potentialMessage = newMessageParts.shift(); ++ var anotherMessageAhead = newMessageParts.length > 0; ++ if (!anotherMessageAhead) { ++ this.messagePart = potentialMessage; ++ } ++ else if (potentialMessage.length > 0) { ++ messages.push(potentialMessage); ++ } ++ } ++ } ++ return messages; ++} ++ ++Storm.prototype.isTaskIds = function(msg) { ++ return (msg instanceof Array); ++} ++ ++Storm.prototype.handleNewMessage = function(msg) { ++ var parsedMsg = JSON.parse(msg); ++ ++ if (this.isFirstMessage) { ++ this.initSetupInfo(parsedMsg); ++ this.isFirstMessage = false; ++ } else if (this.isTaskIds(parsedMsg)) { ++ this.handleNewTaskId(parsedMsg); ++ } else { ++ this.handleNewCommand(parsedMsg); ++ } ++} ++ ++Storm.prototype.handleNewTaskId = function(taskIds) { ++ //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called. ++ //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply ++ //take the first callback in the list and be sure it is the right one. ++ ++ var callback = this.taskIdsCallbacks.shift(); ++ if (callback) { ++ callback(taskIds); ++ } else { ++ throw new Error('Something went wrong, we off the split of task id callbacks'); ++ } ++} ++ ++ ++ ++/** ++ * ++ * @param messageDetails json with the emit details. ++ * ++ * For bolt, the json must contain the required fields: ++ * - tuple - the value to emit ++ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source ++ * tuple and return ack when all components successfully finished to process it. ++ * and may contain the optional fields: ++ * - stream (if empty - emit to default stream) ++ * ++ * For spout, the json must contain the required fields: ++ * - tuple - the value to emit ++ * ++ * and may contain the optional fields: ++ * - id - pass id for reliable emit (and receive ack/fail later). ++ * - stream - if empty - emit to default stream. ++ * ++ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received). ++ */ ++Storm.prototype.emit = function(messageDetails, onTaskIds) { ++ //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible ++ //through the callback (will be called when the response arrives). The callback is stored in a list until the ++ //corresponding task id list arrives. ++ if (messageDetails.task) { ++ throw new Error('Illegal input - task. To emit to specific task use emit direct!'); ++ } ++ ++ if (!onTaskIds) { ++ throw new Error('You must pass a onTaskIds callback when using emit!') ++ } ++ ++ this.taskIdsCallbacks.push(onTaskIds); ++ this.__emit(messageDetails);; ++} ++ ++ ++/** ++ * Emit message to specific task. ++ * @param messageDetails json with the emit details. ++ * ++ * For bolt, the json must contain the required fields: ++ * - tuple - the value to emit ++ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source ++ * tuple and return ack when all components successfully finished to process it. ++ * - task - indicate the task to send the tuple to. ++ * and may contain the optional fields: ++ * - stream (if empty - emit to default stream) ++ * ++ * For spout, the json must contain the required fields: ++ * - tuple - the value to emit ++ * - task - indicate the task to send the tuple to. ++ * and may contain the optional fields: ++ * - id - pass id for reliable emit (and receive ack/fail later). ++ * - stream - if empty - emit to default stream. ++ * ++ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received). ++ */ ++Storm.prototype.emitDirect = function(commandDetails) { ++ if (!commandDetails.task) { ++ throw new Error("Emit direct must receive task id!") ++ } ++ this.__emit(commandDetails); ++} ++ ++/** ++ * Initialize storm component according to the configuration received. ++ * @param conf configuration object accrding to storm protocol. ++ * @param context context object according to storm protocol. ++ * @param done callback. Call this method when finished initializing. ++ */ ++Storm.prototype.initialize = function(conf, context, done) { ++ done(); ++} ++ ++Storm.prototype.run = function() { ++ process.stdout.setEncoding('utf8'); ++ process.stdin.setEncoding('utf8'); ++ this.startReadingInput(); ++} ++ ++function Tuple(id, component, stream, task, values) { ++ this.id = id; ++ this.component = component; ++ this.stream = stream; ++ this.task = task; ++ this.values = values; ++} ++ ++/** ++ * Base class for storm bolt. ++ * To create a bolt implement 'process' method. ++ * You may also implement initialize method to ++ */ ++function BasicBolt() { ++ Storm.call(this); ++ this.anchorTuple = null; ++}; ++ ++BasicBolt.prototype = Object.create(Storm.prototype); ++BasicBolt.prototype.constructor = BasicBolt; ++ ++/** ++ * Emit message. ++ * @param commandDetails json with the required fields: ++ * - tuple - the value to emit ++ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source ++ * tuple and return ack when all components successfully finished to process it. ++ * and the optional fields: ++ * - stream (if empty - emit to default stream) ++ * - task (pass only to emit to specific task) ++ */ ++BasicBolt.prototype.__emit = function(commandDetails) { ++ var self = this; ++ ++ var message = { ++ command: "emit", ++ tuple: commandDetails.tuple, ++ stream: commandDetails.stream, ++ task: commandDetails.task, ++ anchors: [commandDetails.anchorTupleId] ++ }; ++ ++ this.sendMsgToParent(message); ++} ++ ++BasicBolt.prototype.handleNewCommand = function(command) { ++ var self = this; ++ var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); ++ ++ if (tup.task === -1 && tup.stream === "__heartbeat") { ++ self.sync(); ++ return; ++ } ++ ++ var callback = function(err) { ++ if (err) { ++ self.fail(tup, err); ++ return; ++ } ++ self.ack(tup); ++ } ++ this.process(tup, callback); ++} ++ ++/** ++ * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what ++ * should it do?). ++ * @param tuple the input of the bolt - what to process. ++ * @param done call this method when done processing. ++ */ ++BasicBolt.prototype.process = function(tuple, done) {}; ++ ++BasicBolt.prototype.ack = function(tup) { ++ this.sendMsgToParent({"command": "ack", "id": tup.id}); ++} ++ ++BasicBolt.prototype.fail = function(tup, err) { ++ this.sendMsgToParent({"command": "fail", "id": tup.id}); ++} ++ ++ ++/** ++ * Base class for storm spout. ++ * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail ++ * can stay empty). ++ * You may also implement initialize method. ++ * ++ */ ++function Spout() { ++ Storm.call(this); ++}; ++ ++Spout.prototype = Object.create(Storm.prototype); ++ ++Spout.prototype.constructor = Spout; ++ ++/** ++ * This method will be called when an ack is received for preciously sent tuple. One may implement it. ++ * @param id The id of the tuple. ++ * @param done Call this method when finished and ready to receive more tuples. ++ */ ++Spout.prototype.ack = function(id, done) {}; ++ ++/** ++ * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example - ++ * log the failure or send the tuple again). ++ * @param id The id of the tuple. ++ * @param done Call this method when finished and ready to receive more tuples. ++ */ ++Spout.prototype.fail = function(id, done) {}; ++ ++/** ++ * Method the indicates its time to emit the next tuple. ++ * @param done call this method when done sending the output. ++ */ ++Spout.prototype.nextTuple = function(done) {}; ++ ++Spout.prototype.handleNewCommand = function(command) { ++ var self = this; ++ var callback = function() { ++ self.sync(); ++ } ++ ++ if (command["command"] === "next") { ++ this.nextTuple(callback); ++ } ++ ++ if (command["command"] === "ack") { ++ this.ack(command["id"], callback); ++ } ++ ++ if (command["command"] === "fail") { ++ this.fail(command["id"], callback); ++ } ++} ++ ++/** ++ * @param commandDetails json with the required fields: ++ * - tuple - the value to emit. ++ * and the optional fields: ++ * - id - pass id for reliable emit (and receive ack/fail later). ++ * - stream - if empty - emit to default stream. ++ * - task - pass only to emit to specific task. ++ */ ++Spout.prototype.__emit = function(commandDetails) { ++ var message = { ++ command: "emit", ++ tuple: commandDetails.tuple, ++ id: commandDetails.id, ++ stream: commandDetails.stream, ++ task: commandDetails.task ++ }; ++ ++ this.sendMsgToParent(message); ++} ++ ++module.exports.BasicBolt = BasicBolt; ++module.exports.Spout = Spout; http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/resources/resources/storm.py ---------------------------------------------------------------------- diff --cc external/flux/flux-wrappers/src/main/resources/resources/storm.py index 0000000,0000000..642c393 new file mode 100644 --- /dev/null +++ b/external/flux/flux-wrappers/src/main/resources/resources/storm.py @@@ -1,0 -1,0 +1,260 @@@ ++# -*- coding: utf-8 -*- ++ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++import sys ++import os ++import traceback ++from collections import deque ++ ++try: ++ import simplejson as json ++except ImportError: ++ import json ++ ++json_encode = lambda x: json.dumps(x) ++json_decode = lambda x: json.loads(x) ++ ++#reads lines and reconstructs newlines appropriately ++def readMsg(): ++ msg = "" ++ while True: ++ line = sys.stdin.readline() ++ if not line: ++ raise Exception('Read EOF from stdin') ++ if line[0:-1] == "end": ++ break ++ msg = msg + line ++ return json_decode(msg[0:-1]) ++ ++MODE = None ++ANCHOR_TUPLE = None ++ ++#queue up commands we read while trying to read taskids ++pending_commands = deque() ++ ++def readTaskIds(): ++ if pending_taskids: ++ return pending_taskids.popleft() ++ else: ++ msg = readMsg() ++ while type(msg) is not list: ++ pending_commands.append(msg) ++ msg = readMsg() ++ return msg ++ ++#queue up taskids we read while trying to read commands/tuples ++pending_taskids = deque() ++ ++def readCommand(): ++ if pending_commands: ++ return pending_commands.popleft() ++ else: ++ msg = readMsg() ++ while type(msg) is list: ++ pending_taskids.append(msg) ++ msg = readMsg() ++ return msg ++ ++def readTuple(): ++ cmd = readCommand() ++ return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"]) ++ ++def sendMsgToParent(msg): ++ print json_encode(msg) ++ print "end" ++ sys.stdout.flush() ++ ++def sync(): ++ sendMsgToParent({'command':'sync'}) ++ ++def sendpid(heartbeatdir): ++ pid = os.getpid() ++ sendMsgToParent({'pid':pid}) ++ open(heartbeatdir + "/" + str(pid), "w").close() ++ ++def emit(*args, **kwargs): ++ __emit(*args, **kwargs) ++ return readTaskIds() ++ ++def emitDirect(task, *args, **kwargs): ++ kwargs["directTask"] = task ++ __emit(*args, **kwargs) ++ ++def __emit(*args, **kwargs): ++ global MODE ++ if MODE == Bolt: ++ emitBolt(*args, **kwargs) ++ elif MODE == Spout: ++ emitSpout(*args, **kwargs) ++ ++def emitBolt(tup, stream=None, anchors = [], directTask=None): ++ global ANCHOR_TUPLE ++ if ANCHOR_TUPLE is not None: ++ anchors = [ANCHOR_TUPLE] ++ m = {"command": "emit"} ++ if stream is not None: ++ m["stream"] = stream ++ m["anchors"] = map(lambda a: a.id, anchors) ++ if directTask is not None: ++ m["task"] = directTask ++ m["tuple"] = tup ++ sendMsgToParent(m) ++ ++def emitSpout(tup, stream=None, id=None, directTask=None): ++ m = {"command": "emit"} ++ if id is not None: ++ m["id"] = id ++ if stream is not None: ++ m["stream"] = stream ++ if directTask is not None: ++ m["task"] = directTask ++ m["tuple"] = tup ++ sendMsgToParent(m) ++ ++def ack(tup): ++ sendMsgToParent({"command": "ack", "id": tup.id}) ++ ++def fail(tup): ++ sendMsgToParent({"command": "fail", "id": tup.id}) ++ ++def reportError(msg): ++ sendMsgToParent({"command": "error", "msg": msg}) ++ ++def log(msg, level=2): ++ sendMsgToParent({"command": "log", "msg": msg, "level":level}) ++ ++def logTrace(msg): ++ log(msg, 0) ++ ++def logDebug(msg): ++ log(msg, 1) ++ ++def logInfo(msg): ++ log(msg, 2) ++ ++def logWarn(msg): ++ log(msg, 3) ++ ++def logError(msg): ++ log(msg, 4) ++ ++def rpcMetrics(name, params): ++ sendMsgToParent({"command": "metrics", "name": name, "params": params}) ++ ++def initComponent(): ++ setupInfo = readMsg() ++ sendpid(setupInfo['pidDir']) ++ return [setupInfo['conf'], setupInfo['context']] ++ ++class Tuple(object): ++ def __init__(self, id, component, stream, task, values): ++ self.id = id ++ self.component = component ++ self.stream = stream ++ self.task = task ++ self.values = values ++ ++ def __repr__(self): ++ return '<%s%s>' % ( ++ self.__class__.__name__, ++ ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys()))) ++ ++ def is_heartbeat_tuple(self): ++ return self.task == -1 and self.stream == "__heartbeat" ++ ++class Bolt(object): ++ def initialize(self, stormconf, context): ++ pass ++ ++ def process(self, tuple): ++ pass ++ ++ def run(self): ++ global MODE ++ MODE = Bolt ++ conf, context = initComponent() ++ try: ++ self.initialize(conf, context) ++ while True: ++ tup = readTuple() ++ if tup.is_heartbeat_tuple(): ++ sync() ++ else: ++ self.process(tup) ++ except Exception, e: ++ reportError(traceback.format_exc(e)) ++ ++class BasicBolt(object): ++ def initialize(self, stormconf, context): ++ pass ++ ++ def process(self, tuple): ++ pass ++ ++ def run(self): ++ global MODE ++ MODE = Bolt ++ global ANCHOR_TUPLE ++ conf, context = initComponent() ++ try: ++ self.initialize(conf, context) ++ while True: ++ tup = readTuple() ++ if tup.is_heartbeat_tuple(): ++ sync() ++ else: ++ ANCHOR_TUPLE = tup ++ try: ++ self.process(tup) ++ ack(tup) ++ except Exception, e: ++ reportError(traceback.format_exc(e)) ++ fail(tup) ++ except Exception, e: ++ reportError(traceback.format_exc(e)) ++ ++class Spout(object): ++ def initialize(self, conf, context): ++ pass ++ ++ def ack(self, id): ++ pass ++ ++ def fail(self, id): ++ pass ++ ++ def nextTuple(self): ++ pass ++ ++ def run(self): ++ global MODE ++ MODE = Spout ++ conf, context = initComponent() ++ try: ++ self.initialize(conf, context) ++ while True: ++ msg = readCommand() ++ if msg["command"] == "next": ++ self.nextTuple() ++ if msg["command"] == "ack": ++ self.ack(msg["id"]) ++ if msg["command"] == "fail": ++ self.fail(msg["id"]) ++ sync() ++ except Exception, e: ++ reportError(traceback.format_exc(e)) http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/pom.xml ---------------------------------------------------------------------- diff --cc external/flux/pom.xml index 0000000,0000000..5ea1b40 new file mode 100644 --- /dev/null +++ b/external/flux/pom.xml @@@ -1,0 -1,0 +1,126 @@@ ++<?xml version="1.0" encoding="UTF-8"?> ++<!-- ++ Licensed to the Apache Software Foundation (ASF) under one or more ++ contributor license agreements. See the NOTICE file distributed with ++ this work for additional information regarding copyright ownership. ++ The ASF licenses this file to You under the Apache License, Version 2.0 ++ (the "License"); you may not use this file except in compliance with ++ the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++ Unless required by applicable law or agreed to in writing, software ++ distributed under the License is distributed on an "AS IS" BASIS, ++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ See the License for the specific language governing permissions and ++ limitations under the License. ++--> ++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> ++ <modelVersion>4.0.0</modelVersion> ++ ++ <groupId>com.github.ptgoetz</groupId> ++ <artifactId>flux</artifactId> ++ <version>0.3.1-SNAPSHOT</version> ++ <packaging>pom</packaging> ++ <name>flux</name> ++ <url>https://github.com/ptgoetz/flux</url> ++ ++ <parent> ++ <groupId>org.sonatype.oss</groupId> ++ <artifactId>oss-parent</artifactId> ++ <version>7</version> ++ </parent> ++ <scm> ++ <connection>scm:git:[email protected]:ptgoetz/flux.git</connection> ++ <developerConnection>scm:git:[email protected]:ptgoetz/flux.git</developerConnection> ++ <url>:[email protected]:ptgoetz/flux.git</url> ++ </scm> ++ ++ <developers> ++ <developer> ++ <id>ptgoetz</id> ++ <name>P. Taylor Goetz</name> ++ <email>[email protected]</email> ++ </developer> ++ </developers> ++ ++ <properties> ++ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> ++ <storm.version>0.9.3</storm.version> ++ <!-- see comment below... This fixes an annoyance with intellij --> ++ <provided.scope>provided</provided.scope> ++ </properties> ++ ++ <profiles> ++ <!-- ++ Hack to make intellij behave. ++ If you use intellij, enable this profile in your IDE. ++ It should make life easier. ++ --> ++ <profile> ++ <id>intellij</id> ++ <properties> ++ <provided.scope>compile</provided.scope> ++ </properties> ++ </profile> ++ </profiles> ++ ++ <modules> ++ <module>flux-wrappers</module> ++ <module>flux-core</module> ++ <module>flux-examples</module> ++ </modules> ++ ++ <dependencies> ++ <dependency> ++ <groupId>org.apache.storm</groupId> ++ <artifactId>storm-core</artifactId> ++ <version>${storm.version}</version> ++ <scope>${provided.scope}</scope> ++ </dependency> ++ <dependency> ++ <groupId>commons-cli</groupId> ++ <artifactId>commons-cli</artifactId> ++ <version>1.2</version> ++ </dependency> ++ <dependency> ++ <groupId>org.apache.kafka</groupId> ++ <artifactId>kafka_2.10</artifactId> ++ <version>0.8.1.1</version> ++ <scope>test</scope> ++ <exclusions> ++ <exclusion> ++ <groupId>org.apache.zookeeper</groupId> ++ <artifactId>zookeeper</artifactId> ++ </exclusion> ++ <exclusion> ++ <groupId>log4j</groupId> ++ <artifactId>log4j</artifactId> ++ </exclusion> ++ </exclusions> ++ </dependency> ++ <dependency> ++ <groupId>junit</groupId> ++ <artifactId>junit</artifactId> ++ <version>4.11</version> ++ <scope>test</scope> ++ </dependency> ++ </dependencies> ++ <build> ++ <resources> ++ ++ </resources> ++ <plugins> ++ <plugin> ++ <groupId>org.apache.maven.plugins</groupId> ++ <artifactId>maven-compiler-plugin</artifactId> ++ <version>3.3</version> ++ <configuration> ++ <source>1.6</source> ++ <target>1.6</target> ++ <encoding>UTF-8</encoding> ++ </configuration> ++ </plugin> ++ </plugins> ++ </build> ++</project>
