Re: Spark 2.1.1: A bug in org.apache.spark.ml.linalg.* when using VectorAssembler.scala

2017-07-13 Thread Yan Facai
Hi, junjie.

As Nick said,
spark.ml indeed contains Vector, Vectors and VectorUDT by itself, see:
mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala:36:
sealed trait Vector extends Serializable

So, which bug do you find with VectorAssembler? Could you give more details?









On Thu, Jul 13, 2017 at 5:15 PM,  wrote:

> Dear Developers:
>
> Here is a bug in org.apache.spark.ml.linalg.*:
> Class Vector, Vectors are not included in org.apache.spark.ml.linalg.*,
> but they are used in VectorAssembler.scala as follows:
>
> *import *org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
>
> Therefore, bug was reported when I was using VectorAssembler.
>
> Since org.apache.spark.mllib.linalg.* contains the class {Vector,
> Vectors, VectorUDT}, I rewrote VectorAssembler.scala as
> XVectorAssembler.scala by mainly changing "*import *org.apache.spark.*ml*
> .linalg.{Vector, Vectors, VectorUDT}" to
> "*import *org.apache.spark.*mllib*.linalg.{Vector, Vectors, VectorUDT}"
>
> But bug occured as follows:
>
> " Column v must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7
> but was actually org.apache.spark.mllib.linalg.VectorUDT@f71b0bce "
>
> Would you please help fix the bug?
>
> Thank you very much!
>
> Best regards
> --xiongjunjie



On Thu, Jul 13, 2017 at 6:08 PM, Nick Pentreath 
wrote:

> There are Vector classes under ml.linalg package - And VectorAssembler and
> other feature transformers all work with ml.linalg vectors.
>
> If you try to use mllib.linalg vectors instead you will get an error as
> the user defined type for SQL is not correct
>
>
> On Thu, 13 Jul 2017 at 11:23,  wrote:
>
>> Dear Developers:
>>
>> Here is a bug in org.apache.spark.ml.linalg.*:
>> Class Vector, Vectors are not included in org.apache.spark.ml.linalg.*,
>> but they are used in VectorAssembler.scala as follows:
>>
>> *import *org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
>>
>> Therefore, bug was reported when I was using VectorAssembler.
>>
>> Since org.apache.spark.mllib.linalg.* contains the class {Vector,
>> Vectors, VectorUDT}, I rewrote VectorAssembler.scala as
>> XVectorAssembler.scala by mainly changing "*import *org.apache.spark.*ml*
>> .linalg.{Vector, Vectors, VectorUDT}" to
>> "*import *org.apache.spark.*mllib*.linalg.{Vector, Vectors, VectorUDT}"
>>
>> But bug occured as follows:
>>
>> " Column v must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7
>> but was actually org.apache.spark.mllib.linalg.VectorUDT@f71b0bce "
>>
>> Would you please help fix the bug?
>>
>> Thank you very much!
>>
>> Best regards
>> --xiongjunjie
>
>


Re: [ML] Stop conditions for RandomForest

2017-06-28 Thread Yan Facai
It seems that split will always stop when count of nodes is less than
max(X, Y).
Hence, are they different?



On Tue, Jun 27, 2017 at 11:07 PM, OBones  wrote:

> Hello,
>
> Reading around on the theory behind tree based regression, I concluded
> that there are various reasons to stop exploring the tree when a given node
> has been reached. Among these, I have those two:
>
> 1. When starting to process a node, if its size (row count) is less than X
> then consider it a leaf
> 2. When a split for a node is considered, if any side of the split has its
> size less than Y, then ignore it when selecting the best split
>
> As an example, let's consider a node with 45 rows, that for a given split
> creates two children, containing 5 and 35 rows respectively.
>
> If I set X to 50, then the node is a leaf and no split is attempted
> if I set X to 10 and Y to 15, then the splits are computed but because one
> of them has less than 15 rows, that split is ignored.
>
> I'm using DecisionTreeRegressor and RandomForestRegressor on our data and
> because the former is implemented using the latter, they both share the
> same parameters.
> Going through those parameters, I found minInstancesPerNode which to me is
> the Y value, but I could not find any parameter for the X value.
> Have I missed something?
> If not, would there be a way to implement this?
>
> Regards
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Best alternative for Category Type in Spark Dataframe

2017-06-17 Thread Yan Facai
Yes, perhaps we could use SQLTransformer as well.

http://spark.apache.org/docs/latest/ml-features.html#sqltransformer

On Sun, Jun 18, 2017 at 10:47 AM, Pralabh Kumar <pralabhku...@gmail.com>
wrote:

> Hi Yan
>
> Yes sql is good option , but if we have to create ML Pipeline , then
> having transformers and set it into pipeline stages ,would be better option
> .
>
> Regards
> Pralabh Kumar
>
> On Sun, Jun 18, 2017 at 4:23 AM, 颜发才(Yan Facai) <facai@gmail.com>
> wrote:
>
>> To filter data, how about using sql?
>>
>> df.createOrReplaceTempView("df")
>> val sqlDF = spark.sql("SELECT * FROM df WHERE EMOTION IN 
>> (HAPPY,SAD,ANGRY,NEUTRAL,NA)")
>>
>> https://spark.apache.org/docs/latest/sql-programming-guide.html#sql
>>
>>
>>
>> On Fri, Jun 16, 2017 at 11:28 PM, Pralabh Kumar <pralabhku...@gmail.com>
>> wrote:
>>
>>> Hi Saatvik
>>>
>>> You can write your own transformer to make sure that column contains
>>> ,value which u provided , and filter out rows which doesn't follow the
>>> same.
>>>
>>> Something like this
>>>
>>>
>>> case class CategoryTransformer(override val uid : String) extends
>>> Transformer{
>>>   override def transform(inputData: DataFrame): DataFrame = {
>>> inputData.select("col1").filter("col1 in ('happy')")
>>>   }
>>>   override def copy(extra: ParamMap): Transformer = ???
>>>   @DeveloperApi
>>>   override def transformSchema(schema: StructType): StructType ={
>>>schema
>>>   }
>>> }
>>>
>>>
>>> Usage
>>>
>>> val data = sc.parallelize(List("abce","happy")).toDF("col1")
>>> val trans = new CategoryTransformer("1")
>>> data.show()
>>> trans.transform(data).show()
>>>
>>>
>>> This transformer will make sure , you always have values in col1 as
>>> provided by you.
>>>
>>>
>>> Regards
>>> Pralabh Kumar
>>>
>>> On Fri, Jun 16, 2017 at 8:10 PM, Saatvik Shah <saatvikshah1...@gmail.com
>>> > wrote:
>>>
>>>> Hi Pralabh,
>>>>
>>>> I want the ability to create a column such that its values be
>>>> restricted to a specific set of predefined values.
>>>> For example, suppose I have a column called EMOTION: I want to ensure
>>>> each row value is one of HAPPY,SAD,ANGRY,NEUTRAL,NA.
>>>>
>>>> Thanks and Regards,
>>>> Saatvik Shah
>>>>
>>>>
>>>> On Fri, Jun 16, 2017 at 10:30 AM, Pralabh Kumar <pralabhku...@gmail.com
>>>> > wrote:
>>>>
>>>>> Hi satvik
>>>>>
>>>>> Can u please provide an example of what exactly you want.
>>>>>
>>>>>
>>>>>
>>>>> On 16-Jun-2017 7:40 PM, "Saatvik Shah" <saatvikshah1...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Yan,
>>>>>>
>>>>>> Basically the reason I was looking for the categorical datatype is as
>>>>>> given here
>>>>>> <https://pandas.pydata.org/pandas-docs/stable/categorical.html>:
>>>>>> ability to fix column values to specific categories. Is it possible to
>>>>>> create a user defined data type which could do so?
>>>>>>
>>>>>> Thanks and Regards,
>>>>>> Saatvik Shah
>>>>>>
>>>>>> On Fri, Jun 16, 2017 at 1:42 AM, 颜发才(Yan Facai) <facai@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> You can use some Transformers to handle categorical data,
>>>>>>> For example,
>>>>>>> StringIndexer encodes a string column of labels to a column of
>>>>>>> label indices:
>>>>>>> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 15, 2017 at 10:19 PM, saatvikshah1994 <
>>>>>>> saatvikshah1...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> I'm trying to convert a Pandas -> Spark dataframe. One of the
>>>>>>>> columns I have
>>>>>>>> is of the Category type in Pandas. But there does not seem to be
>>>>>>>> support for
>>>>>>>> this same type in Spark. What is the best alternative?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> View this message in context: http://apache-spark-user-list.
>>>>>>>> 1001560.n3.nabble.com/Best-alternative-for-Category-Type-in-
>>>>>>>> Spark-Dataframe-tp28764.html
>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>> Nabble.com.
>>>>>>>>
>>>>>>>> 
>>>>>>>> -
>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> *Saatvik Shah,*
>>>>>> *1st  Year,*
>>>>>> *Masters in the School of Computer Science,*
>>>>>> *Carnegie Mellon University*
>>>>>>
>>>>>> *https://saatvikshah1994.github.io/
>>>>>> <https://saatvikshah1994.github.io/>*
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> *Saatvik Shah,*
>>>> *1st  Year,*
>>>> *Masters in the School of Computer Science,*
>>>> *Carnegie Mellon University*
>>>>
>>>> *https://saatvikshah1994.github.io/
>>>> <https://saatvikshah1994.github.io/>*
>>>>
>>>
>>>
>>
>


Re: Best alternative for Category Type in Spark Dataframe

2017-06-17 Thread Yan Facai
To filter data, how about using sql?

df.createOrReplaceTempView("df")
val sqlDF = spark.sql("SELECT * FROM df WHERE EMOTION IN
(HAPPY,SAD,ANGRY,NEUTRAL,NA)")

https://spark.apache.org/docs/latest/sql-programming-guide.html#sql



On Fri, Jun 16, 2017 at 11:28 PM, Pralabh Kumar <pralabhku...@gmail.com>
wrote:

> Hi Saatvik
>
> You can write your own transformer to make sure that column contains
> ,value which u provided , and filter out rows which doesn't follow the
> same.
>
> Something like this
>
>
> case class CategoryTransformer(override val uid : String) extends
> Transformer{
>   override def transform(inputData: DataFrame): DataFrame = {
> inputData.select("col1").filter("col1 in ('happy')")
>   }
>   override def copy(extra: ParamMap): Transformer = ???
>   @DeveloperApi
>   override def transformSchema(schema: StructType): StructType ={
>schema
>   }
> }
>
>
> Usage
>
> val data = sc.parallelize(List("abce","happy")).toDF("col1")
> val trans = new CategoryTransformer("1")
> data.show()
> trans.transform(data).show()
>
>
> This transformer will make sure , you always have values in col1 as
> provided by you.
>
>
> Regards
> Pralabh Kumar
>
> On Fri, Jun 16, 2017 at 8:10 PM, Saatvik Shah <saatvikshah1...@gmail.com>
> wrote:
>
>> Hi Pralabh,
>>
>> I want the ability to create a column such that its values be restricted
>> to a specific set of predefined values.
>> For example, suppose I have a column called EMOTION: I want to ensure
>> each row value is one of HAPPY,SAD,ANGRY,NEUTRAL,NA.
>>
>> Thanks and Regards,
>> Saatvik Shah
>>
>>
>> On Fri, Jun 16, 2017 at 10:30 AM, Pralabh Kumar <pralabhku...@gmail.com>
>> wrote:
>>
>>> Hi satvik
>>>
>>> Can u please provide an example of what exactly you want.
>>>
>>>
>>>
>>> On 16-Jun-2017 7:40 PM, "Saatvik Shah" <saatvikshah1...@gmail.com>
>>> wrote:
>>>
>>>> Hi Yan,
>>>>
>>>> Basically the reason I was looking for the categorical datatype is as
>>>> given here
>>>> <https://pandas.pydata.org/pandas-docs/stable/categorical.html>:
>>>> ability to fix column values to specific categories. Is it possible to
>>>> create a user defined data type which could do so?
>>>>
>>>> Thanks and Regards,
>>>> Saatvik Shah
>>>>
>>>> On Fri, Jun 16, 2017 at 1:42 AM, 颜发才(Yan Facai) <facai@gmail.com>
>>>> wrote:
>>>>
>>>>> You can use some Transformers to handle categorical data,
>>>>> For example,
>>>>> StringIndexer encodes a string column of labels to a column of label
>>>>> indices:
>>>>> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
>>>>>
>>>>>
>>>>> On Thu, Jun 15, 2017 at 10:19 PM, saatvikshah1994 <
>>>>> saatvikshah1...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I'm trying to convert a Pandas -> Spark dataframe. One of the columns
>>>>>> I have
>>>>>> is of the Category type in Pandas. But there does not seem to be
>>>>>> support for
>>>>>> this same type in Spark. What is the best alternative?
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context: http://apache-spark-user-list.
>>>>>> 1001560.n3.nabble.com/Best-alternative-for-Category-Type-in-
>>>>>> Spark-Dataframe-tp28764.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> -
>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> *Saatvik Shah,*
>>>> *1st  Year,*
>>>> *Masters in the School of Computer Science,*
>>>> *Carnegie Mellon University*
>>>>
>>>> *https://saatvikshah1994.github.io/
>>>> <https://saatvikshah1994.github.io/>*
>>>>
>>>
>>
>>
>> --
>> *Saatvik Shah,*
>> *1st  Year,*
>> *Masters in the School of Computer Science,*
>> *Carnegie Mellon University*
>>
>> *https://saatvikshah1994.github.io/ <https://saatvikshah1994.github.io/>*
>>
>
>


Re: Best alternative for Category Type in Spark Dataframe

2017-06-15 Thread Yan Facai
You can use some Transformers to handle categorical data,
For example,
StringIndexer encodes a string column of labels to a column of label
indices:
http://spark.apache.org/docs/latest/ml-features.html#stringindexer


On Thu, Jun 15, 2017 at 10:19 PM, saatvikshah1994  wrote:

> Hi,
> I'm trying to convert a Pandas -> Spark dataframe. One of the columns I
> have
> is of the Category type in Pandas. But there does not seem to be support
> for
> this same type in Spark. What is the best alternative?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Best-alternative-for-Category-Type-
> in-Spark-Dataframe-tp28764.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [How-To] Migrating from mllib.tree.DecisionTree to ml.regression.DecisionTreeRegressor

2017-06-15 Thread Yan Facai
Hi, OBones.

1. which columns are features?
For ml,
use `setFeaturesCol` and `setLabelCol` to assign input column:
https://spark.apache.org/docs/2.1.0/api/scala/index.html#
org.apache.spark.ml.classification.DecisionTreeClassifier

2. which ones are categorical?
For ml, use Transformer to create Vector.
In your case,  use VectorIndexer:
http://spark.apache.org/docs/latest/ml-features.html#vectorindexer

Above all,
use Transformer / Estimator to create Vector, and use Estimator to train
and test.





On Thu, Jun 15, 2017 at 5:59 PM, OBones  wrote:

> Hello,
>
> I have written the following scala code to train a regression tree, based
> on mllib:
>
> val conf = new SparkConf().setAppName("DecisionTreeRegressionExample")
> val sc = new SparkContext(conf)
> val spark = new SparkSession.Builder().getOrCreate()
>
> val sourceData = spark.read.format("com.databri
> cks.spark.csv").option("header", "true").option("delimiter",
> ";").load("C:\\Data\\source_file.csv")
>
> val data = sourceData.select($"X3".cast("double"),
> $"Y".cast("double"), $"X1".cast("double"), $"X2".cast("double"))
>
> val featureIndices = List("X1", "X2", "X3").map(data.columns.indexOf
> (_))
> val targetIndex = data.columns.indexOf("Y")
>
> // WARNING: Indices in categoricalFeatures info are those inside the
> vector we build from the featureIndices list
> // Column 0 has two modalities, Column 1 has three
> val categoricalFeaturesInfo = Map[Int, Int]((0, 2), (1, 3))
> val impurity = "variance"
> val maxDepth = 30
> val maxBins = 32
>
> val labeled = data.map(row => LabeledPoint(row.getDouble(targetIndex),
> Vectors.dense(featureIndices.map(row.getDouble(_)).toArray)))
>
> val model = DecisionTree.trainRegressor(labeled.rdd,
> categoricalFeaturesInfo, impurity, maxDepth, maxBins)
>
> println(model.toDebugString)
>
> This works quite well, but I want some information from the model, one of
> them being the features importance values. As it turns out, this is not
> available on DecisionTreeModel but is available on
> DecisionTreeRegressionModel from the ml package.
> I then discovered that the ml package is more recent than the mllib
> package which explains why it gives me more control over the trees I'm
> building.
> So, I tried to rewrite my sample code using the ml package and it is very
> much easier to use, no need for the LabeledPoint transformation. Here is
> the code I came up with:
>
> val dt = new DecisionTreeRegressor()
>   .setPredictionCol("Y")
>   .setImpurity("variance")
>   .setMaxDepth(30)
>   .setMaxBins(32)
>
> val model = dt.fit(data)
>
> println(model.toDebugString)
> println(model.featureImportances.toString)
>
> However, I cannot find a way to specify which columns are features, which
> ones are categorical and how many categories they have, like I used to do
> with the mllib package.
> I did look at the DecisionTreeRegressionExample.scala example found in
> the source package, but it uses a VectorIndexer to automatically discover
> the above information which is an unnecessary step in my case because I
> already have the information at hand.
>
> The documentation found online (http://spark.apache.org/docs/
> latest/api/scala/index.html#org.apache.spark.ml.regression.D
> ecisionTreeRegressor) did not help either because it does not indicate
> the format for the featuresCol string property.
>
> Thanks in advance for your help.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: LibSVM should have just one input file

2017-06-11 Thread Yan Facai
Hi, yaphet.
It seems that the code you pasted should be located in  LibSVM,  rather
than SVM.
Do I misunderstand?

For LibSVMDataSource,
1. if numFeatures is unspecified, only one file is valid input.

val df = spark.read.format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt")

2. otherwise, multiple files are OK.

val df = spark.read.format("libsvm")
  .option("numFeatures", "780")
  .load("data/mllib/sample_libsvm_data.txt")


For more to see: http://spark.apache.org/docs/latest/api/scala/index.html#
org.apache.spark.ml.source.libsvm.LibSVMDataSource


On Mon, Jun 12, 2017 at 11:46 AM, darion.yaphet  wrote:

> Hi team :
>
> Currently when we using SVM to train dataset we found the input
> files limit only one .
>
> the source code as following :
>
> val path = if (dataFiles.length == 1) {
> dataFiles.head.getPath.toUri.toString
> } else if (dataFiles.isEmpty) {
> throw new IOException("No input path specified for libsvm data")
> } else {
> throw new IOException("Multiple input paths are not supported for libsvm
> data.")
> }
>
> The file store on the Distributed File System such as HDFS is split into
> mutil piece and I think this limit is not necessary . I'm not sure is it a
> bug ? or something I'm using not correctly .
>
> thanks a lot ~~~
>
>
>
>


Re: Convert the feature vector to raw data

2017-06-07 Thread Yan Facai
Hi, kumar.

How about removing the `select` in your code?
namely,

Dataset result = model.transform(testData);
result.show(1000, false);




On Wed, Jun 7, 2017 at 5:00 PM, kundan kumar  wrote:

> I am using
>
> Dataset result = model.transform(testData).select("probability",
> "label","features");
>  result.show(1000, false);
>
> In this case the feature vector is being printed as output. Is there a way
> that my original raw data gets printed instead of the feature vector OR is
> there a way to reverse extract my raw data from the feature vector. All of
> the features that my dataset have is categorical in nature.
>
> Thanks,
> Kundan
>


Re: Adding header to an rdd before saving to text file

2017-06-05 Thread Yan Facai
Hi, upendra.
It will be easier to use DataFrame to read/save csv file with header, if
you'd like.

On Tue, Jun 6, 2017 at 5:15 AM, upendra 1991 
wrote:

> I am reading a CSV(file has headers header 1st,header2) and generating
> rdd,
> After few transformations I create an rdd and finally write it to a txt
> file.
>
> What's the best way to add the header from source file, into rdd and have
> it available as header into new file I.e, when I transform the rdd into
> textfile using saveAsTexFile("newfile") the header 1, header 2 shall be
> available.
>
>
> Thanks,
> Upendra
>


Re: Sharing my DataFrame (DataSet) cheat sheet.

2017-05-25 Thread Yan Facai
Thanks, Yuhao.
Similarly, I write a 10-minuters-to-spark-dataframe to share the code
snippets collected by myself.

+
https://github.com/facaiy/Spark-for-the-Impatient/blob/master/doc/10_minuters_to_spark_dataframe.md
+ https://facaiy.github.io/misc/2017/05/24/collection-of-spark-doc.html

I hope that is useful.


On Sun, Mar 5, 2017 at 4:55 AM, Yuhao Yang  wrote:

>
> Sharing some snippets I accumulated during developing with Apache Spark
> DataFrame (DataSet). Hope it can help you in some way.
>
> https://github.com/hhbyyh/DataFrameCheatSheet.
>
> [image: 内嵌图片 1]
>
>
>
>
>
> Regards,
> Yuhao Yang
>


Re: Could any one please tell me why this takes forever to finish?

2017-05-01 Thread Yan Facai
Hi,
10.x.x.x is private network, see https://en.wikipedia.org/wiki/IP_address.
You should use the public IP of your AWS.

On Sat, Apr 29, 2017 at 6:35 AM, Yuan Fang 
wrote:

>
> object SparkPi {
>   private val logger = Logger(this.getClass)
>
>   val sparkConf = new SparkConf()
> .setAppName("Spark Pi")
> .setMaster("spark://10.100.103.192:7077")
>
>   lazy val sc = new SparkContext(sparkConf)
>   sc.addJar("/Users/yfang/workspace/mcs/target/scala-2.11/
> root-assembly-0.1.0.jar")
>
>   def main(args: Array[String]) {
> val x = (1 to 4)
> val a = sc.parallelize(x)
> val mean = a.mean()
> print(mean)
>   }
> }
>
>
> spark://10.100.103.192:7077 is a remote standalone cluster I created on
> AWS.
> I ran it locally with IntelliJ.
> I can see the job is submitted. But the calculation can never finish.
>
> The log shows:
> 15:34:21.674 [Timer-0] WARN org.apache.spark.scheduler.TaskSchedulerImpl
> - Initial job has not accepted any resources; check your cluster UI to
> ensure that workers are registered and have sufficient resources
>
> Any help will be highly appreciated!
>
> Thanks!
>
> Yuan
>
>
> This message is intended exclusively for the individual or entity to which
> it is addressed. This communication may contain information that is
> proprietary, privileged or confidential or otherwise legally prohibited
> from disclosure. If you are not the named addressee, you are not authorized
> to read, print, retain, copy or disseminate this message or any part of it.
> If you have received this message in error, please notify the sender
> immediately by e-mail and delete all copies of the message.


Re: one hot encode a column of vector

2017-04-24 Thread Yan Facai
How about using countvectorizer?
http://spark.apache.org/docs/latest/ml-features.html#countvectorizer





On Tue, Apr 25, 2017 at 9:31 AM, Zeming Yu  wrote:

> how do I do one hot encode on a column of array? e.g. ['TG', 'CA']
>
>
> FYI here's my code for one hot encoding normal categorical columns. How do I 
> make it work for a column of array?
>
>
> from pyspark.ml import Pipeline
> from pyspark.ml.feature import StringIndexer
>
> indexers = [StringIndexer(inputCol=column, 
> outputCol=column+"_index").fit(flight3) for column in list(set['ColA', 
> 'ColB', 'ColC'])]
>
> pipeline = Pipeline(stages=indexers)
> flight4 = pipeline.fit(flight3).transform(flight3)
>
>
>
>


Re: how to add new column using regular expression within pyspark dataframe

2017-04-24 Thread Yan Facai
Don't use udf, as `minute` and `unix_timestamp` are native method of
spark.sql.


scala> df.withColumn("minute", minute(unix_timestamp($"str",
"HH'h'mm'm'").cast("timestamp"))).show





On Tue, Apr 25, 2017 at 7:55 AM, Zeming Yu <zemin...@gmail.com> wrote:

> I tried this, but doesn't seem to work. Do you know how ot fix it?
>
> def getMinutes(aString):
> return minute(unix_timestamp(aString, "HH'h'mm'm'").cast("timestamp"))
>
> udfGetMinutes = udf(getMinutes, IntegerType())
>
> flight2 = (flight2.withColumn('stop_duration1',
> udfGetMinutes(flight2.stop_duration1))
>   )
>
>
>
> On Sat, Apr 22, 2017 at 8:51 PM, 颜发才(Yan Facai) <facai@gmail.com>
> wrote:
>
>> Hi, Zeming.
>>
>> I prefer to convert String to DateTime, like this:
>>
>> scala> val df = Seq("15h10m", "17h0m", "21h25m").toDF("str")
>>
>> scala> val ts = unix_timestamp($"str", "HH'h'mm'm'").cast("timestamp")
>>
>> scala> df.withColumn("minute", minute(ts)).show
>> +--+--+
>> |   str|minute|
>> +--+--+
>> |15h10m|10|
>> | 17h0m |  0|
>> |21h25m|25|
>> +--+--+
>>
>>
>> By the way, check Date-time function section of API:
>> http://spark.apache.org/docs/latest/api/scala/index.html#org
>> .apache.spark.sql.functions$
>>
>>
>>
>>
>> On Sat, Apr 22, 2017 at 6:27 PM, Zeming Yu <zemin...@gmail.com> wrote:
>>
>>> Thanks a lot!
>>>
>>> Just another question, how can I extract the minutes as a number?
>>>
>>> I can use:
>>> .withColumn('duration_m',split(flight.duration,'h').getItem(1)
>>>
>>> to get strings like '10m'
>>>
>>> but how do I drop the charater "m" at the end? I can use substr(), but
>>> what's the function to get the length of the string so that I can do
>>> something like substr(1, len(...)-1)?
>>>
>>> On Thu, Apr 20, 2017 at 11:36 PM, Pushkar.Gujar <pushkarvgu...@gmail.com
>>> > wrote:
>>>
>>>> Can be as  simple as -
>>>>
>>>> from pyspark.sql.functions import split
>>>>
>>>> flight.withColumn('hour',split(flight.duration,'h').getItem(0))
>>>>
>>>>
>>>> Thank you,
>>>> *Pushkar Gujar*
>>>>
>>>>
>>>> On Thu, Apr 20, 2017 at 4:35 AM, Zeming Yu <zemin...@gmail.com> wrote:
>>>>
>>>>> Any examples?
>>>>>
>>>>> On 20 Apr. 2017 3:44 pm, "颜发才(Yan Facai)" <facai@gmail.com> wrote:
>>>>>
>>>>>> How about using `withColumn` and UDF?
>>>>>>
>>>>>> example:
>>>>>> + https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78
>>>>>> <https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78>
>>>>>> + https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu <zemin...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I've got a dataframe with a column looking like this:
>>>>>>>
>>>>>>> display(flight.select("duration").show())
>>>>>>>
>>>>>>> ++
>>>>>>> |duration|
>>>>>>> ++
>>>>>>> |  15h10m|
>>>>>>> |   17h0m|
>>>>>>> |  21h25m|
>>>>>>> |  14h30m|
>>>>>>> |  24h50m|
>>>>>>> |  26h10m|
>>>>>>> |  14h30m|
>>>>>>> |   23h5m|
>>>>>>> |  21h30m|
>>>>>>> |  11h50m|
>>>>>>> |  16h10m|
>>>>>>> |  15h15m|
>>>>>>> |  21h25m|
>>>>>>> |  14h25m|
>>>>>>> |  14h40m|
>>>>>>> |   16h0m|
>>>>>>> |  24h20m|
>>>>>>> |  14h30m|
>>>>>>> |  14h25m|
>>>>>>> |  14h30m|
>>>>>>> ++
>>>>>>> only showing top 20 rows
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I need to extr

Re: Spark-shell's performance

2017-04-20 Thread Yan Facai
Hi, Hanson.
Perhaps I’m digressing here.
If I'm wrong or mistake, please correct me.

SPARK_WORKER_* is the configuration for whole cluster, and it's fine to
write those global variable in spark-env.sh.
However,
SPARK_DRIVER_* and SPARK_EXECUTOR_* is the configuration for application
(your code), perhaps it's better to pass the argument to spark-shell
directly, like:
```bash
spark-shell --driver-memory 8G --executor-cores 4 --executor-memory 2G
```

Tuning the configuration for application is a good start, and passing them
to spark-shell directly is easier to test.

For more details see:
+ `spark-shell -h`
+ http://spark.apache.org/docs/latest/submitting-applications.html
+ http://spark.apache.org/docs/latest/spark-standalone.html




On Mon, Apr 17, 2017 at 6:18 PM, Richard Hanson  wrote:

> I am playing with some data using (stand alone) spark-shell (Spark version
> 1.6.0) by executing `spark-shell`. The flow is simple; a bit like cp -
> basically moving local 100k files (the max size is 190k) to S3. Memory is
> configured as below
>
>
> export SPARK_DRIVER_MEMORY=8192M
> export SPARK_WORKER_CORES=1
> export SPARK_WORKER_MEMORY=8192M
> export SPARK_EXECUTOR_CORES=4
> export SPARK_EXECUTOR_MEMORY=2048M
>
>
> But total time spent on moving those files to S3 took roughly 30 mins. The
> resident memory I found is roughly 3.820g (checking with top -p ).
> This seems to me there are still rooms to speed it up, though this is only
> for testing purpose. So I would like to know if any other parameters I can
> change to improve spark-shell's performance? Is the memory setup above
> correct?
>
>
> Thanks.
>


Re: how to add new column using regular expression within pyspark dataframe

2017-04-19 Thread Yan Facai
How about using `withColumn` and UDF?

example:
+ https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78

+ https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/



On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu  wrote:

> I've got a dataframe with a column looking like this:
>
> display(flight.select("duration").show())
>
> ++
> |duration|
> ++
> |  15h10m|
> |   17h0m|
> |  21h25m|
> |  14h30m|
> |  24h50m|
> |  26h10m|
> |  14h30m|
> |   23h5m|
> |  21h30m|
> |  11h50m|
> |  16h10m|
> |  15h15m|
> |  21h25m|
> |  14h25m|
> |  14h40m|
> |   16h0m|
> |  24h20m|
> |  14h30m|
> |  14h25m|
> |  14h30m|
> ++
> only showing top 20 rows
>
>
>
> I need to extract the hour as a number and store it as an additional
> column within the same dataframe. What's the best way to do that?
>
>
> I tried the following, but it failed:
>
> import re
> def getHours(x):
>   return re.match('([0-9]+(?=h))', x)
> temp = flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()
> temp.select("duration").show()
>
>
> error message:
>
>
> ---Py4JJavaError
>  Traceback (most recent call 
> last) in ()  2 def getHours(x):
>   3   return re.match('([0-9]+(?=h))', x)> 4 temp = 
> flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()  5 
> temp.select("duration").show()
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>  in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice', 
> age=1)] 56 """---> 57 return 
> sparkSession.createDataFrame(self, schema, sampleRatio) 58  59 
> RDD.toDF = toDF
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>  in createDataFrame(self, data, schema, samplingRatio, verifySchema)518   
>   519 if isinstance(data, RDD):--> 520 rdd, schema = 
> self._createFromRDD(data.map(prepare), schema, samplingRatio)521 
> else:522 rdd, schema = self._createFromLocal(map(prepare, 
> data), schema)
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>  in _createFromRDD(self, rdd, schema, samplingRatio)358 """
> 359 if schema is None or isinstance(schema, (list, tuple)):--> 360
>  struct = self._inferSchema(rdd, samplingRatio)361 
> converter = _create_converter(struct)362 rdd = 
> rdd.map(converter)
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>  in _inferSchema(self, rdd, samplingRatio)329 :return: 
> :class:`pyspark.sql.types.StructType`330 """--> 331 first 
> = rdd.first()332 if not first:333 raise 
> ValueError("The first row in RDD is empty, "
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py 
> in first(self)   1359 ValueError: RDD is empty   1360 """-> 
> 1361 rs = self.take(1)   1362 if rs:   1363 
> return rs[0]
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py 
> in take(self, num)   13411342 p = range(partsScanned, 
> min(partsScanned + numPartsToTry, totalParts))-> 1343 res = 
> self.context.runJob(self, takeUpToNumLeft, p)   13441345 
> items += res
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py
>  in runJob(self, rdd, partitionFunc, partitions, allowLocal)963 # 
> SparkContext#runJob.964 mappedRDD = 
> rdd.mapPartitions(partitionFunc)--> 965 port = 
> self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
> 966 return list(_load_from_socket(port, 
> mappedRDD._jrdd_deserializer))967
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py
>  in __call__(self, *args)   1131 answer = 
> self.gateway_client.send_command(command)   1132 return_value = 
> get_return_value(-> 1133 answer, self.gateway_client, 
> self.target_id, self.name)   11341135 for temp_arg in temp_args:
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\utils.py
>  in deco(*a, **kw) 61 def deco(*a, **kw): 62 try:---> 63  
>return f(*a, **kw) 64 except 
> py4j.protocol.Py4JJavaError as e: 65 s = 
> e.java_exception.toString()
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py
>  in get_return_value(answer, gateway_client, target_id, name)317  
>raise Py4JJavaError(318 "An error occurred 
> while calling 

Re: How to convert Spark MLlib vector to ML Vector?

2017-04-09 Thread Yan Facai
By the way, always try to use `ml`, instead of `mllib`.

import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.classification.RandomForestClassifier
or
import org.apache.spark.ml.regression.RandomForestRegressor


more details, see
http://spark.apache.org/docs/latest/ml-classification-regression.html.



On Mon, Apr 10, 2017 at 1:45 PM, 颜发才(Yan Facai) <facai@gmail.com> wrote:

> how about using
>
> val dataset = spark.read.format("libsvm")
>   .option("numFeatures", "780")
>   .load("data/mllib/sample_libsvm_data.txt")
>
> instead of
> val dataset = MLUtils.loadLibSVMFile(spark.sparkContext, "data/mnist.bz2")
>
>
>
>
>
> On Mon, Apr 10, 2017 at 11:19 AM, Ryan <ryan.hd@gmail.com> wrote:
>
>> you could write a udf using the asML method along with some type casting,
>> then apply the udf to data after pca.
>>
>> when using pipeline, that udf need to be wrapped in a customized
>> transformer, I think.
>>
>> On Sun, Apr 9, 2017 at 10:07 PM, Nick Pentreath <nick.pentre...@gmail.com
>> > wrote:
>>
>>> Why not use the RandomForest from Spark ML?
>>>
>>> On Sun, 9 Apr 2017 at 16:01, Md. Rezaul Karim <
>>> rezaul.ka...@insight-centre.org> wrote:
>>>
>>>> I have already posted this question to the StackOverflow
>>>> <http://stackoverflow.com/questions/43263942/how-to-convert-spark-mllib-vector-to-ml-vector>.
>>>> However, not getting any response from someone else. I'm trying to use
>>>> RandomForest algorithm for the classification after applying the PCA
>>>> technique since the dataset is pretty high-dimensional. Here's my source
>>>> code:
>>>>
>>>> import org.apache.spark.mllib.util.MLUtils
>>>> import org.apache.spark.mllib.tree.RandomForest
>>>> import org.apache.spark.mllib.tree.model.RandomForestModel
>>>> import org.apache.spark.mllib.regression.LabeledPoint
>>>> import org.apache.spark.ml.linalg.{Vectors, VectorUDT}
>>>> import org.apache.spark.sql._
>>>> import org.apache.spark.sql.SQLContext
>>>> import org.apache.spark.sql.SparkSession
>>>>
>>>> import org.apache.spark.ml.feature.PCA
>>>> import org.apache.spark.rdd.RDD
>>>>
>>>> object PCAExample {
>>>>   def main(args: Array[String]): Unit = {
>>>> val spark = SparkSession
>>>>   .builder
>>>>   .master("local[*]")
>>>>   .config("spark.sql.warehouse.dir", "E:/Exp/")
>>>>   .appName(s"OneVsRestExample")
>>>>   .getOrCreate()
>>>>
>>>> val dataset = MLUtils.loadLibSVMFile(spark.sparkContext, 
>>>> "data/mnist.bz2")
>>>>
>>>> val splits = dataset.randomSplit(Array(0.7, 0.3), seed = 12345L)
>>>> val (trainingData, testData) = (splits(0), splits(1))
>>>>
>>>> val sqlContext = new SQLContext(spark.sparkContext)
>>>> import sqlContext.implicits._
>>>> val trainingDF = trainingData.toDF("label", "features")
>>>>
>>>> val pca = new PCA()
>>>>   .setInputCol("features")
>>>>   .setOutputCol("pcaFeatures")
>>>>   .setK(100)
>>>>   .fit(trainingDF)
>>>>
>>>> val pcaTrainingData = pca.transform(trainingDF)
>>>> //pcaTrainingData.show()
>>>>
>>>> val labeled = pca.transform(trainingDF).rdd.map(row => LabeledPoint(
>>>>   row.getAs[Double]("label"),
>>>>   row.getAs[org.apache.spark.mllib.linalg.Vector]("pcaFeatures")))
>>>>
>>>> //val labeled = pca.transform(trainingDF).rdd.map(row => 
>>>> LabeledPoint(row.getAs[Double]("label"),
>>>> //  
>>>> Vector.fromML(row.getAs[org.apache.spark.ml.linalg.SparseVector]("features"
>>>>
>>>> val numClasses = 10
>>>> val categoricalFeaturesInfo = Map[Int, Int]()
>>>> val numTrees = 10 // Use more in practice.
>>>> val featureSubsetStrategy = "auto" // Let the algorithm choose.
>>>> val impurity = "gini"
>>>> val maxDepth = 20
>>>> val maxBins = 32
>>>>
>>>> val model = RandomForest.trainClassifier(labeled, numClasses, 
>>>> categoricalFeaturesInfo,
>>>>   numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
>>>>   }
>>>> }
>>>>
>>>> However, I'm getting the following error:
>>>>
>>>> *Exception in thread "main" java.lang.IllegalArgumentException:
>>>> requirement failed: Column features must be of type
>>>> org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually
>>>> org.apache.spark.mllib.linalg.VectorUDT@f71b0bce.*
>>>>
>>>> What am I doing wrong in my code?  Actually, I'm getting the above
>>>> exception in this line:
>>>>
>>>> val pca = new PCA()
>>>>   .setInputCol("features")
>>>>   .setOutputCol("pcaFeatures")
>>>>   .setK(100)
>>>>   .fit(trainingDF) /// GETTING EXCEPTION HERE
>>>>
>>>> Please, someone, help me to solve the problem.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Kind regards,
>>>> *Md. Rezaul Karim*
>>>>
>>>
>>
>


Re: How to convert Spark MLlib vector to ML Vector?

2017-04-09 Thread Yan Facai
how about using

val dataset = spark.read.format("libsvm")
  .option("numFeatures", "780")
  .load("data/mllib/sample_libsvm_data.txt")

instead of
val dataset = MLUtils.loadLibSVMFile(spark.sparkContext, "data/mnist.bz2")





On Mon, Apr 10, 2017 at 11:19 AM, Ryan  wrote:

> you could write a udf using the asML method along with some type casting,
> then apply the udf to data after pca.
>
> when using pipeline, that udf need to be wrapped in a customized
> transformer, I think.
>
> On Sun, Apr 9, 2017 at 10:07 PM, Nick Pentreath 
> wrote:
>
>> Why not use the RandomForest from Spark ML?
>>
>> On Sun, 9 Apr 2017 at 16:01, Md. Rezaul Karim <
>> rezaul.ka...@insight-centre.org> wrote:
>>
>>> I have already posted this question to the StackOverflow
>>> .
>>> However, not getting any response from someone else. I'm trying to use
>>> RandomForest algorithm for the classification after applying the PCA
>>> technique since the dataset is pretty high-dimensional. Here's my source
>>> code:
>>>
>>> import org.apache.spark.mllib.util.MLUtils
>>> import org.apache.spark.mllib.tree.RandomForest
>>> import org.apache.spark.mllib.tree.model.RandomForestModel
>>> import org.apache.spark.mllib.regression.LabeledPoint
>>> import org.apache.spark.ml.linalg.{Vectors, VectorUDT}
>>> import org.apache.spark.sql._
>>> import org.apache.spark.sql.SQLContext
>>> import org.apache.spark.sql.SparkSession
>>>
>>> import org.apache.spark.ml.feature.PCA
>>> import org.apache.spark.rdd.RDD
>>>
>>> object PCAExample {
>>>   def main(args: Array[String]): Unit = {
>>> val spark = SparkSession
>>>   .builder
>>>   .master("local[*]")
>>>   .config("spark.sql.warehouse.dir", "E:/Exp/")
>>>   .appName(s"OneVsRestExample")
>>>   .getOrCreate()
>>>
>>> val dataset = MLUtils.loadLibSVMFile(spark.sparkContext, 
>>> "data/mnist.bz2")
>>>
>>> val splits = dataset.randomSplit(Array(0.7, 0.3), seed = 12345L)
>>> val (trainingData, testData) = (splits(0), splits(1))
>>>
>>> val sqlContext = new SQLContext(spark.sparkContext)
>>> import sqlContext.implicits._
>>> val trainingDF = trainingData.toDF("label", "features")
>>>
>>> val pca = new PCA()
>>>   .setInputCol("features")
>>>   .setOutputCol("pcaFeatures")
>>>   .setK(100)
>>>   .fit(trainingDF)
>>>
>>> val pcaTrainingData = pca.transform(trainingDF)
>>> //pcaTrainingData.show()
>>>
>>> val labeled = pca.transform(trainingDF).rdd.map(row => LabeledPoint(
>>>   row.getAs[Double]("label"),
>>>   row.getAs[org.apache.spark.mllib.linalg.Vector]("pcaFeatures")))
>>>
>>> //val labeled = pca.transform(trainingDF).rdd.map(row => 
>>> LabeledPoint(row.getAs[Double]("label"),
>>> //  
>>> Vector.fromML(row.getAs[org.apache.spark.ml.linalg.SparseVector]("features"
>>>
>>> val numClasses = 10
>>> val categoricalFeaturesInfo = Map[Int, Int]()
>>> val numTrees = 10 // Use more in practice.
>>> val featureSubsetStrategy = "auto" // Let the algorithm choose.
>>> val impurity = "gini"
>>> val maxDepth = 20
>>> val maxBins = 32
>>>
>>> val model = RandomForest.trainClassifier(labeled, numClasses, 
>>> categoricalFeaturesInfo,
>>>   numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
>>>   }
>>> }
>>>
>>> However, I'm getting the following error:
>>>
>>> *Exception in thread "main" java.lang.IllegalArgumentException:
>>> requirement failed: Column features must be of type
>>> org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually
>>> org.apache.spark.mllib.linalg.VectorUDT@f71b0bce.*
>>>
>>> What am I doing wrong in my code?  Actually, I'm getting the above
>>> exception in this line:
>>>
>>> val pca = new PCA()
>>>   .setInputCol("features")
>>>   .setOutputCol("pcaFeatures")
>>>   .setK(100)
>>>   .fit(trainingDF) /// GETTING EXCEPTION HERE
>>>
>>> Please, someone, help me to solve the problem.
>>>
>>>
>>>
>>>
>>>
>>> Kind regards,
>>> *Md. Rezaul Karim*
>>>
>>
>


Re: Returning DataFrame for text file

2017-04-06 Thread Yan Facai
SparkSession.read returns a DataFrameReader.
DataFrameReader supports a series of format, such as csv, json, text as you
mentioned.

check API to find more details:
+ http://spark.apache.org/docs/latest/api/scala/index.html#org
.apache.spark.sql.SparkSession
+ http://spark.apache.org/docs/latest/api/scala/index.html#org
.apache.spark.sql.DataFrameReader




On Thu, Mar 30, 2017 at 2:58 AM, George Obama  wrote:

> Hi,
>
> I saw that the API, either R or Scala, we are returning DataFrame for
> sparkSession.read.text()
>
> What’s the rational behind this?
>
> Regards,
> George
>


Re: Master-Worker communication on Standalone cluster issues

2017-04-06 Thread Yan Facai
1. For worker and master:
spark.worker.timeout 60s
see: http://spark.apache.org/docs/latest/spark-standalone.html

2. For executor and driver:
spark.executor.heartbeatInterval 10s
see: http://spark.apache.org/docs/latest/configuration.html


Please correct me if I'm wrong.



On Thu, Apr 6, 2017 at 5:01 AM, map reduced  wrote:

> Hi,
>
> I was wondering on how often does Worker pings Master to check on Master's
> liveness? Or is it the Master (Resource manager) that pings Workers to
> check on their liveness and if any workers are dead to spawn ? Or is it
> both?
>
> Some info:
> Standalone cluster
> 1 Master - 8core 12Gb
> 32 workers - each 8 core and 8 Gb
>
> My main problem - Here's what happened:
>
> Master M - running with 32 workers
> Worker 1 and 2 died at 03:55:00 - so now the cluster is 30 workers
>
> Worker 1' came up at 03:55:12.000 AM - it connected to M
> Worker 2' came up at 03:55:16.000 AM - it connected to M
>
> Master M *dies* at 03:56.00 AM
> New master NM' comes up at 03:56:30 AM
> Worker 1' and 2' - *DO NOT* connect to NM
> Remaining 30 workers connect to NM.
>
> So NM now has 30 workers.
>
> I was wondering on why those two won't connect to new master NM even
> though master M is dead for sure.
>
> PS:I have a LB setup for Master which means that whenever a new master
> comes in LB will start pointing to new one.
>
> Thanks,
> KP
>
>


Re: Why chinese character gash appear when i use spark textFile?

2017-04-05 Thread Yan Facai
Perhaps your file is not utf-8.

I cannot reconstruct it.

### HADOOP:
~/Downloads ❯❯❯ hdfs -cat hdfs:///test.txt
17/04/06 13:43:58 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
1.0 862910025238798 100733314   18_百度输入法:100733314
8919173c6d49abfab02853458247e5841:129:18_百度输入法:1.0%

### SPARK
scala> val t = sc.textFile("hdfs:///test.txt")
t: org.apache.spark.rdd.RDD[String] = hdfs:///test.txt
MapPartitionsRDD[3] at textFile at :24

scala> t.first
res2: String = 1.0 862910025238798 100733314   18_百度输入法:100733314






On Thu, Apr 6, 2017 at 12:47 PM, Jone Zhang  wrote:

> var textFile = sc.textFile("xxx");
> textFile.first();
> res1: String = 1.0 100733314   18_?:100733314
> 8919173c6d49abfab02853458247e5841:129:18_?:1.0
>
>
> hadoop fs -cat xxx
> 1.0100733314   18_百度输入法:100733314 8919173c6d49abfab02853458247e584
>  1:129:18_百度输入法:1.0
>
> Why  chinese character gash appear when i use spark textFile?
> The code of hdfs file is utf-8.
>
>
> Thanks
>


Re: Read file and represent rows as Vectors

2017-04-05 Thread Yan Facai
You can try `mapPartitions` method.

example as below:
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#mapPartitions

On Mon, Apr 3, 2017 at 8:05 PM, Old-School 
wrote:

> I have a dataset that contains DocID, WordID and frequency (count) as shown
> below. Note that the first three numbers represent 1. the number of
> documents, 2. the number of words in the vocabulary and 3. the total number
> of words in the collection.
>
> 189
> 1430
> 12300
> 1 2 1
> 1 39 1
> 1 42 3
> 1 77 1
> 1 95 1
> 1 96 1
> 2 105 1
> 2 108 1
> 3 133 3
>
>
> What I want to do is to read the data (ignore the first three lines),
> combine the words per document and finally represent each document as a
> vector that contains the frequency of the wordID.
>
> Based on the above dataset the representation of documents 1, 2 and 3 will
> be (note that vocab_size can be extracted by the second line of the data):
>
> val data = Array(
> Vectors.sparse(vocab_size, Seq((2, 1.0), (39, 1.0), (42, 3.0), (77, 1.0),
> (95, 1.0), (96, 1.0))),
> Vectors.sparse(vocab_size, Seq((105, 1.0), (108, 1.0))),
> Vectors.sparse(vocab_size, Seq((133, 3.0
>
>
> The problem is that I am not quite sure how to read the .txt.gz file as RDD
> and create an Array of sparse vectors as described above. Please note that
> I
> actually want to pass the data array in the PCA transformer.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Read-file-and-represent-rows-as-Vectors-tp28562.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark data frame map problem

2017-03-23 Thread Yan Facai
Could you give more details of your code?



On Wed, Mar 22, 2017 at 2:40 AM, Shashank Mandil 
wrote:

> Hi All,
>
> I have a spark data frame which has 992 rows inside it.
> When I run a map on this data frame I expect that the map should work for
> all the 992 rows.
>
> As a mapper runs on an executor on  a cluster I did a distributed count of
> the number of rows the mapper is being run on.
>
> dataframe.map(r => {
>//distributed count inside here using zookeeper
> })
>
> I have found that this distributed count inside the mapper is not exactly
> 992. I have found this number to vary with different runs.
>
> Does anybody have any idea what might be happening ? By the way, I am
> using spark 1.6.1
>
> Thanks,
> Shashank
>
>


Re: How does preprocessing fit into Spark MLlib pipeline

2017-03-20 Thread Yan Facai
SQLTransformer is a good solution if all operators are combined with SQL.

By the way,
if you like to get hands dirty,
writing a Transformer in scala is not hard,
and multiple output columns is valid in such case.




On Fri, Mar 17, 2017 at 9:10 PM, Yanbo Liang  wrote:

> Hi Adrian,
>
> Did you try SQLTransformer? Your preprocessing steps are SQL operations
> and can be handled by SQLTransformer in MLlib pipeline scope.
>
> Thanks
> Yanbo
>
> On Thu, Mar 9, 2017 at 11:02 AM, aATv  wrote:
>
>> I want to start using PySpark Mllib pipelines, but I don't understand
>> how/where preprocessing fits into the pipeline.
>>
>> My preprocessing steps are generally in the following form:
>>1) Load log files(from s3) and parse into a spark Dataframe with
>> columns
>> user_id, event_type, timestamp, etc
>>2) Group by a column, then pivot and count another column
>>   - e.g. df.groupby("user_id").pivot("event_type").count()
>>   - We can think of the columns that this creates besides user_id as
>> features, where the number of each event type is a different feature
>>3) Join the data from step 1 with other metadata, usually stored in
>> Cassandra. Then perform a transformation similar to one from step 2),
>> where
>> the column that is pivoted and counted is a column that came from the data
>> stored in Cassandra.
>>
>> After this preprocessing, I would use transformers to create other
>> features
>> and feed it into a model, lets say Logistic Regression for example.
>>
>> I would like to make at lease step 2 a custom transformer and add that to
>> a
>> pipeline, but it doesn't fit the transformer abstraction. This is because
>> it
>> takes a single input column and outputs multiple columns.  It also has a
>> different number of input rows than output rows due to the group by
>> operation.
>>
>> Given that, how do I fit this into a Mllib pipeline, and it if doesn't fit
>> as part of a pipeline, what is the best way to include it in my code so
>> that
>> it can easily be reused both for training and testing, as well as in
>> production.
>>
>> I'm using pyspark 2.1 and here is an example of 2)
>>
>>
>>
>>
>> Note: My question is in some way related to this question, but I don't
>> think
>> it is answered here:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/
>> Why-can-t-a-Transformer-have-multiple-output-columns-td18689.html
>> > Why-can-t-a-Transformer-have-multiple-output-columns-td18689.html>
>>
>> Thanks
>> Adrian
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/How-does-preprocessing-fit-into-Spark-
>> MLlib-pipeline-tp28473.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: how to retain part of the features in LogisticRegressionModel (spark2.0)

2017-03-20 Thread Yan Facai
Hi, jinhong.
Do you use `setRegParam`, which is 0.0 by default ?


Both elasticNetParam and regParam are required if regularization is need.

val regParamL1 = $(elasticNetParam) * $(regParam)
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)




On Mon, Mar 20, 2017 at 6:31 PM, Yanbo Liang  wrote:

> Do you want to get sparse model that most of the coefficients are zeros?
> If yes, using L1 regularization leads to sparsity. But the
> LogisticRegressionModel coefficients vector's size is still equal with the
> number of features, you can get the non-zero elements manually. Actually,
> it would be a sparse vector (or matrix for multinomial case) if it's sparse
> enough.
>
> Thanks
> Yanbo
>
> On Sun, Mar 19, 2017 at 5:02 AM, Dhanesh Padmanabhan <
> dhanesh12...@gmail.com> wrote:
>
>> It shouldn't be difficult to convert the coefficients to a sparse vector.
>> Not sure if that is what you are looking for
>>
>> -Dhanesh
>>
>> On Sun, Mar 19, 2017 at 5:02 PM jinhong lu  wrote:
>>
>> Thanks Dhanesh,  and how about the features question?
>>
>> 在 2017年3月19日,19:08,Dhanesh Padmanabhan  写道:
>>
>> Dhanesh
>>
>>
>> Thanks,
>> lujinhong
>>
>> --
>> Dhanesh
>> +91-9741125245 <+91%2097411%2025245>
>>
>
>


Re: How to improve performance of saveAsTextFile()

2017-03-11 Thread Yan Facai
How about increasing RDD's partitions / rebalancing data?

On Sat, Mar 11, 2017 at 2:33 PM, Parsian, Mahmoud 
wrote:

> How to improve performance of JavaRDD.saveAsTextFile(“hdfs://…“).
> This is taking over 30 minutes on a cluster of 10 nodes.
> Running Spark on YARN.
>
> JavaRDD has 120 million entries.
>
> Thank you,
> Best regards,
> Mahmoud
>


Re: org.apache.spark.SparkException: Task not serializable

2017-03-11 Thread Yan Facai
For scala,
make your class Serializable, like this
```
class YourClass




*extends Serializable {}```*

On Sat, Mar 11, 2017 at 3:51 PM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

> hi mina,
>
> can you paste your new code here pleasel
> i meet this issue too but do not get Ankur's idea.
>
> thanks
> Robin
>
> ---Original---
> *From:* "Mina Aslani"
> *Date:* 2017/3/7 05:32:10
> *To:* "Ankur Srivastava";
> *Cc:* "user@spark.apache.org";
> *Subject:* Re: org.apache.spark.SparkException: Task not serializable
>
> Thank you Ankur for the quick response, really appreciate it! Making the
> class serializable resolved the exception!
>
> Best regards,
> Mina
>
> On Mon, Mar 6, 2017 at 4:20 PM, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
>> The fix for this make your class Serializable. The reason being the
>> closures you have defined in the class need to be serialized and copied
>> over to all executor nodes.
>>
>> Hope this helps.
>>
>> Thanks
>> Ankur
>>
>> On Mon, Mar 6, 2017 at 1:06 PM, Mina Aslani  wrote:
>>
>>> Hi,
>>>
>>> I am trying to start with spark and get number of lines of a text file in 
>>> my mac, however I get
>>>
>>> org.apache.spark.SparkException: Task not serializable error on
>>>
>>> JavaRDD logData = javaCtx.textFile(file);
>>>
>>> Please see below for the sample of code and the stackTrace.
>>>
>>> Any idea why this error is thrown?
>>>
>>> Best regards,
>>>
>>> Mina
>>>
>>> System.out.println("Creating Spark Configuration");
>>> SparkConf javaConf = new SparkConf();
>>> javaConf.setAppName("My First Spark Java Application");
>>> javaConf.setMaster("PATH to my spark");
>>> System.out.println("Creating Spark Context");
>>> JavaSparkContext javaCtx = new JavaSparkContext(javaConf);
>>> System.out.println("Loading the Dataset and will further process it");
>>> String file = "file:///file.txt";
>>> JavaRDD logData = javaCtx.textFile(file);
>>>
>>> long numLines = logData.filter(new Function() {
>>>public Boolean call(String s) {
>>>   return true;
>>>}
>>> }).count();
>>>
>>> System.out.println("Number of Lines in the Dataset "+numLines);
>>>
>>> javaCtx.close();
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Task not 
>>> serializable
>>> at 
>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>>> at 
>>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
>>> at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
>>> at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
>>> at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>> at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
>>> at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78)
>>>
>>>
>>
>


Re: Sharing my DataFrame (DataSet) cheat sheet.

2017-03-05 Thread Yan Facai
Thanks,
very useful!

On Sun, Mar 5, 2017 at 4:55 AM, Yuhao Yang  wrote:

>
> Sharing some snippets I accumulated during developing with Apache Spark
> DataFrame (DataSet). Hope it can help you in some way.
>
> https://github.com/hhbyyh/DataFrameCheatSheet.
>
> [image: 内嵌图片 1]
>
>
>
>
>
> Regards,
> Yuhao Yang
>


Re: attempting to map Dataset[Row]

2017-02-27 Thread Yan Facai
Hi, Fletcher.
case class can help construct complex structure.
and also,  RDD, StructType and StructureField are helpful if you need.

However,
the code is a little confusing,

source.map{ row => {
  val key = row(0)
  val buff = new ArrayBuffer[Row]()
  buff += row
  (key,buff)
   }
}

The expected result is (row[0], row), right?
Would you like to explain its purpose?








On Sun, Feb 26, 2017 at 8:36 PM, Stephen Fletcher <
stephen.fletc...@gmail.com> wrote:

> sorry here's the whole code
>
> val source = spark.read.format("parquet").load("/emrdata/sources/very_lar
> ge_ds")
>
> implicit val mapEncoder = org.apache.spark.sql.Encoders.
> kryo[(Any,ArrayBuffer[Row])]
>
> source.map{ row => {
>   val key = row(0)
>   val buff = new ArrayBuffer[Row]()
>   buff += row
>   (key,buff)
>}
> }
>
> ...
>
> On Sun, Feb 26, 2017 at 7:31 AM, Stephen Fletcher <
> stephen.fletc...@gmail.com> wrote:
>
>> I'm attempting to perform a map on a Dataset[Row] but getting an error on
>> decode when attempting to pass a custom encoder.
>>  My code looks similar to the following:
>>
>>
>> val source = spark.read.format("parquet").load("/emrdata/sources/very_lar
>> ge_ds")
>>
>>
>>
>> source.map{ row => {
>>   val key = row(0)
>>
>>}
>> }
>>
>
>


Re: Spark runs out of memory with small file

2017-02-26 Thread Yan Facai
Hi, Tremblay.
Your file is .gz format, which is not splittable for hadoop. Perhaps the
file is loaded by only one executor.
How many executors do you start?
Perhaps repartition method could solve it, I guess.


On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay 
wrote:

> I am reading in a single small file from hadoop with wholeText. If I
> process each line and create a row with two cells, the first cell equal to
> the name of the file, the second cell equal to the line. That code runs
> fine.
>
> But if I just add two line of code and change the first cell based on
> parsing a line, spark runs out of memory. Any idea why such a simple
> process that would succeed quickly in a non spark application fails?
>
> Thanks!
>
> Henry
>
> CODE:
>
> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
> 3816096 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.
> internal.warc.gz
>
>
> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
> In [2]: rdd1.count()
> Out[2]: 1
>
>
> In [4]: def process_file(s):
>...: text = s[1]
>...: the_id = s[0]
>...: d = {}
>...: l =  text.split("\n")
>...: final = []
>...: for line in l:
>...: d[the_id] = line
>...: final.append(Row(**d))
>...: return final
>...:
>
> In [5]: rdd2 = rdd1.map(process_file)
>
> In [6]: rdd2.count()
> Out[6]: 1
>
> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>
> In [8]: rdd3.count()
> Out[8]: 508310
>
> In [9]: rdd3.take(1)
> Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/
> mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.
> internal.warc.gz='WARC/1.0\r')]
>
> In [10]: def process_file(s):
> ...: text = s[1]
> ...: d = {}
> ...: l =  text.split("\n")
> ...: final = []
> ...: the_id = "init"
> ...: for line in l:
> ...: if line[0:15] == 'WARC-Record-ID:':
> ...: the_id = line[15:]
> ...: d[the_id] = line
> ...: final.append(Row(**d))
> ...: return final
>
> In [12]: rdd2 = rdd1.map(process_file)
>
> In [13]: rdd2.count()
> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN for
> exceeding memory limits. 10.3 GB of 10.3 GB physical memory used. Consider
> boosting spark.yarn.executor.memoryOverhead.
> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOver
> head.
> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5,
> ip-172-31-41-89.us-west-2.compute.internal, executor 5):
> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
> 10.3 GB physical memory used. Consider boosting
> spark.yarn.executor.memoryOverhead.
>
>
> --
> Henry Tremblay
> Robert Half Technology
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: question on SPARK_WORKER_CORES

2017-02-18 Thread Yan Facai
Hi, kodali.

SPARK_WORKER_CORES is designed for cluster resource manager, see
http://spark.apache.org/docs/latest/cluster-overview.html if interested.

For standalone mode,
you should use the following 3 arguments to allocate resource for normal
spark tasks:

   - --executor-memory
   - --executor-cores
   - --total-executor-cores

and the meaning is as below:

   - Executor memory: --executor-memory
   - Executor cores: --executor-cores
   - Number of executors: --total-executor-cores/--executor-cores

more details see
http://spark.apache.org/docs/latest/submitting-applications.html.




On Sat, Feb 18, 2017 at 9:20 AM, kant kodali  wrote:

> Hi Satish,
>
> I am using spark 2.0.2.  And no I have not passed those variables because
> I didn't want to shoot in the dark. According to the documentation it looks
> like SPARK_WORKER_CORES is the one which should do it. If not, can you
> please explain how these variables inter play together?
>
> --num-executors
> --executor-cores
> –total-executor-cores
> SPARK_WORKER_CORES
>
> Thanks!
>
>
> On Fri, Feb 17, 2017 at 5:13 PM, Satish Lalam 
> wrote:
>
>> Have you tried passing --executor-cores or –total-executor-cores as
>> arguments, , depending on the spark version?
>>
>>
>>
>>
>>
>> *From:* kant kodali [mailto:kanth...@gmail.com]
>> *Sent:* Friday, February 17, 2017 5:03 PM
>> *To:* Alex Kozlov 
>> *Cc:* user @spark 
>> *Subject:* Re: question on SPARK_WORKER_CORES
>>
>>
>>
>> Standalone.
>>
>>
>>
>> On Fri, Feb 17, 2017 at 5:01 PM, Alex Kozlov  wrote:
>>
>> What Spark mode are you running the program in?
>>
>>
>>
>> On Fri, Feb 17, 2017 at 4:55 PM, kant kodali  wrote:
>>
>> when I submit a job using spark shell I get something like this
>>
>>
>>
>> [Stage 0:>(36814 + 4) / 220129]
>>
>>
>>
>> Now all I want is I want to increase number of parallel tasks running
>> from 4 to 16 so I exported an env variable called SPARK_WORKER_CORES=16 in
>> conf/spark-env.sh. I though that should do it but it doesn't. It still
>> shows me 4. any idea?
>>
>>
>>
>> Thanks much!
>>
>>
>>
>>
>>
>> --
>>
>> Alex Kozlov
>> (408) 507-4987
>> (650) 887-2135 efax
>> ale...@gmail.com
>>
>>
>>
>
>


Re: Query data in subdirectories in Hive Partitions using Spark SQL

2017-02-17 Thread Yan Facai
Hi, Abdelfatah,
How to you read these files? spark.read.parquet or spark.sql?
Could you show some code?


On Wed, Feb 15, 2017 at 8:47 PM, Ahmed Kamal Abdelfatah <
ahmed.abdelfa...@careem.com> wrote:

> Hi folks,
>
>
>
> How can I force spark sql to recursively get data stored in parquet format
> from subdirectories ?  In Hive, I could achieve this by setting few Hive
> configs.
>
>
>
> set hive.input.dir.recursive=true;
>
> set hive.mapred.supports.subdirectories=true;
>
> set hive.supports.subdirectories=true;
>
> set mapred.input.dir.recursive=true;
>
>
>
> I tried to set these configs through spark sql queries but I get 0 records
> all the times compared to hive which get me the expected results. I also
> put these confs in hive-site.xml file but nothing changed. How can I handle
> this issue ?
>
>
>
> Spark Version : 2.1.0
>
> I used Hive 2.1.1  on emr-5.3.1
>
>
>
> *Regards, *
>
>
>
>
> *Ahmed Kamal*
> *MTS in Data Science*
>
> *Email: **ahmed.abdelfa...@careem.com *
>
>
>
>
>


Re: How to specify default value for StructField?

2017-02-17 Thread Yan Facai
I agree with Yong Zhang,
perhaps spark sql with hive could solve the problem:

http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables




On Thu, Feb 16, 2017 at 12:42 AM, Yong Zhang  wrote:

> If it works under hive, do you try just create the DF from Hive table
> directly in Spark? That should work, right?
>
>
> Yong
>
>
> --
> *From:* Begar, Veena 
> *Sent:* Wednesday, February 15, 2017 10:16 AM
> *To:* Yong Zhang; smartzjp; user@spark.apache.org
>
> *Subject:* RE: How to specify default value for StructField?
>
>
> Thanks Yong.
>
>
>
> I know about merging the schema option.
>
> Using Hive we can read AVRO files having different schemas. And also we
> can do the same in Spark also.
>
> Similarly we can read ORC files having different schemas in Hive. But, we
> can’t do the same in Spark using dataframe. How we can do it using
> dataframe?
>
>
>
> Thanks.
>
> *From:* Yong Zhang [mailto:java8...@hotmail.com]
> *Sent:* Tuesday, February 14, 2017 8:31 PM
> *To:* Begar, Veena ; smartzjp ;
> user@spark.apache.org
> *Subject:* Re: How to specify default value for StructField?
>
>
>
> You maybe are looking for something like "spark.sql.parquet.mergeSchema"
> for ORC. Unfortunately, I don't think it is available, unless someone tells
> me I am wrong.
>
>
> You can create a JIRA to request this feature, but we all know that
> Parquet is the first citizen format [image: ]
>
>
>
> Yong
>
>
> --
>
> *From:* Begar, Veena 
> *Sent:* Tuesday, February 14, 2017 10:37 AM
> *To:* smartzjp; user@spark.apache.org
> *Subject:* RE: How to specify default value for StructField?
>
>
>
> Thanks, it didn't work. Because, the folder has files from 2 different
> schemas.
> It fails with the following exception:
> org.apache.spark.sql.AnalysisException: cannot resolve '`f2`' given input
> columns: [f1];
>
>
> -Original Message-
> From: smartzjp [mailto:zjp_j...@163.com ]
> Sent: Tuesday, February 14, 2017 10:32 AM
> To: Begar, Veena ; user@spark.apache.org
> Subject: Re: How to specify default value for StructField?
>
> You can try the below code.
>
> val df = spark.read.format("orc").load("/user/hos/orc_files_test_
> together")
> df.select(“f1”,”f2”).show
>
>
>
>
>
> 在 2017/2/14 
> 

Re: How to convert RDD to DF for this case -

2017-02-17 Thread Yan Facai
Hi, Basu,
if all columns is separated by delimter "\t", csv parser might be a better
choice.
for example:

```scala
spark.read
 .option("sep", "\t")
 .option("header", fasle)
 .option("inferSchema", true)
 .csv("/user/root/spark_demo/scala/data/Stations.txt")
```
More details in [DataFrameReader API](
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader
)

Then we get two DataFrame,
you can register them and use sql to join.





On Fri, Feb 17, 2017 at 10:33 PM, Aakash Basu 
wrote:

> Hey Chris,
>
> Thanks for your quick help. Actually the dataset had issues, otherwise the
> logic I implemented was not wrong.
>
> I did this -
>
> 1)  *V.Imp *– Creating row by segregating columns after reading the
> tab delimited file before converting into DF=
>
> val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
> x.split("\t")(2).toInt, x.split("\t")(3).toInt))
>
>
>
> Do a take to see if it throws an error or not (this step is just for
> ensuring if everything is going fine (as it is a lazy execution, that’s
> why)=
>
> stati.take(2)
>
> *Ans:* res8: Array[(String, String, Int, Int)] = Array((uihgf,Pune,56,5),
> (asfsds,***,43,1))
>
> If this comes out, it means it is working fine. We can proceed.
>
> 2)  *V.Imp* - Now converting into DF=
>
> val station = stati.toDF("StationKey","StationName","Temparature","Station
> ID")
>
>
>
> Now doing a show to see how it looks like=
>
> station.show
>
> *Ans:*
>
> * +--+---+---+-+*
>
> *|StationKey|StationName|Temparature|StationID|*
>
> *+--+---+---+-+*
>
> *| uihgf|   Pune| 56|5|*
>
> *|asfsds|***| 43|1|*
>
> *|fkwsdf| Mumbai| 45|6|*
>
> *|  gddg|   ABCD| 32|2|*
>
> *| grgzg| *CSD**| 35|3|*
>
> *| gsrsn| Howrah| 22|4|*
>
> *| ffafv|***| 34|7|*
>
> *+--+---+---+-+*
>
>
>
> 3)  Do the same for the other dataset -
>
> i) val storr = stor.map(p => (p.split("\t")(0).toInt,
> p.split("\t")(1), p.split("\t")(2).toInt, p.split("\t")(3)))
>
> ii)storr.take(2)
>
> iii)   val storm = storr.toDF("ID","Name","Temp","Code")
>
> iv)   storm.show
>
>
>
>
>
> 4)  Registering as table=
>
>  val stations2 = station.registerTempTable("Stations")
>
> val storms2 = storm.registerTempTable("Storms")
>
>
>
> 5)  Querying on the joinedDF as per requirements=
>
> val joinedDF = sqlContext.sql("Select Stations.StationName as StationName,
> Stations.StationID as StationID from Stations inner join Storms on
> Storms.Code = Stations.StationKey where Stations.Temparature > 35")
>
>
>
> 6)  joinedDF.show
>
> +---+-+
>
> |StationName|StationID|
>
> +---+-+
>
> |   Pune|5|
>
> +---+-+
>
> 7)  Saving the file as CSV=
>
> joinedDF.coalesce(1).rdd.map(_.mkString(",")).saveAsTextFile
> ("/user/root/spark_demo/scala/data/output/Question6Soln")
>
>
>
> Thanks,
>
> Aakash.
>
> On Fri, Feb 17, 2017 at 4:17 PM, Christophe Préaud <
> christophe.pre...@kelkoo.com> wrote:
>
>> Hi Aakash,
>>
>> You can try this:
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.types.{StringType, StructField, StructType}
>>
>> val header = Array("col1", "col2", "col3", "col4")
>> val schema = StructType(header.map(StructField(_, StringType, true)))
>>
>> val statRow = stat.map(line => Row(line.split("\t"):_*))
>> val df = spark.createDataFrame(statRow, schema)
>>
>> df.show
>> +--+--+++
>> |  col1|  col2|col3|col4|
>> +--+--+++
>> | uihgf| Paris|  56|   5|
>> |asfsds|   ***|  43|   1|
>> |fkwsdf|London|  45|   6|
>> |  gddg|  ABCD|  32|   2|
>> | grgzg|  *CSD|  35|   3|
>> | gsrsn|  ADR*|  22|   4|
>> +--+--+++
>>
>> Please let me know if this works for you.
>>
>> Regards,
>> Christophe.
>>
>>
>> On 17/02/17 10:37, Aakash Basu wrote:
>>
>> Hi all,
>>
>>
>> Without using case class I tried making a DF to work on the join and
>> other filtration later. But I'm getting an ArrayIndexOutOfBoundException
>> error while doing a show of the DF.
>>
>>
>> 1)  Importing SQLContext=
>>
>> import org.apache.spark.sql.SQLContext._
>>
>> import org.apache.spark.sql.SQLContext
>>
>>
>>
>> 2)  Initializing SQLContext=
>>
>> val sqlContext = new SQLContext(sc)
>>
>>
>>
>> 3)  Importing implicits package for toDF conversion=
>>
>> import sqlContext.implicits._
>>
>>
>>
>> 4)  Reading the Station and Storm Files=
>>
>> val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
>>
>> val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")
>>
>>
>>
>>
>>
>> stat.foreach(println)
>>
>>
>> uihgf   Paris   56   5
>>
>> asfsds   ***   43   

Re: is dataframe thread safe?

2017-02-12 Thread Yan Facai
DataFrame is immutable, so it should be thread safe, right?

On Sun, Feb 12, 2017 at 6:45 PM, Sean Owen  wrote:

> No this use case is perfectly sensible. Yes it is thread safe.
>
>
> On Sun, Feb 12, 2017, 10:30 Jörn Franke  wrote:
>
>> I think you should have a look at the spark documentation. It has
>> something called scheduler who does exactly this. In more sophisticated
>> environments yarn or mesos do this for you.
>>
>> Using threads for transformations does not make sense.
>>
>> On 12 Feb 2017, at 09:50, Mendelson, Assaf 
>> wrote:
>>
>> I know spark takes care of executing everything in a distributed manner,
>> however, spark also supports having multiple threads on the same spark
>> session/context and knows (Through fair scheduler) to distribute the tasks
>> from them in a round robin.
>>
>>
>>
>> The question is, can those two actions (with a different set of
>> transformations) be applied to the SAME dataframe.
>>
>>
>>
>> Let’s say I want to do something like:
>>
>>
>>
>>
>>
>>
>>
>> Val df = ???
>>
>> df.cache()
>>
>> df.count()
>>
>>
>>
>> def f1(df: DataFrame): Unit = {
>>
>>   val df1 = df.groupby(something).agg(some aggs)
>>
>>   df1.write.parquet(“some path”)
>>
>> }
>>
>>
>>
>> def f2(df: DataFrame): Unit = {
>>
>>   val df2 = df.groupby(something else).agg(some different aggs)
>>
>>   df2.write.parquet(“some path 2”)
>>
>> }
>>
>>
>>
>> f1(df)
>>
>> f2(df)
>>
>>
>>
>> df.unpersist()
>>
>>
>>
>> if the aggregations do not use the full cluster (e.g. because of data
>> skewness, because there aren’t enough partitions or any other reason) then
>> this would leave the cluster under utilized.
>>
>>
>>
>> However, if I would call f1 and f2 on different threads, then df2 can use
>> free resources f1 has not consumed and the overall utilization would
>> improve.
>>
>>
>>
>> Of course, I can do this only if the operations on the dataframe are
>> thread safe. For example, if I would do a cache in f1 and an unpersist in
>> f2 I would get an inconsistent result. So my question is, what, if any are
>> the legal operations to use on a dataframe so I could do the above.
>>
>>
>>
>> Thanks,
>>
>> Assaf.
>>
>>
>>
>> *From:* Jörn Franke [mailto:jornfra...@gmail.com ]
>> *Sent:* Sunday, February 12, 2017 10:39 AM
>> *To:* Mendelson, Assaf
>> *Cc:* user
>> *Subject:* Re: is dataframe thread safe?
>>
>>
>>
>> I am not sure what you are trying to achieve here. Spark is taking care
>> of executing the transformations in a distributed fashion. This means you
>> must not use threads - it does not make sense. Hence, you do not find
>> documentation about it.
>>
>>
>> On 12 Feb 2017, at 09:06, Mendelson, Assaf 
>> wrote:
>>
>> Hi,
>>
>> I was wondering if dataframe is considered thread safe. I know the spark
>> session and spark context are thread safe (and actually have tools to
>> manage jobs from different threads) but the question is, can I use the same
>> dataframe in both threads.
>>
>> The idea would be to create a dataframe in the main thread and then in
>> two sub threads do different transformations and actions on it.
>>
>> I understand that some things might not be thread safe (e.g. if I
>> unpersist in one thread it would affect the other. Checkpointing would
>> cause similar issues), however, I can’t find any documentation as to what
>> operations (if any) are thread safe.
>>
>>
>>
>> Thanks,
>>
>> Assaf.
>>
>>
>>
>>


Re: Kryo On Spark 1.6.0

2017-01-14 Thread Yan Facai
For scala, you could fix it by using:
conf.registerKryoClasses(Array(Class.forName("scala.collection.mutable.
WrappedArray$ofRef")))


By the way,
if the class is array of primitive class of Java, say byte[], then to use:
Class.forName("[B")

if it is array of other class, say scala.collection.mutable.WrappedArray$ofRef,
then to use:
Class.forName("[Lscala.collection.mutable.WrappedArray$ofRef")

ref:
https://docs.oracle.com/javase/8/docs/api/java/lang/Class.html#getName--





On Tue, Jan 10, 2017 at 11:11 PM, Yang Cao  wrote:

> If you don’t mind, could please share me with the scala solution? I tried
> to use kryo but seamed not work at all. I hope to get some practical
> example. THX
>
> On 2017年1月10日, at 19:10, Enrico DUrso  wrote:
>
> Hi,
>
> I am trying to use Kryo on Spark 1.6.0.
> I am able to register my own classes and it works, but when I set
> “spark.kryo.registrationRequired “ to true, I get an error about a scala
> class:
> “Class is not registered: scala.collection.mutable.WrappedArray$ofRef”.
>
> Any of you has already solved this issue in Java? I found the code to
> solve it in Scala, but unable to register this class in Java.
>
> Cheers,
>
> enrico
>
> --
>
> CONFIDENTIALITY WARNING.
> This message and the information contained in or attached to it are
> private and confidential and intended exclusively for the addressee. everis
> informs to whom it may receive it in error that it contains privileged
> information and its use, copy, reproduction or distribution is prohibited.
> If you are not an intended recipient of this E-mail, please notify the
> sender, delete it and do not read, act upon, print, disclose, copy, retain
> or redistribute any portion of this E-mail.
>
>
>


Re: Spark 2.0.2, KyroSerializer, double[] is not registered.

2017-01-08 Thread Yan Facai
Hi, Owen,
it is fixed after registering manually:
conf.registerKryoClasses(Array(Class.forName("[D")))


I believe that Kyro (latest version) have supported double[] :
addDefaultSerializer(double[].class, DoubleArraySerializer.class);


Why does it break in spark?



On Sun, Jan 8, 2017 at 6:03 PM, Sean Owen  wrote:

> Double[] is not of the same class as double[]. Kryo should already know
> how to serialize double[], but I doubt Double[] is registered.
>
> The error does seem to clearly indicate double[] though. That surprises
> me.  Can you try manually registering it to see if that fixes it?
> But then I'm not sure why tests wouldn't catch this.
>
> On Sun, Jan 8, 2017 at 7:30 AM smartzjp  wrote:
>
>> You can have a try the following code.
>>
>> ObjectArraySerializer serializer = new ObjectArraySerializer(kryo, Double
>> [].class);
>> kryo.register(Double[].class, serializer);
>>
>>
>> ---
>>
>> Hi, all.
>> I enable kyro in spark with spark-defaults.conf:
>>  spark.serializer org.apache.spark.serializer.
>> KryoSerializer
>>  spark.kryo.registrationRequired  true
>>
>> A KryoException is raised when a logistic regression algorithm is running:
>>  Note: To register this class use: kryo.register(double[].class);
>>  Serialization trace:
>>  currL1 (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer)
>> at com.esotericsoftware.kryo.serializers.FieldSerializer$
>> ObjectField.write(FieldSerializer.java:585)
>> at com.esotericsoftware.kryo.serializers.FieldSerializer.
>> write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.
>> java:568)
>> at com.twitter.chill.Tuple2Serializer.write(
>> TupleSerializers.scala:36)
>> at com.twitter.chill.Tuple2Serializer.write(
>> TupleSerializers.scala:33)
>>at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.
>> java:568)
>>
>> My question is:
>> Doesn't double[].class be supported by default?
>>
>> Thanks.
>>
>>


Spark 2.0.2, KyroSerializer, double[] is not registered.

2017-01-07 Thread Yan Facai
Hi, all.
I enable kyro in spark with spark-defaults.conf:
 spark.serializer
org.apache.spark.serializer.KryoSerializer
 spark.kryo.registrationRequired  true

A KryoException is raised when a logistic regression algorithm is running:
 Note: To register this class use: kryo.register(double[].class);
 Serialization trace:
 currL1 (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
   at
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

My question is:
Doesn't double[].class be supported by default?

Thanks.


Re: Best practice for preprocessing feature with DataFrame

2016-11-22 Thread Yan Facai
Thanks, White.

On Thu, Nov 17, 2016 at 11:15 PM, Stuart White <stuart.whi...@gmail.com>
wrote:

> Sorry.  Small typo.  That last part should be:
>
> val modifiedRows = rows
>   .select(
> substring('age, 0, 2) as "age",
> when('gender === 1, "male").otherwise(when('gender === 2,
> "female").otherwise("unknown")) as "gender"
>   )
> modifiedRows.show
>
> +---+---+
> |age| gender|
> +---+---+
> | 90|   male|
> | 80| female|
> | 80|unknown|
> +---+---+
>
> On Thu, Nov 17, 2016 at 8:57 AM, Stuart White <stuart.whi...@gmail.com>
> wrote:
> > import org.apache.spark.sql.functions._
> >
> > val rows = Seq(("90s", 1), ("80s", 2), ("80s", 3)).toDF("age", "gender")
> > rows.show
> >
> > +---+--+
> > |age|gender|
> > +---+--+
> > |90s| 1|
> > |80s| 2|
> > |80s| 3|
> > +---+--+
> >
> > val modifiedRows
> >   .select(
> > substring('age, 0, 2) as "age",
> > when('gender === 1, "male").otherwise(when('gender === 2,
> > "female").otherwise("unknown")) as "gender"
> >   )
> > modifiedRows.show
> >
> > +---+---+
> > |age| gender|
> > +---+---+
> > | 90|   male|
> > | 80| female|
> > | 80|unknown|
> > +---+---+
> >
> > On Thu, Nov 17, 2016 at 3:37 AM, 颜发才(Yan Facai) <yaf...@gmail.com>
> wrote:
> >> Could you give me an example, how to use Column function?
> >> Thanks very much.
> >>
> >> On Thu, Nov 17, 2016 at 12:23 PM, Divya Gehlot <divya.htco...@gmail.com
> >
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> You can use the Column functions provided by Spark API
> >>>
> >>>
> >>> https://spark.apache.org/docs/1.6.2/api/java/org/apache/
> spark/sql/functions.html
> >>>
> >>> Hope this helps .
> >>>
> >>> Thanks,
> >>> Divya
> >>>
> >>>
> >>> On 17 November 2016 at 12:08, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:
> >>>>
> >>>> Hi,
> >>>> I have a sample, like:
> >>>> +---+--++
> >>>> |age|gender| city_id|
> >>>> +---+--++
> >>>> |   | 1|1042015:city_2044...|
> >>>> |90s| 2|1042015:city_2035...|
> >>>> |80s| 2|1042015:city_2061...|
> >>>> +---+--++
> >>>>
> >>>> and expectation is:
> >>>> "age":  90s -> 90, 80s -> 80
> >>>> "gender": 1 -> "male", 2 -> "female"
> >>>>
> >>>> I have two solutions:
> >>>> 1. Handle each column separately,  and then join all by index.
> >>>> val age = input.select("age").map(...)
> >>>> val gender = input.select("gender").map(...)
> >>>> val result = ...
> >>>>
> >>>> 2. Write utf function for each column, and then use in together:
> >>>>  val result = input.select(ageUDF($"age"), genderUDF($"gender"))
> >>>>
> >>>> However, both are awkward,
> >>>>
> >>>> Does anyone have a better work flow?
> >>>> Write some custom Transforms and use pipeline?
> >>>>
> >>>> Thanks.
> >>>>
> >>>>
> >>>>
> >>>
> >>
>


Re: Best practice for preprocessing feature with DataFrame

2016-11-17 Thread Yan Facai
Could you give me an example, how to use Column function?
Thanks very much.

On Thu, Nov 17, 2016 at 12:23 PM, Divya Gehlot <divya.htco...@gmail.com>
wrote:

> Hi,
>
> You can use the Column functions provided by Spark API
>
> https://spark.apache.org/docs/1.6.2/api/java/org/apache/
> spark/sql/functions.html
>
> Hope this helps .
>
> Thanks,
> Divya
>
>
> On 17 November 2016 at 12:08, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:
>
>> Hi,
>> I have a sample, like:
>> +---+--++
>> |age|gender| city_id|
>> +---+--++
>> |   | 1|1042015:city_2044...|
>> |90s| 2|1042015:city_2035...|
>> |80s| 2|1042015:city_2061...|
>> +---+--++
>>
>> and expectation is:
>> "age":  90s -> 90, 80s -> 80
>> "gender": 1 -> "male", 2 -> "female"
>>
>> I have two solutions:
>> 1. Handle each column separately,  and then join all by index.
>> val age = input.select("age").map(...)
>> val gender = input.select("gender").map(...)
>> val result = ...
>>
>> 2. Write utf function for each column, and then use in together:
>>  val result = input.select(ageUDF($"age"), genderUDF($"gender"))
>>
>> However, both are awkward,
>>
>> Does anyone have a better work flow?
>> Write some custom Transforms and use pipeline?
>>
>> Thanks.
>>
>>
>>
>>
>


Best practice for preprocessing feature with DataFrame

2016-11-16 Thread Yan Facai
Hi,
I have a sample, like:
+---+--++
|age|gender| city_id|
+---+--++
|   | 1|1042015:city_2044...|
|90s| 2|1042015:city_2035...|
|80s| 2|1042015:city_2061...|
+---+--++

and expectation is:
"age":  90s -> 90, 80s -> 80
"gender": 1 -> "male", 2 -> "female"

I have two solutions:
1. Handle each column separately,  and then join all by index.
val age = input.select("age").map(...)
val gender = input.select("gender").map(...)
val result = ...

2. Write utf function for each column, and then use in together:
 val result = input.select(ageUDF($"age"), genderUDF($"gender"))

However, both are awkward,

Does anyone have a better work flow?
Write some custom Transforms and use pipeline?

Thanks.


Re: Spark joins using row id

2016-11-13 Thread Yan Facai
pairRDD can use (hash) partition information to do some optimizations when
joined, while I am not sure if dataset could.

On Sat, Nov 12, 2016 at 7:11 PM, Rohit Verma 
wrote:

> For datasets structured as
>
> ds1
> rowN col1
> 1   A
> 2   B
> 3   C
> 4   C
> …
>
> and
>
> ds2
> rowN col2
> 1   X
> 2   Y
> 3   Z
> …
>
> I want to do a left join
>
> Dataset joined = ds1.join(ds2,”rowN”,”left outer”);
>
> I somewhere read in SO or this mailing list that if spark is aware of
> datasets being sorted it will use some optimizations for joins.
> Is it possible to make this join more efficient/faster.
>
> Rohit


Vector is not found in case class after import

2016-11-04 Thread Yan Facai
 Hi,
My spark-shell version is 2.0.1.

I import the Vector and hope to use it in case class, while spark-shell
throws an error: not found.

scala> import org.apache.spark.ml.linalg.{Vector => OldVector}
import org.apache.spark.ml.linalg.{Vector=>OldVector}

scala> case class Movie(vec: OldVector)
:11: error: not found: type OldVector
   case class Movie(vec: OldVector)
 ^

Is it a bug?


Re: How to return a case class in map function?

2016-11-03 Thread Yan Facai
2.0.1 has fixed the bug.
Thanks very much.

On Thu, Nov 3, 2016 at 6:22 PM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:

> Thanks, Armbrust.
> I'm using 2.0.0.
> Does 2.0.1 stable version fix it?
>
> On Thu, Nov 3, 2016 at 2:01 AM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> Thats a bug.  Which version of Spark are you running?  Have you tried
>> 2.0.2?
>>
>> On Wed, Nov 2, 2016 at 12:01 AM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:
>>
>>> Hi, all.
>>> When I use a case class as return value in map function, spark always
>>> raise a ClassCastException.
>>>
>>> I write an demo, like:
>>>
>>> scala> case class Record(key: Int, value: String)
>>>
>>> scala> case class ID(key: Int)
>>>
>>> scala> val df = Seq(Record(1, "a"), Record(2, "b")).toDF
>>>
>>> scala> df.map{x => ID(x.getInt(0))}.show
>>>
>>> 16/11/02 14:52:34 ERROR Executor: Exception in task 0.0 in stage 166.0
>>> (TID 175)
>>> java.lang.ClassCastException: $line1401.$read$$iw$$iw$ID cannot be cast
>>> to $line1401.$read$$iw$$iw$ID
>>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>> eratedIterator.processNext(Unknown Source)
>>>
>>>
>>> Please tell me if I'm wrong.
>>> Thanks.
>>>
>>>
>>
>>
>


Re: How to return a case class in map function?

2016-11-03 Thread Yan Facai
Thanks, Armbrust.
I'm using 2.0.0.
Does 2.0.1 stable version fix it?

On Thu, Nov 3, 2016 at 2:01 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> Thats a bug.  Which version of Spark are you running?  Have you tried
> 2.0.2?
>
> On Wed, Nov 2, 2016 at 12:01 AM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:
>
>> Hi, all.
>> When I use a case class as return value in map function, spark always
>> raise a ClassCastException.
>>
>> I write an demo, like:
>>
>> scala> case class Record(key: Int, value: String)
>>
>> scala> case class ID(key: Int)
>>
>> scala> val df = Seq(Record(1, "a"), Record(2, "b")).toDF
>>
>> scala> df.map{x => ID(x.getInt(0))}.show
>>
>> 16/11/02 14:52:34 ERROR Executor: Exception in task 0.0 in stage 166.0
>> (TID 175)
>> java.lang.ClassCastException: $line1401.$read$$iw$$iw$ID cannot be cast
>> to $line1401.$read$$iw$$iw$ID
>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>> eratedIterator.processNext(Unknown Source)
>>
>>
>> Please tell me if I'm wrong.
>> Thanks.
>>
>>
>
>


How to return a case class in map function?

2016-11-02 Thread Yan Facai
Hi, all.
When I use a case class as return value in map function, spark always raise
a ClassCastException.

I write an demo, like:

scala> case class Record(key: Int, value: String)

scala> case class ID(key: Int)

scala> val df = Seq(Record(1, "a"), Record(2, "b")).toDF

scala> df.map{x => ID(x.getInt(0))}.show

16/11/02 14:52:34 ERROR Executor: Exception in task 0.0 in stage 166.0 (TID
175)
java.lang.ClassCastException: $line1401.$read$$iw$$iw$ID cannot be cast to
$line1401.$read$$iw$$iw$ID
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)


Please tell me if I'm wrong.
Thanks.


Addition of two SparseVector

2016-11-01 Thread Yan Facai
Hi, all.
How can I add a Vector to another one?


scala> val a = Vectors.sparse(20, Seq((1,1.0), (2,2.0)))
a: org.apache.spark.ml.linalg.Vector = (20,[1,2],[1.0,2.0])

scala> val b = Vectors.sparse(20, Seq((2,2.0), (3,3.0)))
b: org.apache.spark.ml.linalg.Vector = (20,[2,3],[2.0,3.0])

scala> a + b
:38: error: type mismatch;
 found   : org.apache.spark.ml.linalg.Vector
 required: String
   a + b
   ^


Re: How to iterate the element of an array in DataFrame?

2016-10-24 Thread Yan Facai
scala> mblog_tags.dtypes
res13: Array[(String, String)] =
Array((tags,ArrayType(StructType(StructField(category,StringType,true),
StructField(weight,StringType,true)),true)))

scala> val testUDF = udf{ s: Seq[Tags] => s(0).weight }
testUDF: org.apache.spark.sql.expressions.UserDefinedFunction =
UserDefinedFunction(,StringType,Some(List(ArrayType(StructType(StructField(category,StringType,true),
StructField(weight,StringType,true)),true

Where is wrong with the udf function `testUDF` ?





On Tue, Oct 25, 2016 at 10:41 AM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:

> Thanks, Cheng Lian.
>
> I try to use case class:
>
> scala> case class Tags (category: String, weight: String)
>
> scala> val testUDF = udf{ s: Seq[Tags] => s(0).weight }
>
> testUDF: org.apache.spark.sql.expressions.UserDefinedFunction =
> UserDefinedFunction(,StringType,Some(List(ArrayType(StructType(
> StructField(category,StringType,true), StructField(weight,StringType,
> true)),true
>
>
> but it raises an ClassCastException when run:
>
> scala> mblog_tags.withColumn("test", testUDF(col("tags"))).show(false)
>
> 16/10/25 10:39:54 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID
> 4)
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
> cannot be cast to $line58.$read$$iw$$iw$Tags
> at $line59.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.
> apply(:27)
> at $line59.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.
> apply(:27)
> ...
>
>
> Where did I do wrong?
>
>
>
>
> On Sat, Oct 22, 2016 at 6:37 AM, Cheng Lian <l...@databricks.com> wrote:
>
>> You may either use SQL function "array" and "named_struct" or define a
>> case class with expected field names.
>>
>> Cheng
>>
>> On 10/21/16 2:45 AM, 颜发才(Yan Facai) wrote:
>>
>> My expectation is:
>> root
>> |-- tag: vector
>>
>> namely, I want to extract from:
>> [[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
>> to:
>> Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
>>
>> I believe it needs two step:
>> 1. val tag2vec = {tag: Array[Structure] => Vector}
>> 2. mblog_tags.withColumn("vec", tag2vec(col("tag"))
>>
>> But, I have no idea of how to describe the Array[Structure] in the
>> DataFrame.
>>
>>
>>
>>
>>
>> On Fri, Oct 21, 2016 at 4:51 PM, lk_spark <lk_sp...@163.com> wrote:
>>
>>> how about change Schema from
>>> root
>>>  |-- category.firstCategory: array (nullable = true)
>>>  ||-- element: struct (containsNull = true)
>>>  |||-- category: string (nullable = true)
>>>  |||-- weight: string (nullable = true)
>>> to:
>>>
>>> root
>>>  |-- category: string (nullable = true)
>>>  |-- weight: string (nullable = true)
>>>
>>> 2016-10-21
>>> --
>>> lk_spark
>>> --
>>>
>>> *发件人:*颜发才(Yan Facai) <yaf...@gmail.com>
>>> *发送时间:*2016-10-21 15:35
>>> *主题:*Re: How to iterate the element of an array in DataFrame?
>>> *收件人:*"user.spark"<user@spark.apache.org>
>>> *抄送:*
>>>
>>> I don't know how to construct `array<struct<category:string,
>>> weight:string>>`.
>>> Could anyone help me?
>>>
>>> I try to get the array by :
>>> scala> mblog_tags.map(_.getSeq[(String, String)](0))
>>>
>>> while the result is:
>>> res40: org.apache.spark.sql.Dataset[Seq[(String, String)]] = [value:
>>> array<struct<_1:string,_2:string>>]
>>>
>>>
>>> How to express `struct<string, string>` ?
>>>
>>>
>>>
>>> On Thu, Oct 20, 2016 at 4:34 PM, 颜发才(Yan Facai) <yaf...@gmail.com>
>>> wrote:
>>>
>>>> Hi, I want to extract the attribute `weight` of an array, and combine
>>>> them to construct a sparse vector.
>>>>
>>>> ### My data is like this:
>>>>
>>>> scala> mblog_tags.printSchema
>>>> root
>>>>  |-- category.firstCategory: array (nullable = true)
>>>>  ||-- element: struct (containsNull = true)
>>>>  |||-- category: string (nullable = true)
>>>>  |||-- weight: string (nullable = true)
>>>>
>>>>
>>>> scala> mblog_tags.show(false)
>>>> +--+
>>>> |category.firstCategory|
>>>> +--+
>>>> |[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
>>>> |[[tagCategory_029, 0.9]]  |
>>>> |[[tagCategory_029, 0.8]]  |
>>>> +--+
>>>>
>>>>
>>>> ### And expected:
>>>> Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
>>>> Vectors.sparse(100, Array(29),  Array(0.9))
>>>> Vectors.sparse(100, Array(29),  Array(0.8))
>>>>
>>>> How to iterate an array in DataFrame?
>>>> Thanks.
>>>>
>>>>
>>>>
>>>>
>>>
>>
>>
>


Re: How to iterate the element of an array in DataFrame?

2016-10-24 Thread Yan Facai
Thanks, Cheng Lian.

I try to use case class:

scala> case class Tags (category: String, weight: String)

scala> val testUDF = udf{ s: Seq[Tags] => s(0).weight }

testUDF: org.apache.spark.sql.expressions.UserDefinedFunction =
UserDefinedFunction(,StringType,Some(List(ArrayType(StructType(StructField(category,StringType,true),
StructField(weight,StringType,true)),true


but it raises an ClassCastException when run:

scala> mblog_tags.withColumn("test", testUDF(col("tags"))).show(false)

16/10/25 10:39:54 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
cast to $line58.$read$$iw$$iw$Tags
at
$line59.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:27)
at
$line59.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:27)
...


Where did I do wrong?




On Sat, Oct 22, 2016 at 6:37 AM, Cheng Lian <l...@databricks.com> wrote:

> You may either use SQL function "array" and "named_struct" or define a
> case class with expected field names.
>
> Cheng
>
> On 10/21/16 2:45 AM, 颜发才(Yan Facai) wrote:
>
> My expectation is:
> root
> |-- tag: vector
>
> namely, I want to extract from:
> [[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
> to:
> Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
>
> I believe it needs two step:
> 1. val tag2vec = {tag: Array[Structure] => Vector}
> 2. mblog_tags.withColumn("vec", tag2vec(col("tag"))
>
> But, I have no idea of how to describe the Array[Structure] in the
> DataFrame.
>
>
>
>
>
> On Fri, Oct 21, 2016 at 4:51 PM, lk_spark <lk_sp...@163.com> wrote:
>
>> how about change Schema from
>> root
>>  |-- category.firstCategory: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- category: string (nullable = true)
>>  |||-- weight: string (nullable = true)
>> to:
>>
>> root
>>  |-- category: string (nullable = true)
>>  |-- weight: string (nullable = true)
>>
>> 2016-10-21
>> --
>> lk_spark
>> --
>>
>> *发件人:*颜发才(Yan Facai) <yaf...@gmail.com>
>> *发送时间:*2016-10-21 15:35
>> *主题:*Re: How to iterate the element of an array in DataFrame?
>> *收件人:*"user.spark"<user@spark.apache.org>
>> *抄送:*
>>
>> I don't know how to construct `array<struct<category:string,
>> weight:string>>`.
>> Could anyone help me?
>>
>> I try to get the array by :
>> scala> mblog_tags.map(_.getSeq[(String, String)](0))
>>
>> while the result is:
>> res40: org.apache.spark.sql.Dataset[Seq[(String, String)]] = [value:
>> array<struct<_1:string,_2:string>>]
>>
>>
>> How to express `struct<string, string>` ?
>>
>>
>>
>> On Thu, Oct 20, 2016 at 4:34 PM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:
>>
>>> Hi, I want to extract the attribute `weight` of an array, and combine
>>> them to construct a sparse vector.
>>>
>>> ### My data is like this:
>>>
>>> scala> mblog_tags.printSchema
>>> root
>>>  |-- category.firstCategory: array (nullable = true)
>>>  ||-- element: struct (containsNull = true)
>>>  |||-- category: string (nullable = true)
>>>  |||-- weight: string (nullable = true)
>>>
>>>
>>> scala> mblog_tags.show(false)
>>> +--+
>>> |category.firstCategory|
>>> +--+
>>> |[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
>>> |[[tagCategory_029, 0.9]]  |
>>> |[[tagCategory_029, 0.8]]  |
>>> +--+
>>>
>>>
>>> ### And expected:
>>> Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
>>> Vectors.sparse(100, Array(29),  Array(0.9))
>>> Vectors.sparse(100, Array(29),  Array(0.8))
>>>
>>> How to iterate an array in DataFrame?
>>> Thanks.
>>>
>>>
>>>
>>>
>>
>
>


Re: Re: How to iterate the element of an array in DataFrame?

2016-10-21 Thread Yan Facai
My expectation is:
root
|-- tag: vector

namely, I want to extract from:
[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
to:
Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))

I believe it needs two step:
1. val tag2vec = {tag: Array[Structure] => Vector}
2. mblog_tags.withColumn("vec", tag2vec(col("tag"))

But, I have no idea of how to describe the Array[Structure] in the
DataFrame.





On Fri, Oct 21, 2016 at 4:51 PM, lk_spark <lk_sp...@163.com> wrote:

> how about change Schema from
> root
>  |-- category.firstCategory: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- category: string (nullable = true)
>  |||-- weight: string (nullable = true)
> to:
>
> root
>  |-- category: string (nullable = true)
>  |-- weight: string (nullable = true)
>
> 2016-10-21
> ----------
> lk_spark
> --
>
> *发件人:*颜发才(Yan Facai) <yaf...@gmail.com>
> *发送时间:*2016-10-21 15:35
> *主题:*Re: How to iterate the element of an array in DataFrame?
> *收件人:*"user.spark"<user@spark.apache.org>
> *抄送:*
>
> I don't know how to construct `array<struct<category:string,
> weight:string>>`.
> Could anyone help me?
>
> I try to get the array by :
> scala> mblog_tags.map(_.getSeq[(String, String)](0))
>
> while the result is:
> res40: org.apache.spark.sql.Dataset[Seq[(String, String)]] = [value:
> array<struct<_1:string,_2:string>>]
>
>
> How to express `struct<string, string>` ?
>
>
>
> On Thu, Oct 20, 2016 at 4:34 PM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:
>
>> Hi, I want to extract the attribute `weight` of an array, and combine
>> them to construct a sparse vector.
>>
>> ### My data is like this:
>>
>> scala> mblog_tags.printSchema
>> root
>>  |-- category.firstCategory: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- category: string (nullable = true)
>>  |||-- weight: string (nullable = true)
>>
>>
>> scala> mblog_tags.show(false)
>> +--+
>> |category.firstCategory|
>> +--+
>> |[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
>> |[[tagCategory_029, 0.9]]  |
>> |[[tagCategory_029, 0.8]]  |
>> +--+
>>
>>
>> ### And expected:
>> Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
>> Vectors.sparse(100, Array(29),  Array(0.9))
>> Vectors.sparse(100, Array(29),  Array(0.8))
>>
>> How to iterate an array in DataFrame?
>> Thanks.
>>
>>
>>
>>
>


Re: How to iterate the element of an array in DataFrame?

2016-10-21 Thread Yan Facai
I don't know how to construct
`array<struct<category:string,weight:string>>`.
Could anyone help me?

I try to get the array by :
scala> mblog_tags.map(_.getSeq[(String, String)](0))

while the result is:
res40: org.apache.spark.sql.Dataset[Seq[(String, String)]] = [value:
array<struct<_1:string,_2:string>>]


How to express `struct<string, string>` ?



On Thu, Oct 20, 2016 at 4:34 PM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:

> Hi, I want to extract the attribute `weight` of an array, and combine them
> to construct a sparse vector.
>
> ### My data is like this:
>
> scala> mblog_tags.printSchema
> root
>  |-- category.firstCategory: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- category: string (nullable = true)
>  |||-- weight: string (nullable = true)
>
>
> scala> mblog_tags.show(false)
> +--+
> |category.firstCategory|
> +--+
> |[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
> |[[tagCategory_029, 0.9]]  |
> |[[tagCategory_029, 0.8]]  |
> +--+
>
>
> ### And expected:
> Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
> Vectors.sparse(100, Array(29),  Array(0.9))
> Vectors.sparse(100, Array(29),  Array(0.8))
>
> How to iterate an array in DataFrame?
> Thanks.
>
>
>
>


How to iterate the element of an array in DataFrame?

2016-10-20 Thread Yan Facai
Hi, I want to extract the attribute `weight` of an array, and combine them
to construct a sparse vector.

### My data is like this:

scala> mblog_tags.printSchema
root
 |-- category.firstCategory: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- category: string (nullable = true)
 |||-- weight: string (nullable = true)


scala> mblog_tags.show(false)
+--+
|category.firstCategory|
+--+
|[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
|[[tagCategory_029, 0.9]]  |
|[[tagCategory_029, 0.8]]  |
+--+


### And expected:
Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
Vectors.sparse(100, Array(29),  Array(0.9))
Vectors.sparse(100, Array(29),  Array(0.8))

How to iterate an array in DataFrame?
Thanks.


Re: Dataframe, Java: How to convert String to Vector ?

2016-10-02 Thread Yan Facai
Hi, Perter.

It's interesting that `DecisionTreeRegressor.transformImpl` also use udf to
transform dataframe, instead of using map:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala#L175


On Wed, Sep 21, 2016 at 10:22 PM, Peter Figliozzi <pete.figlio...@gmail.com>
wrote:

> I'm sure there's another way to do it; I hope someone can show us.  I
> couldn't figure out how to use `map` either.
>
> On Wed, Sep 21, 2016 at 3:32 AM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:
>
>> Thanks, Peter.
>> It works!
>>
>> Why udf is needed?
>>
>>
>>
>>
>> On Wed, Sep 21, 2016 at 12:00 AM, Peter Figliozzi <
>> pete.figlio...@gmail.com> wrote:
>>
>>> Hi Yan, I agree, it IS really confusing.  Here is the technique for
>>> transforming a column.  It is very general because you can make "myConvert"
>>> do whatever you want.
>>>
>>> import org.apache.spark.mllib.linalg.Vectors
>>> val df = Seq((0, "[1,3,5]"), (1, "[2,4,6]")).toDF
>>>
>>> df.show()
>>> // The columns were named "_1" and "_2"
>>> // Very confusing, because it looks like a Scala wildcard when we refer
>>> to it in code
>>>
>>> val myConvert = (x: String) => { Vectors.parse(x) }
>>> val myConvertUDF = udf(myConvert)
>>>
>>> val newDf = df.withColumn("parsed", myConvertUDF(col("_2")))
>>>
>>> newDf.show()
>>>
>>> On Mon, Sep 19, 2016 at 3:29 AM, 颜发才(Yan Facai) <yaf...@gmail.com>
>>> wrote:
>>>
>>>> Hi, all.
>>>> I find that it's really confuse.
>>>>
>>>> I can use Vectors.parse to create a DataFrame contains Vector type.
>>>>
>>>> scala> val dataVec = Seq((0, Vectors.parse("[1,3,5]")), (1,
>>>> Vectors.parse("[2,4,6]"))).toDF
>>>> dataVec: org.apache.spark.sql.DataFrame = [_1: int, _2: vector]
>>>>
>>>>
>>>> But using map to convert String to Vector throws an error:
>>>>
>>>> scala> val dataStr = Seq((0, "[1,3,5]"), (1, "[2,4,6]")).toDF
>>>> dataStr: org.apache.spark.sql.DataFrame = [_1: int, _2: string]
>>>>
>>>> scala> dataStr.map(row => Vectors.parse(row.getString(1)))
>>>> :30: error: Unable to find encoder for type stored in a
>>>> Dataset.  Primitive types (Int, String, etc) and Product types (case
>>>> classes) are supported by importing spark.implicits._  Support for
>>>> serializing other types will be added in future releases.
>>>>   dataStr.map(row => Vectors.parse(row.getString(1)))
>>>>
>>>>
>>>> Dose anyone can help me,
>>>> thanks very much!
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Sep 6, 2016 at 9:58 PM, Peter Figliozzi <
>>>> pete.figlio...@gmail.com> wrote:
>>>>
>>>>> Hi Yan, I think you'll have to map the features column to a new
>>>>> numerical features column.
>>>>>
>>>>> Here's one way to do the individual transform:
>>>>>
>>>>> scala> val x = "[1, 2, 3, 4, 5]"
>>>>> x: String = [1, 2, 3, 4, 5]
>>>>>
>>>>> scala> val y:Array[Int] = x slice(1, x.length - 1) replace(",", "")
>>>>> split(" ") map(_.toInt)
>>>>> y: Array[Int] = Array(1, 2, 3, 4, 5)
>>>>>
>>>>> If you don't know about the Scala command line, just type "scala" in a
>>>>> terminal window.  It's a good place to try things out.
>>>>>
>>>>> You can make a function out of this transformation and apply it to
>>>>> your features column to make a new column.  Then add this with
>>>>> Dataset.withColumn.
>>>>>
>>>>> See here
>>>>> <http://stackoverflow.com/questions/35227568/applying-function-to-spark-dataframe-column>
>>>>> on how to apply a function to a Column to make a new column.
>>>>>
>>>>> On Tue, Sep 6, 2016 at 1:56 AM, 颜发才(Yan Facai) <yaf...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I have a csv file like:
>>>>>> uid  mid  features   label
>>>>>> 1235231[0, 1, 3, ...]True
>>>>>>
>>>>>> Both  "features" and "label" columns are used for GBTClassifier.
>>>>>>
>>>>>> However, when I read the file:
>>>>>> Dataset samples = sparkSession.read().csv(file);
>>>>>> The type of samples.select("features") is String.
>>>>>>
>>>>>> My question is:
>>>>>> How to map samples.select("features") to Vector or any appropriate
>>>>>> type,
>>>>>> so I can use it to train like:
>>>>>> GBTClassifier gbdt = new GBTClassifier()
>>>>>> .setLabelCol("label")
>>>>>> .setFeaturesCol("features")
>>>>>> .setMaxIter(2)
>>>>>> .setMaxDepth(7);
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Spark ML Decision Trees Algorithm

2016-10-02 Thread Yan Facai
Perhaps the best way is to read the code.
The Decision tree is implemented by 1-tree Random forest, whose entry point
is `run` method:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala#L88

I'm not familiar with the so-called algorithms of decision tree, such as
ID4, CART. However, I believe that the implementation of decision tree of
sklearn is quite similar with those of spark, and some difference are
listed below:
1. Continuous feature.
sklearn use all candidate values to find best split, while spark groups
all candidate values into fixed bins.

2. Build tree.
sklearn provides two methods: depth-first and best-first, while spark
has only one: depth-first.

3. Split number.
sklearn creates one split per iteration, while spark could split in
parallel.

If I'm wrong, please let me know.



On Sat, Oct 1, 2016 at 10:34 AM, janardhan shetty 
wrote:

> It would be good to know which paper has inspired to implement the version
> which we use in spark  2.0 decision trees ?
>
> On Fri, Sep 30, 2016 at 4:44 PM, Peter Figliozzi  > wrote:
>
>> It's a good question.  People have been publishing papers on decision
>> trees and various methods of constructing and pruning them for over 30
>> years.  I think it's rather a question for a historian at this point.
>>
>> On Fri, Sep 30, 2016 at 5:08 PM, janardhan shetty > > wrote:
>>
>>> Read this explanation but wondering if this algorithm has the base from
>>> a research paper for detail understanding.
>>>
>>> On Fri, Sep 30, 2016 at 1:36 PM, Kevin Mellott <
>>> kevin.r.mell...@gmail.com> wrote:
>>>
 The documentation details the algorithm being used at
 http://spark.apache.org/docs/latest/mllib-decision-tree.html

 Thanks,
 Kevin

 On Fri, Sep 30, 2016 at 1:14 AM, janardhan shetty <
 janardhan...@gmail.com> wrote:

> Hi,
>
> Any help here is appreciated ..
>
> On Wed, Sep 28, 2016 at 11:34 AM, janardhan shetty <
> janardhan...@gmail.com> wrote:
>
>> Is there a reference to the research paper which is implemented in
>> spark 2.0 ?
>>
>> On Wed, Sep 28, 2016 at 9:52 AM, janardhan shetty <
>> janardhan...@gmail.com> wrote:
>>
>>> Which algorithm is used under the covers while doing decision trees
>>> FOR SPARK ?
>>> for example: scikit-learn (python) uses an optimised version of the
>>> CART algorithm.
>>>
>>
>>
>

>>>
>>
>


Re: Dataframe, Java: How to convert String to Vector ?

2016-09-21 Thread Yan Facai
Thanks, Peter.
It works!

Why udf is needed?




On Wed, Sep 21, 2016 at 12:00 AM, Peter Figliozzi <pete.figlio...@gmail.com>
wrote:

> Hi Yan, I agree, it IS really confusing.  Here is the technique for
> transforming a column.  It is very general because you can make "myConvert"
> do whatever you want.
>
> import org.apache.spark.mllib.linalg.Vectors
> val df = Seq((0, "[1,3,5]"), (1, "[2,4,6]")).toDF
>
> df.show()
> // The columns were named "_1" and "_2"
> // Very confusing, because it looks like a Scala wildcard when we refer to
> it in code
>
> val myConvert = (x: String) => { Vectors.parse(x) }
> val myConvertUDF = udf(myConvert)
>
> val newDf = df.withColumn("parsed", myConvertUDF(col("_2")))
>
> newDf.show()
>
> On Mon, Sep 19, 2016 at 3:29 AM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:
>
>> Hi, all.
>> I find that it's really confuse.
>>
>> I can use Vectors.parse to create a DataFrame contains Vector type.
>>
>> scala> val dataVec = Seq((0, Vectors.parse("[1,3,5]")), (1,
>> Vectors.parse("[2,4,6]"))).toDF
>> dataVec: org.apache.spark.sql.DataFrame = [_1: int, _2: vector]
>>
>>
>> But using map to convert String to Vector throws an error:
>>
>> scala> val dataStr = Seq((0, "[1,3,5]"), (1, "[2,4,6]")).toDF
>> dataStr: org.apache.spark.sql.DataFrame = [_1: int, _2: string]
>>
>> scala> dataStr.map(row => Vectors.parse(row.getString(1)))
>> :30: error: Unable to find encoder for type stored in a
>> Dataset.  Primitive types (Int, String, etc) and Product types (case
>> classes) are supported by importing spark.implicits._  Support for
>> serializing other types will be added in future releases.
>>   dataStr.map(row => Vectors.parse(row.getString(1)))
>>
>>
>> Dose anyone can help me,
>> thanks very much!
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Sep 6, 2016 at 9:58 PM, Peter Figliozzi <pete.figlio...@gmail.com
>> > wrote:
>>
>>> Hi Yan, I think you'll have to map the features column to a new
>>> numerical features column.
>>>
>>> Here's one way to do the individual transform:
>>>
>>> scala> val x = "[1, 2, 3, 4, 5]"
>>> x: String = [1, 2, 3, 4, 5]
>>>
>>> scala> val y:Array[Int] = x slice(1, x.length - 1) replace(",", "")
>>> split(" ") map(_.toInt)
>>> y: Array[Int] = Array(1, 2, 3, 4, 5)
>>>
>>> If you don't know about the Scala command line, just type "scala" in a
>>> terminal window.  It's a good place to try things out.
>>>
>>> You can make a function out of this transformation and apply it to your
>>> features column to make a new column.  Then add this with
>>> Dataset.withColumn.
>>>
>>> See here
>>> <http://stackoverflow.com/questions/35227568/applying-function-to-spark-dataframe-column>
>>> on how to apply a function to a Column to make a new column.
>>>
>>> On Tue, Sep 6, 2016 at 1:56 AM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:
>>>
>>>> Hi,
>>>> I have a csv file like:
>>>> uid  mid  features   label
>>>> 1235231[0, 1, 3, ...]True
>>>>
>>>> Both  "features" and "label" columns are used for GBTClassifier.
>>>>
>>>> However, when I read the file:
>>>> Dataset samples = sparkSession.read().csv(file);
>>>> The type of samples.select("features") is String.
>>>>
>>>> My question is:
>>>> How to map samples.select("features") to Vector or any appropriate type,
>>>> so I can use it to train like:
>>>> GBTClassifier gbdt = new GBTClassifier()
>>>> .setLabelCol("label")
>>>> .setFeaturesCol("features")
>>>> .setMaxIter(2)
>>>> .setMaxDepth(7);
>>>>
>>>> Thanks.
>>>>
>>>
>>>
>>
>


Re: Dataframe, Java: How to convert String to Vector ?

2016-09-19 Thread Yan Facai
Hi, all.
I find that it's really confuse.

I can use Vectors.parse to create a DataFrame contains Vector type.

scala> val dataVec = Seq((0, Vectors.parse("[1,3,5]")), (1,
Vectors.parse("[2,4,6]"))).toDF
dataVec: org.apache.spark.sql.DataFrame = [_1: int, _2: vector]


But using map to convert String to Vector throws an error:

scala> val dataStr = Seq((0, "[1,3,5]"), (1, "[2,4,6]")).toDF
dataStr: org.apache.spark.sql.DataFrame = [_1: int, _2: string]

scala> dataStr.map(row => Vectors.parse(row.getString(1)))
:30: error: Unable to find encoder for type stored in a
Dataset.  Primitive types (Int, String, etc) and Product types (case
classes) are supported by importing spark.implicits._  Support for
serializing other types will be added in future releases.
  dataStr.map(row => Vectors.parse(row.getString(1)))


Dose anyone can help me,
thanks very much!







On Tue, Sep 6, 2016 at 9:58 PM, Peter Figliozzi <pete.figlio...@gmail.com>
wrote:

> Hi Yan, I think you'll have to map the features column to a new numerical
> features column.
>
> Here's one way to do the individual transform:
>
> scala> val x = "[1, 2, 3, 4, 5]"
> x: String = [1, 2, 3, 4, 5]
>
> scala> val y:Array[Int] = x slice(1, x.length - 1) replace(",", "")
> split(" ") map(_.toInt)
> y: Array[Int] = Array(1, 2, 3, 4, 5)
>
> If you don't know about the Scala command line, just type "scala" in a
> terminal window.  It's a good place to try things out.
>
> You can make a function out of this transformation and apply it to your
> features column to make a new column.  Then add this with
> Dataset.withColumn.
>
> See here
> <http://stackoverflow.com/questions/35227568/applying-function-to-spark-dataframe-column>
> on how to apply a function to a Column to make a new column.
>
> On Tue, Sep 6, 2016 at 1:56 AM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:
>
>> Hi,
>> I have a csv file like:
>> uid  mid  features   label
>> 1235231[0, 1, 3, ...]True
>>
>> Both  "features" and "label" columns are used for GBTClassifier.
>>
>> However, when I read the file:
>> Dataset samples = sparkSession.read().csv(file);
>> The type of samples.select("features") is String.
>>
>> My question is:
>> How to map samples.select("features") to Vector or any appropriate type,
>> so I can use it to train like:
>> GBTClassifier gbdt = new GBTClassifier()
>> .setLabelCol("label")
>> .setFeaturesCol("features")
>> .setMaxIter(2)
>> .setMaxDepth(7);
>>
>> Thanks.
>>
>
>


study materials for operators on Dataframe

2016-09-18 Thread Yan Facai
Hi,
I am a newbie,
and the official document of spark is too concise for me, especially the
introduction of operators on dataframe.

For python, pandas gives a very detailed document: [Pandas](
http://pandas.pydata.org/pandas-docs/stable/index.html)
so,
does anyone know some sites or cookbooks which are more helpful for newbie?

Thanks.


Re: Does it run distributed if class not Serializable

2016-09-10 Thread Yan Facai
I believe that Serializable is necessary for distributing.

On Fri, Sep 9, 2016 at 7:10 PM, Gourav Sengupta 
wrote:

> And you are using JAVA?
>
> AND WHY?
>
> Regards,
> Gourav
>
> On Fri, Sep 9, 2016 at 11:47 AM, Yusuf Can Gürkan 
> wrote:
>
>> Hi,
>>
>> If i don't make a class Serializable (... extends Serializable) will it
>> run distributed with executors or it will only run on master machine?
>>
>> Thanks
>>
>
>


Re: Dataframe, Java: How to convert String to Vector ?

2016-09-08 Thread Yan Facai
many thanks, Peter.

On Wed, Sep 7, 2016 at 10:14 PM, Peter Figliozzi <pete.figlio...@gmail.com>
wrote:

> Here's a decent GitHub book: Mastering Apache Spark
> <https://www.gitbook.com/book/jaceklaskowski/mastering-apache-spark/details>
> .
>
> I'm new at Scala too.  I found it very helpful to study the Scala language
> without Spark.  The documentation found here
> <http://docs.scala-lang.org/index.html> is excellent.
>
> Pete
>
> On Wed, Sep 7, 2016 at 1:39 AM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:
>
>> Hi Peter,
>> I'm familiar with Pandas / Numpy in python,  while spark / scala is
>> totally new for me.
>> Pandas provides a detailed document, like how to slice data, parse file,
>> use apply and filter function.
>>
>> Do spark have some more detailed document?
>>
>>
>>
>> On Tue, Sep 6, 2016 at 9:58 PM, Peter Figliozzi <pete.figlio...@gmail.com
>> > wrote:
>>
>>> Hi Yan, I think you'll have to map the features column to a new
>>> numerical features column.
>>>
>>> Here's one way to do the individual transform:
>>>
>>> scala> val x = "[1, 2, 3, 4, 5]"
>>> x: String = [1, 2, 3, 4, 5]
>>>
>>> scala> val y:Array[Int] = x slice(1, x.length - 1) replace(",", "")
>>> split(" ") map(_.toInt)
>>> y: Array[Int] = Array(1, 2, 3, 4, 5)
>>>
>>> If you don't know about the Scala command line, just type "scala" in a
>>> terminal window.  It's a good place to try things out.
>>>
>>> You can make a function out of this transformation and apply it to your
>>> features column to make a new column.  Then add this with
>>> Dataset.withColumn.
>>>
>>> See here
>>> <http://stackoverflow.com/questions/35227568/applying-function-to-spark-dataframe-column>
>>> on how to apply a function to a Column to make a new column.
>>>
>>> On Tue, Sep 6, 2016 at 1:56 AM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:
>>>
>>>> Hi,
>>>> I have a csv file like:
>>>> uid  mid  features   label
>>>> 1235231[0, 1, 3, ...]True
>>>>
>>>> Both  "features" and "label" columns are used for GBTClassifier.
>>>>
>>>> However, when I read the file:
>>>> Dataset samples = sparkSession.read().csv(file);
>>>> The type of samples.select("features") is String.
>>>>
>>>> My question is:
>>>> How to map samples.select("features") to Vector or any appropriate type,
>>>> so I can use it to train like:
>>>> GBTClassifier gbdt = new GBTClassifier()
>>>> .setLabelCol("label")
>>>> .setFeaturesCol("features")
>>>> .setMaxIter(2)
>>>> .setMaxDepth(7);
>>>>
>>>> Thanks.
>>>>
>>>
>>>
>>
>


Re: Dataframe, Java: How to convert String to Vector ?

2016-09-07 Thread Yan Facai
Hi Peter,
I'm familiar with Pandas / Numpy in python,  while spark / scala is totally
new for me.
Pandas provides a detailed document, like how to slice data, parse file,
use apply and filter function.

Do spark have some more detailed document?



On Tue, Sep 6, 2016 at 9:58 PM, Peter Figliozzi <pete.figlio...@gmail.com>
wrote:

> Hi Yan, I think you'll have to map the features column to a new numerical
> features column.
>
> Here's one way to do the individual transform:
>
> scala> val x = "[1, 2, 3, 4, 5]"
> x: String = [1, 2, 3, 4, 5]
>
> scala> val y:Array[Int] = x slice(1, x.length - 1) replace(",", "")
> split(" ") map(_.toInt)
> y: Array[Int] = Array(1, 2, 3, 4, 5)
>
> If you don't know about the Scala command line, just type "scala" in a
> terminal window.  It's a good place to try things out.
>
> You can make a function out of this transformation and apply it to your
> features column to make a new column.  Then add this with
> Dataset.withColumn.
>
> See here
> <http://stackoverflow.com/questions/35227568/applying-function-to-spark-dataframe-column>
> on how to apply a function to a Column to make a new column.
>
> On Tue, Sep 6, 2016 at 1:56 AM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:
>
>> Hi,
>> I have a csv file like:
>> uid  mid  features   label
>> 1235231[0, 1, 3, ...]True
>>
>> Both  "features" and "label" columns are used for GBTClassifier.
>>
>> However, when I read the file:
>> Dataset samples = sparkSession.read().csv(file);
>> The type of samples.select("features") is String.
>>
>> My question is:
>> How to map samples.select("features") to Vector or any appropriate type,
>> so I can use it to train like:
>> GBTClassifier gbdt = new GBTClassifier()
>> .setLabelCol("label")
>> .setFeaturesCol("features")
>> .setMaxIter(2)
>> .setMaxDepth(7);
>>
>> Thanks.
>>
>
>


How to convert String to Vector ?

2016-09-06 Thread Yan Facai
Hi,
I have a csv file like:
uid  mid  features   label
1235231[0, 1, 3, ...]True

Both  "features" and "label" columns are used for GBTClassifier.

However, when I read the file:
Dataset samples = sparkSession.read().csv(file);
The type of samples.select("features") is String.

My question is:
How to map samples.select("features") to Vector or any appropriate type,
so I can use it to train like:
GBTClassifier gbdt = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(2)
.setMaxDepth(7);


Thanks.


Dataframe, Java: How to convert String to Vector ?

2016-09-06 Thread Yan Facai
Hi,
I have a csv file like:
uid  mid  features   label
1235231[0, 1, 3, ...]True

Both  "features" and "label" columns are used for GBTClassifier.

However, when I read the file:
Dataset samples = sparkSession.read().csv(file);
The type of samples.select("features") is String.

My question is:
How to map samples.select("features") to Vector or any appropriate type,
so I can use it to train like:
GBTClassifier gbdt = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(2)
.setMaxDepth(7);

Thanks.


[Spark 2.0] ClassNotFoundException is thrown when using Hive

2016-08-18 Thread Yan Facai
Hi, all.

I copied hdfs-site.xml, core-site.xml and hive-site.xml to
$SPARK_HOME/conf.
And spark-submit is used to submit task to yarn, and run as **client**
mode.
However, ClassNotFoundException is thrown.

some details of logs are list below:
```
16/08/12 17:07:32 INFO hive.HiveUtils: Initializing HiveMetastoreConnection
version 0.13.1 using file:/data0/facai/lib/hive-0.1
3.1/lib:file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
16/08/12 17:07:32 ERROR yarn.ApplicationMaster: User class threw exception:
java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError:
org/apache/hadoop/hive/ql/session/SessionState when creating Hive client
using classpath: file:/data0/facai/lib/hive-0.13.1/lib,
file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
```

In fact, all the jars needed by hive is  in the directory:
```Bash
[hadoop@h107713699 spark_test]$ ls /data0/facai/lib/hive-0.13.1/lib/ | grep
hive
hive-ant-0.13.1.jar
hive-beeline-0.13.1.jar
hive-cli-0.13.1.jar
hive-common-0.13.1.jar
...
```

So, my question is:
why spark cannot find the jars needed?

Any help will be appreciate, thanks.


[Spark 2.0] spark.sql.hive.metastore.jars doesn't work

2016-08-12 Thread Yan Facai
Hi, everyone.

According the official guide, I copied hdfs-site.xml, core-site.xml and
hive-site.xml to $SPARK_HOME/conf, and write code as below:

```Java
SparkSession spark = SparkSession
.builder()
.appName("Test Hive for Spark")
.config("spark.sql.hive.metastore.version", "0.13.1")
.config("spark.sql.hive.metastore.jars",
"/data0/facai/lib/hive-0.13.1/lib:/data0/facai/lib/hadoop-2.
4.1/share/hadoop")
.enableHiveSupport()
.getOrCreate();
```


When I use spark-submit to submit tasks to yarn, and run as **client**
mode,  ClassNotFoundException is thrown,  and some details of logs are list
below:
```
16/08/12 17:07:28 INFO execution.SparkSqlParser: Parsing command: SHOW
TABLES
16/08/12 17:07:30 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint:
Registered executor NettyRpcEndpointRef(null) (10.77.113.154:52806) with ID
1
16/08/12 17:07:31 INFO storage.BlockManagerMasterEndpoint: Registering
block manager h113154.mars.grid.sina.com.cn:44756 with 912.3 MB RAM,
BlockManagerId(1, h113154.mars.grid.sina.com.cn, 44756)
16/08/12 17:07:32 INFO hive.HiveUtils: Initializing HiveMetastoreConnection
version 0.13.1 using file:/data0/facai/lib/hive-0.1
3.1/lib:file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
16/08/12 17:07:32 ERROR yarn.ApplicationMaster: User class threw exception:
java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError:
org/apache/hadoop/hive/ql/session/SessionState when creating Hive client
using classpath: file:/data0/facai/lib/hive-0.13.1/lib,
file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
```

However, all the jars needed by hive is indeed in the dir:
```Bash
[hadoop@h107713699 spark_test]$ ls /data0/facai/lib/hive-0.13.1/lib/ | grep
hive
hive-ant-0.13.1.jar
hive-beeline-0.13.1.jar
hive-cli-0.13.1.jar
hive-common-0.13.1.jar
hive-contrib-0.13.1.jar
hive-exec-0.13.1.jar
hive-hbase-handler-0.13.1.jar
hive-hwi-0.13.1.jar
hive-jdbc-0.13.1.jar
hive-metastore-0.13.1.jar
hive-serde-0.13.1.jar
hive-service-0.13.1.jar
hive-shims-0.13.1.jar
hive-shims-0.20-0.13.1.jar
hive-shims-0.20S-0.13.1.jar
hive-shims-0.23-0.13.1.jar
hive-shims-common-0.13.1.jar
hive-shims-common-secure-0.13.1.jar
hive-testutils-0.13.1.jar
```

So,
I wonder why spark cannot find the jars needed?

Any help will be appreciated, thanks.