I changs the package way,it runs OK.But I don't know why.
The wrong way is follow,and put the jar
"se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar" to production.
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>cn.ennwifi.storm.Application</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
Today I changes it to :
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>target/libs</outputDirectory>
<includeScope>runtime</includeScope>
</configuration>
</execution>
</executions>
</plugin>
Then I put the se-storm-0.0.1-SNAPSHOT.jar to storm/extlib,and put thest
target/libs/*.jar to storm/extlib also. The problem is solved.But why is
that.
I see the directory of the two jar, the *.xml file and *.drl files that
Drools used are in the same position。
2017-09-15 9:39 GMT+08:00 张博 <[email protected]>:
> The Drools's version I depend on is 7.2.0.Final.The dependency in pom.xml
> is
> <dependency>
> <groupId>org.drools</groupId>
> <artifactId>drools-compiler</artifactId>
> <version>7.2.0.Final</version>
> </dependency>
>
> And the bolt is:
> public class DealLimitBolt extends BaseRichBolt {
>
> private OutputCollector collector;
>
> private KieSession kieSession;
>
> public void execute(Tuple input) {
> // 获取数据
> String sentence = (String) input.getValue(0);
>
> // 数据转换
> PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);
>
> // 获取规则
> String key = String.valueOf(dataPoint.tags.get("gatewayId")) +
> String.valueOf(dataPoint.getTags().get("deviceId"))
> + dataPoint.getMetric() + "0";
> RuleLimitParam paramObj = (RuleLimitParam) MapService.getObject(key);
> LOGGER.info(Json.toJson(MapService.getObjects(), JsonFormat.tidy()));
>
> if (paramObj != null) {
>
> //I used the kieSession here
> LimitFact fact = new LimitFact();
> fact.setHigh(paramObj.high);
> fact.setLow(paramObj.low);
> fact.setOperate(paramObj.operate);
> fact.setValue(Float.valueOf(dataPoint.value));
> kieSession.insert(fact);
> kieSession.fireAllRules();
>
> }
>
> collector.ack(input);
> }
>
>
> public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
> this.collector = collector;
> SpringTools instance = SpringTools.getInstance();
> ApplicationContext applicationContext = instance.
> getApplicationContext();
> methodService = applicationContext.getBean(MethodService.class);
> DroolsService droolsService = applicationContext.getBean(
> DroolsService.class);
> droolsService.getRules();
>
> //This is the init.
> KieServices ks = KieServices.Factory.get();
> KieContainer kContainer = ks.getKieClasspathContainer();
> kieSession = kContainer.newKieSession("all-rule");
> }
>
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>
> }
>
> The drools also uses a xml file that in resources/META-INF/kmodule.xml,
> <?xml version="1.0" encoding="UTF-8"?>
> <kmodule xmlns="http://www.drools.org/xsd/kmodule">
> <kbase name="rules" packages="com.rules">
> <ksession name="all-rule"/>
> </kbase>
> </kmodule>
>
> And the rules I put them in resources/com/rules/DealLimit.drl
>
>
>
> 2017-09-15 0:03 GMT+08:00 Stig Rohde Døssing <[email protected]>:
>
>> There are a lot of differences between local cluster and production. The
>> primary difference is that local clusters run all the code in one JVM,
>> whereas a production cluster runs bolts spread across multiple worker JVMs,
>> and the topology wiring code in a separate JVM on the Nimbus host.
>>
>> There is no initialization in this prepare method:
>> @Override
>> public void prepare(Map stormConf, TopologyContext context,
>> OutputCollector collector) {
>> this.collector = collector;
>> }
>>
>> I think it's easier to help if you post the code you're using to
>> initialize Drools, as well as which Drools artifact and version you're
>> depending on.
>>
>> 2017-09-14 12:37 GMT+02:00 张博 <[email protected]>:
>>
>>> I only use drools in bolt.I init it in prepare method.So,I think that it
>>> is not the reason.But it runs in the localcluster.Do you know the
>>> difference between the localcluster and the production cluster?
>>>
>>> 2017-09-13 23:27 GMT+08:00 Stig Rohde Døssing <[email protected]>:
>>>
>>>> I'm not familiar with Drools, so I'm just guessing here, but are you
>>>> doing any kind of setup of the KieContainer before submitting your
>>>> topology? When you run your topology the bolt doesn't run in the same JVM
>>>> as the topology setup code, so any setup done via static variables/methods
>>>> won't transfer from the submitter JVM to the bolt JVM.
>>>>
>>>> If you need to run code before starting a worker, you might want to
>>>> look at https://github.com/apache/storm/blob/master/storm-client/src
>>>> /jvm/org/apache/storm/hooks/BaseWorkerHook.java and
>>>> https://storm.apache.org/releases/1.0.3/javadocs/org/apache/
>>>> storm/topology/TopologyBuilder.html#addWorkerHook-org.apache
>>>> .storm.hooks.IWorkerHook-.
>>>>
>>>> 2017-09-13 15:30 GMT+02:00 zhangwenwei <[email protected]>:
>>>>
>>>>> According to the log info, there have a NPE occur when call method
>>>>> kieContainer.newKieSession().
>>>>>
>>>>> Best Regards,
>>>>> Jerry Zhang
>>>>>
>>>>> > On 13 Sep 2017, at 14:15, 张博 <[email protected]> wrote:
>>>>> >
>>>>> > Hi!
>>>>> > Now I want to use Drools in a blot,it works normal in the
>>>>> LocalCluster, but when I put it to the production cluster,it has error.
>>>>> > The blot:
>>>>> > public class DealLostBolt extends BaseRichBolt {
>>>>> >
>>>>> > private static final long serialVersionUID = 1L;
>>>>> >
>>>>> > private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_
>>>>> LOST_BOLT");
>>>>> >
>>>>> > private OutputCollector collector;
>>>>> >
>>>>> > private KieSession kieSession;
>>>>> >
>>>>> > private FactHandle factHandle;
>>>>> >
>>>>> > @Override
>>>>> > public void execute(Tuple input) {
>>>>> > // 获取数据
>>>>> > String sentence = (String) input.getValue(0);
>>>>> > LOGGER.info("DealLostBolt获取到的数据:" + sentence);
>>>>> >
>>>>> > // 数据转换
>>>>> > PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class,
>>>>> sentence);
>>>>> >
>>>>> > KieServices ks = KieServices.Factory.get();
>>>>> > KieContainer kieContainer = ks.getKieClasspathContainer();
>>>>> > kieSession = kieContainer.newKieSession("all-rule");
>>>>> > kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
>>>>> >
>>>>> > factHandle = kieSession.insert(dataPoint);
>>>>> > kieSession.fireAllRules();
>>>>> > kieSession.delete(factHandle);
>>>>> >
>>>>> > collector.emit(new Values(sentence));
>>>>> > }
>>>>> >
>>>>> > @Override
>>>>> > public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>> > declarer.declare(new Fields("value"));
>>>>> >
>>>>> > }
>>>>> >
>>>>> > @Override
>>>>> > public void prepare(Map stormConf, TopologyContext context,
>>>>> OutputCollector collector) {
>>>>> > this.collector = collector;
>>>>> > }
>>>>> >
>>>>> > }
>>>>> > The erros:
>>>>> > java.lang.RuntimeException: java.lang.NullPointerException
>>>>> > at org.apache.storm.utils.Disrupt
>>>>> orQueue.consumeBatchToCursor(DisruptorQueue.java:495)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> > at org.apache.storm.utils.Disrupt
>>>>> orQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> > at org.apache.storm.disruptor$con
>>>>> sume_batch_when_available.invoke(disruptor.clj:73)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> > at org.apache.storm.daemon.execut
>>>>> or$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> > at org.apache.storm.util$async_lo
>>>>> op$fn__557.invoke(util.clj:484) [storm-core-1.1.1.jar:1.1.1]
>>>>> > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>>>> > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
>>>>> > Caused by: java.lang.NullPointerException
>>>>> > at org.kie.internal.io.ResourceFa
>>>>> ctory.newByteArrayResource(ResourceFactory.java:66)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> > at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieModule.getResource(AbstractKieModule.java:299)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> > at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:264)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> > at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:259)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> > at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieProject.buildKnowledgePackages(AbstractKieProject.java:228)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> > at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieModule.createKieBase(AbstractKieModule.java:206)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> > at org.drools.compiler.kie.builde
>>>>> r.impl.KieContainerImpl.createKieBase(KieContainerImpl.java:584)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> > at org.drools.compiler.kie.builde
>>>>> r.impl.KieContainerImpl.getKieBase(KieContainerImpl.java:552)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> > at org.drools.compiler.kie.builde
>>>>> r.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:680)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> > at org.drools.compiler.kie.builde
>>>>> r.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:648)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> > at cn.ennwifi.storm.bolt.DealLost
>>>>> Bolt.execute(DealLostBolt.java:52) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>>> with-dependencies.jar:?]
>>>>> > at org.apache.storm.daemon.execut
>>>>> or$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> > at org.apache.storm.daemon.execut
>>>>> or$mk_task_receiver$fn__4951.invoke(executor.clj:461)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> > at org.apache.storm.disruptor$clo
>>>>> jure_handler$reify__4465.onEvent(disruptor.clj:40)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> > at org.apache.storm.utils.Disrupt
>>>>> orQueue.consumeBatchToCursor(DisruptorQueue.java:482)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> > ... 6 more
>>>>> >
>>>>> > Could somebody help me?
>>>>> >
>>>>> > Thanks!
>>>>>
>>>>>
>>>>
>>>
>>
>