How to specify the numFeatures in HashingTF
Hi, There is a parameter in the HashingTF called "numFeatures". I was wondering what is the best way to set the value to this parameter. In the use case of text categorization, do you need to know in advance the number of words in your vocabulary? or do you set it to be a large value, greater than the number of words in your vocabulary? Thanks, Jianguo
Re: workaround for groupByKey
Thanks. Yes, unfortunately, they all need to be grouped. I guess I can partition the record by user id. However, I have millions of users, do you think partition by user id will help? Jianguo On Mon, Jun 22, 2015 at 6:28 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: You’re right of course, I’m sorry. I was typing before thinking about what you actually asked! On a second thought, what is the ultimate outcome for what you want the sequence of pages for? Do they need to actually all be grouped? Could you instead partition by user id then use a mapPartitions perhaps? From: Jianguo Li Date: Monday, June 22, 2015 at 6:21 PM To: Silvio Fiorito Cc: user@spark.apache.org Subject: Re: workaround for groupByKey Thanks for your suggestion. I guess aggregateByKey is similar to combineByKey. I read in the Learning Sparking *We can disable map-side aggregation in combineByKey() if we know that our data won’t benefit from it. For example, groupByKey() disables map-side aggregation as the aggregation function (appending to a list) does not save any space. If we want to disable map-side combines, we need to specify the partitioner; for now you can just use the partitioner on the source RDD by passingrdd.partitioner* It seems that when the map-side aggregation function is to append something to a list (as opposed to summing over all the numbers), then this map-side aggregation does not offer any benefit since appending to a list does not save any space. Is my understanding correct? Thanks, Jianguo On Mon, Jun 22, 2015 at 4:43 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: You can use aggregateByKey as one option: val input: RDD[Int, String] = ... val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) = a += b, (a, b) = a ++ b) From: Jianguo Li Date: Monday, June 22, 2015 at 5:12 PM To: user@spark.apache.org Subject: workaround for groupByKey Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo
workaround for groupByKey
Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo
spark ml model info
Hi, I am training a model using the logistic regression algorithm in ML. I was wondering if there is any API to access the weight vectors (aka the co-efficients for each feature). I need those co-efficients for real time predictions. Thanks, Jianguo
feature scaling in GeneralizedLinearAlgorithm.scala
Hi, In the GeneralizedLinearAlgorithm, which Logistic Regression relied on, it says if userFeatureScaling is enabled, we will standardize the training features , and trained the model in the scaled space. Then we transform the coefficients from the scaled space to the original space My understanding then is we do not need to scale the test data since the coefficients are already in the original space, is this correct? Thanks Jianguo
Does the kFold in Spark always give you the same split?
Hi, I am using the utility function kFold provided in Spark for doing k-fold cross validation using logistic regression. However, each time I run the experiment, I got different different result. Since everything else stays constant, I was wondering if this is due to the kFold function I used. Does anyone know if the kFold gives you a different split on a data set each time you call it? Thanks, Jianguo
Re: Does the kFold in Spark always give you the same split?
Thanks. I did specify a seed parameter. Seems that the problem is not caused by kFold. I actually ran another experiment without cross validation. I just built a model with the training data and then tested the model on the test data. However, the accuracy still varies from one run to another. Interestingly, this only happens when I ran the experiment on our cluster. If I ran the experiment on my local machine, I can reproduce the result each time. Has anybody encountered similar issue before? Thanks, Jianguo On Fri, Jan 30, 2015 at 11:22 AM, Sean Owen so...@cloudera.com wrote: Have a look at the source code for MLUtils.kFold. Yes, there is a random element. That's good; you want the folds to be randomly chosen. Note there is a seed parameter, as in a lot of the APIs, that lets you fix the RNG seed and so get the same result every time, if you need to. On Fri, Jan 30, 2015 at 4:12 PM, Jianguo Li flyingfromch...@gmail.com wrote: Hi, I am using the utility function kFold provided in Spark for doing k-fold cross validation using logistic regression. However, each time I run the experiment, I got different different result. Since everything else stays constant, I was wondering if this is due to the kFold function I used. Does anyone know if the kFold gives you a different split on a data set each time you call it? Thanks, Jianguo
unit tests with java.io.IOException: Could not create FileClient
Hi, I created some unit tests to test some of the functions in my project which use Spark. However, when I used the sbt tool to build it and then ran the sbt test, I ran into java.io.IOException: Could not create FileClient: 2015-01-19 08:50:38,1894 ERROR Client fs/client/fileclient/cc/client.cc:385 Thread: -2 Failed to initialize client for cluster 127.0.0.1:7222, error Unknown error(108) num lines: 21 [info] TextFileAdapterTestSuite: [info] - Checking the RDD Vector Length *** FAILED *** [info] java.io.IOException: Could not create FileClient [info] at com.mapr.fs.MapRFileSystem.lookupClient(MapRFileSystem.java:351) [info] at com.mapr.fs.MapRFileSystem.lookupClient(MapRFileSystem.java:363) [info] at com.mapr.fs.MapRFileSystem.getMapRFileStatus(MapRFileSystem.java:795) [info] at com.mapr.fs.MapRFileSystem.getFileStatus(MapRFileSystem.java:822) [info] at org.apache.hadoop.fs.FileSystem.getFileStatus(FileSystem.java:1419) [info] at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1092) [info] at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1031) [info] at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:231) [info] at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277) [info] at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) [info] ... The only tests failed, which I believe led to this exception are the ones where my functions call the SparkContext's function textFile(). I tried to debug this, and found that the exception seems to take place within the textFile() function. Does anybody know what is the issue and how to fix it? I used the local host for the SparkContext, does it have anything to do with this exception. Thanks, Jianguo
component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use
Hi, I am using the sbt tool to build and run the scala tests related to spark. In my /src/test/scala directory, there are two test classes (TestA, TestB), both of which use the class in Spark for creating SparkContext, something like trait LocalTestSparkContext extends BeforeAndAfterAll { self: Suite = @transient var sc: SparkContext = _ override def beforeAll() { super.beforeAll() val conf = new SparkConf() .setMaster(local[2]) .setAppName(LocalSparkUnitTest) sc = new SparkContext(conf) } override def afterAll() { if (sc != null) { sc.stop() } super.afterAll() } } So, TestA and TestB are defined as class TestA extends FunSuite with LocalTestSparkContext class TestB extends FunSuite with LocalTestSparkContext However, when I built the project using sbt and ran sbt test, I got the following error. However, no error occurred if I only had one test. Is this related to the SparkContext? Only one sc should be active? However, I thought the LocalTestSparkContext should already take care of this since it stops sc at the end of each class. I am totally lost, could someone let me know what is the issue and how to resolve it? Thanks a lot. 15/01/14 14:12:43 WARN component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use: bind java.net.BindException: Address already in use: bind at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Unknown Source) at sun.nio.ch.Net.bind(Unknown Source) at sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source) at sun.nio.ch.ServerSocketAdaptor.bind(Unknown Source) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:195) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:205) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:205) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1504) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1495) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:205) at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) at org.apache.spark.SparkContext$$anonfun$10.apply(SparkContext.scala:234) at org.apache.spark.SparkContext$$anonfun$10.apply(SparkContext.scala:234) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkContext.init(SparkContext.scala:234) at com.unittest.LocalTestSparkContext$class.beforeAll(LocalTestSparkContext.scala:35) at com.unittestt.TestB.beforeAll(TestB.scala:14) at org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187) at com.unittest.TestB.beforeAll(TestB.scala:14) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:253) at com.unittest.TestB.run(TestB.scala:14) at org.scalatest.tools.Framework.org $scalatest$tools$Framework$$runSuite(Framework.scala:444) at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:651) at sbt.TestRunner.runTest$1(TestFramework.scala:76) at sbt.TestRunner.run(TestFramework.scala:85) at sbt.TestFramework$$anon$2$$anonfun$$init$$1$$anonfun$apply$8.apply(TestFramework.scala:202) at sbt.TestFramework$$anon$2$$anonfun$$init$$1$$anonfun$apply$8.apply(TestFramework.scala:202) at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:185) at sbt.TestFramework$$anon$2$$anonfun$$init$$1.apply(TestFramework.scala:202) at sbt.TestFramework$$anon$2$$anonfun$$init$$1.apply(TestFramework.scala:202) at sbt.TestFunction.apply(TestFramework.scala:207) at sbt.Tests$$anonfun$9.apply(Tests.scala:216) at sbt.Tests$$anonfun$9.apply(Tests.scala:216) at sbt.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:44) at sbt.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:44) at sbt.std.Transform$$anon$4.work(System.scala:63) at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226) at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226) at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17) at sbt.Execute.work(Execute.scala:235) at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226) at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226) at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159) at sbt.CompletionService$$anon$2.call(CompletionService.scala:28) at java.util.concurrent.FutureTask.run(Unknown Source) at
Re: component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use
I solved the issue. In case anyone else is looking for an answer, by default, scalatest executes all the tests in parallel. To disable this, just put the following line in your build.sbt parallelExecution in Test := false Thanks On Wed, Jan 14, 2015 at 2:30 PM, Jianguo Li flyingfromch...@gmail.com wrote: Hi, I am using the sbt tool to build and run the scala tests related to spark. In my /src/test/scala directory, there are two test classes (TestA, TestB), both of which use the class in Spark for creating SparkContext, something like trait LocalTestSparkContext extends BeforeAndAfterAll { self: Suite = @transient var sc: SparkContext = _ override def beforeAll() { super.beforeAll() val conf = new SparkConf() .setMaster(local[2]) .setAppName(LocalSparkUnitTest) sc = new SparkContext(conf) } override def afterAll() { if (sc != null) { sc.stop() } super.afterAll() } } So, TestA and TestB are defined as class TestA extends FunSuite with LocalTestSparkContext class TestB extends FunSuite with LocalTestSparkContext However, when I built the project using sbt and ran sbt test, I got the following error. However, no error occurred if I only had one test. Is this related to the SparkContext? Only one sc should be active? However, I thought the LocalTestSparkContext should already take care of this since it stops sc at the end of each class. I am totally lost, could someone let me know what is the issue and how to resolve it? Thanks a lot. 15/01/14 14:12:43 WARN component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use: bind java.net.BindException: Address already in use: bind at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Unknown Source) at sun.nio.ch.Net.bind(Unknown Source) at sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source) at sun.nio.ch.ServerSocketAdaptor.bind(Unknown Source) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:195) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:205) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:205) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1504) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1495) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:205) at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) at org.apache.spark.SparkContext$$anonfun$10.apply(SparkContext.scala:234) at org.apache.spark.SparkContext$$anonfun$10.apply(SparkContext.scala:234) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkContext.init(SparkContext.scala:234) at com.unittest.LocalTestSparkContext$class.beforeAll(LocalTestSparkContext.scala:35) at com.unittestt.TestB.beforeAll(TestB.scala:14) at org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187) at com.unittest.TestB.beforeAll(TestB.scala:14) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:253) at com.unittest.TestB.run(TestB.scala:14) at org.scalatest.tools.Framework.org $scalatest$tools$Framework$$runSuite(Framework.scala:444) at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:651) at sbt.TestRunner.runTest$1(TestFramework.scala:76) at sbt.TestRunner.run(TestFramework.scala:85) at sbt.TestFramework$$anon$2$$anonfun$$init$$1$$anonfun$apply$8.apply(TestFramework.scala:202) at sbt.TestFramework$$anon$2$$anonfun$$init$$1$$anonfun$apply$8.apply(TestFramework.scala:202) at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:185) at sbt.TestFramework$$anon$2$$anonfun$$init$$1.apply(TestFramework.scala:202) at sbt.TestFramework$$anon$2$$anonfun$$init$$1.apply(TestFramework.scala:202) at sbt.TestFunction.apply(TestFramework.scala:207) at sbt.Tests$$anonfun$9.apply(Tests.scala:216) at sbt.Tests$$anonfun$9.apply(Tests.scala:216) at sbt.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:44) at sbt.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:44) at sbt.std.Transform$$anon$4.work(System.scala:63) at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226) at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226) at sbt.ErrorHandling
[no subject]
I am using Spark-1.1.1. When I used sbt test, I ran into the following exceptions. Any idea how to solve it? Thanks! I think somebody posted this question before, but no one seemed to have answered it. Could it be the version of io.netty I put in my build.sbt? I included an dependency libraryDependencies += io.netty % netty % 3.6.6.Final in my build.sbt file. java.lang.NoClassDefFoundError: io/netty/util/TimerTaskat org.apache.spark.storage.BlockManager.init(BlockManager.scala:72) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:168) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:230)at org.apache.spark.SparkContext.init(SparkContext.scala:204) at spark.jobserver.util.DefaultSparkContextFactory.makeContext(SparkContextFactory.scala:34) at spark.jobserver.JobManagerActor.createContextFromConfig(JobManagerActor.scala:255) at spark.jobserver.JobManagerActor$$anonfun$wrappedReceive$1.applyOrElse(JobManagerActor.scala:104) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)...
java.lang.NoClassDefFoundError: io/netty/util/TimerTask Error when running sbt test
I am using Spark-1.1.1. When I used sbt test, I ran into the following exceptions. Any idea how to solve it? Thanks! I think somebody posted this question before, but no one seemed to have answered it. Could it be the version of io.netty I put in my build.sbt? I included an dependency libraryDependencies += io.netty % netty % 3.6.6.Final in my build.sbt file. java.lang.NoClassDefFoundError: io/netty/util/TimerTaskat org.apache.spark.storage.BlockManager.init(BlockManager.scala:72) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:168) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:230)at org.apache.spark.SparkContext.init(SparkContext.scala:204) at spark.jobserver.util.DefaultSparkContextFactory.makeContext(SparkContextFactory.scala:34) at spark.jobserver.JobManagerActor.createContextFromConfig(JobManagerActor.scala:255) at spark.jobserver.JobManagerActor$$anonfun$wrappedReceive$1.applyOrElse(JobManagerActor.scala:104) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)...
including the spark-mllib in build.sbt
Hi, I am trying to build my own scala project using sbt. The project is dependent on both spark-score and spark-mllib. I included the following two dependencies in my build.sbt file libraryDependencies += org.apache.spark %% spark-mllib % 1.1.1 libraryDependencies += org.apache.spark %% spark-core % 1.1.1 However, when I run the package command in sbt, I got an error message indicating that object mllib is not a member of package org.apache.spark. Did I do anything wrong? Thanks, Jianguo
confidence/probability for prediction in MLlib
Hi, A while ago, somebody asked about getting a confidence value of a prediction with MLlib's implementation of Naive Bayes's classification. I was wondering if there is any plan in the near future for the predict function to return both a label and a confidence/probability? Or could the private variables in the various machine learning models be exposed so we could write our own functions which return both? Having a confidence/probability could be very useful in real application. For one thing, you can choose to trust the predicted label only if it has a high confidence level. Also, if you want to combine the results from multiple classifiers, the confidence/probability could be used as some kind of weight for combining. Thanks, Jianguo