Re: [SparkSQL] too many open files although ulimit set to 1048576

2017-03-13 Thread darin
I think your sets not works
try add `ulimit -n 10240 ` in spark-env.sh




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-too-many-open-files-although-ulimit-set-to-1048576-tp28490p28491.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DataFrameWriter - Where to find list of Options applicable to particular format(datasource)

2017-03-13 Thread Hyukjin Kwon
Hi, all the options are documented in
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

It seems we don't have both options for writing. If the goal is trimming
the whitespaces, I think we could do this within dataframe operations (as
we talked in the JIRA - https://issues.apache.org/jira/browse/SPARK-18579).



2017-03-14 9:20 GMT+09:00 Nirav Patel :

> Hi,
>
> Is there a document for each datasource (csv, tsv, parquet, json, avro)
> with available options ?  I need to find one for csv to
> "ignoreLeadingWhiteSpace" and "ignoreTrailingWhiteSpace"
>
> Thanks
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 


Re: Monitoring ongoing Spark Job when run in Yarn Cluster mode

2017-03-13 Thread Marcelo Vanzin
It's linked from the YARN RM's Web UI (see the "Application Master"
link for the running application).

On Mon, Mar 13, 2017 at 6:53 AM, Sourav Mazumder
 wrote:
> Hi,
>
> Is there a way to monitor an ongoing Spark Job when running in Yarn Cluster
> mode ?
>
> In  my understanding in Yarn Cluster mode Spark Monitoring UI for the
> ongoing job would not be available in 4040 port. So is there an alternative
> ?
>
> Regards,
> Sourav



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Monitoring ongoing Spark Job when run in Yarn Cluster mode

2017-03-13 Thread Nirav Patel
I think it would be on port 4040 by default on the Node where driver is
running. You should be able to navigate to that via Resource Manager's
application master link as in cluster mode both AM and driver runs on same
node.


On Mon, Mar 13, 2017 at 6:53 AM, Sourav Mazumder <
sourav.mazumde...@gmail.com> wrote:

> Hi,
>
> Is there a way to monitor an ongoing Spark Job when running in Yarn
> Cluster mode ?
>
> In  my understanding in Yarn Cluster mode Spark Monitoring UI for the
> ongoing job would not be available in 4040 port. So is there an alternative
> ?
>
> Regards,
> Sourav
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



DataFrameWriter - Where to find list of Options applicable to particular format(datasource)

2017-03-13 Thread Nirav Patel
Hi,

Is there a document for each datasource (csv, tsv, parquet, json, avro)
with available options ?  I need to find one for csv to
"ignoreLeadingWhiteSpace" and "ignoreTrailingWhiteSpace"

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Online learning of LDA model in Spark (update an existing model)

2017-03-13 Thread matd
Hi folks,

I would like to train an LDA model in an online fashion, ie. be able to
update the resulting model with new documents as they are available.

I understand that, under the hood, an online algo is implemented in
OnlineLDAOptimizer, but don't understand from the API how I can update an
existing model with a new batch of docs.

is it possible ? Any hint or code sample will be appreciated.

Thanks
Mat



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Online-learning-of-LDA-model-in-Spark-update-an-existing-model-tp28489.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Java Examples @ Spark github

2017-03-13 Thread Mina Aslani
Hi,

When I go github and check the java examples @
https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples,
they do not look like to be updated with the latest spark (e.g. spark 2.11).

Do you know by any chance where I can find the java examples for spark
2.11/book/github/etc?

Regards,
Mina


Java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem

2017-03-13 Thread Mina Aslani
Hi,

I get IllegalArgumentException: requirement failed: Can only call
getServletHandlers on a running MetricsSystem on the specified below line:

String master = "spark://:7077";

SparkConf sparkConf = new
SparkConf()
.setMaster(master)

.setAppName(this.getClass().getSimpleName());
==>ERROR line   JavaStreamingContext ssc = new
JavaStreamingContext(sparkConf, Durations.milliseconds(200));

Do I need to consider a specific setting? Please note that my spark is a
docker in a VM so it's not running locally. I tried using master="local[1]"
same problem.

Any idea?

Regards,
Mina


Re: Structured Streaming - Can I start using it?

2017-03-13 Thread Michael Armbrust
I think its very very unlikely that it will get withdrawn.  The primary
reason that the APIs are still marked experimental is that we like to have
several releases before committing to interface stability (in particular
the interfaces to write custom sources and sinks are likely to evolve).
Also, there are currently quite a few limitations in the types of queries
that we can run (i.e. multiple aggregations are disallowed, we don't
support stream-stream joins yet).  In these cases though, we explicitly say
its not supported when you try to start your stream.

For the use cases that are supported in 2.1 though (streaming ETL, event
time aggregation, etc) I'll say that we have been using it in production
for several months and we have customers doing the same.

On Mon, Mar 13, 2017 at 11:21 AM, Gaurav1809 
wrote:

> I read in spark documentation that Structured Streaming is still ALPHA in
> Spark 2.1 and the APIs are still experimental. Shall I use it to re write
> my
> existing spark streaming code? Looks like it is not yet production ready.
> What happens if Structured Streaming project gets withdrawn?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Structured-Streaming-Can-I-
> start-using-it-tp28488.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Structured Streaming - Can I start using it?

2017-03-13 Thread Gaurav1809
I read in spark documentation that Structured Streaming is still ALPHA in
Spark 2.1 and the APIs are still experimental. Shall I use it to re write my
existing spark streaming code? Looks like it is not yet production ready.
What happens if Structured Streaming project gets withdrawn?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Structured-Streaming-Can-I-start-using-it-tp28488.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Monitoring ongoing Spark Job when run in Yarn Cluster mode

2017-03-13 Thread Sourav Mazumder
Hi,

Is there a way to monitor an ongoing Spark Job when running in Yarn Cluster
mode ?

In  my understanding in Yarn Cluster mode Spark Monitoring UI for the
ongoing job would not be available in 4040 port. So is there an alternative
?

Regards,
Sourav


Re: Differences between scikit-learn and Spark.ml for regression toy problem

2017-03-13 Thread Dhanesh Padmanabhan
Also looks like you need to scale down the regularization for Linear
Regression by 1/2n since the loss function is scaled by 1/2n (refer the API
documentation for Linear Regression). I was able to get close enough
results after this modification.

--spark-ml code--

val linearModel = new LinearRegression().
  setRegParam(0.015).
  setLabelCol("label").
  setFeaturesCol("features").
  setTol(1e-12).
  setMaxIter(100).
  //setFitIntercept(false).
  //setStandardization(false).
  fit(sparkTrainingData)

println(s"Spark linear model coefficients: ${linearModel.coefficients}
Intercept: ${linearModel.intercept}")
// Spark linear model coefficients:
[0.21394341729353747,0.09257340293212045] Intercept: 0.5

--sklearn code--
# Linear regression is called Ridge in sklearn
e = Ridge(
fit_intercept=True,
alpha=l,
max_iter=100,
tol=1e-11)

e.fit(Xsc, y)

print e.coef_,e.intercept_
# =>[ 0.21310109 0.09203616] 0.5


Dhanesh
+91-9741125245

On Mon, Mar 13, 2017 at 8:07 PM, Dhanesh Padmanabhan  wrote:

> [Edit] I got few details wrong in my eagerness to reply:
> 1. Spark uses the corrected standard deviation with sqrt(n-1), and scikit
> uses the one with sqrt(n).
> 2. You should scale down the regularization by sum of weights, in case you
> have a column of weights. When there are no weights, it is equivalent to
> sum of instances.
>
> Dhanesh
> +91-9741125245 <+91%2097411%2025245>
>
> On Mon, Mar 13, 2017 at 5:31 PM, Dhanesh Padmanabhan <
> dhanesh12...@gmail.com> wrote:
>
>> Hi Frank
>>
>> Thanks for this question. I have been comparing logistic regression in
>> sklearn with spark mllib as well. Your example code gave me a perfect way
>> to compare what is going on in both the packages.
>>
>> I looked at both the source codes. There are quite a few differences in
>> how the model fitting is done. I have a solution for the logistic
>> regression problem. I do not have a solution for the linear regression
>> problem yet.
>>
>> Here are the key differences:
>> 1. In spark, Regularization for L2 is divided by feature standard
>> deviation. In sklearn, it is not.
>> 2. In spark, X's are standardized. This changes the solution because of
>> regularization. In sklearn, no standardization is done.
>> 3. In Spark, Average log loss is used for training. The log loss is
>> averaged by sum of weights, which is the number of training instances.
>> Sklearn uses sum of log loss instead. So the spark regularization is very
>> heavy. You should scale down the regularization parameter by the number of
>> instances.
>>
>>
>> So, if you do the following, you should be able to match the outputs of
>> logistic regression:
>> 1. Standardize the spark and pandas dataframes in a similar fashion.
>> Note: The standardization in spark works a little differently for ensuring
>> unit variance - spark uses sqrt(n) as denominator, and sklearn's
>> standardscaler uses sqrt(n-1) (unbiased estimator when mean is not known)
>> 2. Scale down the regularization in spark by number of instances. Use
>> 0.03 in your example instead of 0.3, given you have 10 training instances.
>>
>> Hope this helps
>> -Dhanesh
>>
>> Spark ml code (I changed it to work with Spark 2.1):
>> 
>>
>> import org.apache.spark.{SparkConf, SparkContext}
>> import org.apache.spark.ml.classification.LogisticRegression
>> import org.apache.spark.ml.regression.LinearRegression
>> import org.apache.spark.ml.linalg.Vectors
>> import org.apache.spark.sql.SQLContext
>> import org.apache.spark.ml.feature.StandardScaler
>>
>> val sparkTrainingData_orig = new SQLContext(sc).
>>   createDataFrame(Seq(
>> (0.0, Vectors.dense(Array(-0.7306653538519616, 0.0))),
>> (1.0, Vectors.dense(Array(0.6750417712898752, -0.4232874171873786))),
>> (1.0, Vectors.dense(Array(0.1863463229359709, -0.8163423997075965))),
>> (0.0, Vectors.dense(Array(-0.6719842051493347, 0.0))),
>> (1.0, Vectors.dense(Array(0.9699938346531928, 0.0))),
>> (1.0, Vectors.dense(Array(0.22759406190283604, 0.0))),
>> (1.0, Vectors.dense(Array(0.9688721028330911, 0.0))),
>> (0.0, Vectors.dense(Array(0.5993795346650845, 0.0))),
>> (0.0, Vectors.dense(Array(0.9219423508390701, -0.8972778242305388))),
>> (0.0, Vectors.dense(Array(0.7006904841584055,
>> -0.5607635619919824).
>>   toDF("label", "features_orig")
>>
>> val sparkTrainingData=new StandardScaler().setWithMean(t
>> rue).setInputCol("features_orig").setOutputCol("features").
>> fit(sparkTrainingData_orig).transform(sparkTrainingData_orig)
>>
>> val logisticModel = new LogisticRegression().
>>   setRegParam(0.03).
>>   setLabelCol("label").
>>   setFeaturesCol("features").
>>   setTol(1e-12).
>>   setMaxIter(100).
>>   fit(sparkTrainingData)
>>
>> println(s"Spark logistic model coefficients:
>> ${logisticModel.coefficients} Intercept: ${logisticModel.intercept}")
>> // Spark logistic model coefficients: 
>> 

Re: Sorted partition ranges without overlap

2017-03-13 Thread Yong Zhang
You can implement your own partitioner based on your own logic.


Yong



From: Kristoffer Sjögren 
Sent: Monday, March 13, 2017 9:34 AM
To: user
Subject: Sorted partition ranges without overlap

Hi

I have a RDD that needs to be sorted lexicographically and
then processed by partition. The partitions should be split in to
ranged blocks where sorted order is maintained and each partition
containing sequential, non-overlapping keys.

Given keys (1,2,3,4,5,6)

1. Correct
  - 2 partition = (1,2,3),(4,5,6).
  - 3 partition = (1,2),(3,4),(5,6)

2. Incorrect, the ranges overlap even though they're sorted.
  - 2 partitions (1,3,5) (2,4,6)
  - 3 partitions (1,3),(2,5),(4,6)


Is this possible with spark?

Cheers,
-Kristoffer

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: keep or remove sc.stop() coz of RpcEnv already stopped error

2017-03-13 Thread Alex
Hi ,
I am using spark-1.6 how to ignore this warning because of this Illegal
state exception my production jobs which are scheduld are showing completed
abnormally... I cant even handle exception as after sc.stop if i try to
execute any code again this exception will come from catch block.. so i
removed sc.stop completely and let it exit on its own is it okay?

On Mon, Mar 13, 2017 at 7:01 PM, Yong Zhang  wrote:

> What version of Spark you are using?
>
>
> Based on Spark-12967, it is fixed on Spark 2.0 and later. If you are using
> Spark 1.x, you can ignore this Warning. It shouldn't affect any functions.
>
>
> Yong
>
> --
> *From:* nancy henry 
> *Sent:* Monday, March 13, 2017 7:08 AM
> *To:* user @spark
> *Subject:* keep or remove sc.stop() coz of RpcEnv already stopped error
>
>
>  Hi Team,
>
>
> getting this error if we put sc.stop() in application..
>
>
>
> can we remove it from application but i read if yu dont explicitly stop
> using sc.stop the yarn application will not get registered in history
> service.. SO what to do ?
>
>
>  WARN Dispatcher: Message RemoteProcessDisconnected dropped.
>
> java.lang.IllegalStateException: RpcEnv already stopped.
>
> at org.apache.spark.rpc.netty.Dispatcher.postMessage(
> Dispatcher.scala:159)
>
> at org.apache.spark.rpc.netty.Dispatcher.postToAll(
> Dispatcher.scala:109)
>
> at org.apache.spark.rpc.netty.NettyRpcHandler.
> connectionTerminated(NettyRpcEnv.scala:630)
>
> at org.apache.spark.network.server.TransportRequestHandler.
> channelUnregistered(TransportRequestHandler.java:94)
>
>
>
>
>


Re: Differences between scikit-learn and Spark.ml for regression toy problem

2017-03-13 Thread Dhanesh Padmanabhan
[Edit] I got few details wrong in my eagerness to reply:
1. Spark uses the corrected standard deviation with sqrt(n-1), and scikit
uses the one with sqrt(n).
2. You should scale down the regularization by sum of weights, in case you
have a column of weights. When there are no weights, it is equivalent to
sum of instances.

Dhanesh
+91-9741125245

On Mon, Mar 13, 2017 at 5:31 PM, Dhanesh Padmanabhan  wrote:

> Hi Frank
>
> Thanks for this question. I have been comparing logistic regression in
> sklearn with spark mllib as well. Your example code gave me a perfect way
> to compare what is going on in both the packages.
>
> I looked at both the source codes. There are quite a few differences in
> how the model fitting is done. I have a solution for the logistic
> regression problem. I do not have a solution for the linear regression
> problem yet.
>
> Here are the key differences:
> 1. In spark, Regularization for L2 is divided by feature standard
> deviation. In sklearn, it is not.
> 2. In spark, X's are standardized. This changes the solution because of
> regularization. In sklearn, no standardization is done.
> 3. In Spark, Average log loss is used for training. The log loss is
> averaged by sum of weights, which is the number of training instances.
> Sklearn uses sum of log loss instead. So the spark regularization is very
> heavy. You should scale down the regularization parameter by the number of
> instances.
>
>
> So, if you do the following, you should be able to match the outputs of
> logistic regression:
> 1. Standardize the spark and pandas dataframes in a similar fashion. Note:
> The standardization in spark works a little differently for ensuring unit
> variance - spark uses sqrt(n) as denominator, and sklearn's standardscaler
> uses sqrt(n-1) (unbiased estimator when mean is not known)
> 2. Scale down the regularization in spark by number of instances. Use 0.03
> in your example instead of 0.3, given you have 10 training instances.
>
> Hope this helps
> -Dhanesh
>
> Spark ml code (I changed it to work with Spark 2.1):
> 
>
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.ml.classification.LogisticRegression
> import org.apache.spark.ml.regression.LinearRegression
> import org.apache.spark.ml.linalg.Vectors
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.ml.feature.StandardScaler
>
> val sparkTrainingData_orig = new SQLContext(sc).
>   createDataFrame(Seq(
> (0.0, Vectors.dense(Array(-0.7306653538519616, 0.0))),
> (1.0, Vectors.dense(Array(0.6750417712898752, -0.4232874171873786))),
> (1.0, Vectors.dense(Array(0.1863463229359709, -0.8163423997075965))),
> (0.0, Vectors.dense(Array(-0.6719842051493347, 0.0))),
> (1.0, Vectors.dense(Array(0.9699938346531928, 0.0))),
> (1.0, Vectors.dense(Array(0.22759406190283604, 0.0))),
> (1.0, Vectors.dense(Array(0.9688721028330911, 0.0))),
> (0.0, Vectors.dense(Array(0.5993795346650845, 0.0))),
> (0.0, Vectors.dense(Array(0.9219423508390701, -0.8972778242305388))),
> (0.0, Vectors.dense(Array(0.7006904841584055,
> -0.5607635619919824).
>   toDF("label", "features_orig")
>
> val sparkTrainingData=new StandardScaler().setWithMean(
> true).setInputCol("features_orig").setOutputCol("features"
> ).fit(sparkTrainingData_orig).transform(sparkTrainingData_orig)
>
> val logisticModel = new LogisticRegression().
>   setRegParam(0.03).
>   setLabelCol("label").
>   setFeaturesCol("features").
>   setTol(1e-12).
>   setMaxIter(100).
>   fit(sparkTrainingData)
>
> println(s"Spark logistic model coefficients: ${logisticModel.coefficients}
> Intercept: ${logisticModel.intercept}")
> // Spark logistic model coefficients: [0.8212244419577079,0.32615245441495727]
> Intercept: -0.011815325216668142
>
>
> Sklearn Code:
> -
>
> import numpy as np
> from sklearn.linear_model import LogisticRegression, Ridge
>
> X = np.array([
> [-0.7306653538519616, 0.0],
> [0.6750417712898752, -0.4232874171873786],
> [0.1863463229359709, -0.8163423997075965],
> [-0.6719842051493347, 0.0],
> [0.9699938346531928, 0.0],
> [0.22759406190283604, 0.0],
> [0.9688721028330911, 0.0],
> [0.5993795346650845, 0.0],
> [0.9219423508390701, -0.8972778242305388],
> [0.7006904841584055, -0.5607635619919824]
> ])
>
> y = np.array([
> 0.0,
> 1.0,
> 1.0,
> 0.0,
> 1.0,
> 1.0,
> 1.0,
> 0.0,
> 0.0,
> 0.0
> ])
>
> m, n = X.shape
>
> # Scale and Add intercept term to simulate inputs to GameEstimator
>
> from sklearn.preprocessing import StandardScaler
>
> # Adjust by factor sqrt(n-1)/sqrt(n) to take care of standard deviation
> formula differences
> Xsc=StandardScaler().fit_transform(X)*3/np.sqrt(10)
> Xsc_with_intercept = np.hstack((Xsc, np.ones(m)[:,np.newaxis]))
>
> l = 0.3
> e = LogisticRegression(
> fit_intercept=True,
> penalty='l2',
>  

Sorted partition ranges without overlap

2017-03-13 Thread Kristoffer Sjögren
Hi

I have a RDD that needs to be sorted lexicographically and
then processed by partition. The partitions should be split in to
ranged blocks where sorted order is maintained and each partition
containing sequential, non-overlapping keys.

Given keys (1,2,3,4,5,6)

1. Correct
  - 2 partition = (1,2,3),(4,5,6).
  - 3 partition = (1,2),(3,4),(5,6)

2. Incorrect, the ranges overlap even though they're sorted.
  - 2 partitions (1,3,5) (2,4,6)
  - 3 partitions (1,3),(2,5),(4,6)


Is this possible with spark?

Cheers,
-Kristoffer

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: keep or remove sc.stop() coz of RpcEnv already stopped error

2017-03-13 Thread Yong Zhang
What version of Spark you are using?


Based on Spark-12967, it is fixed on Spark 2.0 and later. If you are using 
Spark 1.x, you can ignore this Warning. It shouldn't affect any functions.


Yong


From: nancy henry 
Sent: Monday, March 13, 2017 7:08 AM
To: user @spark
Subject: keep or remove sc.stop() coz of RpcEnv already stopped error

 Hi Team,

getting this error if we put sc.stop() in application..

can we remove it from application but i read if yu dont explicitly stop using 
sc.stop the yarn application will not get registered in history service.. SO 
what to do ?

 WARN Dispatcher: Message RemoteProcessDisconnected dropped.
java.lang.IllegalStateException: RpcEnv already stopped.
at 
org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:159)
at org.apache.spark.rpc.netty.Dispatcher.postToAll(Dispatcher.scala:109)
at 
org.apache.spark.rpc.netty.NettyRpcHandler.connectionTerminated(NettyRpcEnv.scala:630)
at 
org.apache.spark.network.server.TransportRequestHandler.channelUnregistered(TransportRequestHandler.java:94)




Re: org.apache.spark.SparkException: Task not serializable

2017-03-13 Thread Yong Zhang
In fact, I will suggest different way to handle the originally problem.


The example listed originally comes with a Java Function doesn't use any 
instance fields/methods, so serializing the whole class is a overkill solution.


Instead, you can/should make the Function static, which will work in the logic 
of that function tries to do, and it is a better solution than marking the 
whole class serializable.


The whole issue is that the function is not static, but doesn't use any 
instance fields or other methods. But Spark sends the non-static function call, 
it has to wrapper the whole class which contains the function as a whole 
closure through network, and in this case, it requires the whole class to be 
serializable.


Yong



From: 颜发才(Yan Facai) 
Sent: Saturday, March 11, 2017 6:48 AM
To: 萝卜丝炒饭
Cc: Mina Aslani; Ankur Srivastava; user@spark.apache.org
Subject: Re: org.apache.spark.SparkException: Task not serializable

For scala,
make your class Serializable, like this
```
class YourClass extends Serializable {
}
```



On Sat, Mar 11, 2017 at 3:51 PM, 萝卜丝炒饭 
<1427357...@qq.com> wrote:
hi mina,

can you paste your new code here pleasel
i meet this issue too but do not get Ankur's idea.

thanks
Robin

---Original---
From: "Mina Aslani">
Date: 2017/3/7 05:32:10
To: "Ankur 
Srivastava">;
Cc: 
"user@spark.apache.org">;
Subject: Re: org.apache.spark.SparkException: Task not serializable

Thank you Ankur for the quick response, really appreciate it! Making the class 
serializable resolved the exception!

Best regards,
Mina

On Mon, Mar 6, 2017 at 4:20 PM, Ankur Srivastava 
> wrote:
The fix for this make your class Serializable. The reason being the closures 
you have defined in the class need to be serialized and copied over to all 
executor nodes.

Hope this helps.

Thanks
Ankur

On Mon, Mar 6, 2017 at 1:06 PM, Mina Aslani 
> wrote:

Hi,

I am trying to start with spark and get number of lines of a text file in my 
mac, however I get

org.apache.spark.SparkException: Task not serializable error on

JavaRDD logData = javaCtx.textFile(file);

Please see below for the sample of code and the stackTrace.

Any idea why this error is thrown?

Best regards,

Mina

System.out.println("Creating Spark Configuration");
SparkConf javaConf = new SparkConf();
javaConf.setAppName("My First Spark Java Application");
javaConf.setMaster("PATH to my spark");
System.out.println("Creating Spark Context");
JavaSparkContext javaCtx = new JavaSparkContext(javaConf);
System.out.println("Loading the Dataset and will further process it");
String file = "file:///file.txt";
JavaRDD logData = javaCtx.textFile(file);

long numLines = logData.filter(new Function() {
   public Boolean call(String s) {
  return true;
   }
}).count();

System.out.println("Number of Lines in the Dataset "+numLines);

javaCtx.close();


Exception in thread "main" org.apache.spark.SparkException: Task not 
serializable
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78)





Re: Spark and continuous integration

2017-03-13 Thread Sam Elamin
Hi Jorn

Thanks for the prompt reply, really we have 2 main concerns with CD,
ensuring tests pasts and linting on the code.

I think all platforms should handle this with ease, I was just wondering
what people are using.

Jenkins seems to have the best spark plugins so we are investigating that
as well as a variety of other hosted CI tools

Happy to write a blog post detailing our findings and sharing it here if
people are interested


Regards
Sam

On Mon, Mar 13, 2017 at 1:18 PM, Jörn Franke  wrote:

> Hi,
>
> Jenkins also now supports pipeline as code and multibranch pipelines. thus
> you are not so dependent on the UI and you do not need anymore a long list
> of jobs for different branches. Additionally it has a new UI (beta) called
> blueocean, which is a little bit nicer. You may also check GoCD. Aside from
> this you have a huge variety of commercial tools, e.g. Bamboo.
> In the cloud, I use for my open source github projects Travis-Ci, but
> there are also a lot of alternatives, e.g. Distelli.
>
> It really depends what you expect, e.g. If you want to Version the build
> pipeline in GIT, if you need Docker deployment etc. I am not sure if new
> starters should be responsible for the build pipeline, thus I am not sure
> that i understand  your concern in this area.
>
> From my experience, integration tests for Spark can be run on any of these
> platforms.
>
> Best regards
>
> > On 13 Mar 2017, at 10:55, Sam Elamin  wrote:
> >
> > Hi Folks
> >
> > This is more of a general question. What's everyone using for their CI
> /CD when it comes to spark
> >
> > We are using Pyspark but potentially looking to make to spark scala and
> Sbt in the future
> >
> >
> > One of the suggestions was jenkins but I know the UI isn't great for new
> starters so I'd rather avoid it. I've used team city but that was more
> focused on dot net development
> >
> >
> > What are people using?
> >
> > Kind Regards
> > Sam
>


Re: Spark and continuous integration

2017-03-13 Thread Jörn Franke
Hi,

Jenkins also now supports pipeline as code and multibranch pipelines. thus you 
are not so dependent on the UI and you do not need anymore a long list of jobs 
for different branches. Additionally it has a new UI (beta) called blueocean, 
which is a little bit nicer. You may also check GoCD. Aside from this you have 
a huge variety of commercial tools, e.g. Bamboo.
In the cloud, I use for my open source github projects Travis-Ci, but there are 
also a lot of alternatives, e.g. Distelli.

It really depends what you expect, e.g. If you want to Version the build 
pipeline in GIT, if you need Docker deployment etc. I am not sure if new 
starters should be responsible for the build pipeline, thus I am not sure that 
i understand  your concern in this area.

From my experience, integration tests for Spark can be run on any of these 
platforms.

Best regards

> On 13 Mar 2017, at 10:55, Sam Elamin  wrote:
> 
> Hi Folks 
> 
> This is more of a general question. What's everyone using for their CI /CD 
> when it comes to spark 
> 
> We are using Pyspark but potentially looking to make to spark scala and Sbt 
> in the future 
> 
> 
> One of the suggestions was jenkins but I know the UI isn't great for new 
> starters so I'd rather avoid it. I've used team city but that was more 
> focused on dot net development 
> 
> 
> What are people using? 
> 
> Kind Regards 
> Sam 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Adding metrics to spark datasource

2017-03-13 Thread AssafMendelson
Hi,
I am building a data source so I can convert a custom source to dataframe.
I have been going over examples such as JDBC and noticed that JDBC does the 
following:

val inputMetrics = context.taskMetrics().inputMetrics

and whenever a new record is added:


inputMetrics.incRecordsRead(1)

however, incRecordsRead is spark private.
I was wondering: how can I get standard metrics through a data source?

Thanks,
Assaf.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Adding-metrics-to-spark-datasource-tp28487.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Differences between scikit-learn and Spark.ml for regression toy problem

2017-03-13 Thread Dhanesh Padmanabhan
Hi Frank

Thanks for this question. I have been comparing logistic regression in
sklearn with spark mllib as well. Your example code gave me a perfect way
to compare what is going on in both the packages.

I looked at both the source codes. There are quite a few differences in how
the model fitting is done. I have a solution for the logistic regression
problem. I do not have a solution for the linear regression problem yet.

Here are the key differences:
1. In spark, Regularization for L2 is divided by feature standard
deviation. In sklearn, it is not.
2. In spark, X's are standardized. This changes the solution because of
regularization. In sklearn, no standardization is done.
3. In Spark, Average log loss is used for training. The log loss is
averaged by sum of weights, which is the number of training instances.
Sklearn uses sum of log loss instead. So the spark regularization is very
heavy. You should scale down the regularization parameter by the number of
instances.


So, if you do the following, you should be able to match the outputs of
logistic regression:
1. Standardize the spark and pandas dataframes in a similar fashion. Note:
The standardization in spark works a little differently for ensuring unit
variance - spark uses sqrt(n) as denominator, and sklearn's standardscaler
uses sqrt(n-1) (unbiased estimator when mean is not known)
2. Scale down the regularization in spark by number of instances. Use 0.03
in your example instead of 0.3, given you have 10 training instances.

Hope this helps
-Dhanesh

Spark ml code (I changed it to work with Spark 2.1):


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.feature.StandardScaler

val sparkTrainingData_orig = new SQLContext(sc).
  createDataFrame(Seq(
(0.0, Vectors.dense(Array(-0.7306653538519616, 0.0))),
(1.0, Vectors.dense(Array(0.6750417712898752, -0.4232874171873786))),
(1.0, Vectors.dense(Array(0.1863463229359709, -0.8163423997075965))),
(0.0, Vectors.dense(Array(-0.6719842051493347, 0.0))),
(1.0, Vectors.dense(Array(0.9699938346531928, 0.0))),
(1.0, Vectors.dense(Array(0.22759406190283604, 0.0))),
(1.0, Vectors.dense(Array(0.9688721028330911, 0.0))),
(0.0, Vectors.dense(Array(0.5993795346650845, 0.0))),
(0.0, Vectors.dense(Array(0.9219423508390701, -0.8972778242305388))),
(0.0, Vectors.dense(Array(0.7006904841584055, -0.5607635619919824).
  toDF("label", "features_orig")

val sparkTrainingData=new
StandardScaler().setWithMean(true).setInputCol("features_orig").setOutputCol("features").fit(sparkTrainingData_orig).transform(sparkTrainingData_orig)

val logisticModel = new LogisticRegression().
  setRegParam(0.03).
  setLabelCol("label").
  setFeaturesCol("features").
  setTol(1e-12).
  setMaxIter(100).
  fit(sparkTrainingData)

println(s"Spark logistic model coefficients: ${logisticModel.coefficients}
Intercept: ${logisticModel.intercept}")
// Spark logistic model coefficients:
[0.8212244419577079,0.32615245441495727] Intercept: -0.011815325216668142


Sklearn Code:
-

import numpy as np
from sklearn.linear_model import LogisticRegression, Ridge

X = np.array([
[-0.7306653538519616, 0.0],
[0.6750417712898752, -0.4232874171873786],
[0.1863463229359709, -0.8163423997075965],
[-0.6719842051493347, 0.0],
[0.9699938346531928, 0.0],
[0.22759406190283604, 0.0],
[0.9688721028330911, 0.0],
[0.5993795346650845, 0.0],
[0.9219423508390701, -0.8972778242305388],
[0.7006904841584055, -0.5607635619919824]
])

y = np.array([
0.0,
1.0,
1.0,
0.0,
1.0,
1.0,
1.0,
0.0,
0.0,
0.0
])

m, n = X.shape

# Scale and Add intercept term to simulate inputs to GameEstimator

from sklearn.preprocessing import StandardScaler

# Adjust by factor sqrt(n-1)/sqrt(n) to take care of standard deviation
formula differences
Xsc=StandardScaler().fit_transform(X)*3/np.sqrt(10)
Xsc_with_intercept = np.hstack((Xsc, np.ones(m)[:,np.newaxis]))

l = 0.3
e = LogisticRegression(
fit_intercept=True,
penalty='l2',
C=1/l,
max_iter=100,
tol=1e-11,
solver='lbfgs',verbose=1)

e.fit(Xsc, y)

print e.coef_, e.intercept_
# => [[ 0.82122437 0.32615256]] [-0.01181534]



Dhanesh
+91-9741125245

On Mon, Mar 13, 2017 at 7:50 AM, Frank Astier 
wrote:

> (this was also posted to stackoverflow on 03/10)
>
> I am setting up a very simple logistic regression problem in scikit-learn
> and in spark.ml, and the results diverge: the models they learn are
> different, but I can't figure out why (data is the same, model type is the
> same, regularization is the same...).
>
> No doubt I am missing some setting on one side or the other. Which
> 

Re: how to construct parameter for model.transform() from datafile

2017-03-13 Thread jinhong lu
Anyone help?

> 在 2017年3月13日,19:38,jinhong lu  写道:
> 
> After train the mode, I got the result look like this:
> 
> 
>   scala>  predictionResult.show()
>   
> +-++++--+
>   |label|features|   rawPrediction| 
> probability|prediction|
>   
> +-++++--+
>   |  0.0|(144109,[100],[2.0])|[-12.246737725034...|[0.96061209556737...|  
>  0.0|
>   |  0.0|(144109,[100],[2.0])|[-12.246737725034...|[0.96061209556737...|  
>  0.0|
>   |  0.0|(144109,[100],[24...|[-146.81612388602...|[9.73704654529197...|  
>  1.0|
> 
> And then, I transform() the data by these code:
> 
>   import org.apache.spark.ml.linalg.Vectors
>   import org.apache.spark.ml.linalg.Vector
>   import scala.collection.mutable
> 
>  def lineToVector(line:String ):Vector={
>   val seq = new mutable.Queue[(Int,Double)]
>   val content = line.split(" ");
>   for( s <- content){
> val index = s.split(":")(0).toInt
> val value = s.split(":")(1).toDouble
>  seq += ((index,value))
>   }
>   return Vectors.sparse(144109, seq)
> }
> 
>val df = sc.sequenceFile[org.apache.hadoop.io.LongWritable, 
> org.apache.hadoop.io.Text]("/data/gamein/gameall_sdc/wh/gameall.db/edt_udid_label_format/ds=20170312/001006_0").map(line=>line._2).map(line
>  => 
> (line.toString.split("\t")(0),lineToVector(line.toString.split("\t")(1.toDF("udid",
>  "features")
>val predictionResult = model.transform(df)
>predictionResult.show()
> 
> 
> But I got the error look like this:
> 
> Caused by: java.lang.IllegalArgumentException: requirement failed: You may 
> not write an element to index 804201 because the declared size of your vector 
> is 144109
>  at scala.Predef$.require(Predef.scala:224)
>  at org.apache.spark.ml.linalg.Vectors$.sparse(Vectors.scala:219)
>  at lineToVector(:55)
>  at $anonfun$4.apply(:50)
>  at $anonfun$4.apply(:50)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>  at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:84)
>  at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
> 
> So I change
> 
>   return Vectors.sparse(144109, seq)
> 
> to 
> 
>   return Vectors.sparse(804202, seq)
> 
> Another error occurs:
> 
>   Caused by: java.lang.IllegalArgumentException: requirement failed: The 
> columns of A don't match the number of elements of x. A: 144109, x: 804202
> at scala.Predef$.require(Predef.scala:224)
> at org.apache.spark.ml.linalg.BLAS$.gemv(BLAS.scala:521)
> at 
> org.apache.spark.ml.linalg.Matrix$class.multiply(Matrices.scala:110)
> at org.apache.spark.ml.linalg.DenseMatrix.multiply(Matrices.scala:176)
> 
> what should I do?
>> 在 2017年3月13日,16:31,jinhong lu  写道:
>> 
>> Hi, all:
>> 
>> I got these training data:
>> 
>>  0 31607:17
>>  0 111905:36
>>  0 109:3 506:41 1509:1 2106:4 5309:1 7209:5 8406:1 27108:1 27709:1 
>> 30209:8 36109:20 41408:1 42309:1 46509:1 47709:5 57809:1 58009:1 58709:2 
>> 112109:4 123305:48 142509:1
>>  0 407:14 2905:2 5209:2 6509:2 6909:2 14509:2 18507:10
>>  0 604:3 3505:9 6401:3 6503:2 6505:3 7809:8 10509:3 12109:3 15207:19 
>> 31607:19
>>  0 19109:7 29705:4 123305:32
>>  0 15309:1 43005:1 108509:1
>>  1 604:1 6401:1 6503:1 15207:4 31607:40
>>  0 1807:19
>>  0 301:14 501:1 1502:14 2507:12 123305:4
>>  0 607:14 19109:460 123305:448
>>  0 5406:14 7209:4 10509:3 19109:6 24706:10 26106:4 31409:1 123305:48 
>> 128209:1
>>  1 1606:1 2306:3 3905:19 4408:3 4506:8 8707:3 19109:50 24809:1 26509:2 
>> 27709:2 56509:8 122705:62 123305:31 124005:2
>> 
>> And then I train the model by spark:
>> 
>>  import org.apache.spark.ml.classification.NaiveBayes
>>  import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
>>  import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
>>  import org.apache.spark.sql.SparkSession
>> 
>>  val spark = 
>> 

Re: how to construct parameter for model.transform() from datafile

2017-03-13 Thread jinhong lu
After train the mode, I got the result look like this:


scala>  predictionResult.show()

+-++++--+
|label|features|   rawPrediction| 
probability|prediction|

+-++++--+
|  0.0|(144109,[100],[2.0])|[-12.246737725034...|[0.96061209556737...|  
 0.0|
|  0.0|(144109,[100],[2.0])|[-12.246737725034...|[0.96061209556737...|  
 0.0|
|  0.0|(144109,[100],[24...|[-146.81612388602...|[9.73704654529197...|  
 1.0|

And then, I transform() the data by these code:

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Vector
import scala.collection.mutable

   def lineToVector(line:String ):Vector={
val seq = new mutable.Queue[(Int,Double)]
val content = line.split(" ");
for( s <- content){
  val index = s.split(":")(0).toInt
  val value = s.split(":")(1).toDouble
   seq += ((index,value))
}
return Vectors.sparse(144109, seq)
  }

 val df = sc.sequenceFile[org.apache.hadoop.io.LongWritable, 
org.apache.hadoop.io.Text]("/data/gamein/gameall_sdc/wh/gameall.db/edt_udid_label_format/ds=20170312/001006_0").map(line=>line._2).map(line
 => 
(line.toString.split("\t")(0),lineToVector(line.toString.split("\t")(1.toDF("udid",
 "features")
 val predictionResult = model.transform(df)
 predictionResult.show()


But I got the error look like this:

 Caused by: java.lang.IllegalArgumentException: requirement failed: You may not 
write an element to index 804201 because the declared size of your vector is 
144109
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.ml.linalg.Vectors$.sparse(Vectors.scala:219)
  at lineToVector(:55)
  at $anonfun$4.apply(:50)
  at $anonfun$4.apply(:50)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:84)
  at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)

So I change

return Vectors.sparse(144109, seq)

to 

return Vectors.sparse(804202, seq)

Another error occurs:

Caused by: java.lang.IllegalArgumentException: requirement failed: The 
columns of A don't match the number of elements of x. A: 144109, x: 804202
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.ml.linalg.BLAS$.gemv(BLAS.scala:521)
  at 
org.apache.spark.ml.linalg.Matrix$class.multiply(Matrices.scala:110)
  at org.apache.spark.ml.linalg.DenseMatrix.multiply(Matrices.scala:176)

what should I do?
> 在 2017年3月13日,16:31,jinhong lu  写道:
> 
> Hi, all:
> 
> I got these training data:
> 
>   0 31607:17
>   0 111905:36
>   0 109:3 506:41 1509:1 2106:4 5309:1 7209:5 8406:1 27108:1 27709:1 
> 30209:8 36109:20 41408:1 42309:1 46509:1 47709:5 57809:1 58009:1 58709:2 
> 112109:4 123305:48 142509:1
>   0 407:14 2905:2 5209:2 6509:2 6909:2 14509:2 18507:10
>   0 604:3 3505:9 6401:3 6503:2 6505:3 7809:8 10509:3 12109:3 15207:19 
> 31607:19
>   0 19109:7 29705:4 123305:32
>   0 15309:1 43005:1 108509:1
>   1 604:1 6401:1 6503:1 15207:4 31607:40
>   0 1807:19
>   0 301:14 501:1 1502:14 2507:12 123305:4
>   0 607:14 19109:460 123305:448
>   0 5406:14 7209:4 10509:3 19109:6 24706:10 26106:4 31409:1 123305:48 
> 128209:1
>   1 1606:1 2306:3 3905:19 4408:3 4506:8 8707:3 19109:50 24809:1 26509:2 
> 27709:2 56509:8 122705:62 123305:31 124005:2
> 
> And then I train the model by spark:
> 
>   import org.apache.spark.ml.classification.NaiveBayes
>   import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
>   import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
>   import org.apache.spark.sql.SparkSession
> 
>   val spark = 
> SparkSession.builder.appName("NaiveBayesExample").getOrCreate()
>   val data = 
> spark.read.format("libsvm").load("/tmp/ljhn1829/aplus/training_data3")
>   val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), 
> 

keep or remove sc.stop() coz of RpcEnv already stopped error

2017-03-13 Thread nancy henry
 Hi Team,


getting this error if we put sc.stop() in application..



can we remove it from application but i read if yu dont explicitly stop
using sc.stop the yarn application will not get registered in history
service.. SO what to do ?


 WARN Dispatcher: Message RemoteProcessDisconnected dropped.

java.lang.IllegalStateException: RpcEnv already stopped.

at
org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:159)

at
org.apache.spark.rpc.netty.Dispatcher.postToAll(Dispatcher.scala:109)

at
org.apache.spark.rpc.netty.NettyRpcHandler.connectionTerminated(NettyRpcEnv.scala:630)

at
org.apache.spark.network.server.TransportRequestHandler.channelUnregistered(TransportRequestHandler.java:94)


Spark and continuous integration

2017-03-13 Thread Sam Elamin
Hi Folks

This is more of a general question. What's everyone using for their CI /CD
when it comes to spark

We are using Pyspark but potentially looking to make to spark scala and Sbt
in the future


One of the suggestions was jenkins but I know the UI isn't great for new
starters so I'd rather avoid it. I've used team city but that was more
focused on dot net development


What are people using?

Kind Regards
Sam


how to construct parameter for model.transform() from datafile

2017-03-13 Thread jinhong lu
Hi, all:

I got these training data:

0 31607:17
0 111905:36
0 109:3 506:41 1509:1 2106:4 5309:1 7209:5 8406:1 27108:1 27709:1 
30209:8 36109:20 41408:1 42309:1 46509:1 47709:5 57809:1 58009:1 58709:2 
112109:4 123305:48 142509:1
0 407:14 2905:2 5209:2 6509:2 6909:2 14509:2 18507:10
0 604:3 3505:9 6401:3 6503:2 6505:3 7809:8 10509:3 12109:3 15207:19 
31607:19
0 19109:7 29705:4 123305:32
0 15309:1 43005:1 108509:1
1 604:1 6401:1 6503:1 15207:4 31607:40
0 1807:19
0 301:14 501:1 1502:14 2507:12 123305:4
0 607:14 19109:460 123305:448
0 5406:14 7209:4 10509:3 19109:6 24706:10 26106:4 31409:1 123305:48 
128209:1
1 1606:1 2306:3 3905:19 4408:3 4506:8 8707:3 19109:50 24809:1 26509:2 
27709:2 56509:8 122705:62 123305:31 124005:2

And then I train the model by spark:

import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql.SparkSession

val spark = 
SparkSession.builder.appName("NaiveBayesExample").getOrCreate()
val data = 
spark.read.format("libsvm").load("/tmp/ljhn1829/aplus/training_data3")
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), 
seed = 1234L)
//val model = new NaiveBayes().fit(trainingData)
val model = new 
NaiveBayes().setThresholds(Array(10.0,1.0)).fit(trainingData)
val predictions = model.transform(testData)
predictions.show()


OK, I have got my model by the cole above, but how can I use this model to 
predict the classfication of other data like these:

ID1 509:2 5102:4 25909:1 31709:4 121905:19
ID2 800201:1
ID3 116005:4
ID4 800201:1
ID5 19109:1  21708:1 23208:1 49809:1 88609:1
ID6 800201:1
ID7 43505:7 106405:7

I know I can use the transform() method, but how to contrust the parameter for 
transform() method?





Thanks,
lujinhong


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org