Hi Ravi , Need your help . So I created a local cluster and deployed my topology to it . Inside my Spout and Bolts , I am launching a Spring Boot application wrapped inside a singleton to initialise my context . Unfortunately , it appears to me that it is not working :(((( and annotations like @EnableAutoConfiguration is not picking up yml files from the classpath and injecting their values in the bean. And I am getting exceptions like
Error creating bean with name 'inputQueueManager': Injection of autowired dependencies failed; nested exception is org.springframework.beans.factory.BeanCreationException: Could not autowire field: private int mqclient.rabbitmq.manager.impl.InputQueueManagerImpl.rabbitMqPort; nested exception is org.springframework.beans.TypeMismatchException: Failed to convert value of type 'java.lang.String' to required type 'int'; nested exception is java.lang.NumberFormatException: For input string: "${input.rabbitmq.port}" at has anyone here ever tried injecting dependencies from Spring . I am not sure why this is not working . It works like a charm in Local Cluster and now I am not passing context as a constructor argument , rather declaring and initializing it inside each spout and bolts :( . Is there any reason why Spring Annotations dont work inside a Remote Cluster . Need help urgently here . Thanks Ankur On Sun, Oct 11, 2015 at 1:01 PM, Ankur Garg <ankurga...@gmail.com> wrote: > I think I don't need to Autowire beans inside my spout and bolts . > > All I want my context to be available . Since I use Spring Boot , I am > delegating it to initialise all the beans and set up every bean (reading > yml file and create DB connections , connections to Message brokers etc ) . > > On my local cluster I am passing it as a constructor argument to Spouts > and Bolts . Since all r running in same jvm its available to all spouts and > bolts . > > But in a distributed cluster , this will blow up as Context is not > serializable and cannot be passed like above . > > So the problem is only to make this context available once per jvm . Hence > I thought I will wrap it under a singleton and make this available to all > spouts and bolts per jvm. > > Once I have this context initialized and loaded all I need to do is to get > the bean which I will do the same way I am doing inside local cluster > spouts and bolts . > > > > > > On Sun, Oct 11, 2015 at 12:46 PM, Ravi Sharma <ping2r...@gmail.com> wrote: > >> Yes ur assumption is right >> Jvm1 will create application contexts say ac1 >> >> And jvm2 will create another application instance ac2 >> >> And all of it can be done via singleton classes. >> >> All bolts and spouts in same jvm instance need to access same application >> context. >> >> I have done same in cluster and it works >> >> Remember all spring beans need to be transient and also u need to set >> required=false in case u r going create spout and bolt using spring >> >> Public class mybolt { >> @aurowired(required=false) >> Private transient MyServiceBean myServiceBean; >> >> .... >> ... >> } >> >> Ravi >> On 11 Oct 2015 07:59, "Ankur Garg" <ankurga...@gmail.com> wrote: >> >>> Also , I think there can be some instances of spouts/bolts running on >>> JVM 1 and some on JVM 2 and so on... >>> >>> Is it possible for spouts and bolts running on same jvm to access same >>> applicationContext . >>> >>> I am thinking that I can make the place where I launch my spring Boot >>> application inside a singleton class , and so all the spouts and bolts >>> running on say JVM1 will have access to same context (instead of launching >>> it in all spouts and bolts) . And for those in JVM 2 they will still >>> initialise it once and all the rest will get the same application Context . >>> >>> But all above is theoretical assumption . I still need to try it out >>> (unfortunately i dont have a cluster setup at my end) but if possible >>> please let me know if this can work . >>> >>> Thanks >>> Ankur >>> >>> On Sun, Oct 11, 2015 at 11:48 AM, Ankur Garg <ankurga...@gmail.com> >>> wrote: >>> >>>> Thanks for replying Ravi . >>>> >>>> I think your suggestion to make wrapper to read json or xml is a very >>>> nice Idea indeed . >>>> >>>> But , the problem for me here is to have the context (with all beans >>>> loaded and initialized ) available inside the Spouts and Bolts and that >>>> means inside every running instance of Spouts and Bolts which may be >>>> running on different machines and different jvm. >>>> >>>> Agree that when defining topology I dont need Spring Context as I just >>>> have to define spouts and bolts there. I used context here to send them to >>>> spout and bolt through constructor but it appears from comments above that >>>> it wont work on distributed cluster . >>>> >>>> So , is there some way that once topology gets submitted to run in a >>>> distributed cluster , I can initialize my context there and someway they >>>> are available to all Spouts and Bolts ..Basically some shared location >>>> where my application Context can be initialized (once and only once) and >>>> this context can be accessed by >>>> all instances of Spouts and Bolts ? >>>> >>>> Thanks >>>> >>>> On Sun, Oct 11, 2015 at 11:20 AM, Ravi Sharma <ping2r...@gmail.com> >>>> wrote: >>>> >>>>> Basically u will have two context defined at different time/phase >>>>> >>>>> When u r about to submit the topology, u need to build topology, that >>>>> context only need information about spouts and bolts. You don't need any >>>>> application bean like database accessories or ur services etc, as at this >>>>> level u r not running ur application but u r just creating a topology and >>>>> defining how bolts and spouts are connected to each other etc etc >>>>> >>>>> Now once topology is submitted, topology will be moved to one of the >>>>> supervisor node and will start running, all spouts and bolts will be >>>>> initialized, at this moment u will need ur application context, which >>>>> doesn't need ur earlier topology context >>>>> >>>>> So I will suggest keep both context separate. >>>>> >>>>> Topology is not complex to build, smaller topology can be built via >>>>> code only, I. E. Which bolt listening to which spout, but if u want to go >>>>> with good design, I say just write a small wrapper to read some json where >>>>> u can define ur bolts and spouts and use that to build topology (u can use >>>>> spring but it's not much needed) >>>>> >>>>> In past I have done it using both json setting (without spring) and >>>>> xml setting (with spring) both works good >>>>> >>>>> Ravi >>>>> On 11 Oct 2015 06:38, "Ankur Garg" <ankurga...@gmail.com> wrote: >>>>> >>>>>> Oh The problem here is I have many beans and which need to be >>>>>> initialized (some are reading conf from yml files , database connection , >>>>>> thread pool initialization etc) . >>>>>> >>>>>> >>>>>> Now , I have written a spring boot application which takes care of >>>>>> all the above and I define my topology inside one of the beans , Here is >>>>>> my >>>>>> bean >>>>>> >>>>>> @Autowired >>>>>> ApplicationContext appContext; >>>>>> >>>>>> @Bean >>>>>> public void submitTopology() throws >>>>>> AlreadyAliveException,InvalidTopologyException { >>>>>> >>>>>> TopologyBuilder builder = new TopologyBuilder(); >>>>>> >>>>>> builder.setSpout("rabbitMqSpout", new RabbitListnerSpout( >>>>>> appContext), 10); >>>>>> >>>>>> builder.setBolt("mapBolt", new GroupingBolt(appContext), >>>>>> 10).shuffleGrouping("rabbitMqSpout"); >>>>>> >>>>>> builder.setBolt("reduceBolt", new PublishingBolt(appContext), >>>>>> 10).shuffleGrouping("mapBolt"); >>>>>> >>>>>> Config conf = new Config(); >>>>>> >>>>>> conf.registerSerialization(EventBean.class); // To be registered >>>>>> with Kyro for Storm >>>>>> >>>>>> conf.registerSerialization(InputQueueManagerImpl.class); >>>>>> >>>>>> conf.setDebug(true); >>>>>> >>>>>> conf.setMessageTimeoutSecs(200); >>>>>> >>>>>> LocalCluster cluster = new LocalCluster(); >>>>>> >>>>>> cluster.submitTopology("test", conf, builder.createTopology()); >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> When this bean is initialized , I already have appContext initialized >>>>>> by my Spring Boot Application . So , the thing is , I am using SpringBoot >>>>>> to initialize and load my context with all beans . >>>>>> >>>>>> Now this is the context which I want to leverage in my spouts and >>>>>> bolts . >>>>>> >>>>>> So , if what I suggested earlier does not work on Storm Distributed >>>>>> Cluster , I need to find a way of initializing my AppContext somehow:( >>>>>> >>>>>> I would be really thankful if anyone here can help me :( >>>>>> >>>>>> >>>>>> Thanks >>>>>> >>>>>> Ankur >>>>>> >>>>>> On Sun, Oct 11, 2015 at 5:54 AM, Javier Gonzalez <jagon...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> The local cluster runs completely within a single JVM AFAIK. The >>>>>>> local cluster is useful for development, testing your topology, etc. The >>>>>>> real deployment has to go through nimbus, run on workers started by >>>>>>> supervisors on one or more nodes, etc. Kind of difficult to simulate all >>>>>>> that on a single box. >>>>>>> >>>>>>> On Sat, Oct 10, 2015 at 1:45 PM, Ankur Garg <ankurga...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Oh ...So I will have to test it in a cluster. >>>>>>>> >>>>>>>> Having said that, how is local cluster which we use is too >>>>>>>> different from normal cluster.. Ideally ,it shud simulate normal >>>>>>>> cluster.. >>>>>>>> On Oct 10, 2015 7:51 PM, "Ravi Sharma" <ping2r...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Ankur, >>>>>>>>> local it may be working but It wont work in Actual cluster. >>>>>>>>> >>>>>>>>> Think about SpringContext is collection of your so many resoucres, >>>>>>>>> like Database connections , may be HTTP connections , Thread pools >>>>>>>>> etc. >>>>>>>>> These things wont get serialised and just go to other machines and >>>>>>>>> start working. >>>>>>>>> >>>>>>>>> SO basically in init methods of bolt and spout, you need to call >>>>>>>>> some singloton class like this >>>>>>>>> >>>>>>>>> ApplicationContext ac = SingletonApplicationContext.getContext(); >>>>>>>>> >>>>>>>>> SingletonApplicationContext will have a static variable >>>>>>>>> ApplicationContext and in getContext you will check if static >>>>>>>>> variable has >>>>>>>>> been initialised if not then u will initilize it, and then return >>>>>>>>> it(normal >>>>>>>>> Singleton class) >>>>>>>>> >>>>>>>>> >>>>>>>>> Now when Topolgy will move to any other node, Bolt and spouts will >>>>>>>>> start and first init call will initialize it and other bolt/spouts >>>>>>>>> will >>>>>>>>> just use that. >>>>>>>>> >>>>>>>>> As John mentioned, its very important to mark all Spring beans and >>>>>>>>> Context as transient. >>>>>>>>> >>>>>>>>> Hope it helps. >>>>>>>>> >>>>>>>>> Ravi. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sat, Oct 10, 2015 at 6:25 AM, Ankur Garg <ankurga...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Javier , >>>>>>>>>> >>>>>>>>>> So , I am using a Local cluster on my dev machine where I am >>>>>>>>>> using Eclipse . Here , I am passing Springs ApplicationContext as >>>>>>>>>> constructor argument to spouts and bolts . >>>>>>>>>> >>>>>>>>>> TopologyBuilder builder = new TopologyBuilder(); >>>>>>>>>> >>>>>>>>>> builder.setSpout("rabbitMqSpout", new RabbitListnerSpout( >>>>>>>>>> appContext), 10); >>>>>>>>>> >>>>>>>>>> builder.setBolt("mapBolt", new GroupingBolt(appContext), >>>>>>>>>> 10).shuffleGrouping("rabbitMqSpout"); >>>>>>>>>> >>>>>>>>>> builder.setBolt("reduceBolt", new PublishingBolt(appContext), >>>>>>>>>> 10).shuffleGrouping("mapBolt"); >>>>>>>>>> >>>>>>>>>> Config conf = new Config(); >>>>>>>>>> >>>>>>>>>> conf.registerSerialization(EventBean.class); / >>>>>>>>>> >>>>>>>>>> conf.registerSerialization(InputQueueManagerImpl.class); >>>>>>>>>> >>>>>>>>>> conf.setDebug(true); >>>>>>>>>> >>>>>>>>>> LocalCluster cluster = new LocalCluster(); >>>>>>>>>> >>>>>>>>>> cluster.submitTopology("test", conf, builder.createTopology()); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> And in my spouts and Bolts , >>>>>>>>>> >>>>>>>>>> I make my Application Context variable as static . So when it is >>>>>>>>>> launched by c;uster.submitTopology , my context is still avalilable >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> private static ApplicationContext ctx; >>>>>>>>>> >>>>>>>>>> public RabbitListnerSpout(ApplicationContext appContext) { >>>>>>>>>> >>>>>>>>>> LOG.info("RabbitListner Constructor called"); >>>>>>>>>> >>>>>>>>>> ctx = appContext; >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> @SuppressWarnings("rawtypes") >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> >>>>>>>>>> public void open(Map conf, TopologyContext >>>>>>>>>> context,SpoutOutputCollector >>>>>>>>>> collector) { >>>>>>>>>> >>>>>>>>>> LOG.info("Inside the open Method for RabbitListner Spout"); >>>>>>>>>> >>>>>>>>>> inputManager = (InputQueueManagerImpl) ctx >>>>>>>>>> .getBean(InputQueueManagerImpl.class); >>>>>>>>>> >>>>>>>>>> notificationManager = (NotificationQueueManagerImpl) ctx >>>>>>>>>> .getBean(NotificationQueueManagerImpl.class); >>>>>>>>>> >>>>>>>>>> eventExchange = ctx.getEnvironment().getProperty( >>>>>>>>>> "input.rabbitmq.events.exchange"); >>>>>>>>>> >>>>>>>>>> routingKey = ctx.getEnvironment().getProperty( >>>>>>>>>> "input.rabbitmq.events.routingKey"); >>>>>>>>>> >>>>>>>>>> eventQueue = ctx.getEnvironment().getProperty( >>>>>>>>>> "input.rabbitmq.events.queue"); >>>>>>>>>> >>>>>>>>>> _collector = collector; >>>>>>>>>> >>>>>>>>>> LOG.info("Exiting the open Method for RabbitListner Spout"); >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> This is working like a charm (my ApplicationContext is >>>>>>>>>> initialized seperately ) . As we all know , ApplicationContext is not >>>>>>>>>> serializable . But this works well in LocalCluster. >>>>>>>>>> >>>>>>>>>> My assumption is that it will work in a seperate Cluster too . Is >>>>>>>>>> my assumption correct ?? >>>>>>>>>> >>>>>>>>>> On Fri, Oct 9, 2015 at 9:04 PM, Javier Gonzalez < >>>>>>>>>> jagon...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> IIRC, only if everything you use in your spouts and bolts is >>>>>>>>>>> serializable. >>>>>>>>>>> On Oct 6, 2015 11:29 PM, "Ankur Garg" <ankurga...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Ravi , >>>>>>>>>>>> >>>>>>>>>>>> I was able to make an Integration with Spring but the problem >>>>>>>>>>>> is that I have to autowire for every bolt and spout . That means >>>>>>>>>>>> that even >>>>>>>>>>>> if i parallelize spout and bolt it will get started to each >>>>>>>>>>>> instance . Is >>>>>>>>>>>> there some way that I only have to do for bolts and spouts once (I >>>>>>>>>>>> mean if >>>>>>>>>>>> I parallelize bolts or spouts individually it can share the conf >>>>>>>>>>>> from >>>>>>>>>>>> somewhere) . IS this possible?? >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Ankur >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Sep 29, 2015 at 7:57 PM, Ravi Sharma < >>>>>>>>>>>> ping2r...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Yes this is for annotation also... >>>>>>>>>>>>> >>>>>>>>>>>>> you can call this method in prepare() method of bolt and >>>>>>>>>>>>> onOpen() method >>>>>>>>>>>>> in every Spout and make sure you don't use any autowire bean >>>>>>>>>>>>> before this >>>>>>>>>>>>> call. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Ravi. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Sep 29, 2015 at 2:22 PM, Ankur Garg < >>>>>>>>>>>>> ankurga...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> > Hi Ravi , >>>>>>>>>>>>> > >>>>>>>>>>>>> > Thanks for your reply . I am using annotation based >>>>>>>>>>>>> configuration and using >>>>>>>>>>>>> > Spring Boot. >>>>>>>>>>>>> > >>>>>>>>>>>>> > Any idea how to do it using annotations ? >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > On Tue, Sep 29, 2015 at 6:41 PM, Ravi Sharma < >>>>>>>>>>>>> ping2r...@gmail.com> wrote: >>>>>>>>>>>>> > >>>>>>>>>>>>> > > Bolts and Spouts are created by Storm and not known to >>>>>>>>>>>>> Spring Context. >>>>>>>>>>>>> > You >>>>>>>>>>>>> > > need to manually add them to SpringContext, there are few >>>>>>>>>>>>> methods >>>>>>>>>>>>> > available >>>>>>>>>>>>> > > i.e. >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > >>>>>>>>>>>>> SpringContext.getContext().getAutowireCapableBeanFactory().autowireBeanProperties(this, >>>>>>>>>>>>> > > AutowireCapableBeanFactory.AUTOWIRE_AUTODETECT, false); >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > SpringContext is my own class where i have injected >>>>>>>>>>>>> SpringContext so >>>>>>>>>>>>> > > SpringContext.getContext() returns the actuall Spring >>>>>>>>>>>>> Context >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > Ravi. >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > On Tue, Sep 29, 2015 at 1:03 PM, Ankur Garg < >>>>>>>>>>>>> ankurga...@gmail.com> >>>>>>>>>>>>> > wrote: >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > > Hi , >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > I am building a Storm topology with set of Spouts and >>>>>>>>>>>>> Bolts and also >>>>>>>>>>>>> > > using >>>>>>>>>>>>> > > > Spring for Dependency Injection . >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > Unfortunately , none of my fields are getting autowired >>>>>>>>>>>>> even though I >>>>>>>>>>>>> > > have >>>>>>>>>>>>> > > > declared all my spouts and Bolts as @Components . >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > However the place where I am declaring my topology , >>>>>>>>>>>>> Spring is working >>>>>>>>>>>>> > > fine >>>>>>>>>>>>> > > > . >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > Is it because cluster.submitTopology("test", conf, >>>>>>>>>>>>> > > > builder.createTopology()) >>>>>>>>>>>>> > > > submits the topology to a cluster (locally it spawns >>>>>>>>>>>>> different thread >>>>>>>>>>>>> > > for >>>>>>>>>>>>> > > > Spouts and Bolts) that Autowiring is not working? >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > Please suggest . >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Javier González Nicolini >>>>>>> >>>>>> >>>>>> >>>> >>> >