Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval
Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <zoran.jere...@gmail.com> a écrit : > Hi, > > I have a twitter spark stream initialized in the following way: > > val ssc:StreamingContext = >> SparkLauncher.getSparkScalaStreamingContext() >> val config = getTwitterConfigurationBuilder.build() >> val auth: Option[twitter4j.auth.Authorization] = >> Some(new >> twitter4j.auth.OAuthAuthorization(config)) >> val stream = TwitterUtils.createStream(ssc, auth, filters) >> > > This works fine when I initialy start it. However, at some point I need to > update filters since users might add new hashtags they want to follow. I > tried to stop the running stream and spark streaming context without > stoping spark context, e.g: > > >> stream.stop() >> ssc.stop(false) >> > > Afterward, I'm trying to initialize a new Twitter stream like I did > previously. However, I got this exception: > > Exception in thread "Firestorm JMX Monitor" >> java.lang.IllegalStateException: Adding new inputs, transformations, and >> output operations after stopping a context is not supported >> at >> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) >> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) >> at >> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41) >> at >> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41) >> at >> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46) >> at >> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) >> at >> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) >> at >> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) >> at >> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) >> at >> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) >> at >> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) >> at java.util.TimerThread.mainLoop(Timer.java:555) >> at java.util.TimerThread.run(Timer.java:505) >> INFO [2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing >> thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater >> has been closed >> ERROR [2015-07-18 22:24:32,503] >> [sparkDriver-akka.actor.default-dispatcher-3] >> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error >> stopping receiver >> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) >> > >> > > Anybody can explain how to solve this issue? > > Thanks, > Zoran >