Unsubscribe
Unsubscribe
Unsubscribe
Unsubscribe
Re: Using Spark in mixed Java/Scala project
Hi Jakob, Thanks a lot for your help. I'll try this. Zoran On Wed, Jan 27, 2016 at 10:49 AM, Jakob Oderskywrote: > JavaSparkContext has a wrapper constructor for the "scala" > SparkContext. In this case all you need to do is declare a > SparkContext that is accessible both from the Java and Scala sides of > your project and wrap the context with a JavaSparkContext. > > Search for java source compatibilty with scala for more information on > how to interface Java with Scala (the other way around is trivial). > Essentially, as long as you declare your SparkContext either in Java > or as a val/var/def in a plain Scala class you are good. >
Re: Problem to persist Hibernate entity from Spark job
Hi guys, I'm still trying to solve the issue with saving Hibernate entities from Spark. After several attempts to redesign my own code I ended up with HelloWorld example which clearly demonstrates that it's not the problem in complexity of my code and session mixing in threads. The code given bellow creates one simple hibernate entity and tries to save it. If number of Spark partitions is more than one, line *println("Saved tag:"+newTag)* is never executed. If I have only one partition, everything works fine. I would appreciate very much if somebody could explain what is the problem in this code. val sc = SparkContextLoader.getSC > val scalaUsersIds = Seq[Long](2, 8, 15, 14, 6, 17, 21, 34, 75, 128, > 304) > val usersRDD: RDD[Long] = sc.parallelize(scalaUsersIds) > usersRDD.foreachPartition { > val session: Session = > HibernateUtil.getSessionFactory().openSession() > users => { > users.foreach { > userid => > { > val newTag: Tag = new Tag > newTag.setTitle("title" + userid) > try { > val isActive: Boolean = session.getTransaction().isActive() > if (!isActive) { > session.beginTransaction() > } > println("Saving tag:"+newTag) > session.save(newTag) > println("Saved tag:"+newTag) > session.getTransaction().commit() > } catch { > case ex: Exception => { > if (session.getTransaction() != null) { > session.getTransaction().rollback() > ex.printStackTrace() > } > } > } > } > } > } > session.close() > } > Thanks, Zoran On Sun, Sep 6, 2015 at 1:42 PM, Zoran Jeremic <zoran.jere...@gmail.com> wrote: > I have GenericDAO class which is initialized for each partition. This > class uses SessionFactory.openSession() to open a new session in it's > constructor. As per my understanding, this means that each partition have > different session, but they are using the same SessionFactory to open it. > > why not create the session at the start of the saveInBatch method and >> close it at the end >> > This won't work for me, or at least I think it won't. At the beginning of > the process I load some entities (e.g. User, UserPreference...) from > hibernate and then I use it across the process, even after I perform > saveInBatch. It needs to be in session in order to pull data that I need > and update it later, so I can't open another session inside the existing > one. > > On Sun, Sep 6, 2015 at 1:40 AM, Matthew Johnson <matt.john...@algomi.com> > wrote: > >> I agree with Igor - I would either make sure session is ThreadLocal or, >> more simply, why not create the session at the start of the saveInBatch >> method and close it at the end? Creating a SessionFactory is an expensive >> operation but creating a Session is a relatively cheap one. >> On 6 Sep 2015 07:27, "Igor Berman" <igor.ber...@gmail.com> wrote: >> >>> how do you create your session? do you reuse it across threads? how do >>> you create/close session manager? >>> look for the problem in session creation, probably something deadlocked, >>> as far as I remember hib.session should be created per thread >>> >>> On 6 September 2015 at 07:11, Zoran Jeremic <zoran.jere...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I'm developing long running process that should find RSS feeds that all >>>> users in the system have registered to follow, parse these RSS feeds, >>>> extract new entries and store it back to the database as Hibernate >>>> entities, so user can retrieve it. I want to use Apache Spark to enable >>>> parallel processing, since this process might take several hours depending >>>> on the number of users. >>>> >>>> The approach I thought should work was to use >>>> *useridsRDD.foreachPartition*, so I can have separate hibernate >>>> session for each partition. I created Database session manager that is >>>> initialized for each partition which keeps hibernate session alive until >>>> the process is over. >>>> >>>> Once all RSS feeds from one source are parsed and Feed entities are >>>> created, I'm sending the whole list to Database Manager method that saves >>>> the whole list in batch: >>&
Re: Problem to persist Hibernate entity from Spark job
I have GenericDAO class which is initialized for each partition. This class uses SessionFactory.openSession() to open a new session in it's constructor. As per my understanding, this means that each partition have different session, but they are using the same SessionFactory to open it. why not create the session at the start of the saveInBatch method and close > it at the end > This won't work for me, or at least I think it won't. At the beginning of the process I load some entities (e.g. User, UserPreference...) from hibernate and then I use it across the process, even after I perform saveInBatch. It needs to be in session in order to pull data that I need and update it later, so I can't open another session inside the existing one. On Sun, Sep 6, 2015 at 1:40 AM, Matthew Johnson <matt.john...@algomi.com> wrote: > I agree with Igor - I would either make sure session is ThreadLocal or, > more simply, why not create the session at the start of the saveInBatch > method and close it at the end? Creating a SessionFactory is an expensive > operation but creating a Session is a relatively cheap one. > On 6 Sep 2015 07:27, "Igor Berman" <igor.ber...@gmail.com> wrote: > >> how do you create your session? do you reuse it across threads? how do >> you create/close session manager? >> look for the problem in session creation, probably something deadlocked, >> as far as I remember hib.session should be created per thread >> >> On 6 September 2015 at 07:11, Zoran Jeremic <zoran.jere...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I'm developing long running process that should find RSS feeds that all >>> users in the system have registered to follow, parse these RSS feeds, >>> extract new entries and store it back to the database as Hibernate >>> entities, so user can retrieve it. I want to use Apache Spark to enable >>> parallel processing, since this process might take several hours depending >>> on the number of users. >>> >>> The approach I thought should work was to use >>> *useridsRDD.foreachPartition*, so I can have separate hibernate session >>> for each partition. I created Database session manager that is initialized >>> for each partition which keeps hibernate session alive until the process is >>> over. >>> >>> Once all RSS feeds from one source are parsed and Feed entities are >>> created, I'm sending the whole list to Database Manager method that saves >>> the whole list in batch: >>> >>>> public void saveInBatch(List entities) { >>>> try{ >>>> boolean isActive = session.getTransaction().isActive(); >>>> if ( !isActive) { >>>> session.beginTransaction(); >>>> } >>>>for(Object entity:entities){ >>>> session.save(entity); >>>> } >>>>session.getTransaction().commit(); >>>> }catch(Exception ex){ >>>> if(session.getTransaction()!=null) { >>>> session.getTransaction().rollback(); >>>> ex.printStackTrace(); >>>>} >>>> } >>>> >>>> However, this works only if I have one Spark partition. If there are >>> two or more partitions, the whole process is blocked once I try to save the >>> first entity. In order to make the things simpler, I tried to simplify Feed >>> entity, so it doesn't refer and is not referred from any other entity. It >>> also doesn't have any collection. >>> >>> I hope that some of you have already tried something similar and could >>> give me idea how to solve this problem >>> >>> Thanks, >>> Zoran >>> >>> >> -- *** Zoran Jeremic, PhD Senior System Analyst & Programmer Athabasca University Tel: +1 604 92 89 944 E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> Homepage: http://zoranjeremic.org **
Re: Twitter streaming with apache spark stream only a small amount of tweets
Can you send me the subject of that email? I can't find any email suggesting solution to that problem. There is email *Twitter4j streaming question*, but it doesn't have any sample code. It just confirms what I explained earlier that without filtering Twitter will limit to 1% of tweets, and if you use filter API, Twitter limits you to 400 hashtags you can follow. Thanks, Zoran On Wed, Jul 29, 2015 at 8:40 AM, Peyman Mohajerian mohaj...@gmail.com wrote: This question was answered with sample code a couple of days ago, please look back. On Sat, Jul 25, 2015 at 11:43 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi, I discovered what is the problem here. Twitter public stream is limited to 1% of overall tweets (https://goo.gl/kDwnyS), so that's why I can't access all the tweets posted with specific hashtag using approach that I posted in previous email, so I guess this approach would not work for me. The other problem is that filtering has a limit of 400 hashtags ( https://goo.gl/BywrAk), so in order to follow more than 400 hashtags I need more parallel streams. This brings me back to my previous question (https://goo.gl/bVDkHx). In my application I need to follow more than 400 hashtags, and I need to collect each tweet having one of these hashtags. Another complication is that users could add new hashtags or remove old hashtags, so I have to update stream in the real-time. My earlier approach without Apache Spark was to create twitter4j user stream with initial filter, and each time new hashtag has to be added, stop stream, add new hashtag and run it again. When stream had 400 hashtags, I initialize new stream with new credentials. This was really complex, and I was hopping that Apache Spark would make it simpler. However, I'm trying for a days to find solution, and had no success. If I have to use the same approach I used with twitter4j, I have to solve 2 problems: - how to run multiple twitter streams in the same spark context - how to add new hashtags to the existing filter I hope that somebody will have some more elegant solution and idea, and tell me that I missed something obvious. Thanks, Zoran On Sat, Jul 25, 2015 at 8:44 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi, I've implemented Twitter streaming as in the code given at the bottom of email. It finds some tweets based on the hashtags I'm following. However, it seems that a large amount of tweets is missing. I've tried to post some tweets that I'm following in the application, and none of them was received in application. I also checked some hashtags (e.g. #android) on Twitter using Live and I could see that almost each second something was posted with that hashtag, and my application received only 3-4 posts in one minute. I didn't have this problem in earlier non-spark version of application which used twitter4j to access user stream API. I guess this is some trending stream, but I couldn't find anything that explains which Twitter API is used in Spark Twitter Streaming and how to create stream that will access everything posted on the Twitter. I hope somebody could explain what is the problem and how to solve this. Thanks, Zoran def initializeStreaming(){ val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream:DStream[Status] = TwitterUtils.createStream(ssc, auth) val filtered_statuses = stream.transform(rdd ={ val filtered = rdd.filter(status ={ var found = false for(tag - hashTagsList){ if(status.getText.toLowerCase.contains(tag)) { found = true } } found }) filtered }) filtered_statuses.foreachRDD(rdd = { rdd.collect.foreach(t = { println(t) }) }) ssc.start() } -- *** Zoran Jeremic, PhD Senior System Analyst Programmer Athabasca University Tel: +1 604 92 89 944 E-mail: zoran.jere...@gmail.com zoran.jere...@va.mod.gov.rs Homepage: http://zoranjeremic.org **
Re: Twitter streaming with apache spark stream only a small amount of tweets
Actually, I posted that question :) I already implemented solution that Akhil suggested there , and that solution is using Sample tweets API, which returns only 1% of the tweets. It would not work in my scenario of use. For the hashtags I'm interested in, I need to catch each single tweet, not only some of them. So for me, only twitter filtering API would work, but as I already wrote, there is another problem. Twitter limits to maximum number of 400 hashtags you can use in the filter. That means I need several parallel twitter streams in order to follow more hashtags. That was the problem I could not solve with Spark twitter streaming. I could not start parallel streams. The other problem is that I need to add and remove hashtags from the running streams, that is, I need to clean up stream, and initialize filter again. I managed to implement this with twitter4j directly, but not with spark-twitter streaming. Zoran On Wed, Jul 29, 2015 at 10:40 AM, Peyman Mohajerian mohaj...@gmail.com wrote: 'How to restart Twitter spark stream' i It may not be exactly what you are looking for, but i thought it did touch on some aspect of your question. On Wed, Jul 29, 2015 at 10:26 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Can you send me the subject of that email? I can't find any email suggesting solution to that problem. There is email *Twitter4j streaming question*, but it doesn't have any sample code. It just confirms what I explained earlier that without filtering Twitter will limit to 1% of tweets, and if you use filter API, Twitter limits you to 400 hashtags you can follow. Thanks, Zoran On Wed, Jul 29, 2015 at 8:40 AM, Peyman Mohajerian mohaj...@gmail.com wrote: This question was answered with sample code a couple of days ago, please look back. On Sat, Jul 25, 2015 at 11:43 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi, I discovered what is the problem here. Twitter public stream is limited to 1% of overall tweets (https://goo.gl/kDwnyS), so that's why I can't access all the tweets posted with specific hashtag using approach that I posted in previous email, so I guess this approach would not work for me. The other problem is that filtering has a limit of 400 hashtags ( https://goo.gl/BywrAk), so in order to follow more than 400 hashtags I need more parallel streams. This brings me back to my previous question (https://goo.gl/bVDkHx). In my application I need to follow more than 400 hashtags, and I need to collect each tweet having one of these hashtags. Another complication is that users could add new hashtags or remove old hashtags, so I have to update stream in the real-time. My earlier approach without Apache Spark was to create twitter4j user stream with initial filter, and each time new hashtag has to be added, stop stream, add new hashtag and run it again. When stream had 400 hashtags, I initialize new stream with new credentials. This was really complex, and I was hopping that Apache Spark would make it simpler. However, I'm trying for a days to find solution, and had no success. If I have to use the same approach I used with twitter4j, I have to solve 2 problems: - how to run multiple twitter streams in the same spark context - how to add new hashtags to the existing filter I hope that somebody will have some more elegant solution and idea, and tell me that I missed something obvious. Thanks, Zoran On Sat, Jul 25, 2015 at 8:44 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi, I've implemented Twitter streaming as in the code given at the bottom of email. It finds some tweets based on the hashtags I'm following. However, it seems that a large amount of tweets is missing. I've tried to post some tweets that I'm following in the application, and none of them was received in application. I also checked some hashtags (e.g. #android) on Twitter using Live and I could see that almost each second something was posted with that hashtag, and my application received only 3-4 posts in one minute. I didn't have this problem in earlier non-spark version of application which used twitter4j to access user stream API. I guess this is some trending stream, but I couldn't find anything that explains which Twitter API is used in Spark Twitter Streaming and how to create stream that will access everything posted on the Twitter. I hope somebody could explain what is the problem and how to solve this. Thanks, Zoran def initializeStreaming(){ val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream:DStream[Status] = TwitterUtils.createStream(ssc, auth) val filtered_statuses = stream.transform(rdd ={ val filtered = rdd.filter(status ={ var found = false for(tag - hashTagsList){ if(status.getText.toLowerCase.contains(tag)) { found = true
Re: Twitter streaming with apache spark stream only a small amount of tweets
Hi, I discovered what is the problem here. Twitter public stream is limited to 1% of overall tweets (https://goo.gl/kDwnyS), so that's why I can't access all the tweets posted with specific hashtag using approach that I posted in previous email, so I guess this approach would not work for me. The other problem is that filtering has a limit of 400 hashtags (https://goo.gl/BywrAk), so in order to follow more than 400 hashtags I need more parallel streams. This brings me back to my previous question (https://goo.gl/bVDkHx). In my application I need to follow more than 400 hashtags, and I need to collect each tweet having one of these hashtags. Another complication is that users could add new hashtags or remove old hashtags, so I have to update stream in the real-time. My earlier approach without Apache Spark was to create twitter4j user stream with initial filter, and each time new hashtag has to be added, stop stream, add new hashtag and run it again. When stream had 400 hashtags, I initialize new stream with new credentials. This was really complex, and I was hopping that Apache Spark would make it simpler. However, I'm trying for a days to find solution, and had no success. If I have to use the same approach I used with twitter4j, I have to solve 2 problems: - how to run multiple twitter streams in the same spark context - how to add new hashtags to the existing filter I hope that somebody will have some more elegant solution and idea, and tell me that I missed something obvious. Thanks, Zoran On Sat, Jul 25, 2015 at 8:44 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi, I've implemented Twitter streaming as in the code given at the bottom of email. It finds some tweets based on the hashtags I'm following. However, it seems that a large amount of tweets is missing. I've tried to post some tweets that I'm following in the application, and none of them was received in application. I also checked some hashtags (e.g. #android) on Twitter using Live and I could see that almost each second something was posted with that hashtag, and my application received only 3-4 posts in one minute. I didn't have this problem in earlier non-spark version of application which used twitter4j to access user stream API. I guess this is some trending stream, but I couldn't find anything that explains which Twitter API is used in Spark Twitter Streaming and how to create stream that will access everything posted on the Twitter. I hope somebody could explain what is the problem and how to solve this. Thanks, Zoran def initializeStreaming(){ val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream:DStream[Status] = TwitterUtils.createStream(ssc, auth) val filtered_statuses = stream.transform(rdd ={ val filtered = rdd.filter(status ={ var found = false for(tag - hashTagsList){ if(status.getText.toLowerCase.contains(tag)) { found = true } } found }) filtered }) filtered_statuses.foreachRDD(rdd = { rdd.collect.foreach(t = { println(t) }) }) ssc.start() }
Re: Spark on Tomcat has exception IncompatibleClassChangeError: Implementing class
Yes. You're right. I didn't get it till now. Thanks. On Sun, Jul 26, 2015 at 7:36 AM, Ted Yu yuzhih...@gmail.com wrote: bq. [INFO] \- org.apache.spark:spark-core_2.10:jar:1.4.0:compile I think the above notation means spark-core_2.10 is the last dependency. Cheers On Thu, Jul 23, 2015 at 9:22 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Yana, Sorry for late response. I just saw your email. At the end I ended with the following pom https://www.dropbox.com/s/19fldb9qnnfieck/pom.xml?dl=0 There were multiple problems I had to struggle with. One of these were that my application had REST implemented with jboss jersey which got conflicts with sun jersey embedded in spark libraries, so after several days of trying to fix that, I change my REST implementation and switched to sun jersey. Although I'm not sure why spark-core shows under gson: [INFO] +- com.google.code.gson:gson:jar:2.2.2:compile [INFO] \- org.apache.spark:spark-core_2.10:jar:1.4.0:compile It's not actually under gson. Both, gson and spark-core are root libraries. Spark-core has children libraries, so that's why it has \- in front of it. Zoran On Mon, Jul 13, 2015 at 1:17 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Oh, this is very interesting -- can you explain about your dependencies -- I'm running Tomcat 7 and ended up using spark-assembly from WEB_INF/lib and removing the javax/servlet package out of it...but it's a pain in the neck. If I'm reading your first message correctly you use hadoop common and spark-core? Although I'm not sure why spark-core shows under gson: [INFO] +- com.google.code.gson:gson:jar:2.2.2:compile [INFO] \- org.apache.spark:spark-core_2.10:jar:1.4.0:compile Do you fat jar spark-core? Do you have spark-assembly in Tomcat's runtime classpath anywhere? Curious on what a minimal setup here is. Thanks a lot (I'd love to see your .pom if you have it on github or somewhere accessible). On Fri, Jul 10, 2015 at 2:24 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: It looks like there is no problem with Tomcat 8. On Fri, Jul 10, 2015 at 11:12 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Ted, I'm running Tomcat 7 with Java: java version 1.8.0_45 Java(TM) SE Runtime Environment (build 1.8.0_45-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode) Zoran On Fri, Jul 10, 2015 at 10:45 AM, Ted Yu yuzhih...@gmail.com wrote: What version of Java is Tomcat run ? Thanks On Jul 10, 2015, at 10:09 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi, I've developed maven application that uses mongo-hadoop connector to pull data from mongodb and process it using Apache spark. The whole process runs smoothly if I run it on embedded Jetty server. However, if I deploy it to Tomcat server 7, it's always interrupted at the line of code which collects data from JavaPairRDD with exception that doesn't give me any clue what the problem might be: 15/07/09 20:42:05 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58302 (size: 6.3 KB, free: 946.6 MB) 15/07/09 20:42:05 INFO spark.SparkContext: Created broadcast 0 from newAPIHadoopRDD at SparkCategoryRecommender.java:106Exception in thread Thread-6 java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at org.apache.catalina.loader.WebappClassLoader.findClassInternal(WebappClassLoader.java:2918) at org.apache.catalina.loader.WebappClassLoader.findClass(WebappClassLoader.java:1174) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1669) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1547) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:74) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.newJobContext(SparkHadoopMapReduceUtil.scala:28) at org.apache.spark.rdd.NewHadoopRDD.newJobContext(NewHadoopRDD.scala:66) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:94) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120
Twitter streaming with apache spark stream only a small amount of tweets
Hi, I've implemented Twitter streaming as in the code given at the bottom of email. It finds some tweets based on the hashtags I'm following. However, it seems that a large amount of tweets is missing. I've tried to post some tweets that I'm following in the application, and none of them was received in application. I also checked some hashtags (e.g. #android) on Twitter using Live and I could see that almost each second something was posted with that hashtag, and my application received only 3-4 posts in one minute. I didn't have this problem in earlier non-spark version of application which used twitter4j to access user stream API. I guess this is some trending stream, but I couldn't find anything that explains which Twitter API is used in Spark Twitter Streaming and how to create stream that will access everything posted on the Twitter. I hope somebody could explain what is the problem and how to solve this. Thanks, Zoran def initializeStreaming(){ val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream:DStream[Status] = TwitterUtils.createStream(ssc, auth) val filtered_statuses = stream.transform(rdd ={ val filtered = rdd.filter(status ={ var found = false for(tag - hashTagsList){ if(status.getText.toLowerCase.contains(tag)) { found = true } } found }) filtered }) filtered_statuses.foreachRDD(rdd = { rdd.collect.foreach(t = { println(t) }) }) ssc.start() }
Re: How to restart Twitter spark stream
Hi Akhil, That's exactly what I needed. You saved my day :) Thanks a lot, Best, Zoran On Fri, Jul 24, 2015 at 12:28 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Yes, that is correct, sorry for confusing you. But i guess this is what you are looking for, let me know if that doesn't help: val filtered_statuses = stream.transform(rdd ={ //Instead of hardcoding, you can fetch these from a MySQL or a file or whatever val sampleHashTags = Array(#teenchoice.toLowerCase,#android.toLowerCase,#iphone.toLowerCase) val filtered = rdd.filter(status ={ var found = false for(tag - sampleHashTags){ if(status.getText.toLowerCase.contains(tag)) found = true } found }) filtered }) Thanks Best Regards On Fri, Jul 24, 2015 at 11:25 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Akhil, Thank you for sending this code. My apologize if I will ask something that is obvious here, since I'm newbie in Scala, but I still don't see how I can use this code. Maybe my original question was not very clear. What I need is to get each Twitter Status that contains one of the hashtags I'm following. I'm implementing learning platform, where each course will have at least one hashtag, e.g. #cs270computersciencecourse. If somebody post anything on Twitter with that hashtag, I want to get it and save Twitter status in the system, so it can be shown in the application. Other tweets should be ignored, but each tweet containing one of the hashtags I'm following should be stored in the application, so I can't process most popular tweets or something like that where it's possible that I miss somebody's post. There is a list of hashtags that is followed by stream, and this list should be possible to be updated by users. If I understood well, code you sent me extracts hashtags from statuses received through the stream, and it continue processing these hashtags, but at the end I will have only hashtags without statuses. Is that correct, or I missed something? Thanks, Zoran On Wed, Jul 22, 2015 at 12:41 AM, Akhil Das ak...@sigmoidanalytics.com wrote: That was a pseudo code, working version would look like this: val stream = TwitterUtils.createStream(ssc, None) val hashTags = stream.flatMap(status = status.getText.split( ).filter(_.startsWith(#))).map(x = (x.toLowerCase,1)) val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) .map{case (topic, count) = (count, topic)} .transform(_.sortByKey(false)).map(x = x._2) topCounts10.print() val filteredStream = topCounts10.transform(rdd ={ *val samplehashtags = ssc.sparkContext.parallelize(Array(#RobinWilliams.toLowerCase,#android.toLowerCase,#iphone.toLowerCase))* val newRDD = samplehashtags.map { x = (x,1) } val joined = newRDD.join(rdd) joined }) filteredStream.print() Thanks Best Regards On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Akhil and Jorn, I tried as you suggested to create some simple scenario, but I have an error on rdd.join(newRDD): value join is not a member of org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like: val stream = TwitterUtils.createStream(ssc, auth) val filteredStream= stream.transform(rdd ={ val samplehashtags=Array(music,film) val newRDD= samplehashtags.map { x = (x,1) } rdd.join(newRDD) }) Did I miss something here? Thanks, Zoran On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Thanks for explanation. If I understand this correctly, in this approach I would actually stream everything from Twitter, and perform filtering in my application using Spark. Isn't this too much overhead if my application is interested in listening for couple of hundreds or thousands hashtags? On one side, this will be better approach since I will not have the problem to open new streams if number of hashtags go over 400 which is the Twitter limit for User stream filtering, but on the other side I'm concern about how much it will affect application performance if I stream everything that is posted on Twitter and filter it locally. It would be great if somebody with experience on this could comment on these concerns. Thanks, Zoran On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question
Re: How to restart Twitter spark stream
Hi Akhil, Thank you for sending this code. My apologize if I will ask something that is obvious here, since I'm newbie in Scala, but I still don't see how I can use this code. Maybe my original question was not very clear. What I need is to get each Twitter Status that contains one of the hashtags I'm following. I'm implementing learning platform, where each course will have at least one hashtag, e.g. #cs270computersciencecourse. If somebody post anything on Twitter with that hashtag, I want to get it and save Twitter status in the system, so it can be shown in the application. Other tweets should be ignored, but each tweet containing one of the hashtags I'm following should be stored in the application, so I can't process most popular tweets or something like that where it's possible that I miss somebody's post. There is a list of hashtags that is followed by stream, and this list should be possible to be updated by users. If I understood well, code you sent me extracts hashtags from statuses received through the stream, and it continue processing these hashtags, but at the end I will have only hashtags without statuses. Is that correct, or I missed something? Thanks, Zoran On Wed, Jul 22, 2015 at 12:41 AM, Akhil Das ak...@sigmoidanalytics.com wrote: That was a pseudo code, working version would look like this: val stream = TwitterUtils.createStream(ssc, None) val hashTags = stream.flatMap(status = status.getText.split( ).filter(_.startsWith(#))).map(x = (x.toLowerCase,1)) val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) .map{case (topic, count) = (count, topic)} .transform(_.sortByKey(false)).map(x = x._2) topCounts10.print() val filteredStream = topCounts10.transform(rdd ={ *val samplehashtags = ssc.sparkContext.parallelize(Array(#RobinWilliams.toLowerCase,#android.toLowerCase,#iphone.toLowerCase))* val newRDD = samplehashtags.map { x = (x,1) } val joined = newRDD.join(rdd) joined }) filteredStream.print() Thanks Best Regards On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Akhil and Jorn, I tried as you suggested to create some simple scenario, but I have an error on rdd.join(newRDD): value join is not a member of org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like: val stream = TwitterUtils.createStream(ssc, auth) val filteredStream= stream.transform(rdd ={ val samplehashtags=Array(music,film) val newRDD= samplehashtags.map { x = (x,1) } rdd.join(newRDD) }) Did I miss something here? Thanks, Zoran On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Thanks for explanation. If I understand this correctly, in this approach I would actually stream everything from Twitter, and perform filtering in my application using Spark. Isn't this too much overhead if my application is interested in listening for couple of hundreds or thousands hashtags? On one side, this will be better approach since I will not have the problem to open new streams if number of hashtags go over 400 which is the Twitter limit for User stream filtering, but on the other side I'm concern about how much it will affect application performance if I stream everything that is posted on Twitter and filter it locally. It would be great if somebody with experience on this could comment on these concerns. Thanks, Zoran On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult. However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by join it with a rdd loading the newest hash tags from disk in a regular interval. Thanks, Zoran On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote: Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream
Re: Spark on Tomcat has exception IncompatibleClassChangeError: Implementing class
Hi Yana, Sorry for late response. I just saw your email. At the end I ended with the following pom https://www.dropbox.com/s/19fldb9qnnfieck/pom.xml?dl=0 There were multiple problems I had to struggle with. One of these were that my application had REST implemented with jboss jersey which got conflicts with sun jersey embedded in spark libraries, so after several days of trying to fix that, I change my REST implementation and switched to sun jersey. Although I'm not sure why spark-core shows under gson: [INFO] +- com.google.code.gson:gson:jar:2.2.2:compile [INFO] \- org.apache.spark:spark-core_2.10:jar:1.4.0:compile It's not actually under gson. Both, gson and spark-core are root libraries. Spark-core has children libraries, so that's why it has \- in front of it. Zoran On Mon, Jul 13, 2015 at 1:17 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Oh, this is very interesting -- can you explain about your dependencies -- I'm running Tomcat 7 and ended up using spark-assembly from WEB_INF/lib and removing the javax/servlet package out of it...but it's a pain in the neck. If I'm reading your first message correctly you use hadoop common and spark-core? Although I'm not sure why spark-core shows under gson: [INFO] +- com.google.code.gson:gson:jar:2.2.2:compile [INFO] \- org.apache.spark:spark-core_2.10:jar:1.4.0:compile Do you fat jar spark-core? Do you have spark-assembly in Tomcat's runtime classpath anywhere? Curious on what a minimal setup here is. Thanks a lot (I'd love to see your .pom if you have it on github or somewhere accessible). On Fri, Jul 10, 2015 at 2:24 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: It looks like there is no problem with Tomcat 8. On Fri, Jul 10, 2015 at 11:12 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Ted, I'm running Tomcat 7 with Java: java version 1.8.0_45 Java(TM) SE Runtime Environment (build 1.8.0_45-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode) Zoran On Fri, Jul 10, 2015 at 10:45 AM, Ted Yu yuzhih...@gmail.com wrote: What version of Java is Tomcat run ? Thanks On Jul 10, 2015, at 10:09 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi, I've developed maven application that uses mongo-hadoop connector to pull data from mongodb and process it using Apache spark. The whole process runs smoothly if I run it on embedded Jetty server. However, if I deploy it to Tomcat server 7, it's always interrupted at the line of code which collects data from JavaPairRDD with exception that doesn't give me any clue what the problem might be: 15/07/09 20:42:05 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58302 (size: 6.3 KB, free: 946.6 MB) 15/07/09 20:42:05 INFO spark.SparkContext: Created broadcast 0 from newAPIHadoopRDD at SparkCategoryRecommender.java:106Exception in thread Thread-6 java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at org.apache.catalina.loader.WebappClassLoader.findClassInternal(WebappClassLoader.java:2918) at org.apache.catalina.loader.WebappClassLoader.findClass(WebappClassLoader.java:1174) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1669) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1547) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:74) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.newJobContext(SparkHadoopMapReduceUtil.scala:28) at org.apache.spark.rdd.NewHadoopRDD.newJobContext(NewHadoopRDD.scala:66) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:94) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109
Re: How to restart Twitter spark stream
Hi Akhil and Jorn, I tried as you suggested to create some simple scenario, but I have an error on rdd.join(newRDD): value join is not a member of org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like: val stream = TwitterUtils.createStream(ssc, auth) val filteredStream= stream.transform(rdd ={ val samplehashtags=Array(music,film) val newRDD= samplehashtags.map { x = (x,1) } rdd.join(newRDD) }) Did I miss something here? Thanks, Zoran On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Thanks for explanation. If I understand this correctly, in this approach I would actually stream everything from Twitter, and perform filtering in my application using Spark. Isn't this too much overhead if my application is interested in listening for couple of hundreds or thousands hashtags? On one side, this will be better approach since I will not have the problem to open new streams if number of hashtags go over 400 which is the Twitter limit for User stream filtering, but on the other side I'm concern about how much it will affect application performance if I stream everything that is posted on Twitter and filter it locally. It would be great if somebody with experience on this could comment on these concerns. Thanks, Zoran On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult. However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by join it with a rdd loading the newest hash tags from disk in a regular interval. Thanks, Zoran On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote: Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41) at org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46) at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run
Re: How to restart Twitter spark stream
Thanks for explanation. If I understand this correctly, in this approach I would actually stream everything from Twitter, and perform filtering in my application using Spark. Isn't this too much overhead if my application is interested in listening for couple of hundreds or thousands hashtags? On one side, this will be better approach since I will not have the problem to open new streams if number of hashtags go over 400 which is the Twitter limit for User stream filtering, but on the other side I'm concern about how much it will affect application performance if I stream everything that is posted on Twitter and filter it locally. It would be great if somebody with experience on this could comment on these concerns. Thanks, Zoran On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult. However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by join it with a rdd loading the newest hash tags from disk in a regular interval. Thanks, Zoran On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote: Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41) at org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46) at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater has been closed ERROR[2015-07-18 22:24:32,503] [sparkDriver-akka.actor.default-dispatcher-3] streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error stopping receiver 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) Anybody can explain how to solve
Re: How to restart Twitter spark stream
Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult. However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by join it with a rdd loading the newest hash tags from disk in a regular interval. Thanks, Zoran On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote: Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41) at org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46) at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater has been closed ERROR[2015-07-18 22:24:32,503] [sparkDriver-akka.actor.default-dispatcher-3] streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error stopping receiver 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) Anybody can explain how to solve this issue? Thanks, Zoran -- *** Zoran Jeremic, PhD Senior System Analyst Programmer Athabasca University Tel: +1 604 92 89 944 E-mail: zoran.jere...@gmail.com zoran.jere...@va.mod.gov.rs Homepage: http://zoranjeremic.org **
How to restart Twitter spark stream
Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41) at org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46) at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater has been closed ERROR[2015-07-18 22:24:32,503] [sparkDriver-akka.actor.default-dispatcher-3] streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error stopping receiver 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) Anybody can explain how to solve this issue? Thanks, Zoran
Re: Spark on Tomcat has exception IncompatibleClassChangeError: Implementing class
It looks like there is no problem with Tomcat 8. On Fri, Jul 10, 2015 at 11:12 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Ted, I'm running Tomcat 7 with Java: java version 1.8.0_45 Java(TM) SE Runtime Environment (build 1.8.0_45-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode) Zoran On Fri, Jul 10, 2015 at 10:45 AM, Ted Yu yuzhih...@gmail.com wrote: What version of Java is Tomcat run ? Thanks On Jul 10, 2015, at 10:09 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi, I've developed maven application that uses mongo-hadoop connector to pull data from mongodb and process it using Apache spark. The whole process runs smoothly if I run it on embedded Jetty server. However, if I deploy it to Tomcat server 7, it's always interrupted at the line of code which collects data from JavaPairRDD with exception that doesn't give me any clue what the problem might be: 15/07/09 20:42:05 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58302 (size: 6.3 KB, free: 946.6 MB) 15/07/09 20:42:05 INFO spark.SparkContext: Created broadcast 0 from newAPIHadoopRDD at SparkCategoryRecommender.java:106Exception in thread Thread-6 java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at org.apache.catalina.loader.WebappClassLoader.findClassInternal(WebappClassLoader.java:2918) at org.apache.catalina.loader.WebappClassLoader.findClass(WebappClassLoader.java:1174) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1669) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1547) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:74) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.newJobContext(SparkHadoopMapReduceUtil.scala:28) at org.apache.spark.rdd.NewHadoopRDD.newJobContext(NewHadoopRDD.scala:66) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:94) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.collect(RDD.scala:884) at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:335) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:47) at com.warrantylife.iqmetrix.spark.SparkCategoryRecommender.buildDataModelOnProductid(SparkCategoryRecommender.java:149) at com.warrantylife.iqmetrix.spark.SparkCategoryRecommender.computeAndStoreRecommendationsForProductsInCategory(SparkCategoryRecommender.java:179) at com.warrantylife.recommendation.impl.CategoryRecommendationManagerImpl.computeRecommendationsForCategory(CategoryRecommendationManagerImpl.java:105) at com.warrantylife.testdata.TestDataGeneratorService$1.run(TestDataGeneratorService.java:55) at java.lang.Thread.run(Thread.java:745) I guess there is some library conflict happening on the Tomcat, but I have no idea where to look for problem. This is my whole dependency tree: [INFO] --- maven-dependency-plugin:2.1:tree (default-cli) @ warranty-analytics ---[INFO] org.warrantylife:warranty-analytics:war:0.1.0-SNAPSHOT[INFO] +- javax.servlet:javax.servlet-api:jar:3.0.1:provided[INFO] +- com.sun.jersey:jersey-client:jar:1.19:compile[INFO] +- com.sun.jersey:jersey-server:jar:1.19:compile[INFO] +- com.sun.jersey:jersey-core:jar:1.19:compile[INFO] | \- javax.ws.rs:jsr311-api:jar:1.1.1:compile[INFO] +- com.sun.jersey:jersey-servlet:jar:1.19:compile[INFO] +- org.apache.hadoop:hadoop-common:jar:2.4.1:compile[INFO] | +- org.apache.hadoop:hadoop-annotations:jar:2.4.1:compile[INFO] | | \- jdk.tools:jdk.tools:jar:1.6:system[INFO
Re: Spark on Tomcat has exception IncompatibleClassChangeError: Implementing class
Hi Ted, I'm running Tomcat 7 with Java: java version 1.8.0_45 Java(TM) SE Runtime Environment (build 1.8.0_45-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode) Zoran On Fri, Jul 10, 2015 at 10:45 AM, Ted Yu yuzhih...@gmail.com wrote: What version of Java is Tomcat run ? Thanks On Jul 10, 2015, at 10:09 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi, I've developed maven application that uses mongo-hadoop connector to pull data from mongodb and process it using Apache spark. The whole process runs smoothly if I run it on embedded Jetty server. However, if I deploy it to Tomcat server 7, it's always interrupted at the line of code which collects data from JavaPairRDD with exception that doesn't give me any clue what the problem might be: 15/07/09 20:42:05 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58302 (size: 6.3 KB, free: 946.6 MB) 15/07/09 20:42:05 INFO spark.SparkContext: Created broadcast 0 from newAPIHadoopRDD at SparkCategoryRecommender.java:106Exception in thread Thread-6 java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at org.apache.catalina.loader.WebappClassLoader.findClassInternal(WebappClassLoader.java:2918) at org.apache.catalina.loader.WebappClassLoader.findClass(WebappClassLoader.java:1174) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1669) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1547) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:74) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.newJobContext(SparkHadoopMapReduceUtil.scala:28) at org.apache.spark.rdd.NewHadoopRDD.newJobContext(NewHadoopRDD.scala:66) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:94) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.collect(RDD.scala:884) at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:335) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:47) at com.warrantylife.iqmetrix.spark.SparkCategoryRecommender.buildDataModelOnProductid(SparkCategoryRecommender.java:149) at com.warrantylife.iqmetrix.spark.SparkCategoryRecommender.computeAndStoreRecommendationsForProductsInCategory(SparkCategoryRecommender.java:179) at com.warrantylife.recommendation.impl.CategoryRecommendationManagerImpl.computeRecommendationsForCategory(CategoryRecommendationManagerImpl.java:105) at com.warrantylife.testdata.TestDataGeneratorService$1.run(TestDataGeneratorService.java:55) at java.lang.Thread.run(Thread.java:745) I guess there is some library conflict happening on the Tomcat, but I have no idea where to look for problem. This is my whole dependency tree: [INFO] --- maven-dependency-plugin:2.1:tree (default-cli) @ warranty-analytics ---[INFO] org.warrantylife:warranty-analytics:war:0.1.0-SNAPSHOT[INFO] +- javax.servlet:javax.servlet-api:jar:3.0.1:provided[INFO] +- com.sun.jersey:jersey-client:jar:1.19:compile[INFO] +- com.sun.jersey:jersey-server:jar:1.19:compile[INFO] +- com.sun.jersey:jersey-core:jar:1.19:compile[INFO] | \- javax.ws.rs:jsr311-api:jar:1.1.1:compile[INFO] +- com.sun.jersey:jersey-servlet:jar:1.19:compile[INFO] +- org.apache.hadoop:hadoop-common:jar:2.4.1:compile[INFO] | +- org.apache.hadoop:hadoop-annotations:jar:2.4.1:compile[INFO] | | \- jdk.tools:jdk.tools:jar:1.6:system[INFO] | +- com.google.guava:guava:jar:11.0.2:compile[INFO] | +- commons-cli:commons-cli:jar:1.2:compile[INFO] | +- xmlenc:xmlenc:jar
Spark on Tomcat has exception IncompatibleClassChangeError: Implementing class
Hi, I've developed maven application that uses mongo-hadoop connector to pull data from mongodb and process it using Apache spark. The whole process runs smoothly if I run it on embedded Jetty server. However, if I deploy it to Tomcat server 7, it's always interrupted at the line of code which collects data from JavaPairRDD with exception that doesn't give me any clue what the problem might be: 15/07/09 20:42:05 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58302 (size: 6.3 KB, free: 946.6 MB) 15/07/09 20:42:05 INFO spark.SparkContext: Created broadcast 0 from newAPIHadoopRDD at SparkCategoryRecommender.java:106Exception in thread Thread-6 java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at org.apache.catalina.loader.WebappClassLoader.findClassInternal(WebappClassLoader.java:2918) at org.apache.catalina.loader.WebappClassLoader.findClass(WebappClassLoader.java:1174) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1669) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1547) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:74) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.newJobContext(SparkHadoopMapReduceUtil.scala:28) at org.apache.spark.rdd.NewHadoopRDD.newJobContext(NewHadoopRDD.scala:66) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:94) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.collect(RDD.scala:884) at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:335) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:47) at com.warrantylife.iqmetrix.spark.SparkCategoryRecommender.buildDataModelOnProductid(SparkCategoryRecommender.java:149) at com.warrantylife.iqmetrix.spark.SparkCategoryRecommender.computeAndStoreRecommendationsForProductsInCategory(SparkCategoryRecommender.java:179) at com.warrantylife.recommendation.impl.CategoryRecommendationManagerImpl.computeRecommendationsForCategory(CategoryRecommendationManagerImpl.java:105) at com.warrantylife.testdata.TestDataGeneratorService$1.run(TestDataGeneratorService.java:55) at java.lang.Thread.run(Thread.java:745) I guess there is some library conflict happening on the Tomcat, but I have no idea where to look for problem. This is my whole dependency tree: [INFO] --- maven-dependency-plugin:2.1:tree (default-cli) @ warranty-analytics ---[INFO] org.warrantylife:warranty-analytics:war:0.1.0-SNAPSHOT[INFO] +- javax.servlet:javax.servlet-api:jar:3.0.1:provided[INFO] +- com.sun.jersey:jersey-client:jar:1.19:compile[INFO] +- com.sun.jersey:jersey-server:jar:1.19:compile[INFO] +- com.sun.jersey:jersey-core:jar:1.19:compile[INFO] | \- javax.ws.rs:jsr311-api:jar:1.1.1:compile[INFO] +- com.sun.jersey:jersey-servlet:jar:1.19:compile[INFO] +- org.apache.hadoop:hadoop-common:jar:2.4.1:compile[INFO] | +- org.apache.hadoop:hadoop-annotations:jar:2.4.1:compile[INFO] | | \- jdk.tools:jdk.tools:jar:1.6:system[INFO] | +- com.google.guava:guava:jar:11.0.2:compile[INFO] | +- commons-cli:commons-cli:jar:1.2:compile[INFO] | +- xmlenc:xmlenc:jar:0.52:compile[INFO] | +- commons-httpclient:commons-httpclient:jar:3.1:compile[INFO] | +- commons-codec:commons-codec:jar:1.4:compile[INFO] | +- commons-io:commons-io:jar:2.4:compile[INFO] | +- commons-net:commons-net:jar:3.1:compile[INFO] | +- commons-collections:commons-collections:jar:3.2.1:compile[INFO] | +- javax.servlet:servlet-api:jar:2.5:compile[INFO] | +- org.mortbay.jetty:jetty:jar:6.1.26:compile[INFO] | +- org.mortbay.jetty:jetty-util:jar:6.1.26:compile[INFO] |