I changs the package way,it runs OK.But I don't know why. The wrong way is follow,and put the jar "se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar" to production. <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.4</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <addClasspath>true</addClasspath> <mainClass>cn.ennwifi.storm.Application</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
Today I changes it to : <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <id>copy-dependencies</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>target/libs</outputDirectory> <includeScope>runtime</includeScope> </configuration> </execution> </executions> </plugin> Then I put the se-storm-0.0.1-SNAPSHOT.jar to storm/extlib,and put thest target/libs/*.jar to storm/extlib also. The problem is solved.But why is that. I see the directory of the two jar, the *.xml file and *.drl files that Drools used are in the same position。 2017-09-15 9:39 GMT+08:00 张博 <jyzhan...@gmail.com>: > The Drools's version I depend on is 7.2.0.Final.The dependency in pom.xml > is > <dependency> > <groupId>org.drools</groupId> > <artifactId>drools-compiler</artifactId> > <version>7.2.0.Final</version> > </dependency> > > And the bolt is: > public class DealLimitBolt extends BaseRichBolt { > > private OutputCollector collector; > > private KieSession kieSession; > > public void execute(Tuple input) { > // 获取数据 > String sentence = (String) input.getValue(0); > > // 数据转换 > PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence); > > // 获取规则 > String key = String.valueOf(dataPoint.tags.get("gatewayId")) + > String.valueOf(dataPoint.getTags().get("deviceId")) > + dataPoint.getMetric() + "0"; > RuleLimitParam paramObj = (RuleLimitParam) MapService.getObject(key); > LOGGER.info(Json.toJson(MapService.getObjects(), JsonFormat.tidy())); > > if (paramObj != null) { > > //I used the kieSession here > LimitFact fact = new LimitFact(); > fact.setHigh(paramObj.high); > fact.setLow(paramObj.low); > fact.setOperate(paramObj.operate); > fact.setValue(Float.valueOf(dataPoint.value)); > kieSession.insert(fact); > kieSession.fireAllRules(); > > } > > collector.ack(input); > } > > > public void prepare(Map stormConf, TopologyContext context, > OutputCollector collector) { > this.collector = collector; > SpringTools instance = SpringTools.getInstance(); > ApplicationContext applicationContext = instance. > getApplicationContext(); > methodService = applicationContext.getBean(MethodService.class); > DroolsService droolsService = applicationContext.getBean( > DroolsService.class); > droolsService.getRules(); > > //This is the init. > KieServices ks = KieServices.Factory.get(); > KieContainer kContainer = ks.getKieClasspathContainer(); > kieSession = kContainer.newKieSession("all-rule"); > } > > public void declareOutputFields(OutputFieldsDeclarer declarer) { > > } > > The drools also uses a xml file that in resources/META-INF/kmodule.xml, > <?xml version="1.0" encoding="UTF-8"?> > <kmodule xmlns="http://www.drools.org/xsd/kmodule"> > <kbase name="rules" packages="com.rules"> > <ksession name="all-rule"/> > </kbase> > </kmodule> > > And the rules I put them in resources/com/rules/DealLimit.drl > > > > 2017-09-15 0:03 GMT+08:00 Stig Rohde Døssing <s...@apache.org>: > >> There are a lot of differences between local cluster and production. The >> primary difference is that local clusters run all the code in one JVM, >> whereas a production cluster runs bolts spread across multiple worker JVMs, >> and the topology wiring code in a separate JVM on the Nimbus host. >> >> There is no initialization in this prepare method: >> @Override >> public void prepare(Map stormConf, TopologyContext context, >> OutputCollector collector) { >> this.collector = collector; >> } >> >> I think it's easier to help if you post the code you're using to >> initialize Drools, as well as which Drools artifact and version you're >> depending on. >> >> 2017-09-14 12:37 GMT+02:00 张博 <jyzhan...@gmail.com>: >> >>> I only use drools in bolt.I init it in prepare method.So,I think that it >>> is not the reason.But it runs in the localcluster.Do you know the >>> difference between the localcluster and the production cluster? >>> >>> 2017-09-13 23:27 GMT+08:00 Stig Rohde Døssing <s...@apache.org>: >>> >>>> I'm not familiar with Drools, so I'm just guessing here, but are you >>>> doing any kind of setup of the KieContainer before submitting your >>>> topology? When you run your topology the bolt doesn't run in the same JVM >>>> as the topology setup code, so any setup done via static variables/methods >>>> won't transfer from the submitter JVM to the bolt JVM. >>>> >>>> If you need to run code before starting a worker, you might want to >>>> look at https://github.com/apache/storm/blob/master/storm-client/src >>>> /jvm/org/apache/storm/hooks/BaseWorkerHook.java and >>>> https://storm.apache.org/releases/1.0.3/javadocs/org/apache/ >>>> storm/topology/TopologyBuilder.html#addWorkerHook-org.apache >>>> .storm.hooks.IWorkerHook-. >>>> >>>> 2017-09-13 15:30 GMT+02:00 zhangwenwei <jerry....@icloud.com>: >>>> >>>>> According to the log info, there have a NPE occur when call method >>>>> kieContainer.newKieSession(). >>>>> >>>>> Best Regards, >>>>> Jerry Zhang >>>>> >>>>> > On 13 Sep 2017, at 14:15, 张博 <jyzhan...@gmail.com> wrote: >>>>> > >>>>> > Hi! >>>>> > Now I want to use Drools in a blot,it works normal in the >>>>> LocalCluster, but when I put it to the production cluster,it has error. >>>>> > The blot: >>>>> > public class DealLostBolt extends BaseRichBolt { >>>>> > >>>>> > private static final long serialVersionUID = 1L; >>>>> > >>>>> > private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_ >>>>> LOST_BOLT"); >>>>> > >>>>> > private OutputCollector collector; >>>>> > >>>>> > private KieSession kieSession; >>>>> > >>>>> > private FactHandle factHandle; >>>>> > >>>>> > @Override >>>>> > public void execute(Tuple input) { >>>>> > // 获取数据 >>>>> > String sentence = (String) input.getValue(0); >>>>> > LOGGER.info("DealLostBolt获取到的数据:" + sentence); >>>>> > >>>>> > // 数据转换 >>>>> > PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, >>>>> sentence); >>>>> > >>>>> > KieServices ks = KieServices.Factory.get(); >>>>> > KieContainer kieContainer = ks.getKieClasspathContainer(); >>>>> > kieSession = kieContainer.newKieSession("all-rule"); >>>>> > kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus(); >>>>> > >>>>> > factHandle = kieSession.insert(dataPoint); >>>>> > kieSession.fireAllRules(); >>>>> > kieSession.delete(factHandle); >>>>> > >>>>> > collector.emit(new Values(sentence)); >>>>> > } >>>>> > >>>>> > @Override >>>>> > public void declareOutputFields(OutputFieldsDeclarer declarer) { >>>>> > declarer.declare(new Fields("value")); >>>>> > >>>>> > } >>>>> > >>>>> > @Override >>>>> > public void prepare(Map stormConf, TopologyContext context, >>>>> OutputCollector collector) { >>>>> > this.collector = collector; >>>>> > } >>>>> > >>>>> > } >>>>> > The erros: >>>>> > java.lang.RuntimeException: java.lang.NullPointerException >>>>> > at org.apache.storm.utils.Disrupt >>>>> orQueue.consumeBatchToCursor(DisruptorQueue.java:495) >>>>> ~[storm-core-1.1.1.jar:1.1.1] >>>>> > at org.apache.storm.utils.Disrupt >>>>> orQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460) >>>>> ~[storm-core-1.1.1.jar:1.1.1] >>>>> > at org.apache.storm.disruptor$con >>>>> sume_batch_when_available.invoke(disruptor.clj:73) >>>>> ~[storm-core-1.1.1.jar:1.1.1] >>>>> > at org.apache.storm.daemon.execut >>>>> or$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848) >>>>> ~[storm-core-1.1.1.jar:1.1.1] >>>>> > at org.apache.storm.util$async_lo >>>>> op$fn__557.invoke(util.clj:484) [storm-core-1.1.1.jar:1.1.1] >>>>> > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] >>>>> > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] >>>>> > Caused by: java.lang.NullPointerException >>>>> > at org.kie.internal.io.ResourceFa >>>>> ctory.newByteArrayResource(ResourceFactory.java:66) >>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >>>>> > at org.drools.compiler.kie.builde >>>>> r.impl.AbstractKieModule.getResource(AbstractKieModule.java:299) >>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >>>>> > at org.drools.compiler.kie.builde >>>>> r.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:264) >>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >>>>> > at org.drools.compiler.kie.builde >>>>> r.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:259) >>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >>>>> > at org.drools.compiler.kie.builde >>>>> r.impl.AbstractKieProject.buildKnowledgePackages(AbstractKieProject.java:228) >>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >>>>> > at org.drools.compiler.kie.builde >>>>> r.impl.AbstractKieModule.createKieBase(AbstractKieModule.java:206) >>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >>>>> > at org.drools.compiler.kie.builde >>>>> r.impl.KieContainerImpl.createKieBase(KieContainerImpl.java:584) >>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >>>>> > at org.drools.compiler.kie.builde >>>>> r.impl.KieContainerImpl.getKieBase(KieContainerImpl.java:552) >>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >>>>> > at org.drools.compiler.kie.builde >>>>> r.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:680) >>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >>>>> > at org.drools.compiler.kie.builde >>>>> r.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:648) >>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >>>>> > at cn.ennwifi.storm.bolt.DealLost >>>>> Bolt.execute(DealLostBolt.java:52) ~[se-storm-0.0.1-SNAPSHOT-jar- >>>>> with-dependencies.jar:?] >>>>> > at org.apache.storm.daemon.execut >>>>> or$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729) >>>>> ~[storm-core-1.1.1.jar:1.1.1] >>>>> > at org.apache.storm.daemon.execut >>>>> or$mk_task_receiver$fn__4951.invoke(executor.clj:461) >>>>> ~[storm-core-1.1.1.jar:1.1.1] >>>>> > at org.apache.storm.disruptor$clo >>>>> jure_handler$reify__4465.onEvent(disruptor.clj:40) >>>>> ~[storm-core-1.1.1.jar:1.1.1] >>>>> > at org.apache.storm.utils.Disrupt >>>>> orQueue.consumeBatchToCursor(DisruptorQueue.java:482) >>>>> ~[storm-core-1.1.1.jar:1.1.1] >>>>> > ... 6 more >>>>> > >>>>> > Could somebody help me? >>>>> > >>>>> > Thanks! >>>>> >>>>> >>>> >>> >> >