How to specify the numFeatures in HashingTF
Hi, There is a parameter in the HashingTF called "numFeatures". I was wondering what is the best way to set the value to this parameter. In the use case of text categorization, do you need to know in advance the number of words in your vocabulary? or do you set it to be a large value, greater than the number of words in your vocabulary? Thanks, Jianguo
Re: workaround for groupByKey
Thanks. Yes, unfortunately, they all need to be grouped. I guess I can partition the record by user id. However, I have millions of users, do you think partition by user id will help? Jianguo On Mon, Jun 22, 2015 at 6:28 PM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > You’re right of course, I’m sorry. I was typing before thinking about > what you actually asked! > > On a second thought, what is the ultimate outcome for what you want the > sequence of pages for? Do they need to actually all be grouped? Could you > instead partition by user id then use a mapPartitions perhaps? > > From: Jianguo Li > Date: Monday, June 22, 2015 at 6:21 PM > To: Silvio Fiorito > Cc: "user@spark.apache.org" > Subject: Re: workaround for groupByKey > > Thanks for your suggestion. I guess aggregateByKey is similar to > combineByKey. I read in the Learning Sparking > > *We can disable map-side aggregation in combineByKey() if we know that > our data won’t benefit from it. For example, groupByKey() disables map-side > aggregation as the aggregation function (appending to a list) does not save > any space. If we want to disable map-side combines, we need to specify the > partitioner; for now you can just use the partitioner on the source RDD by > passingrdd.partitioner* > > It seems that when the map-side aggregation function is to append > something to a list (as opposed to summing over all the numbers), then this > map-side aggregation does not offer any benefit since appending to a list > does not save any space. Is my understanding correct? > > Thanks, > > Jianguo > > On Mon, Jun 22, 2015 at 4:43 PM, Silvio Fiorito < > silvio.fior...@granturing.com> wrote: > >> You can use aggregateByKey as one option: >> >> val input: RDD[Int, String] = ... >> >> val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) => a >> += b, (a, b) => a ++ b) >> >> From: Jianguo Li >> Date: Monday, June 22, 2015 at 5:12 PM >> To: "user@spark.apache.org" >> Subject: workaround for groupByKey >> >> Hi, >> >> I am processing an RDD of key-value pairs. The key is an user_id, and >> the value is an website url the user has ever visited. >> >> Since I need to know all the urls each user has visited, I am tempted >> to call the groupByKey on this RDD. However, since there could be millions >> of users and urls, the shuffling caused by groupByKey proves to be a major >> bottleneck to get the job done. Is there any workaround? I want to end up >> with an RDD of key-value pairs, where the key is an user_id, the value is a >> list of all the urls visited by the user. >> >> Thanks, >> >> Jianguo >> > >
Re: workaround for groupByKey
Thanks for your suggestion. I guess aggregateByKey is similar to combineByKey. I read in the Learning Sparking *We can disable map-side aggregation in combineByKey() if we know that our data won’t benefit from it. For example, groupByKey() disables map-side aggregation as the aggregation function (appending to a list) does not save any space. If we want to disable map-side combines, we need to specify the partitioner; for now you can just use the partitioner on the source RDD by passingrdd.partitioner* It seems that when the map-side aggregation function is to append something to a list (as opposed to summing over all the numbers), then this map-side aggregation does not offer any benefit since appending to a list does not save any space. Is my understanding correct? Thanks, Jianguo On Mon, Jun 22, 2015 at 4:43 PM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > You can use aggregateByKey as one option: > > val input: RDD[Int, String] = ... > > val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) => a += > b, (a, b) => a ++ b) > > From: Jianguo Li > Date: Monday, June 22, 2015 at 5:12 PM > To: "user@spark.apache.org" > Subject: workaround for groupByKey > > Hi, > > I am processing an RDD of key-value pairs. The key is an user_id, and > the value is an website url the user has ever visited. > > Since I need to know all the urls each user has visited, I am tempted > to call the groupByKey on this RDD. However, since there could be millions > of users and urls, the shuffling caused by groupByKey proves to be a major > bottleneck to get the job done. Is there any workaround? I want to end up > with an RDD of key-value pairs, where the key is an user_id, the value is a > list of all the urls visited by the user. > > Thanks, > > Jianguo >
workaround for groupByKey
Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo
spark ml model info
Hi, I am training a model using the logistic regression algorithm in ML. I was wondering if there is any API to access the weight vectors (aka the co-efficients for each feature). I need those co-efficients for real time predictions. Thanks, Jianguo
feature scaling in GeneralizedLinearAlgorithm.scala
Hi, In the GeneralizedLinearAlgorithm, which Logistic Regression relied on, it says "if userFeatureScaling is enabled, we will standardize the training features , and trained the model in the scaled space. Then we transform the coefficients from the scaled space to the original space ...". My understanding then is we do not need to scale the test data since the coefficients are already in the original space, is this correct? Thanks Jianguo
Spark ML pipeline
Hi, I really like the pipeline in the spark.ml in Spark1.2 release. Will there be more machine learning algorithms implemented for the pipeline framework in the next major release? Any idea when the next major release comes out? Thanks, Jianguo
Re: Does the kFold in Spark always give you the same split?
Thanks. I did specify a seed parameter. Seems that the problem is not caused by kFold. I actually ran another experiment without cross validation. I just built a model with the training data and then tested the model on the test data. However, the accuracy still varies from one run to another. Interestingly, this only happens when I ran the experiment on our cluster. If I ran the experiment on my local machine, I can reproduce the result each time. Has anybody encountered similar issue before? Thanks, Jianguo On Fri, Jan 30, 2015 at 11:22 AM, Sean Owen wrote: > Have a look at the source code for MLUtils.kFold. Yes, there is a > random element. That's good; you want the folds to be randomly chosen. > Note there is a seed parameter, as in a lot of the APIs, that lets you > fix the RNG seed and so get the same result every time, if you need > to. > > On Fri, Jan 30, 2015 at 4:12 PM, Jianguo Li > wrote: > > Hi, > > > > I am using the utility function kFold provided in Spark for doing k-fold > > cross validation using logistic regression. However, each time I run the > > experiment, I got different different result. Since everything else stays > > constant, I was wondering if this is due to the kFold function I used. > Does > > anyone know if the kFold gives you a different split on a data set each > time > > you call it? > > > > Thanks, > > > > Jianguo >
Does the kFold in Spark always give you the same split?
Hi, I am using the utility function kFold provided in Spark for doing k-fold cross validation using logistic regression. However, each time I run the experiment, I got different different result. Since everything else stays constant, I was wondering if this is due to the kFold function I used. Does anyone know if the kFold gives you a different split on a data set each time you call it? Thanks, Jianguo
unit tests with "java.io.IOException: Could not create FileClient"
Hi, I created some unit tests to test some of the functions in my project which use Spark. However, when I used the sbt tool to build it and then ran the "sbt test", I ran into "java.io.IOException: Could not create FileClient": 2015-01-19 08:50:38,1894 ERROR Client fs/client/fileclient/cc/client.cc:385 Thread: -2 Failed to initialize client for cluster 127.0.0.1:7222, error Unknown error(108) num lines: 21 [info] TextFileAdapterTestSuite: [info] - Checking the RDD Vector Length *** FAILED *** [info] java.io.IOException: Could not create FileClient [info] at com.mapr.fs.MapRFileSystem.lookupClient(MapRFileSystem.java:351) [info] at com.mapr.fs.MapRFileSystem.lookupClient(MapRFileSystem.java:363) [info] at com.mapr.fs.MapRFileSystem.getMapRFileStatus(MapRFileSystem.java:795) [info] at com.mapr.fs.MapRFileSystem.getFileStatus(MapRFileSystem.java:822) [info] at org.apache.hadoop.fs.FileSystem.getFileStatus(FileSystem.java:1419) [info] at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1092) [info] at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1031) [info] at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:231) [info] at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277) [info] at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) [info] ... The only tests failed, which I believe led to this exception are the ones where my functions call the SparkContext's function textFile(). I tried to debug this, and found that the exception seems to take place within the textFile() function. Does anybody know what is the issue and how to fix it? I used the local host for the SparkContext, does it have anything to do with this exception. Thanks, Jianguo
Re: component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use
I solved the issue. In case anyone else is looking for an answer, by default, scalatest executes all the tests in parallel. To disable this, just put the following line in your build.sbt parallelExecution in Test := false Thanks On Wed, Jan 14, 2015 at 2:30 PM, Jianguo Li wrote: > Hi, > > I am using the sbt tool to build and run the scala tests related to spark. > In my /src/test/scala directory, there are two test classes (TestA, TestB), > both of which use the class in Spark for creating SparkContext, something > like > > trait LocalTestSparkContext extends BeforeAndAfterAll { self: Suite => > @transient var sc: SparkContext = _ > > override def beforeAll() { > super.beforeAll() > val conf = new SparkConf() > .setMaster("local[2]") > .setAppName("LocalSparkUnitTest") > sc = new SparkContext(conf) > } > > override def afterAll() { > if (sc != null) { > sc.stop() > } > super.afterAll() > } > } > > So, TestA and TestB are defined as > > class TestA extends FunSuite with LocalTestSparkContext > class TestB extends FunSuite with LocalTestSparkContext > > However, when I built the project using sbt and ran sbt test, I got the > following error. However, no error occurred if I only had one test. Is this > related to the SparkContext? Only one sc should be active? However, I > thought the LocalTestSparkContext should already take care of this since it > stops sc at the end of each class. I am totally lost, could someone let me > know what is the issue and how to resolve it? Thanks a lot. > > 15/01/14 14:12:43 WARN component.AbstractLifeCycle: FAILED > SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address > already in use: bind > java.net.BindException: Address already in use: bind > at sun.nio.ch.Net.bind0(Native Method) > at sun.nio.ch.Net.bind(Unknown Source) > at sun.nio.ch.Net.bind(Unknown Source) > at sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source) > at sun.nio.ch.ServerSocketAdaptor.bind(Unknown Source) > at > org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) > at > org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) > at > org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) > at > org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) > at org.eclipse.jetty.server.Server.doStart(Server.java:293) > at > org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) > at > org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:195) > at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:205) > at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:205) > at > org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1504) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1495) > at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:205) > at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) > at org.apache.spark.SparkContext$$anonfun$10.apply(SparkContext.scala:234) > at org.apache.spark.SparkContext$$anonfun$10.apply(SparkContext.scala:234) > at scala.Option.foreach(Option.scala:236) > at org.apache.spark.SparkContext.(SparkContext.scala:234) > at > com.unittest.LocalTestSparkContext$class.beforeAll(LocalTestSparkContext.scala:35) > at com.unittestt.TestB.beforeAll(TestB.scala:14) > at > org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187) > at com.unittest.TestB.beforeAll(TestB.scala:14) > at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:253) > at com.unittest.TestB.run(TestB.scala:14) > at org.scalatest.tools.Framework.org > $scalatest$tools$Framework$$runSuite(Framework.scala:444) > at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:651) > at sbt.TestRunner.runTest$1(TestFramework.scala:76) > at sbt.TestRunner.run(TestFramework.scala:85) > at > sbt.TestFramework$$anon$2$$anonfun$$init$$1$$anonfun$apply$8.apply(TestFramework.scala:202) > at > sbt.TestFramework$$anon$2$$anonfun$$init$$1$$anonfun$apply$8.apply(TestFramework.scala:202) > at > sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:185) > at > sbt.TestFramework$$anon$2$$anonfun$$init$$1.apply(TestFramework.scala:202) > at > sbt.TestFramework$$anon$2$$anonfun$$init$$1.apply(TestFramework.scala:202) > at sbt.TestFunction.apply(TestFramework.scala:207) > at sbt.Tests$$anonfun$9.apply(Tests.scala:216) > at sbt.Tests$$anonfun$9.app
component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use
Hi, I am using the sbt tool to build and run the scala tests related to spark. In my /src/test/scala directory, there are two test classes (TestA, TestB), both of which use the class in Spark for creating SparkContext, something like trait LocalTestSparkContext extends BeforeAndAfterAll { self: Suite => @transient var sc: SparkContext = _ override def beforeAll() { super.beforeAll() val conf = new SparkConf() .setMaster("local[2]") .setAppName("LocalSparkUnitTest") sc = new SparkContext(conf) } override def afterAll() { if (sc != null) { sc.stop() } super.afterAll() } } So, TestA and TestB are defined as class TestA extends FunSuite with LocalTestSparkContext class TestB extends FunSuite with LocalTestSparkContext However, when I built the project using sbt and ran sbt test, I got the following error. However, no error occurred if I only had one test. Is this related to the SparkContext? Only one sc should be active? However, I thought the LocalTestSparkContext should already take care of this since it stops sc at the end of each class. I am totally lost, could someone let me know what is the issue and how to resolve it? Thanks a lot. 15/01/14 14:12:43 WARN component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use: bind java.net.BindException: Address already in use: bind at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Unknown Source) at sun.nio.ch.Net.bind(Unknown Source) at sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source) at sun.nio.ch.ServerSocketAdaptor.bind(Unknown Source) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:195) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:205) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:205) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1504) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1495) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:205) at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) at org.apache.spark.SparkContext$$anonfun$10.apply(SparkContext.scala:234) at org.apache.spark.SparkContext$$anonfun$10.apply(SparkContext.scala:234) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkContext.(SparkContext.scala:234) at com.unittest.LocalTestSparkContext$class.beforeAll(LocalTestSparkContext.scala:35) at com.unittestt.TestB.beforeAll(TestB.scala:14) at org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187) at com.unittest.TestB.beforeAll(TestB.scala:14) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:253) at com.unittest.TestB.run(TestB.scala:14) at org.scalatest.tools.Framework.org $scalatest$tools$Framework$$runSuite(Framework.scala:444) at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:651) at sbt.TestRunner.runTest$1(TestFramework.scala:76) at sbt.TestRunner.run(TestFramework.scala:85) at sbt.TestFramework$$anon$2$$anonfun$$init$$1$$anonfun$apply$8.apply(TestFramework.scala:202) at sbt.TestFramework$$anon$2$$anonfun$$init$$1$$anonfun$apply$8.apply(TestFramework.scala:202) at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:185) at sbt.TestFramework$$anon$2$$anonfun$$init$$1.apply(TestFramework.scala:202) at sbt.TestFramework$$anon$2$$anonfun$$init$$1.apply(TestFramework.scala:202) at sbt.TestFunction.apply(TestFramework.scala:207) at sbt.Tests$$anonfun$9.apply(Tests.scala:216) at sbt.Tests$$anonfun$9.apply(Tests.scala:216) at sbt.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:44) at sbt.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:44) at sbt.std.Transform$$anon$4.work(System.scala:63) at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226) at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226) at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17) at sbt.Execute.work(Execute.scala:235) at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226) at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226) at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159) at sbt.CompletionService$$anon$2.call(CompletionService.scala:28) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.
java.lang.NoClassDefFoundError: io/netty/util/TimerTask Error when running sbt test
I am using Spark-1.1.1. When I used "sbt test", I ran into the following exceptions. Any idea how to solve it? Thanks! I think somebody posted this question before, but no one seemed to have answered it. Could it be the version of "io.netty" I put in my build.sbt? I included an dependency "libraryDependencies += "io.netty" % "netty" % "3.6.6.Final" in my build.sbt file. java.lang.NoClassDefFoundError: io/netty/util/TimerTaskat org.apache.spark.storage.BlockManager.(BlockManager.scala:72) at org.apache.spark.storage.BlockManager.(BlockManager.scala:168) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:230)at org.apache.spark.SparkContext.(SparkContext.scala:204) at spark.jobserver.util.DefaultSparkContextFactory.makeContext(SparkContextFactory.scala:34) at spark.jobserver.JobManagerActor.createContextFromConfig(JobManagerActor.scala:255) at spark.jobserver.JobManagerActor$$anonfun$wrappedReceive$1.applyOrElse(JobManagerActor.scala:104) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)...
[no subject]
I am using Spark-1.1.1. When I used "sbt test", I ran into the following exceptions. Any idea how to solve it? Thanks! I think somebody posted this question before, but no one seemed to have answered it. Could it be the version of "io.netty" I put in my build.sbt? I included an dependency "libraryDependencies += "io.netty" % "netty" % "3.6.6.Final" in my build.sbt file. java.lang.NoClassDefFoundError: io/netty/util/TimerTaskat org.apache.spark.storage.BlockManager.(BlockManager.scala:72) at org.apache.spark.storage.BlockManager.(BlockManager.scala:168) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:230)at org.apache.spark.SparkContext.(SparkContext.scala:204) at spark.jobserver.util.DefaultSparkContextFactory.makeContext(SparkContextFactory.scala:34) at spark.jobserver.JobManagerActor.createContextFromConfig(JobManagerActor.scala:255) at spark.jobserver.JobManagerActor$$anonfun$wrappedReceive$1.applyOrElse(JobManagerActor.scala:104) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)...
including the spark-mllib in build.sbt
Hi, I am trying to build my own scala project using sbt. The project is dependent on both spark-score and spark-mllib. I included the following two dependencies in my build.sbt file libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.1.1" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.1" However, when I run the "package" command in sbt, I got an error message indicating that "object mllib is not a member of package org.apache.spark". Did I do anything wrong? Thanks, Jianguo
confidence/probability for prediction in MLlib
Hi, A while ago, somebody asked about getting a confidence value of a prediction with MLlib's implementation of Naive Bayes's classification. I was wondering if there is any plan in the near future for the predict function to return both a label and a confidence/probability? Or could the private variables in the various machine learning models be exposed so we could write our own functions which return both? Having a confidence/probability could be very useful in real application. For one thing, you can choose to trust the predicted label only if it has a high confidence level. Also, if you want to combine the results from multiple classifiers, the confidence/probability could be used as some kind of weight for combining. Thanks, Jianguo