Re: [Spark] RDDs are not persisting in memory
Hello team. so I found and resolved the issue. In case if someone run into same problem this was the problem. >>Each nodes were allocated 1397MB memory for storages. 16/10/11 13:16:58 INFO storage.MemoryStore: MemoryStore started with capacity 1397.3 MB >> However, my RDD exceeded the storage limit (although it says computed 1224MB). 16/10/11 13:18:36 WARN storage.MemoryStore: Not enough space to cache rdd_6_0 in memory! (computed 1224.3 MB so far) 16/10/11 13:18:36 INFO storage.MemoryStore: Memory use = 331.8 KB (blocks) + 1224.3 MB (scratch space shared across 2 tasks(s)) = 1224.6 MB. Storage limit = 1397.3 MB. Therefore, I repartitioned the RDDs for better memory utilisation, wich resolved the issue. Kind regards, Guru On 11 October 2016 at 11:23, diplomatic Guru <diplomaticg...@gmail.com> wrote: > @Song, I have called an action but it did not cache as you can see in the > provided screenshot on my original email. It has cahced into Disk but not > memory. > > @Chin Wei Low, I have 15GB memory allocated which is more than the dataset > size. > > Any other suggestion please? > > > Kind regards, > > Guru > > On 11 October 2016 at 03:34, Chin Wei Low <lowchin...@gmail.com> wrote: > >> Hi, >> >> Your RDD is 5GB, perhaps it is too large to fit into executor's storage >> memory. You can refer to the Executors tab in Spark UI to check the >> available memory for storage for each of the executor. >> >> Regards, >> Chin Wei >> >> On Tue, Oct 11, 2016 at 6:14 AM, diplomatic Guru < >> diplomaticg...@gmail.com> wrote: >> >>> Hello team, >>> >>> Spark version: 1.6.0 >>> >>> I'm trying to persist done data into memory for reusing them. However, >>> when I call rdd.cache() OR rdd.persist(StorageLevel.MEMORY_ONLY()) it >>> does not store the data as I can not see any rdd information under WebUI >>> (Storage Tab). >>> >>> Therefore I tried rdd.persist(StorageLevel.MEMORY_AND_DISK()), for >>> which it stored the data into Disk only as shown in below screenshot: >>> >>> [image: Inline images 2] >>> >>> Do you know why the memory is not being used? >>> >>> Is there a configuration in cluster level to stop jobs from storing data >>> into memory altogether? >>> >>> >>> Please let me know. >>> >>> Thanks >>> >>> Guru >>> >>> >> >
Re: [Spark] RDDs are not persisting in memory
@Song, I have called an action but it did not cache as you can see in the provided screenshot on my original email. It has cahced into Disk but not memory. @Chin Wei Low, I have 15GB memory allocated which is more than the dataset size. Any other suggestion please? Kind regards, Guru On 11 October 2016 at 03:34, Chin Wei Low <lowchin...@gmail.com> wrote: > Hi, > > Your RDD is 5GB, perhaps it is too large to fit into executor's storage > memory. You can refer to the Executors tab in Spark UI to check the > available memory for storage for each of the executor. > > Regards, > Chin Wei > > On Tue, Oct 11, 2016 at 6:14 AM, diplomatic Guru <diplomaticg...@gmail.com > > wrote: > >> Hello team, >> >> Spark version: 1.6.0 >> >> I'm trying to persist done data into memory for reusing them. However, >> when I call rdd.cache() OR rdd.persist(StorageLevel.MEMORY_ONLY()) it >> does not store the data as I can not see any rdd information under WebUI >> (Storage Tab). >> >> Therefore I tried rdd.persist(StorageLevel.MEMORY_AND_DISK()), for which >> it stored the data into Disk only as shown in below screenshot: >> >> [image: Inline images 2] >> >> Do you know why the memory is not being used? >> >> Is there a configuration in cluster level to stop jobs from storing data >> into memory altogether? >> >> >> Please let me know. >> >> Thanks >> >> Guru >> >> >
[Spark] RDDs are not persisting in memory
Hello team, Spark version: 1.6.0 I'm trying to persist done data into memory for reusing them. However, when I call rdd.cache() OR rdd.persist(StorageLevel.MEMORY_ONLY()) it does not store the data as I can not see any rdd information under WebUI (Storage Tab). Therefore I tried rdd.persist(StorageLevel.MEMORY_AND_DISK()), for which it stored the data into Disk only as shown in below screenshot: [image: Inline images 2] Do you know why the memory is not being used? Is there a configuration in cluster level to stop jobs from storing data into memory altogether? Please let me know. Thanks Guru
[Spark + MLlib] how to update offline model with the online model
Hello all, I have built a spark batch model using MLlib and a Streaming online model. Now I would like to load the offline model in streaming job and apply and update the model. Could to please advise me how to do it. is there an example to look at. The streaming model does not allow saving or loading a model. The primary function it provides is trainOn and predictOn. Thanks.
Fwd: [Spark + MLlib] How to prevent negative values in Linear regression?
Hello Sean, Absolutely, there is nothing wrong with predicting negative values but for my scenario I do not want to predict any negative value (also, all the data that is fed into the model is positive). Is there any way I could stop predicting negative values. I assume it is not possible but wanted to find out. Thanks. On 21 June 2016 at 13:55, Sean Owen <so...@cloudera.com> wrote: > There's nothing inherently wrong with a regression predicting a > negative value. What is the issue, more specifically? > > On Tue, Jun 21, 2016 at 1:38 PM, diplomatic Guru > <diplomaticg...@gmail.com> wrote: > > Hello all, > > > > I have a job for forecasting using linear regression, but sometimes I'm > > getting a negative prediction. How do I prevent this? > > > > Thanks. > > >
[Spark + MLlib] How to prevent negative values in Linear regression?
Hello all, I have a job for forecasting using linear regression, but sometimes I'm getting a negative prediction. How do I prevent this? Thanks.
StreamingLinearRegression Java example
Hello, I'm trying to find an example of using StreamingLinearRegression in Java, but couldn't find any. There are examples for Scala but not for Java, Has anyone got any example that I can take a look. Thanks.
Could we use Sparkling Water Lib with Spark Streaming
Hello all, I was wondering if it is possible to use H2O with Spark Streaming for online prediction?
H2O + Spark Streaming?
Hello all, I was wondering if it is possible to use H2O with Spark Streaming for online prediction?
Re: [Streaming + MLlib] Is it only Linear regression supported by online learning?
Could someone verify this for me? On 8 March 2016 at 14:06, diplomatic Guru <diplomaticg...@gmail.com> wrote: > Hello all, > > I'm using Random Forest for my machine learning (batch), I would like to > use online prediction using Streaming job. However, the document only > states linear algorithm for regression job. Could we not use other > algorithms? > > >
[Streaming + MLlib] Is it only Linear regression supported by online learning?
Hello all, I'm using Random Forest for my machine learning (batch), I would like to use online prediction using Streaming job. However, the document only states linear algorithm for regression job. Could we not use other algorithms?
Re: [MLlib] How to set Loss to Gradient Boosted Tree in Java
Thank you very much Kevin. On 29 February 2016 at 16:20, Kevin Mellott <kevin.r.mell...@gmail.com> wrote: > I found a helper class that I think should do the trick. Take a look at > https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Losses.scala > > When passing the Loss, you should be able to do something like: > > Losses.fromString("leastSquaresError") > > On Mon, Feb 29, 2016 at 10:03 AM, diplomatic Guru < > diplomaticg...@gmail.com> wrote: > >> It's strange as you are correct the doc does state it. But it's >> complaining about the constructor. >> >> When I clicked on the org.apache.spark.mllib.tree.loss.AbsoluteError >> class, this is what I see: >> >> >> @Since("1.2.0") >> @DeveloperApi >> object AbsoluteError extends Loss { >> >> /** >>* Method to calculate the gradients for the gradient boosting >> calculation for least >>* absolute error calculation. >>* The gradient with respect to F(x) is: sign(F(x) - y) >>* @param prediction Predicted label. >>* @param label True label. >>* @return Loss gradient >>*/ >> @Since("1.2.0") >> override def gradient(prediction: Double, label: Double): Double = { >> if (label - prediction < 0) 1.0 else -1.0 >> } >> >> override private[mllib] def computeError(prediction: Double, label: >> Double): Double = { >> val err = label - prediction >> math.abs(err) >> } >> } >> >> >> On 29 February 2016 at 15:49, Kevin Mellott <kevin.r.mell...@gmail.com> >> wrote: >> >>> Looks like it should be present in 1.3 at >>> org.apache.spark.mllib.tree.loss.AbsoluteError >>> >>> >>> spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/tree/loss/AbsoluteError.html >>> >>> On Mon, Feb 29, 2016 at 9:46 AM, diplomatic Guru < >>> diplomaticg...@gmail.com> wrote: >>> >>>> AbsoluteError() constructor is undefined. >>>> >>>> I'm using Spark 1.3.0, maybe it is not ready for this version? >>>> >>>> >>>> >>>> On 29 February 2016 at 15:38, Kevin Mellott <kevin.r.mell...@gmail.com> >>>> wrote: >>>> >>>>> I believe that you can instantiate an instance of the AbsoluteError >>>>> class for the *Loss* object, since that object implements the Loss >>>>> interface. For example. >>>>> >>>>> val loss = new AbsoluteError() >>>>> boostingStrategy.setLoss(loss) >>>>> >>>>> On Mon, Feb 29, 2016 at 9:33 AM, diplomatic Guru < >>>>> diplomaticg...@gmail.com> wrote: >>>>> >>>>>> Hi Kevin, >>>>>> >>>>>> Yes, I've set the bootingStrategy like that using the example. But >>>>>> I'm not sure how to create and pass the Loss object. >>>>>> >>>>>> e.g >>>>>> >>>>>> boostingStrategy.setLoss(..); >>>>>> >>>>>> Not sure how to pass the selected Loss. >>>>>> >>>>>> How do I set the Absolute Error in setLoss() function? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On 29 February 2016 at 15:26, Kevin Mellott < >>>>>> kevin.r.mell...@gmail.com> wrote: >>>>>> >>>>>>> You can use the constructor that accepts a BoostingStrategy object, >>>>>>> which will allow you to set the tree strategy (and other >>>>>>> hyperparameters as >>>>>>> well). >>>>>>> >>>>>>> *GradientBoostedTrees >>>>>>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/GradientBoostedTrees.html#GradientBoostedTrees(org.apache.spark.mllib.tree.configuration.BoostingStrategy)>* >>>>>>> (BoostingStrategy >>>>>>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/configuration/BoostingStrategy.html> >>>>>>> boostingStrategy) >>>>>>> >>>>>>> On Mon, Feb 29, 2016 at 9:21 AM, diplomatic Guru < >>>>>>> diplomaticg...@gmail.com> wrote: >>>>>>> >>>>>>>> Hello guys, >>>>>>>> >>>>>>>> I think the default Loss algorithm is Squared Error for regression, >>>>>>>> but how do I change that to Absolute Error in Java. >>>>>>>> >>>>>>>> Could you please show me an example? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: [MLlib] How to set Loss to Gradient Boosted Tree in Java
It's strange as you are correct the doc does state it. But it's complaining about the constructor. When I clicked on the org.apache.spark.mllib.tree.loss.AbsoluteError class, this is what I see: @Since("1.2.0") @DeveloperApi object AbsoluteError extends Loss { /** * Method to calculate the gradients for the gradient boosting calculation for least * absolute error calculation. * The gradient with respect to F(x) is: sign(F(x) - y) * @param prediction Predicted label. * @param label True label. * @return Loss gradient */ @Since("1.2.0") override def gradient(prediction: Double, label: Double): Double = { if (label - prediction < 0) 1.0 else -1.0 } override private[mllib] def computeError(prediction: Double, label: Double): Double = { val err = label - prediction math.abs(err) } } On 29 February 2016 at 15:49, Kevin Mellott <kevin.r.mell...@gmail.com> wrote: > Looks like it should be present in 1.3 at > org.apache.spark.mllib.tree.loss.AbsoluteError > > > spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/tree/loss/AbsoluteError.html > > On Mon, Feb 29, 2016 at 9:46 AM, diplomatic Guru <diplomaticg...@gmail.com > > wrote: > >> AbsoluteError() constructor is undefined. >> >> I'm using Spark 1.3.0, maybe it is not ready for this version? >> >> >> >> On 29 February 2016 at 15:38, Kevin Mellott <kevin.r.mell...@gmail.com> >> wrote: >> >>> I believe that you can instantiate an instance of the AbsoluteError >>> class for the *Loss* object, since that object implements the Loss >>> interface. For example. >>> >>> val loss = new AbsoluteError() >>> boostingStrategy.setLoss(loss) >>> >>> On Mon, Feb 29, 2016 at 9:33 AM, diplomatic Guru < >>> diplomaticg...@gmail.com> wrote: >>> >>>> Hi Kevin, >>>> >>>> Yes, I've set the bootingStrategy like that using the example. But I'm >>>> not sure how to create and pass the Loss object. >>>> >>>> e.g >>>> >>>> boostingStrategy.setLoss(..); >>>> >>>> Not sure how to pass the selected Loss. >>>> >>>> How do I set the Absolute Error in setLoss() function? >>>> >>>> >>>> >>>> >>>> On 29 February 2016 at 15:26, Kevin Mellott <kevin.r.mell...@gmail.com> >>>> wrote: >>>> >>>>> You can use the constructor that accepts a BoostingStrategy object, >>>>> which will allow you to set the tree strategy (and other hyperparameters >>>>> as >>>>> well). >>>>> >>>>> *GradientBoostedTrees >>>>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/GradientBoostedTrees.html#GradientBoostedTrees(org.apache.spark.mllib.tree.configuration.BoostingStrategy)>* >>>>> (BoostingStrategy >>>>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/configuration/BoostingStrategy.html> >>>>> boostingStrategy) >>>>> >>>>> On Mon, Feb 29, 2016 at 9:21 AM, diplomatic Guru < >>>>> diplomaticg...@gmail.com> wrote: >>>>> >>>>>> Hello guys, >>>>>> >>>>>> I think the default Loss algorithm is Squared Error for regression, >>>>>> but how do I change that to Absolute Error in Java. >>>>>> >>>>>> Could you please show me an example? >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: [MLlib] How to set Loss to Gradient Boosted Tree in Java
AbsoluteError() constructor is undefined. I'm using Spark 1.3.0, maybe it is not ready for this version? On 29 February 2016 at 15:38, Kevin Mellott <kevin.r.mell...@gmail.com> wrote: > I believe that you can instantiate an instance of the AbsoluteError class > for the *Loss* object, since that object implements the Loss interface. > For example. > > val loss = new AbsoluteError() > boostingStrategy.setLoss(loss) > > On Mon, Feb 29, 2016 at 9:33 AM, diplomatic Guru <diplomaticg...@gmail.com > > wrote: > >> Hi Kevin, >> >> Yes, I've set the bootingStrategy like that using the example. But I'm >> not sure how to create and pass the Loss object. >> >> e.g >> >> boostingStrategy.setLoss(..); >> >> Not sure how to pass the selected Loss. >> >> How do I set the Absolute Error in setLoss() function? >> >> >> >> >> On 29 February 2016 at 15:26, Kevin Mellott <kevin.r.mell...@gmail.com> >> wrote: >> >>> You can use the constructor that accepts a BoostingStrategy object, >>> which will allow you to set the tree strategy (and other hyperparameters as >>> well). >>> >>> *GradientBoostedTrees >>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/GradientBoostedTrees.html#GradientBoostedTrees(org.apache.spark.mllib.tree.configuration.BoostingStrategy)>* >>> (BoostingStrategy >>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/configuration/BoostingStrategy.html> >>> boostingStrategy) >>> >>> On Mon, Feb 29, 2016 at 9:21 AM, diplomatic Guru < >>> diplomaticg...@gmail.com> wrote: >>> >>>> Hello guys, >>>> >>>> I think the default Loss algorithm is Squared Error for regression, but >>>> how do I change that to Absolute Error in Java. >>>> >>>> Could you please show me an example? >>>> >>>> >>>> >>> >> >
[MLlib] How to set Loss to Gradient Boosted Tree in Java
Hello guys, I think the default Loss algorithm is Squared Error for regression, but how do I change that to Absolute Error in Java. Could you please show me an example?
Re: [MLlib] What is the best way to forecast the next month page visit?
Hi Jorge, Thanks for the example. I managed to get the job to run but the results are appalling. The best I could get it: Test Mean Squared Error: 684.3709679595169 Learned regression tree model: DecisionTreeModel regressor of depth 30 with 6905 nodes I tried tweaking maxDepth and maxBins but I couldn't get any better results. Do you know how I could improve the results? On 5 February 2016 at 08:34, Jorge Machado <jom...@me.com> wrote: > Hi, > > For Example an array: > > 3 Categories : Nov,Dec, Jan. > > Nov = 1,0,0 > Dec = 0,1,0 > Jan = 0,0,1 > for the complete Year you would have 12 Categories. Like Jan = > 1,0,0,0,0,0,0,0,0,0,0,0 > Pages: > PageA: 0,0,0,1 > PageB: 0,0,1,0 > PageC:0,1,0,0 > PageD:1,0,0,0 > > If you are using decisionTree I think you do not need to normalize the > other values > > You should have at the end for Januar and PageA something like : > > LabeledPoint (label , (0,0,1,0,0,01,1.0,2.0,3.0)) > > Pass the LabeledPoint to the ML model. > > test it. > > PS: label is what you want to predict. > > On 02/02/2016, at 20:44, diplomatic Guru <diplomaticg...@gmail.com> wrote: > > Hi Jorge, > > Unfortunately, I couldn't transform the data as you suggested. > > This is what I get: > > +---+-+-+ > | id|pageIndex| pageVec| > +---+-+-+ > |0.0| 3.0|(3,[],[])| > |1.0| 0.0|(3,[0],[1.0])| > |2.0| 2.0|(3,[2],[1.0])| > |3.0| 1.0|(3,[1],[1.0])| > +---+-+-+ > > > This is the snippets: > > JavaRDD jrdd = jsc.parallelize(Arrays.asList( > RowFactory.create(0.0, "PageA", 1.0, 2.0, 3.0), > RowFactory.create(1.0, "PageB", 4.0, 5.0, 6.0), > RowFactory.create(2.0, "PageC", 7.0, 8.0, 9.0), > RowFactory.create(3.0, "PageD", 10.0, 11.0, 12.0) > > )); > > StructType schema = new StructType(new StructField[] { > new StructField("id", DataTypes.DoubleType, false, > Metadata.empty()), > new StructField("page", DataTypes.StringType, false, > Metadata.empty()), > new StructField("Nov", DataTypes.DoubleType, false, > Metadata.empty()), > new StructField("Dec", DataTypes.DoubleType, false, > Metadata.empty()), > new StructField("Jan", DataTypes.DoubleType, false, > Metadata.empty()) }); > > DataFrame df = sqlContext.createDataFrame(jrdd, schema); > > StringIndexerModel indexer = new > StringIndexer().setInputCol("page").setInputCol("Nov") > > .setInputCol("Dec").setInputCol("Jan").setOutputCol("pageIndex").fit(df); > > OneHotEncoder encoder = new > OneHotEncoder().setInputCol("pageIndex").setOutputCol("pageVec"); > > DataFrame indexed = indexer.transform(df); > > DataFrame encoded = encoder.transform(indexed); > encoded.select("id", "pageIndex", "pageVec").show(); > > > Could you please let me know what I'm doing wrong? > > > PS: My cluster is running Spark 1.3.0, which doesn't support > StringIndexer, OneHotEncoder but for testing this I've installed the 1.6.0 > on my local machine. > > Cheer. > > > On 2 February 2016 at 10:25, Jorge Machado <jom...@me.com> wrote: > >> Hi Guru, >> >> Any results ? :) >> >> On 01/02/2016, at 14:34, diplomatic Guru <diplomaticg...@gmail.com> >> wrote: >> >> Hi Jorge, >> >> Thank you for the reply and your example. I'll try your suggestion and >> will let you know the outcome. >> >> Cheers >> >> >> On 1 February 2016 at 13:17, Jorge Machado <jom...@me.com> wrote: >> >>> Hi Guru, >>> >>> So First transform your Name pages with OneHotEncoder ( >>> https://spark.apache.org/docs/latest/ml-features.html#onehotencoder) >>> then make the same thing for months: >>> >>> You will end with something like: >>> (first tree are the pagename, the other the month,) >>> (0,0,1,0,0,1) >>> >>> then you have your label that is what you want to predict. At the end >>> you will have an LabeledPoint with (1 -> (0,0,1,0,0,1)) this will >>> represent (1 -> (PageA, UV_NOV)) >>> After that try a regression tree with >>> >>> val model = DecisionTree.trainRegressor(trainingData, >>> categoricalFeaturesInfo, impurity,maxDepth, maxBins) >>> >>> >>> Regards >>> Jorge >>> >>> On 01/02/2016, at 12:29, di
Re: [MLlib] What is the best way to forecast the next month page visit?
Hi Jorge, Unfortunately, I couldn't transform the data as you suggested. This is what I get: +---+-+-+ | id|pageIndex| pageVec| +---+-+-+ |0.0| 3.0|(3,[],[])| |1.0| 0.0|(3,[0],[1.0])| |2.0| 2.0|(3,[2],[1.0])| |3.0| 1.0|(3,[1],[1.0])| +---+-+-+ This is the snippets: JavaRDD jrdd = jsc.parallelize(Arrays.asList( RowFactory.create(0.0, "PageA", 1.0, 2.0, 3.0), RowFactory.create(1.0, "PageB", 4.0, 5.0, 6.0), RowFactory.create(2.0, "PageC", 7.0, 8.0, 9.0), RowFactory.create(3.0, "PageD", 10.0, 11.0, 12.0) )); StructType schema = new StructType(new StructField[] { new StructField("id", DataTypes.DoubleType, false, Metadata.empty()), new StructField("page", DataTypes.StringType, false, Metadata.empty()), new StructField("Nov", DataTypes.DoubleType, false, Metadata.empty()), new StructField("Dec", DataTypes.DoubleType, false, Metadata.empty()), new StructField("Jan", DataTypes.DoubleType, false, Metadata.empty()) }); DataFrame df = sqlContext.createDataFrame(jrdd, schema); StringIndexerModel indexer = new StringIndexer().setInputCol("page").setInputCol("Nov") .setInputCol("Dec").setInputCol("Jan").setOutputCol("pageIndex").fit(df); OneHotEncoder encoder = new OneHotEncoder().setInputCol("pageIndex").setOutputCol("pageVec"); DataFrame indexed = indexer.transform(df); DataFrame encoded = encoder.transform(indexed); encoded.select("id", "pageIndex", "pageVec").show(); Could you please let me know what I'm doing wrong? PS: My cluster is running Spark 1.3.0, which doesn't support StringIndexer, OneHotEncoder but for testing this I've installed the 1.6.0 on my local machine. Cheer. On 2 February 2016 at 10:25, Jorge Machado <jom...@me.com> wrote: > Hi Guru, > > Any results ? :) > > On 01/02/2016, at 14:34, diplomatic Guru <diplomaticg...@gmail.com> wrote: > > Hi Jorge, > > Thank you for the reply and your example. I'll try your suggestion and > will let you know the outcome. > > Cheers > > > On 1 February 2016 at 13:17, Jorge Machado <jom...@me.com> wrote: > >> Hi Guru, >> >> So First transform your Name pages with OneHotEncoder ( >> https://spark.apache.org/docs/latest/ml-features.html#onehotencoder) >> then make the same thing for months: >> >> You will end with something like: >> (first tree are the pagename, the other the month,) >> (0,0,1,0,0,1) >> >> then you have your label that is what you want to predict. At the end you >> will have an LabeledPoint with (1 -> (0,0,1,0,0,1)) this will represent >> (1 -> (PageA, UV_NOV)) >> After that try a regression tree with >> >> val model = DecisionTree.trainRegressor(trainingData, >> categoricalFeaturesInfo, impurity,maxDepth, maxBins) >> >> >> Regards >> Jorge >> >> On 01/02/2016, at 12:29, diplomatic Guru <diplomaticg...@gmail.com> >> wrote: >> >> Any suggestions please? >> >> >> On 29 January 2016 at 22:31, diplomatic Guru <diplomaticg...@gmail.com> >> wrote: >> >>> Hello guys, >>> >>> I'm trying understand how I could predict the next month page views >>> based on the previous access pattern. >>> >>> For example, I've collected statistics on page views: >>> >>> e.g. >>> Page,UniqueView >>> - >>> pageA, 1 >>> pageB, 999 >>> ... >>> pageZ,200 >>> >>> I aggregate the statistics monthly. >>> >>> I've prepared a file containing last 3 months as this: >>> >>> e.g. >>> Page,UV_NOV, UV_DEC, UV_JAN >>> --- >>> pageA, 1,9989,11000 >>> pageB, 999,500,700 >>> ... >>> pageZ,200,50,34 >>> >>> >>> Based on above information, I want to predict the next month (FEB). >>> >>> Which alogrithm do you think will suit most, I think linear regression >>> is the safe bet. However, I'm struggling to prepare this data for LR ML, >>> especially how do I prepare the X,Y relationship. >>> >>> The Y is easy (uniqiue visitors), but not sure about the X(it should be >>> Page,right). However, how do I plot those three months of data. >>> >>> Could you give me an example based on above example data? >>> >>> >>> >>> Page,UV_NOV, UV_DEC, UV_JAN >>> --- >>> 1, 1,9989,11000 >>> 2, 999,500,700 >>> ... >>> 26,200,50,34 >>> >>> >>> >>> >>> >> >> > >
Re: [MLlib] What is the best way to forecast the next month page visit?
Any suggestions please? On 29 January 2016 at 22:31, diplomatic Guru <diplomaticg...@gmail.com> wrote: > Hello guys, > > I'm trying understand how I could predict the next month page views based > on the previous access pattern. > > For example, I've collected statistics on page views: > > e.g. > Page,UniqueView > - > pageA, 1 > pageB, 999 > ... > pageZ,200 > > I aggregate the statistics monthly. > > I've prepared a file containing last 3 months as this: > > e.g. > Page,UV_NOV, UV_DEC, UV_JAN > --- > pageA, 1,9989,11000 > pageB, 999,500,700 > ... > pageZ,200,50,34 > > > Based on above information, I want to predict the next month (FEB). > > Which alogrithm do you think will suit most, I think linear regression is > the safe bet. However, I'm struggling to prepare this data for LR ML, > especially how do I prepare the X,Y relationship. > > The Y is easy (uniqiue visitors), but not sure about the X(it should be > Page,right). However, how do I plot those three months of data. > > Could you give me an example based on above example data? > > > > Page,UV_NOV, UV_DEC, UV_JAN > --- > 1, 1,9989,11000 > 2, 999,500,700 > ... > 26,200,50,34 > > > > >
[MLlib] What is the best way to forecast the next month page visit?
Hello guys, I'm trying understand how I could predict the next month page views based on the previous access pattern. For example, I've collected statistics on page views: e.g. Page,UniqueView - pageA, 1 pageB, 999 ... pageZ,200 I aggregate the statistics monthly. I've prepared a file containing last 3 months as this: e.g. Page,UV_NOV, UV_DEC, UV_JAN --- pageA, 1,9989,11000 pageB, 999,500,700 ... pageZ,200,50,34 Based on above information, I want to predict the next month (FEB). Which alogrithm do you think will suit most, I think linear regression is the safe bet. However, I'm struggling to prepare this data for LR ML, especially how do I prepare the X,Y relationship. The Y is easy (uniqiue visitors), but not sure about the X(it should be Page,right). However, how do I plot those three months of data. Could you give me an example based on above example data? Page,UV_NOV, UV_DEC, UV_JAN --- 1, 1,9989,11000 2, 999,500,700 ... 26,200,50,34
[Spark] Reading avro file in Spark 1.3.0
Hello guys, I've been trying to read avro file using Spark's DataFrame but it's throwing this error: java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext.read()Lorg/apache/spark/sql/DataFrameReader; This is what I've done so far: I've added the dependency to pom.xml: com.databricks spark-avro_2.10 1.0.0 Java code: JavaSparkContext sc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(sc); DataFrame df = sqlContext.read().format("com.databricks.spark.avro").load(args[0]); Could you please let me know what am I doing wrong? Thanks.
[SPARK] Obtaining matrices of an individual Spark job
Hello team, I need to present the Spark job performance to my management. I could get the execution time by measuring the starting and finishing time of the job (includes overhead). However, not sure how to get the other matrices e.g cpu, i/o, memory etc.. I want to measure the individual job, not the whole cluster. Please let me know the best way to do it. if there are any useful resources the please provide links. Thank you.
Obtaining metrics of an individual Spark job
Hello team, I need to present the Spark job performance to my management. I could get the execution time by measuring the starting and finishing time of the job (includes overhead). However, not sure how to get the other matrices e.g cpu, i/o, memory etc.. I want to measure the individual job, not the whole cluster. Please let me know the best way to do it. if there are any useful resources the please provide links. Thank you.
[Spark Streaming] How to clear old data from Stream State?
Hello, I know how I could clear the old state depending on the input value. If some condition matches to determine that the state is old then set the return null, will invalidate the record. But this is only feasible if a new record arrives that matches the old key. What if no new data arrives for the old data, how could I make that invalid. e.g. A key/Value arrives like this Key 12-11-2015:10:00: Value:test,1,2,12-11-2015:10:00 Above key will be updated to state. Every time there is a value for this '12-11-2015:10:00' key, it will be aggregated and updated. If the job is running for 24/7, then this state will be kept forever until we restart the job. But I could have a validation within the updateStateByKey function to check and delete the record if value[3]< SYSTIME-1. But this only effective if a new record arrives that matches the 12-11-2015:10:00 in the later days. What if no new values are received for this key:12-11-2015:10:00. I assume it will remain in the state, am I correct? if so the how do I clear the state? Thank you.
[SPARK STREAMING] multiple hosts and multiple ports for Stream job
Hello team, I was wondering whether it is a good idea to have multiple hosts and multiple ports for a spark job. Let's say that there are two hosts, and each has 2 ports, is this a good idea? If this is not an issue then what is the best way to do it. Currently, we pass it as an argument comma separated.
How to enable debug in Spark Streaming?
I have an issue with a Spark Streaming job that appears to be running but not producing any results. Therefore, I would like to enable the debugging mode to get much logging as possible.
[Spark Streaming] Why are some uncached RDDs are growing?
Hello All, When I checked my running Stream job on WebUI, I can see that some RDDs are being listed that were not requested to be cached. What more is that they are growing! I've not asked them to be cached. What are they? Are they the state (UpdateStateByKey)? Only the rows in white are being requested to be cached. But where are the RDDs that are highlighted in yellow are from?
Re: [Spark Streaming] Connect to Database only once at the start of Streaming job
I know it uses lazy model, which is why I was wondering. On 27 October 2015 at 19:02, Uthayan Suthakarwrote: > Hello all, > > What I wanted to do is configure the spark streaming job to read the > database using JdbcRDD and cache the results. This should occur only once > at the start of the job. It should not make any further connection to DB > afterwards. Is it possible to do that? >
How to check whether the RDD is empty or not
Hello All, I have a Spark Streaming job that should do some action only if the RDD is not empty. This can be done easily with the spark batch RDD as I could .take(1) and check whether it is empty or not. But this cannot been done in Spark Streaming DStrem JavaPairInputDStreaminput = ssc.fileStream(iFolder, LongWritable.class,Text.class, TextInputFormat.class); if(inputLines!=null){ //do some action if it is not empty } Any ideas please?
Re: How to check whether the RDD is empty or not
I tried below code but still carrying out the action even though there is no new data. JavaPairInputDStream<LongWritable, Text> input = ssc.fileStream(iFolder, LongWritable.class,Text.class, TextInputFormat.class); if(input != null){ //do some action if it is not empty } On 21 October 2015 at 18:00, diplomatic Guru <diplomaticg...@gmail.com> wrote: > > Hello All, > > I have a Spark Streaming job that should do some action only if the RDD > is not empty. This can be done easily with the spark batch RDD as I could > .take(1) and check whether it is empty or not. But this cannot been done > in Spark Streaming DStrem > > > JavaPairInputDStream<LongWritable, Text> input = ssc.fileStream(iFolder, > LongWritable.class,Text.class, TextInputFormat.class); > > if(inputLines!=null){ > //do some action if it is not empty > } > > Any ideas please? > > > >
Re: How to check whether the RDD is empty or not
Tathagata, thank you for the response. I have two receivers in my Spark Stream job; 1 reads an endless stream of data from flume and the other reads data from HDFS directory. However, files do not get moved into HDFS frequently (let's say it gets moved every 10 minutes). This is where I need to check of there are any events in the HDFS before doing any action on it. The RDD.isEmpty() is available in JavaRDD and JavaPairRDD but not JavaDStream and JavaPairDStream, but I could use foreach and then check the RDD but it's long winded. On 21 October 2015 at 20:00, Tathagata Das <t...@databricks.com> wrote: > What do you mean by checking when a "DStream is empty"? DStream represents > an endless stream of data, and at point of time checking whether it is > empty or not does not make sense. > > FYI, there is RDD.isEmpty() > > > > On Wed, Oct 21, 2015 at 10:03 AM, diplomatic Guru < > diplomaticg...@gmail.com> wrote: > >> I tried below code but still carrying out the action even though there is no >> new data. >> >> JavaPairInputDStream<LongWritable, Text> input = ssc.fileStream(iFolder, >> LongWritable.class,Text.class, TextInputFormat.class); >> >> if(input != null){ >> //do some action if it is not empty >> } >> >> >> On 21 October 2015 at 18:00, diplomatic Guru <diplomaticg...@gmail.com> >> wrote: >> >>> >>> Hello All, >>> >>> I have a Spark Streaming job that should do some action only if the RDD >>> is not empty. This can be done easily with the spark batch RDD as I could >>> .take(1) and check whether it is empty or not. But this cannot been done >>> in Spark Streaming DStrem >>> >>> >>> JavaPairInputDStream<LongWritable, Text> input = ssc.fileStream(iFolder, >>> LongWritable.class,Text.class, TextInputFormat.class); >>> >>> if(inputLines!=null){ >>> //do some action if it is not empty >>> } >>> >>> Any ideas please? >>> >>> >>> >>> >> >
Re: How to calculate average from multiple values
Hi Robin, You are a star! Thank you for the explanation and example. I converted your code into Java without any hassle. It is working as I expected. I carried out the final calculation (5th/6th) using mapValues and it is working nicely. But I was wondering is there a better way to do it other than using mapValues? Cheers, Raj On 16 September 2015 at 20:13, Robin East <robin.e...@xense.co.uk> wrote: > One way is to use foldByKey which is similar to reduceByKey but you supply > a ‘zero’ value for the start of the computation. The idea is to add an > extra element to the returned string to represent the count of the 5th > element. You can then use the 5th and 6th elements to calculate the mean. > The ‘zero’ value you supply to foldByKey is the all-zeros string > “0,0,0,0,0,0”. > > Below is some example scala code that implements this idea - I’m sure > Spark Java experts on the forum could turn this into the equivalent Java. > > initial.foldByKey("0,0,0,0,0,0")( (a,b) => { > val iFirst = a.split(",")(0).toInt > val iFirstB = b.split(",")(0).toInt > val iFirth = a.split(",")(4).toInt > val iFirthB = b.split(",")(4).toInt > val countA = if(a.split(",").size > 5) a.split(",")(5).toInt else 1 > val countB = if(b.split(",").size > 5) b.split(",")(5).toInt else 1 > s"${iFirst + iFirstB},0,0,0,${iFirth + iFirthB},${countA + countB}" > }).collect > > > This returns a collection of keys and 6 element strings where the 5th > element is the sum of all the fifth entries and the 6th element is the > running count of entries. > > --- > Robin East > *Spark GraphX in Action* Michael Malak and Robin East > Manning Publications Co. > http://www.manning.com/books/spark-graphx-in-action > > > > > > On 16 Sep 2015, at 15:46, diplomatic Guru <diplomaticg...@gmail.com> > wrote: > > have a mapper that emit key/value pairs(composite keys and composite > values separated by comma). > > e.g > > *key:* a,b,c,d *Value:* 1,2,3,4,5 > > *key:* a1,b1,c1,d1 *Value:* 5,4,3,2,1 > > ... > > ... > > *key:* a,b,c,d *Value:* 5,4,3,2,1 > > > I could easily SUM these values using reduceByKey. > > e.g. > > reduceByKey(new Function2<String, String, String>() { > > @Override > public String call(String value1, String value2) { > String oldValue[] = value1.toString().split(","); > String newValue[] = value2.toString().split(","); > > int iFirst = Integer.parseInt(oldValue[0]) + > Integer.parseInt(newValue[0]); > int iSecond = Integer.parseInt(oldValue[1]) + > Integer.parseInt(newValue[1]); > int iThird = Integer.parseInt(oldValue[2]) + > Integer.parseInt(newValue[2]); > int iFourth = Integer.parseInt(oldValue[3]) + > Integer.parseInt(newValue[3]); > int iFifth = Integer.parseInt(oldValue[4]) + > Integer.parseInt(newValue[4]); > > return iFirst + "," + iSecond + "," > + iThird+ "," + iFourth+ "," + iFifth; > > } > }); > > But the problem is how do I find average of just one of these values. Lets > assume I want to SUM iFirst, iSecond, iThird and iFourth but I want to find > Average of iFifth. How do i do it? With a simple key/value pairs I could > use mapValues function but not sure how I could do it with my example. > Please advice. > > >
How to calculate average from multiple values
have a mapper that emit key/value pairs(composite keys and composite values separated by comma). e.g *key:* a,b,c,d *Value:* 1,2,3,4,5 *key:* a1,b1,c1,d1 *Value:* 5,4,3,2,1 ... ... *key:* a,b,c,d *Value:* 5,4,3,2,1 I could easily SUM these values using reduceByKey. e.g. reduceByKey(new Function2() { @Override public String call(String value1, String value2) { String oldValue[] = value1.toString().split(","); String newValue[] = value2.toString().split(","); int iFirst = Integer.parseInt(oldValue[0]) + Integer.parseInt(newValue[0]); int iSecond = Integer.parseInt(oldValue[1]) + Integer.parseInt(newValue[1]); int iThird = Integer.parseInt(oldValue[2]) + Integer.parseInt(newValue[2]); int iFourth = Integer.parseInt(oldValue[3]) + Integer.parseInt(newValue[3]); int iFifth = Integer.parseInt(oldValue[4]) + Integer.parseInt(newValue[4]); return iFirst + "," + iSecond + "," + iThird+ "," + iFourth+ "," + iFifth; } }); But the problem is how do I find average of just one of these values. Lets assume I want to SUM iFirst, iSecond, iThird and iFourth but I want to find Average of iFifth. How do i do it? With a simple key/value pairs I could use mapValues function but not sure how I could do it with my example. Please advice.
Re: Is it this a BUG?: Why Spark Flume Streaming job is not deploying the Receiver to the specified host?
Thank you Tathagata for your response. Yes, I'm using push model on Spark 1.2. For my scenario I do prefer the push model. Is this the case on the later version 1.4 too? I think I can find a workaround for this issue but only if I know how to obtain the worker(executor) ID. I can get the detail of the driver like this: *ss.ssc().env().blockManager().blockManagerId().host()* *But not sure how I could the executor Id from the driver.* *When the job is submitted, I can see that blockmanager being registered with the Driver and Executor IP address:* *15/08/18 23:31:40 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@05151113997207:41630/user/Executor#1210147506] with ID 115/08/18 23:31:40 INFO RackResolver: Resolved 05151113997207 to /0513_R-0050/RJ0515/08/18 23:31:41 INFO BlockManagerMasterActor: Registering block manager 05151113997207:56921 with 530.3 MB RAM, BlockManagerId(1, 05151113997207, 56921)The BlockManagerMasterActor appears to be doing the registering. Is there anyway I can access this from the SparkContext?Thanks.* On 18 August 2015 at 22:40, Tathagata Das t...@databricks.com wrote: Are you using the Flume polling stream or the older stream? Such problems of binding used to occur in the older push-based approach, hence we built the polling stream (pull-based). On Tue, Aug 18, 2015 at 4:45 AM, diplomatic Guru diplomaticg...@gmail.com wrote: I'm testing the Flume + Spark integration example (flume count). I'm deploying the job using yarn cluster mode. I first logged into the Yarn cluster, then submitted the job and passed in a specific worker node's IP to deploy the job. But when I checked the WebUI, it failed to bind to the specified IP because the receiver was deployed to a different host, not the one I asked it to. Do you know? For your information, I've also tried passing the IP address used by the resource manager to find resources but no joy. But when I set the host to 'localhost' and deploy to the cluster it is binding a worker node that is selected by the resource manager.
Re: Performance issue with Spak's foreachpartition method
Bagavath, Sometimes we need to merge existing records, due to recomputations of the whole data. I don't think we could achieve this with pure insert, or is there a way? On 24 July 2015 at 08:53, Bagavath bagav...@gmail.com wrote: Try using insert instead of merge. Typically we use insert append to do bulk inserts to oracle. On Thu, Jul 23, 2015 at 1:12 AM, diplomatic Guru diplomaticg...@gmail.com wrote: Thanks Robin for your reply. I'm pretty sure that writing to Oracle is taking longer as when writing to HDFS is only taking ~5minutes. The job is writing about ~5 Million of records. I've set the job to call executeBatch() when the batchSize reaches 200,000 of records, so I assume that commit will be invoked at every 200K batch. In this case, it should only call commit 25 times, is this too much? I wouldn't want to increase the batch size any further as it may cause Java heap issue. I do not have much knowledge in Oracle side, so any advice with the configuration will be grateful. Thanks, Raj On 22 July 2015 at 20:20, Robin East robin.e...@xense.co.uk wrote: The first question I would ask is have you determined whether you have a performance issue writing to Oracle? In particular how many commits are you making? If you are issuing a lot of commits that would be a performance problem. Robin On 22 Jul 2015, at 19:11, diplomatic Guru diplomaticg...@gmail.com wrote: Hello all, We are having a major performance issue with the Spark, which is holding us from going live. We have a job that carries out computation on log files and write the results into Oracle DB. The reducer 'reduceByKey' have been set to parallelize by 4 as we don't want to establish too many DB connections. We are then calling the foreachPartition on the RDD pairs that were reduced by the key. Within this foreachPartition method we establish DB connection, then iterate the results, prepare the Oracle statement for batch insertion then we commit the batch and close the connection. All these are working fine. However, when we execute the job to process 12GB of data, it takes forever to complete, especially at the foreachPartition stage. We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3 is assigned to spark.storage.memoryFraction. The job is taking about 50 minutes to complete, which is not ideal. I'm not sure how we could enhance the performance. I've provided the main body of the codes, please take a look and advice: From Driver: reduceResultsRDD.foreachPartition(new DB.InsertFunction( dbuser,dbpass,batchsize)); DB class: public class DB { private static final Logger logger = LoggerFactory .getLogger(DB.class); public static class InsertFunction implements VoidFunctionIteratorTuple2String, String { private static final long serialVersionUID = 55766876878L; private String dbuser = ; private String dbpass = ; private int batchsize; public InsertFunction(String dbuser, String dbpass, int batchsize) { super(); this.dbuser = dbuser; this.dbuser = dbuser; this.batchsize=batchsize; } @Override public void call(IteratorTuple2String, String results) { Connection connect = null; PreparedStatement pstmt = null; try { connect = getDBConnection(dbuser, dbpass); int count = 0; if (batchsize = 0) { batchsize = 1; } pstmt1 = connect .prepareStatement(MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT); while (results.hasNext()) { Tuple2String, String kv = results.next(); String [] data = kv._1.concat(, +kv._2).split(,); pstmt.setString(1, data[0].toString()); pstmt.setString(2, data[1].toString()); . pstmt.addBatch(); count++; if (count == batchsize) { logger.info(BulkCount : + count); pstmt.executeBatch(); connect.commit(); count = 0; } pstmt.executeBatch(); connect.commit(); } pstmt.executeBatch(); connect.commit(); } catch (Exception e) { logger.error(InsertFunction error: + e.getMessage()); } finally { if (pstmt != null) { pstmt.close(); } try { connect.close(); } catch (SQLException e) { logger.error(InsertFunction Connection Close error: + e.getMessage()); } } } } }
Performance issue with Spak's foreachpartition method
Hello all, We are having a major performance issue with the Spark, which is holding us from going live. We have a job that carries out computation on log files and write the results into Oracle DB. The reducer 'reduceByKey' have been set to parallelize by 4 as we don't want to establish too many DB connections. We are then calling the foreachPartition on the RDD pairs that were reduced by the key. Within this foreachPartition method we establish DB connection, then iterate the results, prepare the Oracle statement for batch insertion then we commit the batch and close the connection. All these are working fine. However, when we execute the job to process 12GB of data, it takes forever to complete, especially at the foreachPartition stage. We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3 is assigned to spark.storage.memoryFraction. The job is taking about 50 minutes to complete, which is not ideal. I'm not sure how we could enhance the performance. I've provided the main body of the codes, please take a look and advice: From Driver: reduceResultsRDD.foreachPartition(new DB.InsertFunction( dbuser,dbpass,batchsize)); DB class: public class DB { private static final Logger logger = LoggerFactory .getLogger(DB.class); public static class InsertFunction implements VoidFunctionIteratorTuple2String, String { private static final long serialVersionUID = 55766876878L; private String dbuser = ; private String dbpass = ; private int batchsize; public InsertFunction(String dbuser, String dbpass, int batchsize) { super(); this.dbuser = dbuser; this.dbuser = dbuser; this.batchsize=batchsize; } @Override public void call(IteratorTuple2String, String results) { Connection connect = null; PreparedStatement pstmt = null; try { connect = getDBConnection(dbuser, dbpass); int count = 0; if (batchsize = 0) { batchsize = 1; } pstmt1 = connect .prepareStatement(MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT); while (results.hasNext()) { Tuple2String, String kv = results.next(); String [] data = kv._1.concat(, +kv._2).split(,); pstmt.setString(1, data[0].toString()); pstmt.setString(2, data[1].toString()); . pstmt.addBatch(); count++; if (count == batchsize) { logger.info(BulkCount : + count); pstmt.executeBatch(); connect.commit(); count = 0; } pstmt.executeBatch(); connect.commit(); } pstmt.executeBatch(); connect.commit(); } catch (Exception e) { logger.error(InsertFunction error: + e.getMessage()); } finally { if (pstmt != null) { pstmt.close(); } try { connect.close(); } catch (SQLException e) { logger.error(InsertFunction Connection Close error: + e.getMessage()); } } } } }
Re: Performance issue with Spak's foreachpartition method
Thanks Robin for your reply. I'm pretty sure that writing to Oracle is taking longer as when writing to HDFS is only taking ~5minutes. The job is writing about ~5 Million of records. I've set the job to call executeBatch() when the batchSize reaches 200,000 of records, so I assume that commit will be invoked at every 200K batch. In this case, it should only call commit 25 times, is this too much? I wouldn't want to increase the batch size any further as it may cause Java heap issue. I do not have much knowledge in Oracle side, so any advice with the configuration will be grateful. Thanks, Raj On 22 July 2015 at 20:20, Robin East robin.e...@xense.co.uk wrote: The first question I would ask is have you determined whether you have a performance issue writing to Oracle? In particular how many commits are you making? If you are issuing a lot of commits that would be a performance problem. Robin On 22 Jul 2015, at 19:11, diplomatic Guru diplomaticg...@gmail.com wrote: Hello all, We are having a major performance issue with the Spark, which is holding us from going live. We have a job that carries out computation on log files and write the results into Oracle DB. The reducer 'reduceByKey' have been set to parallelize by 4 as we don't want to establish too many DB connections. We are then calling the foreachPartition on the RDD pairs that were reduced by the key. Within this foreachPartition method we establish DB connection, then iterate the results, prepare the Oracle statement for batch insertion then we commit the batch and close the connection. All these are working fine. However, when we execute the job to process 12GB of data, it takes forever to complete, especially at the foreachPartition stage. We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3 is assigned to spark.storage.memoryFraction. The job is taking about 50 minutes to complete, which is not ideal. I'm not sure how we could enhance the performance. I've provided the main body of the codes, please take a look and advice: From Driver: reduceResultsRDD.foreachPartition(new DB.InsertFunction( dbuser,dbpass,batchsize)); DB class: public class DB { private static final Logger logger = LoggerFactory .getLogger(DB.class); public static class InsertFunction implements VoidFunctionIteratorTuple2String, String { private static final long serialVersionUID = 55766876878L; private String dbuser = ; private String dbpass = ; private int batchsize; public InsertFunction(String dbuser, String dbpass, int batchsize) { super(); this.dbuser = dbuser; this.dbuser = dbuser; this.batchsize=batchsize; } @Override public void call(IteratorTuple2String, String results) { Connection connect = null; PreparedStatement pstmt = null; try { connect = getDBConnection(dbuser, dbpass); int count = 0; if (batchsize = 0) { batchsize = 1; } pstmt1 = connect .prepareStatement(MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT); while (results.hasNext()) { Tuple2String, String kv = results.next(); String [] data = kv._1.concat(, +kv._2).split(,); pstmt.setString(1, data[0].toString()); pstmt.setString(2, data[1].toString()); . pstmt.addBatch(); count++; if (count == batchsize) { logger.info(BulkCount : + count); pstmt.executeBatch(); connect.commit(); count = 0; } pstmt.executeBatch(); connect.commit(); } pstmt.executeBatch(); connect.commit(); } catch (Exception e) { logger.error(InsertFunction error: + e.getMessage()); } finally { if (pstmt != null) { pstmt.close(); } try { connect.close(); } catch (SQLException e) { logger.error(InsertFunction Connection Close error: + e.getMessage()); } } } } }
Re: query on Spark + Flume integration using push model
Hi Akhil, thank you for your reply. Does that mean that original Spark Streaming only support Avro? If that the case then why only Avro? Is there a particular reason? The project linked is for Scala but I'm using Java. Is there another project? On 10 July 2015 at 08:46, Akhil Das ak...@sigmoidanalytics.com wrote: Here's an example https://github.com/przemek1990/spark-streaming Thanks Best Regards On Thu, Jul 9, 2015 at 4:35 PM, diplomatic Guru diplomaticg...@gmail.com wrote: Hello all, I'm trying to configure the flume to push data into a sink so that my stream job could pick up the data. My events are in JSON format, but the Spark + Flume integration [1] document only refer to Avro sink. [1] https://spark.apache.org/docs/latest/streaming-flume-integration.html I looked at some of the examples online, and they all refer to avro type: agent.sinks.avroSink.type = avro If I set the type to avro and send the data in JSON, will it work? I'm unable to try this because the Stream job throwing Avro 'org.apache.flume.source.avro.AvroFlumeEvent' exception. Please advice how to handle this situation. many thanks
query on Spark + Flume integration using push model
Hello all, I'm trying to configure the flume to push data into a sink so that my stream job could pick up the data. My events are in JSON format, but the Spark + Flume integration [1] document only refer to Avro sink. [1] https://spark.apache.org/docs/latest/streaming-flume-integration.html I looked at some of the examples online, and they all refer to avro type: agent.sinks.avroSink.type = avro If I set the type to avro and send the data in JSON, will it work? I'm unable to try this because the Stream job throwing Avro 'org.apache.flume.source.avro.AvroFlumeEvent' exception. Please advice how to handle this situation. many thanks
Spark performance issue
Hello guys, I'm after some advice on Spark performance. I've a MapReduce job that read inputs carry out a simple calculation and write the results into HDFS. I've implemented the same logic in Spark job. When I tried both jobs on same datasets, I'm getting different execution time, which is expected. BUT .. In my example, MapReduce job is performing much better than Spark. The difference is that I'm not changing much with the MR job configuration, e.g., memory, cores, etc...But this is not the case with Spark as it's very flexible. So I'm sure my configuration isn't correct which is why MR is outperforming Spark but need your advice. For example: Test 1: 4.5GB data - MR job took ~55 seconds to compute, but Spark took ~3 minutes and 20 seconds. Test 2: 25GB data -MR took 2 minutes and 15 seconds, whereas Spark job is still running, and it's already been 15 minutes. I have a cluster of 15 nodes. The maximum memory that I could allocate to each executor is 6GB. Therefore, for Test 1, this is the config I used: --executor-memory 6G --num-executors 4 --driver-memory 6G --executor-cores 2 (also I set spark.storage.memoryFraction to 0.3) For Test 2: --executor-memory 6G --num-executors 10 --driver-memory 6G --executor-cores 2 (also I set spark.storage.memoryFraction to 0.3) I tried all possible combination but couldn't get better performance. Any suggestions will be much appreciated.
load Java properties file in Spark
I want to store the Spark application arguments such as input file, output file into a Java property files and pass that file into Spark Driver. I'm using spark-submit for submitting the job but couldn't find a parameter to pass the properties file. Have you got any suggestions?
Spark job throwing “java.lang.OutOfMemoryError: GC overhead limit exceeded”
Hello All, I have a Spark job that throws java.lang.OutOfMemoryError: GC overhead limit exceeded. The job is trying to process a filesize 4.5G. I've tried following spark configuration: --num-executors 6 --executor-memory 6G --executor-cores 6 --driver-memory 3G I tried increasing more cores and executors which sometime works, but takes over 20 minutes to process the file. Could I do something to improve the performance? or stop the Java Heap issue? Thank you. Best regards, Raj.
Could Spark batch processing live within Spark Streaming?
Hello all, I was wondering if it is possible to have a high latency batch processing job coexists within Spark Streaming job? If it's possible then could we share the state of the batch job with the Spark Streaming job? For example when the long-running batch computation is complete, could we inform that Spark streaming that batch job is complete? Kind regards, Raj
Re: How do I access the SPARK SQL
Many thanks for your prompt reply. I'll try your suggestions and will get back to you. On 24 April 2014 18:17, Michael Armbrust mich...@databricks.com wrote: Oh, and you'll also need to add a dependency on spark-sql_2.10. On Thu, Apr 24, 2014 at 10:13 AM, Michael Armbrust mich...@databricks.com wrote: Yeah, you'll need to run `sbt publish-local` to push the jars to your local maven repository (~/.m2) and then depend on version 1.0.0-SNAPSHOT. On Thu, Apr 24, 2014 at 9:58 AM, diplomatic Guru diplomaticg...@gmail.com wrote: It's a simple application based on the People example. I'm using Maven for building and below is the pom.xml. Perhaps, I need to change the version? project groupIdUthay.Test.App/groupId artifactIdtest-app/artifactId modelVersion4.0.0/modelVersion nameTestApp/name packagingjar/packaging version1.0/version repositories repository idAkka repository/id urlhttp://repo.akka.io/releases/url /repository /repositories dependencies dependency !-- Spark dependency -- groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version0.9.1/version /dependency /dependencies /project On 24 April 2014 17:47, Michael Armbrust mich...@databricks.com wrote: You shouldn't need to set SPARK_HIVE=true unless you want to use the JavaHiveContext. You should be able to access org.apache.spark.sql.api.java.JavaSQLContext with the default build. How are you building your application? Michael On Thu, Apr 24, 2014 at 9:17 AM, Andrew Or and...@databricks.comwrote: Did you build it with SPARK_HIVE=true? On Thu, Apr 24, 2014 at 7:00 AM, diplomatic Guru diplomaticg...@gmail.com wrote: Hi Matei, I checked out the git repository and built it. However, I'm still getting below error. It couldn't find those SQL packages. Please advice. package org.apache.spark.sql.api.java does not exist [ERROR] /home/VirtualBoxImages.com/Documents/projects/errCount/src/main/java/errorCount/TransDriverSQL.java:[49,8] cannot find symbol [ERROR] symbol : class JavaSchemaRDD Kind regards, Raj. On 23 April 2014 22:09, Matei Zaharia matei.zaha...@gmail.comwrote: It’s currently in the master branch, on https://github.com/apache/spark. You can check that out from git, build it with sbt/sbt assembly, and then try it out. We’re also going to post some release candidates soon that will be pre-built. Matei On Apr 23, 2014, at 1:30 PM, diplomatic Guru diplomaticg...@gmail.com wrote: Hello Team, I'm new to SPARK and just came across SPARK SQL, which appears to be interesting but not sure how I could get it. I know it's an Alpha version but not sure if its available for community yet. Many thanks. Raj.
Re: How do I access the SPARK SQL
It worked!! Many thanks for your brilliant support. On 24 April 2014 18:20, diplomatic Guru diplomaticg...@gmail.com wrote: Many thanks for your prompt reply. I'll try your suggestions and will get back to you. On 24 April 2014 18:17, Michael Armbrust mich...@databricks.com wrote: Oh, and you'll also need to add a dependency on spark-sql_2.10. On Thu, Apr 24, 2014 at 10:13 AM, Michael Armbrust mich...@databricks.com wrote: Yeah, you'll need to run `sbt publish-local` to push the jars to your local maven repository (~/.m2) and then depend on version 1.0.0-SNAPSHOT. On Thu, Apr 24, 2014 at 9:58 AM, diplomatic Guru diplomaticg...@gmail.com wrote: It's a simple application based on the People example. I'm using Maven for building and below is the pom.xml. Perhaps, I need to change the version? project groupIdUthay.Test.App/groupId artifactIdtest-app/artifactId modelVersion4.0.0/modelVersion nameTestApp/name packagingjar/packaging version1.0/version repositories repository idAkka repository/id urlhttp://repo.akka.io/releases/url /repository /repositories dependencies dependency !-- Spark dependency -- groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version0.9.1/version /dependency /dependencies /project On 24 April 2014 17:47, Michael Armbrust mich...@databricks.comwrote: You shouldn't need to set SPARK_HIVE=true unless you want to use the JavaHiveContext. You should be able to access org.apache.spark.sql.api.java.JavaSQLContext with the default build. How are you building your application? Michael On Thu, Apr 24, 2014 at 9:17 AM, Andrew Or and...@databricks.comwrote: Did you build it with SPARK_HIVE=true? On Thu, Apr 24, 2014 at 7:00 AM, diplomatic Guru diplomaticg...@gmail.com wrote: Hi Matei, I checked out the git repository and built it. However, I'm still getting below error. It couldn't find those SQL packages. Please advice. package org.apache.spark.sql.api.java does not exist [ERROR] /home/VirtualBoxImages.com/Documents/projects/errCount/src/main/java/errorCount/TransDriverSQL.java:[49,8] cannot find symbol [ERROR] symbol : class JavaSchemaRDD Kind regards, Raj. On 23 April 2014 22:09, Matei Zaharia matei.zaha...@gmail.comwrote: It’s currently in the master branch, on https://github.com/apache/spark. You can check that out from git, build it with sbt/sbt assembly, and then try it out. We’re also going to post some release candidates soon that will be pre-built. Matei On Apr 23, 2014, at 1:30 PM, diplomatic Guru diplomaticg...@gmail.com wrote: Hello Team, I'm new to SPARK and just came across SPARK SQL, which appears to be interesting but not sure how I could get it. I know it's an Alpha version but not sure if its available for community yet. Many thanks. Raj.
How do I access the SPARK SQL
Hello Team, I'm new to SPARK and just came across SPARK SQL, which appears to be interesting but not sure how I could get it. I know it's an Alpha version but not sure if its available for community yet. Many thanks. Raj.