FP growth - Items in a transaction must be unique

2017-02-01 Thread Devi P.V
Hi all,

I am trying to run FP growth algorithm using spark and scala.sample input
dataframe is following,

+---+
|productName

+---+
|Apple Iphone 7 128GB Jet Black with
Facetime
|Levi’s Blue Slim Fit Jeans- L5112,Rimmel London Lasting Finish Matte by
Kate Moss 101 Dusky|
|Iphone 6 Plus (5.5",Limited Stocks, TRA Oman
Approved)
+---+

Each row contains unique items.

I converted it into rdd like following

val transactions = names.as[String].rdd.map(s =>s.split(","))

val fpg = new FPGrowth().
  setMinSupport(0.3).
  setNumPartitions(100)


val model = fpg.run(transactions)

But I got error

WARN TaskSetManager: Lost task 2.0 in stage 27.0 (TID 622, localhost):
org.apache.spark.SparkException:
Items in a transaction must be unique but got WrappedArray(
Huawei GR3 Dual Sim 16GB 13MP 5Inch 4G,
 Huawei G8 Gold 32GB,  4G,
5.5 Inches, HTC Desire 816 (Dual Sim, 3G, 8GB),
 Samsung Galaxy S7 Single Sim - 32GB,  4G LTE,
Gold, Huawei P8 Lite 16GB,  4G LTE, Huawei Y625,
Samsung Galaxy Note 5 - 32GB,  4G LTE,
Samsung Galaxy S7 Dual Sim - 32GB)


How to solve this?


Thanks


Closing resources in the executor

2017-02-01 Thread Appu K
What would be the recommended way to close resources opened or shared by
executors?

A few use cases

#1) Let's say the enrichment process needs to convert ip / lat+long to
city/country. To achieve this, executors could open a file in the hdfs and
build a map or use a memory mapped file  - the implementation could be a
transient lazy val singleton or something similar .  Now, the udf defined
would perform lookups on these data structures and return geo data.

#2) Let's say there is a need to do a lookup on a KV store like redis from
the executor. Each executor would create a connection pool and provide
connections for tasks running in them to perform lookups.

In scenarios, like this when the executor is closed, what would be the best
way to close the open resources ( streams etc)


Any pointers to places where i could read up a bit more about the best
practices around it would be highly appreciated!

thanks
appu


Re: increasing cross join speed

2017-02-01 Thread Takeshi Yamamuro
Hi,

I'm not sure how to improve this kind of queries only on vanilla spark
though,
you can write custom physical plans for top-k queries.
You can check the link below as a reference;
benchmark: https://github.com/apache/incubator-hivemall/pull/33
manual:
https://github.com/apache/incubator-hivemall/blob/master/docs/gitbook/spark/misc/topk_join.md

I hope this helps for you.
Thanks,

// maropu


On Wed, Feb 1, 2017 at 6:35 AM, Kürşat Kurt  wrote:

> Hi;
>
>
>
> I have 2 dataframes. I am trying to cross join for finding vector
> distances. Then i can choose the most similiar vectors.
>
> Cross join speed is too slow. How can i increase the speed, or have you
> any suggestion for this comparision?
>
>
>
>
>
> *val* result=myDict.join(mainDataset).map(x=>{
>
>
>
>*val* orgClassName1 =x.getAs[SparseVector](1);
>
>*val* orgClassName2 =x.getAs[SparseVector](2);
>
>*val* f1=x.getAs[SparseVector](3);
>
>*val* f2=x.getAs[SparseVector](4);
>
>*val* dist=Vectors.sqdist(f1,f2);
>
>
>
>(orgClassName1, orgClassName2,dist)
>
>  }).toDF("orgClassName1","orgClassName2,"dist");
>
>
>
>
>
>
>



-- 
---
Takeshi Yamamuro


Re: pivot over non numerical data

2017-02-01 Thread Kevin Mellott
This should work for non-numerical data as well - can you please elaborate
on the error you are getting and provide a code sample? As a preliminary
hint, you can "aggregate" text values using *max*.

df.groupBy("someCol")
  .pivot("anotherCol")
  .agg(max($"textCol"))

Thanks,
Kevin

On Wed, Feb 1, 2017 at 2:02 PM, Darshan Pandya 
wrote:

> Hello,
>
> I am trying to transpose some data using groupBy pivot aggr as mentioned
> in this blog
> https://databricks.com/blog/2016/02/09/reshaping-data-
> with-pivot-in-apache-spark.html
>
> But this works only for numerical data.
> Any hints for doing the same thing for non numerical data ?
>
>
> --
> Sincerely,
> Darshan
>
>


Re: Parameterized types and Datasets - Spark 2.1.0

2017-02-01 Thread Don Drake
I imported that as my first command in my previous email.  I'm using a
spark-shell.

scala> import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoder

scala>


Any comments regarding importing implicits in an application?

Thanks.

-Don

On Wed, Feb 1, 2017 at 6:10 PM, Michael Armbrust 
wrote:

> This is the error, you are missing an import:
>
> :13: error: not found: type Encoder
>abstract class RawTable[A : Encoder](inDir: String) {
>
> Works for me in a REPL.
> 
>
> On Wed, Feb 1, 2017 at 3:34 PM, Don Drake  wrote:
>
>> Thanks for the reply.   I did give that syntax a try [A : Encoder]
>> yesterday, but I kept getting this exception in a spark-shell and Zeppelin
>> browser.
>>
>> scala> import org.apache.spark.sql.Encoder
>> import org.apache.spark.sql.Encoder
>>
>> scala>
>>
>> scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
>> java.sql.Timestamp, data_filename: String)
>> defined class RawTemp
>>
>> scala>
>>
>> scala> import spark.implicits._
>> import spark.implicits._
>>
>> scala>
>>
>> scala> abstract class RawTable[A : Encoder](inDir: String) {
>>  | import spark.implicits._
>>  | def load() = {
>>  | import spark.implicits._
>>  | spark.read
>>  | .option("header", "true")
>>  | .option("mode", "FAILFAST")
>>  | .option("escape", "\"")
>>  | .option("nullValue", "")
>>  | .option("indferSchema", "true")
>>  | .csv(inDir)
>>  | .as[A]
>>  | }
>>  | }
>> :13: error: not found: type Encoder
>>abstract class RawTable[A : Encoder](inDir: String) {
>>^
>> :24: error: Unable to find encoder for type stored in a
>> Dataset.  Primitive types (Int, String, etc) and Product types (case
>> classes) are supported by importing spark.implicits._  Support for
>> serializing other types will be added in future releases.
>>.as[A]
>>
>>
>> I gave it a try today in a Scala application and it seems to work.  Is
>> this a known issue in a spark-shell?
>>
>> In my Scala application, this is being defined in a separate file, etc.
>> without direct access to a Spark session.
>>
>> I had to add the following code snippet so the import spark.implicits._
>> would take effect:
>>
>> // ugly hack to get around Encoder can't be found compile time errors
>>
>> private object myImplicits extends SQLImplicits {
>>
>>   protected override def _sqlContext: SQLContext =
>> MySparkSingleton.getCurrentSession().sqlContext
>>
>> }
>>
>> import myImplicits._
>>
>> I found that in about the hundredth SO post I searched for this problem.
>> Is this the best way to let implicits do its thing?
>>
>> Thanks.
>>
>> -Don
>>
>>
>>
>> On Wed, Feb 1, 2017 at 3:16 PM, Michael Armbrust 
>> wrote:
>>
>>> You need to enforce that an Encoder is available for the type A using a 
>>> context
>>> bound .
>>>
>>> import org.apache.spark.sql.Encoder
>>> abstract class RawTable[A : Encoder](inDir: String) {
>>>   ...
>>> }
>>>
>>> On Tue, Jan 31, 2017 at 8:12 PM, Don Drake  wrote:
>>>
 I have a set of CSV that I need to perform ETL on, with the plan to
 re-use a lot of code between each file in a parent abstract class.

 I tried creating the following simple abstract class that will have a
 parameterized type of a case class that represents the schema being read 
 in.

 This won't compile, it just complains about not being able to find an
 encoder, but I'm importing the implicits and don't believe this error.


 scala> import spark.implicits._
 import spark.implicits._

 scala>

 scala> case class RawTemp(f1: String, f2: String, temp: Long,
 created_at: java.sql.Timestamp, data_filename: String)
 defined class RawTemp

 scala>

 scala> abstract class RawTable[A](inDir: String) {
  | def load() = {
  | spark.read
  | .option("header", "true")
  | .option("mode", "FAILFAST")
  | .option("escape", "\"")
  | .option("nullValue", "")
  | .option("indferSchema", "true")
  | .csv(inDir)
  | .as[A]
  | }
  | }
 :27: error: Unable to find encoder for type stored in a
 Dataset.  Primitive types (Int, String, etc) and Product types (case
 classes) are supported by importing spark.implicits._  Support for
 serializing other types will be added in future releases.
.as[A]

 scala> class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
 :13: error: not fo

Re: JavaRDD text matadata(file name) findings

2017-02-01 Thread neil90
You can use the
https://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#wholeTextFiles(java.lang.String)
but it will return a rdd as such (filename,content)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/JavaRDD-text-matadata-file-name-findings-tp28353p28356.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Parameterized types and Datasets - Spark 2.1.0

2017-02-01 Thread Michael Armbrust
This is the error, you are missing an import:

:13: error: not found: type Encoder
   abstract class RawTable[A : Encoder](inDir: String) {

Works for me in a REPL.


On Wed, Feb 1, 2017 at 3:34 PM, Don Drake  wrote:

> Thanks for the reply.   I did give that syntax a try [A : Encoder]
> yesterday, but I kept getting this exception in a spark-shell and Zeppelin
> browser.
>
> scala> import org.apache.spark.sql.Encoder
> import org.apache.spark.sql.Encoder
>
> scala>
>
> scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
> java.sql.Timestamp, data_filename: String)
> defined class RawTemp
>
> scala>
>
> scala> import spark.implicits._
> import spark.implicits._
>
> scala>
>
> scala> abstract class RawTable[A : Encoder](inDir: String) {
>  | import spark.implicits._
>  | def load() = {
>  | import spark.implicits._
>  | spark.read
>  | .option("header", "true")
>  | .option("mode", "FAILFAST")
>  | .option("escape", "\"")
>  | .option("nullValue", "")
>  | .option("indferSchema", "true")
>  | .csv(inDir)
>  | .as[A]
>  | }
>  | }
> :13: error: not found: type Encoder
>abstract class RawTable[A : Encoder](inDir: String) {
>^
> :24: error: Unable to find encoder for type stored in a Dataset.
> Primitive types (Int, String, etc) and Product types (case classes) are
> supported by importing spark.implicits._  Support for serializing other
> types will be added in future releases.
>.as[A]
>
>
> I gave it a try today in a Scala application and it seems to work.  Is
> this a known issue in a spark-shell?
>
> In my Scala application, this is being defined in a separate file, etc.
> without direct access to a Spark session.
>
> I had to add the following code snippet so the import spark.implicits._
> would take effect:
>
> // ugly hack to get around Encoder can't be found compile time errors
>
> private object myImplicits extends SQLImplicits {
>
>   protected override def _sqlContext: SQLContext = MySparkSingleton.
> getCurrentSession().sqlContext
>
> }
>
> import myImplicits._
>
> I found that in about the hundredth SO post I searched for this problem.
> Is this the best way to let implicits do its thing?
>
> Thanks.
>
> -Don
>
>
>
> On Wed, Feb 1, 2017 at 3:16 PM, Michael Armbrust 
> wrote:
>
>> You need to enforce that an Encoder is available for the type A using a 
>> context
>> bound .
>>
>> import org.apache.spark.sql.Encoder
>> abstract class RawTable[A : Encoder](inDir: String) {
>>   ...
>> }
>>
>> On Tue, Jan 31, 2017 at 8:12 PM, Don Drake  wrote:
>>
>>> I have a set of CSV that I need to perform ETL on, with the plan to
>>> re-use a lot of code between each file in a parent abstract class.
>>>
>>> I tried creating the following simple abstract class that will have a
>>> parameterized type of a case class that represents the schema being read in.
>>>
>>> This won't compile, it just complains about not being able to find an
>>> encoder, but I'm importing the implicits and don't believe this error.
>>>
>>>
>>> scala> import spark.implicits._
>>> import spark.implicits._
>>>
>>> scala>
>>>
>>> scala> case class RawTemp(f1: String, f2: String, temp: Long,
>>> created_at: java.sql.Timestamp, data_filename: String)
>>> defined class RawTemp
>>>
>>> scala>
>>>
>>> scala> abstract class RawTable[A](inDir: String) {
>>>  | def load() = {
>>>  | spark.read
>>>  | .option("header", "true")
>>>  | .option("mode", "FAILFAST")
>>>  | .option("escape", "\"")
>>>  | .option("nullValue", "")
>>>  | .option("indferSchema", "true")
>>>  | .csv(inDir)
>>>  | .as[A]
>>>  | }
>>>  | }
>>> :27: error: Unable to find encoder for type stored in a
>>> Dataset.  Primitive types (Int, String, etc) and Product types (case
>>> classes) are supported by importing spark.implicits._  Support for
>>> serializing other types will be added in future releases.
>>>.as[A]
>>>
>>> scala> class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
>>> :13: error: not found: type RawTable
>>>class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
>>>   ^
>>>
>>> What's odd is that this output looks okay:
>>>
>>> scala> val RTEncoder = Encoders.product[RawTemp]
>>> RTEncoder: org.apache.spark.sql.Encoder[RawTemp] = class[f1[0]: string,
>>> f2[0]: string, temp[0]: bigint, created_at[0]: timestamp, data_filename[0]:
>>> string]
>>>
>>> scala> RTEncoder.schema
>>> res4: org.apache.spark.sql.

Re: Question about Multinomial LogisticRegression in spark mllib in spark 2.1.0

2017-02-01 Thread Hollin Wilkins
Hey Aseem,

If you are looking for a full-featured library to execute Spark ML
pipelines outside of Spark, take a look at MLeap:
https://github.com/combust/mleap

Not only does it support transforming single instances of a feature vector,
but you can execute your entire ML pipeline including feature extraction.

Cheers,
Hollin

On Wed, Feb 1, 2017 at 8:49 AM, Seth Hendrickson <
seth.hendrickso...@gmail.com> wrote:

> In Spark.ML the coefficients are not "pivoted" meaning that they do not
> set one of the coefficient sets equal to zero. You can read more about it
> here: https://en.wikipedia.org/wiki/Multinomial_logistic_
> regression#As_a_set_of_independent_binary_regressions
>
> You can translate your set of coefficients to a pivoted version by simply
> subtracting one of the sets of coefficients from all the others. That
> leaves the one you selected, the "pivot", as all zeros. You can then pass
> this into the mllib model, disregarding the "pivot" coefficients. The
> coefficients should be laid out like:
>
> [feature0_class0, feature1_class0, feature2_class0, intercept0,
> feature0_class1, ..., intercept1]
>
> So you have 9 coefficients and 3 intercepts, but you are going to get rid
> of one class's coefficients, leaving you with 6 coefficients and two
> intercepts - so a vector of length 8 for mllib's model.
>
> Note: if you use regularization then it is not exactly correct to convert
> from the non-pivoted version to the pivoted one, since the algorithms will
> give different results in those cases, though it is still possible to do it.
>
> On Wed, Feb 1, 2017 at 3:42 AM, Aseem Bansal  wrote:
>
>> *What I want to do*
>> I have a trained a ml.classification.LogisticRegressionModel using spark
>> ml package.
>>
>> It has 3 features and 3 classes. So the generated model has coefficients
>> in (3, 3) matrix and intercepts in Vector of length (3) as expected.
>>
>> Now, I want to take these coefficients and convert this
>> ml.classification.LogisticRegressionModel model to an instance of
>> mllib.classification.LogisticRegressionModel model.
>>
>> *Why I want to do this*
>> Computational Speed as SPARK-10413 is still in progress and scheduled for
>> Spark 2.2 which is not yet released.
>>
>> *Why I think this is possible*
>> I checked https://spark.apache.org/docs/latest/mllib-linear-me
>> thods.html#logistic-regression and in that example a multinomial
>> Logistic Regression is trained. So as per this the class
>> mllib.classification.LogisticRegressionModel can encapsulate these
>> parameters.
>>
>> *Problem faced*
>> The only constructor in mllib.classification.LogisticRegressionModel
>> takes a single vector as coefficients and single double as intercept but I
>> have a Matrix of coefficients and Vector of intercepts respectively.
>>
>> I tried converting matrix to a vector by just taking the values (Guess
>> work) but got
>>
>> requirement failed: LogisticRegressionModel.load with numClasses = 3 and
>> numFeatures = 3 expected weights of length 6 (without intercept) or 8 (with
>> intercept), but was given weights of length 9
>>
>> So any ideas?
>>
>
>


Re: Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-02-01 Thread Jerry Lam
Hi Koert,

Thank you for your help! GOT IT!

Best Regards,

Jerry

On Wed, Feb 1, 2017 at 6:24 PM, Koert Kuipers  wrote:

> you can still use it as Dataset[Set[X]]. all transformations should work
> correctly.
>
> however dataset.schema will show binary type, and dataset.show will show
> bytes (unfortunately).
>
> for example:
>
> scala> implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]
> setEncoder: [X]=> org.apache.spark.sql.Encoder[Set[X]]
>
> scala> val x = Seq(Set(1,2,3)).toDS
> x: org.apache.spark.sql.Dataset[scala.collection.immutable.Set[Int]] =
> [value: binary]
>
> scala> x.map(_ + 4).collect
> res17: Array[scala.collection.immutable.Set[Int]] = Array(Set(1, 2, 3, 4))
>
> scala> x.show
> ++
> |   value|
> ++
> |[2A 01 03 02 02 0...|
> ++
>
>
> scala> x.schema
> res19: org.apache.spark.sql.types.StructType =
> StructType(StructField(value,BinaryType,true))
>
>
> On Wed, Feb 1, 2017 at 12:03 PM, Jerry Lam  wrote:
>
>> Hi Koert,
>>
>> Thanks for the tips. I tried to do that but the column's type is now
>> Binary. Do I get the Set[X] back in the Dataset?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Tue, Jan 31, 2017 at 8:04 PM, Koert Kuipers  wrote:
>>
>>> set is currently not supported. you can use kryo encoder. there is no
>>> other work around that i know of.
>>>
>>> import org.apache.spark.sql.{ Encoder, Encoders }
>>> implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]
>>>
>>> On Tue, Jan 31, 2017 at 7:33 PM, Jerry Lam  wrote:
>>>
 Hi guys,

 I got an exception like the following, when I tried to implement a user
 defined aggregation function.

  Exception in thread "main" java.lang.UnsupportedOperationException:
 No Encoder found for Set[(scala.Long, scala.Long)]

 The Set[(Long, Long)] is a field in the case class which is the output
 type for the aggregation.

 Is there a workaround for this?

 Best Regards,

 Jerry

>>>
>>>
>>
>


Re: Parameterized types and Datasets - Spark 2.1.0

2017-02-01 Thread Don Drake
Thanks for the reply.   I did give that syntax a try [A : Encoder]
yesterday, but I kept getting this exception in a spark-shell and Zeppelin
browser.

scala> import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoder

scala>

scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
java.sql.Timestamp, data_filename: String)
defined class RawTemp

scala>

scala> import spark.implicits._
import spark.implicits._

scala>

scala> abstract class RawTable[A : Encoder](inDir: String) {
 | import spark.implicits._
 | def load() = {
 | import spark.implicits._
 | spark.read
 | .option("header", "true")
 | .option("mode", "FAILFAST")
 | .option("escape", "\"")
 | .option("nullValue", "")
 | .option("indferSchema", "true")
 | .csv(inDir)
 | .as[A]
 | }
 | }
:13: error: not found: type Encoder
   abstract class RawTable[A : Encoder](inDir: String) {
   ^
:24: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._  Support for serializing other
types will be added in future releases.
   .as[A]


I gave it a try today in a Scala application and it seems to work.  Is this
a known issue in a spark-shell?

In my Scala application, this is being defined in a separate file, etc.
without direct access to a Spark session.

I had to add the following code snippet so the import spark.implicits._
would take effect:

// ugly hack to get around Encoder can't be found compile time errors

private object myImplicits extends SQLImplicits {

  protected override def _sqlContext: SQLContext =
MySparkSingleton.getCurrentSession().sqlContext

}

import myImplicits._

I found that in about the hundredth SO post I searched for this problem.
Is this the best way to let implicits do its thing?

Thanks.

-Don



On Wed, Feb 1, 2017 at 3:16 PM, Michael Armbrust 
wrote:

> You need to enforce that an Encoder is available for the type A using a 
> context
> bound .
>
> import org.apache.spark.sql.Encoder
> abstract class RawTable[A : Encoder](inDir: String) {
>   ...
> }
>
> On Tue, Jan 31, 2017 at 8:12 PM, Don Drake  wrote:
>
>> I have a set of CSV that I need to perform ETL on, with the plan to
>> re-use a lot of code between each file in a parent abstract class.
>>
>> I tried creating the following simple abstract class that will have a
>> parameterized type of a case class that represents the schema being read in.
>>
>> This won't compile, it just complains about not being able to find an
>> encoder, but I'm importing the implicits and don't believe this error.
>>
>>
>> scala> import spark.implicits._
>> import spark.implicits._
>>
>> scala>
>>
>> scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
>> java.sql.Timestamp, data_filename: String)
>> defined class RawTemp
>>
>> scala>
>>
>> scala> abstract class RawTable[A](inDir: String) {
>>  | def load() = {
>>  | spark.read
>>  | .option("header", "true")
>>  | .option("mode", "FAILFAST")
>>  | .option("escape", "\"")
>>  | .option("nullValue", "")
>>  | .option("indferSchema", "true")
>>  | .csv(inDir)
>>  | .as[A]
>>  | }
>>  | }
>> :27: error: Unable to find encoder for type stored in a
>> Dataset.  Primitive types (Int, String, etc) and Product types (case
>> classes) are supported by importing spark.implicits._  Support for
>> serializing other types will be added in future releases.
>>.as[A]
>>
>> scala> class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
>> :13: error: not found: type RawTable
>>class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
>>   ^
>>
>> What's odd is that this output looks okay:
>>
>> scala> val RTEncoder = Encoders.product[RawTemp]
>> RTEncoder: org.apache.spark.sql.Encoder[RawTemp] = class[f1[0]: string,
>> f2[0]: string, temp[0]: bigint, created_at[0]: timestamp, data_filename[0]:
>> string]
>>
>> scala> RTEncoder.schema
>> res4: org.apache.spark.sql.types.StructType =
>> StructType(StructField(f1,StringType,true),
>> StructField(f2,StringType,true), StructField(temp,LongType,false),
>> StructField(created_at,TimestampType,true),
>> StructField(data_filename,StringType,true))
>>
>> scala> RTEncoder.clsTag
>> res5: scala.reflect.ClassTag[RawTemp] = RawTemp
>>
>> Any ideas?
>>
>> --
>> Donald Drake
>> Drake Consulting
>> http://www.drakeconsulting.com/
>> https://twitter.com/dondrake 
>> 800-733-2143 <(800)%20733-2143>
>>
>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.

Re: Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-02-01 Thread Koert Kuipers
you can still use it as Dataset[Set[X]]. all transformations should work
correctly.

however dataset.schema will show binary type, and dataset.show will show
bytes (unfortunately).

for example:

scala> implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]
setEncoder: [X]=> org.apache.spark.sql.Encoder[Set[X]]

scala> val x = Seq(Set(1,2,3)).toDS
x: org.apache.spark.sql.Dataset[scala.collection.immutable.Set[Int]] =
[value: binary]

scala> x.map(_ + 4).collect
res17: Array[scala.collection.immutable.Set[Int]] = Array(Set(1, 2, 3, 4))

scala> x.show
++
|   value|
++
|[2A 01 03 02 02 0...|
++


scala> x.schema
res19: org.apache.spark.sql.types.StructType =
StructType(StructField(value,BinaryType,true))


On Wed, Feb 1, 2017 at 12:03 PM, Jerry Lam  wrote:

> Hi Koert,
>
> Thanks for the tips. I tried to do that but the column's type is now
> Binary. Do I get the Set[X] back in the Dataset?
>
> Best Regards,
>
> Jerry
>
>
> On Tue, Jan 31, 2017 at 8:04 PM, Koert Kuipers  wrote:
>
>> set is currently not supported. you can use kryo encoder. there is no
>> other work around that i know of.
>>
>> import org.apache.spark.sql.{ Encoder, Encoders }
>> implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]
>>
>> On Tue, Jan 31, 2017 at 7:33 PM, Jerry Lam  wrote:
>>
>>> Hi guys,
>>>
>>> I got an exception like the following, when I tried to implement a user
>>> defined aggregation function.
>>>
>>>  Exception in thread "main" java.lang.UnsupportedOperationException: No
>>> Encoder found for Set[(scala.Long, scala.Long)]
>>>
>>> The Set[(Long, Long)] is a field in the case class which is the output
>>> type for the aggregation.
>>>
>>> Is there a workaround for this?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>
>>
>


RE: Jars directory in Spark 2.0

2017-02-01 Thread Sidney Feiner
Ok, good to know ☺
Shading every spark app it is then…
Thanks!

Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]

From: Marcelo Vanzin [mailto:van...@cloudera.com]
Sent: Wednesday, February 1, 2017 7:41 PM
To: Sidney Feiner 
Cc: Koert Kuipers ; user@spark.apache.org
Subject: Re: Jars directory in Spark 2.0

Spark has never shaded dependencies (in the sense of renaming the classes), 
with a couple of exceptions (Guava and Jetty). So that behavior is nothing new. 
Spark's dependencies themselves have a lot of other dependencies, so doing that 
would have limited benefits anyway.

On Tue, Jan 31, 2017 at 11:23 PM, Sidney Feiner 
mailto:sidney.fei...@startapp.com>> wrote:
Is this done on purpose? Because it really makes it hard to deploy 
applications. Is there a reason they didn't shade the jars they use to begin 
with?

Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]

From: Koert Kuipers [mailto:ko...@tresata.com]
Sent: Tuesday, January 31, 2017 7:26 PM
To: Sidney Feiner 
mailto:sidney.fei...@startapp.com>>
Cc: user@spark.apache.org
Subject: Re: Jars directory in Spark 2.0

you basically have to keep your versions of dependencies in line with sparks or 
shade your own dependencies.

you cannot just replace the jars in sparks jars folder. if you wan to update 
them you have to build spark yourself with updated dependencies and confirm it 
compiles, passes tests etc.

On Tue, Jan 31, 2017 at 3:40 AM, Sidney Feiner 
mailto:sidney.fei...@startapp.com>> wrote:
Hey,
While migrating to Spark 2.X from 1.6, I've had many issues with jars that come 
preloaded with Spark in the "jars/" directory and I had to shade most of my 
packages.
Can I replace the jars in this folder to more up to date versions? Are those 
jar used for anything internal in Spark which means I can't blindly replace 
them?

Thanks ☺


Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]

 



--
Marcelo


Re: Parameterized types and Datasets - Spark 2.1.0

2017-02-01 Thread Michael Armbrust
You need to enforce that an Encoder is available for the type A using a context
bound .

import org.apache.spark.sql.Encoder
abstract class RawTable[A : Encoder](inDir: String) {
  ...
}

On Tue, Jan 31, 2017 at 8:12 PM, Don Drake  wrote:

> I have a set of CSV that I need to perform ETL on, with the plan to re-use
> a lot of code between each file in a parent abstract class.
>
> I tried creating the following simple abstract class that will have a
> parameterized type of a case class that represents the schema being read in.
>
> This won't compile, it just complains about not being able to find an
> encoder, but I'm importing the implicits and don't believe this error.
>
>
> scala> import spark.implicits._
> import spark.implicits._
>
> scala>
>
> scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
> java.sql.Timestamp, data_filename: String)
> defined class RawTemp
>
> scala>
>
> scala> abstract class RawTable[A](inDir: String) {
>  | def load() = {
>  | spark.read
>  | .option("header", "true")
>  | .option("mode", "FAILFAST")
>  | .option("escape", "\"")
>  | .option("nullValue", "")
>  | .option("indferSchema", "true")
>  | .csv(inDir)
>  | .as[A]
>  | }
>  | }
> :27: error: Unable to find encoder for type stored in a Dataset.
> Primitive types (Int, String, etc) and Product types (case classes) are
> supported by importing spark.implicits._  Support for serializing other
> types will be added in future releases.
>.as[A]
>
> scala> class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
> :13: error: not found: type RawTable
>class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
>   ^
>
> What's odd is that this output looks okay:
>
> scala> val RTEncoder = Encoders.product[RawTemp]
> RTEncoder: org.apache.spark.sql.Encoder[RawTemp] = class[f1[0]: string,
> f2[0]: string, temp[0]: bigint, created_at[0]: timestamp, data_filename[0]:
> string]
>
> scala> RTEncoder.schema
> res4: org.apache.spark.sql.types.StructType = 
> StructType(StructField(f1,StringType,true),
> StructField(f2,StringType,true), StructField(temp,LongType,false),
> StructField(created_at,TimestampType,true), StructField(data_filename,
> StringType,true))
>
> scala> RTEncoder.clsTag
> res5: scala.reflect.ClassTag[RawTemp] = RawTemp
>
> Any ideas?
>
> --
> Donald Drake
> Drake Consulting
> http://www.drakeconsulting.com/
> https://twitter.com/dondrake 
> 800-733-2143 <(800)%20733-2143>
>


Re: using withWatermark on Dataset

2017-02-01 Thread Michael Armbrust
Can you give the full stack trace?  Also which version of Spark are you
running?

On Wed, Feb 1, 2017 at 10:38 AM, Jerry Lam  wrote:

> Hi everyone,
>
> Anyone knows how to use withWatermark  on Dataset?
>
> I have tried the following but hit this exception:
>
> dataset org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
> cannot be cast to "MyType"
>
> The code looks like the following:
>
> dataset
> .withWatermark("timestamp", "5 seconds")
> .groupBy("timestamp", "customer_id")
> .agg(MyAggregator)
> .writeStream
>
> Where dataset has MyType for each row.
> Where MyType is:
> case class MyTpe(customer_id: Long, timestamp: Timestamp, product_id: Long)
>
> MyAggregator which takes MyType as the input type did some maths on the
> product_id and outputs a set of product_ids.
>
> Best Regards,
>
> Jerry
>
>
>
>
>
>
>


pivot over non numerical data

2017-02-01 Thread Darshan Pandya
Hello,

I am trying to transpose some data using groupBy pivot aggr as mentioned in
this blog
https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html

But this works only for numerical data.
Any hints for doing the same thing for non numerical data ?


-- 
Sincerely,
Darshan


using withWatermark on Dataset

2017-02-01 Thread Jerry Lam
Hi everyone,

Anyone knows how to use withWatermark  on Dataset?

I have tried the following but hit this exception:

dataset org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
cannot be cast to "MyType"

The code looks like the following:

dataset
.withWatermark("timestamp", "5 seconds")
.groupBy("timestamp", "customer_id")
.agg(MyAggregator)
.writeStream

Where dataset has MyType for each row.
Where MyType is:
case class MyTpe(customer_id: Long, timestamp: Timestamp, product_id: Long)

MyAggregator which takes MyType as the input type did some maths on the
product_id and outputs a set of product_ids.

Best Regards,

Jerry


Re: Jars directory in Spark 2.0

2017-02-01 Thread Marcelo Vanzin
Spark has never shaded dependencies (in the sense of renaming the classes),
with a couple of exceptions (Guava and Jetty). So that behavior is nothing
new. Spark's dependencies themselves have a lot of other dependencies, so
doing that would have limited benefits anyway.

On Tue, Jan 31, 2017 at 11:23 PM, Sidney Feiner 
wrote:

> Is this done on purpose? Because it really makes it hard to deploy
> applications. Is there a reason they didn't shade the jars they use to
> begin with?
>
>
>
> *Sidney Feiner*   */*  SW Developer
>
> M: +972.528197720 <+972%2052-819-7720>  */*  Skype: sidney.feiner.startapp
>
>
>
> [image: StartApp] 
>
>
>
> *From:* Koert Kuipers [mailto:ko...@tresata.com]
> *Sent:* Tuesday, January 31, 2017 7:26 PM
> *To:* Sidney Feiner 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Jars directory in Spark 2.0
>
>
>
> you basically have to keep your versions of dependencies in line with
> sparks or shade your own dependencies.
>
> you cannot just replace the jars in sparks jars folder. if you wan to
> update them you have to build spark yourself with updated dependencies and
> confirm it compiles, passes tests etc.
>
>
>
> On Tue, Jan 31, 2017 at 3:40 AM, Sidney Feiner 
> wrote:
>
> Hey,
>
> While migrating to Spark 2.X from 1.6, I've had many issues with jars that
> come preloaded with Spark in the "jars/" directory and I had to shade most
> of my packages.
>
> Can I replace the jars in this folder to more up to date versions? Are
> those jar used for anything internal in Spark which means I can't blindly
> replace them?
>
>
>
> Thanks J
>
>
>
>
>
> *Sidney Feiner*   */*  SW Developer
>
> M: +972.528197720 <+972%2052-819-7720>  */*  Skype: sidney.feiner.startapp
>
>
>
> [image: StartApp] 
>
>
>
> 
>
>   
>



-- 
Marcelo


Re: Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-02-01 Thread Jerry Lam
Hi Koert,

Thanks for the tips. I tried to do that but the column's type is now
Binary. Do I get the Set[X] back in the Dataset?

Best Regards,

Jerry


On Tue, Jan 31, 2017 at 8:04 PM, Koert Kuipers  wrote:

> set is currently not supported. you can use kryo encoder. there is no
> other work around that i know of.
>
> import org.apache.spark.sql.{ Encoder, Encoders }
> implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]
>
> On Tue, Jan 31, 2017 at 7:33 PM, Jerry Lam  wrote:
>
>> Hi guys,
>>
>> I got an exception like the following, when I tried to implement a user
>> defined aggregation function.
>>
>>  Exception in thread "main" java.lang.UnsupportedOperationException: No
>> Encoder found for Set[(scala.Long, scala.Long)]
>>
>> The Set[(Long, Long)] is a field in the case class which is the output
>> type for the aggregation.
>>
>> Is there a workaround for this?
>>
>> Best Regards,
>>
>> Jerry
>>
>
>


Re: Question about Multinomial LogisticRegression in spark mllib in spark 2.1.0

2017-02-01 Thread Seth Hendrickson
In Spark.ML the coefficients are not "pivoted" meaning that they do not set
one of the coefficient sets equal to zero. You can read more about it here:
https://en.wikipedia.org/wiki/Multinomial_logistic_regression#As_a_set_of_independent_binary_regressions

You can translate your set of coefficients to a pivoted version by simply
subtracting one of the sets of coefficients from all the others. That
leaves the one you selected, the "pivot", as all zeros. You can then pass
this into the mllib model, disregarding the "pivot" coefficients. The
coefficients should be laid out like:

[feature0_class0, feature1_class0, feature2_class0, intercept0,
feature0_class1, ..., intercept1]

So you have 9 coefficients and 3 intercepts, but you are going to get rid
of one class's coefficients, leaving you with 6 coefficients and two
intercepts - so a vector of length 8 for mllib's model.

Note: if you use regularization then it is not exactly correct to convert
from the non-pivoted version to the pivoted one, since the algorithms will
give different results in those cases, though it is still possible to do it.

On Wed, Feb 1, 2017 at 3:42 AM, Aseem Bansal  wrote:

> *What I want to do*
> I have a trained a ml.classification.LogisticRegressionModel using spark
> ml package.
>
> It has 3 features and 3 classes. So the generated model has coefficients
> in (3, 3) matrix and intercepts in Vector of length (3) as expected.
>
> Now, I want to take these coefficients and convert this 
> ml.classification.LogisticRegressionModel
> model to an instance of mllib.classification.LogisticRegressionModel
> model.
>
> *Why I want to do this*
> Computational Speed as SPARK-10413 is still in progress and scheduled for
> Spark 2.2 which is not yet released.
>
> *Why I think this is possible*
> I checked https://spark.apache.org/docs/latest/mllib-linear-
> methods.html#logistic-regression and in that example a multinomial
> Logistic Regression is trained. So as per this the class
> mllib.classification.LogisticRegressionModel can encapsulate these
> parameters.
>
> *Problem faced*
> The only constructor in mllib.classification.LogisticRegressionModel
> takes a single vector as coefficients and single double as intercept but I
> have a Matrix of coefficients and Vector of intercepts respectively.
>
> I tried converting matrix to a vector by just taking the values (Guess
> work) but got
>
> requirement failed: LogisticRegressionModel.load with numClasses = 3 and
> numFeatures = 3 expected weights of length 6 (without intercept) or 8 (with
> intercept), but was given weights of length 9
>
> So any ideas?
>


union of compatible types

2017-02-01 Thread Koert Kuipers
spark's onion/merging of compatible types seems kind of weak. it works on
basic types in the top level record, but it fails for nested records, maps,
arrays, etc.

are there any known workarounds or plans to improve this?

for example i get errors like this:
org.apache.spark.sql.AnalysisException: Union can only be performed on
tables with the compatible column types.
StructType(StructField(_1,StringType,true),
StructField(_2,IntegerType,false)) <>
StructType(StructField(_1,StringType,true), StructField(_2,LongType,false))
at the first column of the second table

some examples that do work:

scala> Seq(1, 2, 3).toDF union Seq(1L, 2L, 3L).toDF
res2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [value:
bigint]

scala> Seq((1,"x"), (2,"x"), (3,"x")).toDF union Seq((1L,"x"), (2L,"x"),
(3L,"x")).toDF
res3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_1: bigint,
_2: string]

what i would also expect to work but currently doesn't:

scala> Seq((Seq(1),"x"), (Seq(2),"x"), (Seq(3),"x")).toDF union
Seq((Seq(1L),"x"), (Seq(2L),"x"), (Seq(3L),"x")).toDF

scala> Seq((1,("x",1)), (2,("x",2)), (3,("x",3))).toDF union
Seq((1L,("x",1L)), (2L,("x",2L)), (3L,("x", 3L))).toDF


Re: tylerchap...@yahoo-inc.com is no longer with Yahoo! (was: Question about Multinomial LogisticRegression in spark mllib in spark 2.1.0)

2017-02-01 Thread Aseem Bansal
Can a admin of mailing list please remove this email? I get this email
every time I send an email to the mailing list.

On Wed, Feb 1, 2017 at 5:12 PM, Yahoo! No Reply 
wrote:

>
> This is an automatically generated message.
>
> tylerchap...@yahoo-inc.com is no longer with Yahoo! Inc.
>
> Your message will not be forwarded.
>
> If you have a sales inquiry, please email yahoosa...@yahoo-inc.com and
> someone will follow up with you shortly.
>
> If you require assistance with a legal matter, please send a message to
> legal-noti...@yahoo-inc.com
>
> Thank you!
>


Re: Hive Java UDF running on spark-sql issue

2017-02-01 Thread Alex
Yes...

Its taking values form a record which is a json and converting it into
multiple columns after typecasting...

On Wed, Feb 1, 2017 at 4:07 PM, Marco Mistroni  wrote:

> Hi
>  What is the UDF supposed to do? Are you trying to write a generic
> function to convert values to another type depending on what is the type of
> the original value?
> Kr
>
>
>
> On 1 Feb 2017 5:56 am, "Alex"  wrote:
>
> Hi ,
>
>
> we have Java Hive UDFS which are working perfectly fine in Hive
>
> SO for Better performance we are migrating the same To Spark-sql
>
> SO these jar files we are giving --jars argument to spark-sql
> and defining temporary functions to make it to run on spark-sql
>
> there is this particular Java UDF which is working fine on hive But when
> ran on spark-sql it is giving the error
>
> Caused by:org.apache.hadoop.hive.ql.metadata.HiveException:
> java.lang.ClassCastException: java.lang.Long cannot be cast to
> org.apache.hadoop.io.LongWritable
> org.apache.hadoop.hive.ql.metadata.HiveException:
> java.lang.ClassCastException: java.lang.String cannot be cast to
> org.apache.hadoop.io.Text
> Caused by:org.apache.hadoop.hive.ql.metadata.HiveException:
> java.lang.ClassCastException: java.lang.Double cannot be cast to
> org.apache.hadoop.hive.serde2.io.DoubleWritable
>
> The piece of code where it is throwing the error is in teh switch case
> below
>
> public String getName(int pos) {
> if (pos < 0 && pos >= colnames.size())
> return null;
> return ((StructField) colnames.get(pos)).getFieldName();
> }
>
> public int getPos(String name) {
> // System.out.println(name+transactionObject.toString());
> Integer pos = (Integer) transactionObject.get(name.toLowerCase());
> if (pos == null)
> return -1;
> return pos;
> }
>
> public Object get(Object name) {
> int pos = getPos((String) name);
> if (pos < 0)
> return null;
> String f = "string";
> Object obj = list.get(pos);
> if (obj == null)
> return null;
> ObjectInspector ins = ((StructField) colnames.get(pos)).getFieldObj
> ectInspector();
> if (ins != null)
> f = ins.getTypeName();
> switch (f) {
> case "double":
> return ((DoubleWritable) obj).get();
> case "bigint":
> return ((Long) obj).get();
> case "string":
> return ((Text) obj).toString();
> default:
> return obj;
> }
> }
>
> So I made the code change to below
>
> public int getPos(String name) {
> // System.out.println(name+transactionObject.toString());
> Integer pos = (Integer) transactionObject.get(name.toLowerCase());
> if (pos == null)
> return -1;
> return pos;
> }
>
> public Object get(Object name) {
> int pos = getPos((String) name);
> if (pos < 0)
> return null;
> String f = "string";
> Object obj = list.get(pos);
> Object result = null;
> if (obj == null)
> return null;
> ObjectInspector ins = ((StructField) colnames.get(pos)).getFieldObj
> ectInspector();
> if (ins != null)
> f = ins.getTypeName();
>
> PrimitiveObjectInspector ins2 = (PrimitiveObjectInspector) ins;
> switch (ins2.getPrimitiveCategory()) {
> case DOUBLE:
>
> Double res = (Double)(((DoubleObjectInspector) ins2).get(obj));
>
> result = (double) res;
> System.out.println("printlog when double"+result);
> return result;
>
>
> case LONG:
>
> Long res1 = (Long)(((LongObjectInspector) ins2).get(obj));
> result = (long) res1;
> System.out.println("printlog when long"+result);
> return result;
>
>
> case STRING:
> result = (((StringObjectInspector) ins2).getPrimitiveJavaObject(o
> bj)).toString();
> System.out.println("printlog when String"+result);
> return result;
>
> default:
> result = obj;
> return result;
> }
>
> }
> After making This Changes .. The java hive udf started working fine on
> Spark-sql
>
> But it is giving different results when the UDF is used in the query..
>
> If you think You can give it a shot solving this issue please reach me out
> on hangouts or reply here
>
>
>
>
>


Question about Multinomial LogisticRegression in spark mllib in spark 2.1.0

2017-02-01 Thread Aseem Bansal
*What I want to do*
I have a trained a ml.classification.LogisticRegressionModel using spark ml
package.

It has 3 features and 3 classes. So the generated model has coefficients in
(3, 3) matrix and intercepts in Vector of length (3) as expected.

Now, I want to take these coefficients and convert this
ml.classification.LogisticRegressionModel model to an instance of
mllib.classification.LogisticRegressionModel model.

*Why I want to do this*
Computational Speed as SPARK-10413 is still in progress and scheduled for
Spark 2.2 which is not yet released.

*Why I think this is possible*
I checked
https://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression
and in that example a multinomial Logistic Regression is trained. So as per
this the class mllib.classification.LogisticRegressionModel can encapsulate
these parameters.

*Problem faced*
The only constructor in mllib.classification.LogisticRegressionModel takes
a single vector as coefficients and single double as intercept but I have a
Matrix of coefficients and Vector of intercepts respectively.

I tried converting matrix to a vector by just taking the values (Guess
work) but got

requirement failed: LogisticRegressionModel.load with numClasses = 3 and
numFeatures = 3 expected weights of length 6 (without intercept) or 8 (with
intercept), but was given weights of length 9

So any ideas?


Re: Hive Java UDF running on spark-sql issue

2017-02-01 Thread Marco Mistroni
Hi
 What is the UDF supposed to do? Are you trying to write a generic function
to convert values to another type depending on what is the type of the
original value?
Kr



On 1 Feb 2017 5:56 am, "Alex"  wrote:

Hi ,


we have Java Hive UDFS which are working perfectly fine in Hive

SO for Better performance we are migrating the same To Spark-sql

SO these jar files we are giving --jars argument to spark-sql
and defining temporary functions to make it to run on spark-sql

there is this particular Java UDF which is working fine on hive But when
ran on spark-sql it is giving the error

Caused by:org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.ClassCastException: java.lang.Long cannot be cast to
org.apache.hadoop.io.LongWritable
org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.hadoop.io.Text
Caused by:org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.ClassCastException: java.lang.Double cannot be cast to
org.apache.hadoop.hive.serde2.io.DoubleWritable

The piece of code where it is throwing the error is in teh switch case below

public String getName(int pos) {
if (pos < 0 && pos >= colnames.size())
return null;
return ((StructField) colnames.get(pos)).getFieldName();
}

public int getPos(String name) {
// System.out.println(name+transactionObject.toString());
Integer pos = (Integer) transactionObject.get(name.toLowerCase());
if (pos == null)
return -1;
return pos;
}

public Object get(Object name) {
int pos = getPos((String) name);
if (pos < 0)
return null;
String f = "string";
Object obj = list.get(pos);
if (obj == null)
return null;
ObjectInspector ins = ((StructField) colnames.get(pos)).
getFieldObjectInspector();
if (ins != null)
f = ins.getTypeName();
switch (f) {
case "double":
return ((DoubleWritable) obj).get();
case "bigint":
return ((Long) obj).get();
case "string":
return ((Text) obj).toString();
default:
return obj;
}
}

So I made the code change to below

public int getPos(String name) {
// System.out.println(name+transactionObject.toString());
Integer pos = (Integer) transactionObject.get(name.toLowerCase());
if (pos == null)
return -1;
return pos;
}

public Object get(Object name) {
int pos = getPos((String) name);
if (pos < 0)
return null;
String f = "string";
Object obj = list.get(pos);
Object result = null;
if (obj == null)
return null;
ObjectInspector ins = ((StructField) colnames.get(pos)).
getFieldObjectInspector();
if (ins != null)
f = ins.getTypeName();

PrimitiveObjectInspector ins2 = (PrimitiveObjectInspector) ins;
switch (ins2.getPrimitiveCategory()) {
case DOUBLE:

Double res = (Double)(((DoubleObjectInspector) ins2).get(obj));

result = (double) res;
System.out.println("printlog when double"+result);
return result;


case LONG:

Long res1 = (Long)(((LongObjectInspector) ins2).get(obj));
result = (long) res1;
System.out.println("printlog when long"+result);
return result;


case STRING:
result = (((StringObjectInspector) ins2).getPrimitiveJavaObject(
obj)).toString();
System.out.println("printlog when String"+result);
return result;

default:
result = obj;
return result;
}

}
After making This Changes .. The java hive udf started working fine on
Spark-sql

But it is giving different results when the UDF is used in the query..

If you think You can give it a shot solving this issue please reach me out
on hangouts or reply here


A question about inconsistency during dataframe creation with RDD/dict in PySpark

2017-02-01 Thread Han-Cheol Cho
Dear spark user ml members,


I have quite messy input data so it is difficult to load them as a dataframe 
object
directly.
What I did is to load it as an RDD of strings first, convert it to an RDD of 
pyspark.sql.Row objects, then use toDF method as below.
mydf = myrdd.map(parse).toDF()

I didn't expect any problem from this very simple code at first.


But, when I tested it with a bunch of data, I found that this approach fails 
with the
following exception.
java.lang.IllegalStateException: Input row doesn't have expected number of 
values required by the schema. 10 fields are required while 9 values are 
provided.   
  
at 
org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:147)
at 
org.apache.spark.sql.SparkSession$$anonfun$7.apply(SparkSession.scala:665)
at 
org.apache.spark.sql.SparkSession$$anonfun$7.apply(SparkSession.scala:665)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)   
...

This exception comes from the fact that some Row objects in RDD have missing 
fields.
For example, the following example fails with the same exception
d1 = [Row(k1="value1.1", k2="value1.2")]
  
d2 = [Row(k1="value2.1")]   
  

rdd1 = spark.sparkContext.parallelize(d1)   
 
rdd2 = spark.sparkContext.parallelize(d2)   
 

urdd = rdd1.union(rdd2) 
urdd.collect()  
[Row(k1='value1.1', k2='value1.2'), Row(k1='value2.1')] 
 

urdd.toDF() 
 
DataFrame[k1: string, k2: string]   
   
urdd.toDF().show()   
--> fail with the same exception

While digging into the code, I found that Row object raises an exception if
it does not have a given key as follows.
# spark/python/pyspark/sql/types.py
def _verify_type(obj, dataType, nullable=True):
...
elif isinstance(dataType, StructType):
...
elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False):
# the order in obj could be different than dataType.fields
for f in dataType.fields:
_verify_type(obj[f.name], f.dataType, f.nullable)
   --> obj[f.name] raise ValueError(item) exception if the 
key does not exist.


I think that raising an exception in this situation is a reasonable approach.
However, if I use an RDD of dict objects, instead of Row objects, the convert 
process
succeed as follows by filling missing columns with null values.
dict1 = [{"k1":"v1.1", "k2":"v1.2"}]
dict2 = [{"k1":"v2.1"}]

rdd1 = spark.sparkContext.parallelize(dict1)
rdd2 = spark.sparkContext.parallelize(dict2)
rdd1.collect()
[{'k2': 'v1.2', 'k1': 'v1.1'}]
rdd2.collect()
[{'k1': 'v2.1'}]

urdd = rdd1.union(rdd2)
urdd.collect()
[{'k2': 'v1.2', 'k1': 'v1.1'}, {'k1': 'v2.1'}]

spark.createDataFrame(urdd).show()
+++
|  k1|  k2|
+++
|v1.1|v1.2|
|v2.1|null|
+++

urdd.toDF().show()
+++
|  k1|  k2|
+++
|v1.1|v1.2|
|v2.1|null|
+++


I am wonder whether this difference is an expected result or not.


Best wishes,
Han-cheol



 Han-Cheol Cho  Data Laboratory   / Data Scientist  〒160-0022 東京都新宿区新宿6-27-30 新宿イーストサイドスクエア13階
Email  hancheol@nhn-techorus.com