Re: Problems with reading data from parquet files in a HDFS remotely

2016-01-08 Thread Henrik Baastrup
Hi Ewan,

Thank you for your answer.
I have already tried what you suggest.

If I use:
"hdfs://172.27.13.57:7077/user/hdfs/parquet-multi/BICC"
I get the AssertionError exception:
Exception in thread "main" java.lang.AssertionError: assertion
failed: No predefined schema found, and no Parquet data files or summary
files found under hdfs://172.27.13.57:7077/user/hdfs/parquet-multi/BICC.
Note: The IP address of my Spark Master is: 172.27.13.57

If I do as as you suggest literally:
"hdfs:///user/hdfs/parquet-multi/BICC"
I get an IOException:
Exception in thread "main" java.io.IOException: Incomplete HDFS URI,
no host: hdfs:///user/hdfs/parquet-multi/BICC

To me it seams that the Spark library try to resolve the URI locally and
I suspect I miss something in my configuration of the SparkContext, but
do not know what.
Or could it be that I use the wrong port in the hdfs:// URI above?

Henrik




On 07/01/2016 19:41, Ewan Leith wrote:
>
> Try the path
>
>
> "hdfs:///user/hdfs/parquet-multi/BICC"
> Thanks,
> Ewan
>
>
> -- Original message--
>
> *From: *Henrik Baastrup
>
> *Date: *Thu, 7 Jan 2016 17:54
>
> *To: *user@spark.apache.org;
>
> *Cc: *Baastrup, Henrik;
>
> *Subject:*Problems with reading data from parquet files in a HDFS remotely
>
>
> Hi All,
>
> I have a small Hadoop cluster where I have stored a lot of data in parquet 
> files. I have installed a Spark master service on one of the nodes and now 
> would like to query my parquet files from a Spark client. When I run the 
> following program from the spark-shell on the Spark Master node all function 
> correct:
>
> # val sqlCont = new org.apache.spark.sql.SQLContext(sc)
> # val reader = sqlCont.read
> # val dataFrame = reader.parquet("/user/hdfs/parquet-multi/BICC")
> # dataFrame.registerTempTable("BICC")
> # val recSet = sqlCont.sql("SELECT 
> protocolCode,beginTime,endTime,called,calling FROM BICC WHERE 
> endTime>=14494218 AND endTime<=14494224 AND 
> calling='6287870642893' AND p_endtime=14494224")
> # recSet.show()  
>
> But when I run the Java program below, from my client, I get: 
>
> Exception in thread "main" java.lang.AssertionError: assertion failed: No 
> predefined schema found, and no Parquet data files or summary files found 
> under file:/user/hdfs/parquet-multi/BICC.
>
> The exception occurs at the line: DataFrame df = 
> reader.parquet("/user/hdfs/parquet-multi/BICC");
>
> On the Master node I can see the client connect when the SparkContext is 
> instanced, as I get the following lines in the Spark log:
>
> 16/01/07 18:27:47 INFO Master: Registering app SparkTest
> 16/01/07 18:27:47 INFO Master: Registered app SparkTest with ID 
> app-20160107182747-00801
>
> If I create a local directory with the given path, my program goes in an 
> endless loop, with the following warning on the console:
>
> WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; 
> check your cluster UI to ensure that workers are registered and have 
> sufficient resources
>
> To me it seams that my SQLContext does not connect to the Spark Master, but 
> try to work locally on the client, where the requested files do not exist.
>
> Java program:
>   SparkConf conf = new SparkConf()
>   .setAppName("SparkTest")
>   .setMaster("spark://172.27.13.57:7077");
>   JavaSparkContext sc = new JavaSparkContext(conf);
>   SQLContext sqlContext = new SQLContext(sc);
>   
>   DataFrameReader reader = sqlContext.read();
>   DataFrame df = reader.parquet("/user/hdfs/parquet-multi/BICC");
>   DataFrame filtered = df.filter("endTime>=14494218 AND 
> endTime<=14494224 AND calling='6287870642893' AND 
> p_endtime=14494224");
>   filtered.show();
>
> Are there someone there can help me?
>
> Henrik
>



Re: Recommendations using Spark

2016-01-08 Thread Jorge Machado
Hello anjali, 

You can Start here : org.apache.spark.mllib.recommendation

Them you should build a “recomender”  you need to transform your trainData into 
Rating objects them you can train a model  with for example : val model = 
ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)
Jorge

> On 08/01/2016, at 08:11, anjali gautam  wrote:
> 
> Hi,
> 
> Can anybody please guide me how can we create generate recommendations for a 
> user using spark?
> 
> Regards,
> Anjali Gautam


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: write new data to mysql

2016-01-08 Thread Yasemin Kaya
Hi,
There is no write function that Todd mentioned or i cant find it.
The code and error are in gist
. Could you check it
out please?

Best,
yasemin

2016-01-08 18:23 GMT+02:00 Todd Nist :

> It is not clear from the information provided why the insertIntoJDBC
> failed in #2.  I would note that method on the DataFrame as been deprecated
> since 1.4, not sure what version your on.  You should be able to do
> something like this:
>
>  DataFrame.write.mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL_WRITE,
> "track_on_alarm", connectionProps)
>
> HTH.
>
> -Todd
>
> On Fri, Jan 8, 2016 at 10:53 AM, Ted Yu  wrote:
>
>> Which Spark release are you using ?
>>
>> For case #2, was there any error / clue in the logs ?
>>
>> Cheers
>>
>> On Fri, Jan 8, 2016 at 7:36 AM, Yasemin Kaya  wrote:
>>
>>> Hi,
>>>
>>> I want to write dataframe existing mysql table, but when i use
>>> *peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
>>> "track_on_alarm",false)*
>>>
>>> it says "Table track_on_alarm already exists."
>>>
>>> And when i *use peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
>>> "track_on_alarm",true)*
>>>
>>> i lost the existing data.
>>>
>>> How i can write new data to db?
>>>
>>> Best,
>>> yasemin
>>>
>>> --
>>> hiç ender hiç
>>>
>>
>>
>


-- 
hiç ender hiç


Re: What should be the ideal value(unit) for spark.memory.offheap.size

2016-01-08 Thread Umesh Kacha
Hi for a 30 GB executor how much offheap should I give along with yarn
memory over head is it ok?

On Thu, Jan 7, 2016 at 4:24 AM, Ted Yu  wrote:

> Turns out that I should have specified -i to my former grep command :-)
>
> Thanks Marcelo
>
> But does this mean that specifying custom value for parameter 
> spark.memory.offheap.size
> would not take effect ?
>
> Cheers
>
> On Wed, Jan 6, 2016 at 2:47 PM, Marcelo Vanzin 
> wrote:
>
>> Try "git grep -i spark.memory.offheap.size"...
>>
>> On Wed, Jan 6, 2016 at 2:45 PM, Ted Yu  wrote:
>> > Maybe I looked in the wrong files - I searched *.scala and *.java files
>> (in
>> > latest Spark 1.6.0 RC) for '.offheap.' but didn't find the config.
>> >
>> > Can someone enlighten me ?
>> >
>> > Thanks
>> >
>> > On Wed, Jan 6, 2016 at 2:35 PM, Jakob Odersky 
>> wrote:
>> >>
>> >> Check the configuration guide for a description on units
>> >> (
>> http://spark.apache.org/docs/latest/configuration.html#spark-properties).
>> >> In your case, 5GB would be specified as 5g.
>> >>
>> >> On 6 January 2016 at 10:29, unk1102  wrote:
>> >>>
>> >>> Hi As part of Spark 1.6 release what should be ideal value or unit for
>> >>> spark.memory.offheap.size I have set as 5000 I assume it will be 5GB
>> is
>> >>> it
>> >>> correct? Please guide.
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> >>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/What-should-be-the-ideal-value-unit-for-spark-memory-offheap-size-tp25898.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>
>> >>> -
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>>
>> >>
>> >
>>
>>
>>
>> --
>> Marcelo
>>
>
>


Re: Do we need to enabled Tungsten sort in Spark 1.6?

2016-01-08 Thread Chris Fregly
Yeah, this confused me, as well.  Good question, Umesh.

As Ted pointed out:  between Spark 1.5 and 1.6,
o.a.s.shuffle.unsafe.UnsafeShuffleManager no longer exists as a separate
shuffle manager.  Here's the old code (notice the o.a.s.shuffle.unsafe
package):

https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala

The functionality has essentially been rolled into
o.a.s.shuffle.sort.SortShuffleManager with the help of a Scala match/case
statement.  Here's the newer code (notice the o.a.s.shuffle.unsafe package
is gone):

https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala


On Fri, Jan 8, 2016 at 1:14 PM, Ted Yu  wrote:

> For "spark.shuffle.manager", the default is "sort"
> From core/src/main/scala/org/apache/spark/SparkEnv.scala :
>
> val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
>
> "tungsten-sort" is the same as "sort" :
>
> val shortShuffleMgrNames = Map(
>   "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
>   "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
>   "tungsten-sort" ->
> "org.apache.spark.shuffle.sort.SortShuffleManager")
>
> FYI
>
> On Fri, Jan 8, 2016 at 12:59 PM, Umesh Kacha 
> wrote:
>
>> ok thanks so it will be enabled by default always if yes then in
>> documentation why default shuffle manager is mentioned as sort?
>>
>> On Sat, Jan 9, 2016 at 1:55 AM, Ted Yu  wrote:
>>
>>> From
>>> sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala :
>>>
>>> case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
>>>   val runFunc = (sqlContext: SQLContext) => {
>>> logWarning(
>>>   s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is
>>> deprecated and " +
>>> s"will be ignored. Tungsten will continue to be used.")
>>> Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true"))
>>>   }
>>>
>>> FYI
>>>
>>> On Fri, Jan 8, 2016 at 12:21 PM, unk1102  wrote:
>>>
 Hi I was using Spark 1.5 with Tungsten sort and now I have using Spark
 1.6 I
 dont see any difference I was expecting Spark 1.6 to be faster. Anyways
 do
 we need to enable Tunsten and unsafe options or they are enabled by
 default
 I see in documentation that default sort manager is sort I though it is
 Tungsten no? Please guide.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Do-we-need-to-enabled-Tungsten-sort-in-Spark-1-6-tp25923.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Spark job uses only one Worker

2016-01-08 Thread Michael Pisula
Hi Annabel,

I am using Spark in stand-alone mode (deployment using the ec2 scripts
packaged with spark).

Cheers,
Michael

On 08.01.2016 00:43, Annabel Melongo wrote:
> Michael,
>
> I don't know what's your environment but if it's Cloudera, you should
> be able to see the link to your master in the Hue.
>
> Thanks
>
>
> On Thursday, January 7, 2016 5:03 PM, Michael Pisula
>  wrote:
>
>
> I had tried several parameters, including --total-executor-cores, no
> effect.
> As for the port, I tried 7077, but if I remember correctly I got some
> kind of error that suggested to try 6066, with which it worked just
> fine (apart from this issue here).
>
> Each worker has two cores. I also tried increasing cores, again no
> effect. I was able to increase the number of cores the job was using
> on one worker, but it would not use any other worker (and it would not
> start if the number of cores the job wanted was higher than the number
> available on one worker).
>
> On 07.01.2016 22:51, Igor Berman wrote:
>> read about *--total-executor-cores*
>> not sure why you specify port 6066 in master...usually it's 7077
>> verify in master ui(usually port 8080) how many cores are
>> there(depends on other configs, but usually workers connect to master
>> with all their cores)
>>
>> On 7 January 2016 at 23:46, Michael Pisula
>> > wrote:
>>
>> Hi,
>>
>> I start the cluster using the spark-ec2 scripts, so the cluster
>> is in stand-alone mode.
>> Here is how I submit my job:
>> spark/bin/spark-submit --class demo.spark.StaticDataAnalysis
>> --master spark://:6066 --deploy-mode cluster
>> demo/Demo-1.0-SNAPSHOT-all.jar
>>
>> Cheers,
>> Michael
>>
>>
>> On 07.01.2016 22:41, Igor Berman wrote:
>>> share how you submit your job
>>> what cluster(yarn, standalone)
>>>
>>> On 7 January 2016 at 23:24, Michael Pisula
>>> >
>>> wrote:
>>>
>>> Hi there,
>>>
>>> I ran a simple Batch Application on a Spark Cluster on EC2.
>>> Despite having 3
>>> Worker Nodes, I could not get the application processed on
>>> more than one
>>> node, regardless if I submitted the Application in Cluster
>>> or Client mode.
>>> I also tried manually increasing the number of partitions in
>>> the code, no
>>> effect. I also pass the master into the application.
>>> I verified on the nodes themselves that only one node was
>>> active while the
>>> job was running.
>>> I pass enough data to make the job take 6 minutes to process.
>>> The job is simple enough, reading data from two S3 files,
>>> joining records on
>>> a shared field, filtering out some records and writing the
>>> result back to
>>> S3.
>>>
>>> Tried all kinds of stuff, but could not make it work. I did
>>> find similar
>>> questions, but had already tried the solutions that worked
>>> in those cases.
>>> Would be really happy about any pointers.
>>>
>>> Cheers,
>>> Michael
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> 
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-uses-only-one-Worker-tp25909.html
>>> Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>>
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> 
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>>>
>>>
>>
>> -- 
>> Michael Pisula * michael.pis...@tngtech.com 
>>  * +49-174-3180084
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>
> -- 
> Michael Pisula * michael.pis...@tngtech.com 
>  * +49-174-3180084
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

-- 
Michael Pisula * michael.pis...@tngtech.com * +49-174-3180084
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



Re: Do we need to enabled Tungsten sort in Spark 1.6?

2016-01-08 Thread Ted Yu
For "spark.shuffle.manager", the default is "sort"
>From core/src/main/scala/org/apache/spark/SparkEnv.scala :

val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")

"tungsten-sort" is the same as "sort" :

val shortShuffleMgrNames = Map(
  "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
  "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
  "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")

FYI

On Fri, Jan 8, 2016 at 12:59 PM, Umesh Kacha  wrote:

> ok thanks so it will be enabled by default always if yes then in
> documentation why default shuffle manager is mentioned as sort?
>
> On Sat, Jan 9, 2016 at 1:55 AM, Ted Yu  wrote:
>
>> From
>> sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala :
>>
>> case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
>>   val runFunc = (sqlContext: SQLContext) => {
>> logWarning(
>>   s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated
>> and " +
>> s"will be ignored. Tungsten will continue to be used.")
>> Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true"))
>>   }
>>
>> FYI
>>
>> On Fri, Jan 8, 2016 at 12:21 PM, unk1102  wrote:
>>
>>> Hi I was using Spark 1.5 with Tungsten sort and now I have using Spark
>>> 1.6 I
>>> dont see any difference I was expecting Spark 1.6 to be faster. Anyways
>>> do
>>> we need to enable Tunsten and unsafe options or they are enabled by
>>> default
>>> I see in documentation that default sort manager is sort I though it is
>>> Tungsten no? Please guide.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Do-we-need-to-enabled-Tungsten-sort-in-Spark-1-6-tp25923.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Do we need to enabled Tungsten sort in Spark 1.6?

2016-01-08 Thread unk1102
Hi I was using Spark 1.5 with Tungsten sort and now I have using Spark 1.6 I
dont see any difference I was expecting Spark 1.6 to be faster. Anyways do
we need to enable Tunsten and unsafe options or they are enabled by default
I see in documentation that default sort manager is sort I though it is
Tungsten no? Please guide. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Do-we-need-to-enabled-Tungsten-sort-in-Spark-1-6-tp25923.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kryo serializer Exception during serialization: java.io.IOException: java.lang.IllegalArgumentException:

2016-01-08 Thread Shixiong(Ryan) Zhu
Could you disable `spark.kryo.registrationRequired`? Some classes may not
be registered but they work well with Kryo's default serializer.

On Fri, Jan 8, 2016 at 8:58 AM, Ted Yu  wrote:

> bq. try adding scala.collection.mutable.WrappedArray
>
> But the hint said registering 
> scala.collection.mutable.WrappedArray$ofRef.class
> , right ?
>
> On Fri, Jan 8, 2016 at 8:52 AM, jiml  wrote:
>
>> (point of post is to see if anyone has ideas about errors at end of post)
>>
>> In addition, the real way to test if it's working is to force
>> serialization:
>>
>> In Java:
>>
>> Create array of all your classes:
>> // for kyro serializer it wants to register all classes that need to be
>> serialized
>> Class[] kryoClassArray = new Class[]{DropResult.class,
>> DropEvaluation.class,
>> PrintHetSharing.class};
>>
>> in the builder for your SparkConf (or in conf/spark-defaults.sh)
>> .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> //require registration of all classes with Kyro
>> .set("spark.kryo.registrationRequired", "true")
>> // don't forget to register ALL classes or will get error
>> .registerKryoClasses(kryoClassArray);
>>
>> Then you will start to get neat errors like the one I am working on:
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due
>> to stage failure: Failed to serialize task 0, not attempting to retry it.
>> Exception during serialization: java.io.IOException:
>> java.lang.IllegalArgumentException: Class is not registered:
>> scala.collection.mutable.WrappedArray$ofRef
>> Note: To register this class use:
>> kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
>>
>> I did try adding scala.collection.mutable.WrappedArray to the Class array
>> up
>> top but no luck. Thanks
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/kryos-serializer-tp16454p25921.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Do we need to enabled Tungsten sort in Spark 1.6?

2016-01-08 Thread Ted Yu
>From sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala :

case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
  val runFunc = (sqlContext: SQLContext) => {
logWarning(
  s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated
and " +
s"will be ignored. Tungsten will continue to be used.")
Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true"))
  }

FYI

On Fri, Jan 8, 2016 at 12:21 PM, unk1102  wrote:

> Hi I was using Spark 1.5 with Tungsten sort and now I have using Spark 1.6
> I
> dont see any difference I was expecting Spark 1.6 to be faster. Anyways do
> we need to enable Tunsten and unsafe options or they are enabled by default
> I see in documentation that default sort manager is sort I though it is
> Tungsten no? Please guide.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Do-we-need-to-enabled-Tungsten-sort-in-Spark-1-6-tp25923.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: write new data to mysql

2016-01-08 Thread Yasemin Kaya
When i change the version to 1.6.0, it worked.
Thanks.

2016-01-08 21:27 GMT+02:00 Yasemin Kaya :

> Hi,
> There is no write function that Todd mentioned or i cant find it.
> The code and error are in gist
> . Could you check
> it out please?
>
> Best,
> yasemin
>
> 2016-01-08 18:23 GMT+02:00 Todd Nist :
>
>> It is not clear from the information provided why the insertIntoJDBC
>> failed in #2.  I would note that method on the DataFrame as been deprecated
>> since 1.4, not sure what version your on.  You should be able to do
>> something like this:
>>
>>  DataFrame.write.mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL_WRITE,
>> "track_on_alarm", connectionProps)
>>
>> HTH.
>>
>> -Todd
>>
>> On Fri, Jan 8, 2016 at 10:53 AM, Ted Yu  wrote:
>>
>>> Which Spark release are you using ?
>>>
>>> For case #2, was there any error / clue in the logs ?
>>>
>>> Cheers
>>>
>>> On Fri, Jan 8, 2016 at 7:36 AM, Yasemin Kaya  wrote:
>>>
 Hi,

 I want to write dataframe existing mysql table, but when i use
 *peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
 "track_on_alarm",false)*

 it says "Table track_on_alarm already exists."

 And when i *use peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
 "track_on_alarm",true)*

 i lost the existing data.

 How i can write new data to db?

 Best,
 yasemin

 --
 hiç ender hiç

>>>
>>>
>>
>
>
> --
> hiç ender hiç
>



-- 
hiç ender hiç


Re: SparkContext SyntaxError: invalid syntax

2016-01-08 Thread Bryan Cutler
Hi Andrew,

I know that older versions of Spark could not run PySpark on YARN in
cluster mode.  I'm not sure if that is fixed in 1.6.0 though.  Can you try
setting deploy-mode option to "client" when calling spark-submit?

Bryan

On Thu, Jan 7, 2016 at 2:39 PM, weineran <
andrewweiner2...@u.northwestern.edu> wrote:

> Hello,
>
> When I try to submit a python job using spark-submit (using --master yarn
> --deploy-mode cluster), I get the following error:
>
> /Traceback (most recent call last):
>   File "loss_rate_by_probe.py", line 15, in ?
> from pyspark import SparkContext
>   File
>
> "/scratch5/hadoop/yarn/local/usercache//filecache/18/spark-assembly-1.3.1-hadoop2.4.0.jar/pyspark/__init__.py",
> line 41, in ?
>   File
>
> "/scratch5/hadoop/yarn/local/usercache//filecache/18/spark-assembly-1.3.1-hadoop2.4.0.jar/pyspark/context.py",
> line 219
> with SparkContext._lock:
> ^
> SyntaxError: invalid syntax/
>
> This is very similar to  this post from 2014
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-lock-Error-td18233.html
> >
> , but unlike that person I am using Python 2.7.8.
>
> Here is what I'm using:
> Spark 1.3.1
> Hadoop 2.4.0.2.1.5.0-695
> Python 2.7.8
>
> Another clue:  I also installed Spark 1.6.0 and tried to submit the same
> job.  I got a similar error:
>
> /Traceback (most recent call last):
>   File "loss_rate_by_probe.py", line 15, in ?
> from pyspark import SparkContext
>   File
>
> "/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0119/container_1450370639491_0119_01_01/pyspark.zip/pyspark/__init__.py",
> line 61
> indent = ' ' * (min(len(m) for m in indents) if indents else 0)
>   ^
> SyntaxError: invalid syntax/
>
> Any thoughts?
>
> Andrew
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-SyntaxError-invalid-syntax-tp25910.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: write new data to mysql

2016-01-08 Thread Todd Nist
Sorry, did not see your update until now.

On Fri, Jan 8, 2016 at 3:52 PM, Todd Nist  wrote:

> Hi Yasemin,
>
> What version of Spark are you using?  Here is the reference, it is off of
> the DataFrame
> https://spark.apache.org/docs/latest/api/java/index.html#org.apache.spark.sql.DataFrame
>  and provides a DataFrameWriter,
> https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameWriter.html
> :
>
> DataFrameWriter
> 
>  *write
> *
> ()
> Interface for saving the content of the DataFrame
> 
>  out
> into external storage.
>
> It is the very last method defined there in the api docs.
>
> HTH.
>
> -Todd
>
>
> On Fri, Jan 8, 2016 at 2:27 PM, Yasemin Kaya  wrote:
>
>> Hi,
>> There is no write function that Todd mentioned or i cant find it.
>> The code and error are in gist
>> . Could you check
>> it out please?
>>
>> Best,
>> yasemin
>>
>> 2016-01-08 18:23 GMT+02:00 Todd Nist :
>>
>>> It is not clear from the information provided why the insertIntoJDBC
>>> failed in #2.  I would note that method on the DataFrame as been deprecated
>>> since 1.4, not sure what version your on.  You should be able to do
>>> something like this:
>>>
>>>  DataFrame.write.mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL_WRITE,
>>> "track_on_alarm", connectionProps)
>>>
>>> HTH.
>>>
>>> -Todd
>>>
>>> On Fri, Jan 8, 2016 at 10:53 AM, Ted Yu  wrote:
>>>
 Which Spark release are you using ?

 For case #2, was there any error / clue in the logs ?

 Cheers

 On Fri, Jan 8, 2016 at 7:36 AM, Yasemin Kaya  wrote:

> Hi,
>
> I want to write dataframe existing mysql table, but when i use
> *peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
> "track_on_alarm",false)*
>
> it says "Table track_on_alarm already exists."
>
> And when i *use peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
> "track_on_alarm",true)*
>
> i lost the existing data.
>
> How i can write new data to db?
>
> Best,
> yasemin
>
> --
> hiç ender hiç
>


>>>
>>
>>
>> --
>> hiç ender hiç
>>
>
>


Re: write new data to mysql

2016-01-08 Thread Todd Nist
Hi Yasemin,

What version of Spark are you using?  Here is the reference, it is off of
the DataFrame
https://spark.apache.org/docs/latest/api/java/index.html#org.apache.spark.sql.DataFrame
 and provides a DataFrameWriter,
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameWriter.html
:

DataFrameWriter

 *write
*
()
Interface for saving the content of the DataFrame

out
into external storage.

It is the very last method defined there in the api docs.

HTH.

-Todd


On Fri, Jan 8, 2016 at 2:27 PM, Yasemin Kaya  wrote:

> Hi,
> There is no write function that Todd mentioned or i cant find it.
> The code and error are in gist
> . Could you check
> it out please?
>
> Best,
> yasemin
>
> 2016-01-08 18:23 GMT+02:00 Todd Nist :
>
>> It is not clear from the information provided why the insertIntoJDBC
>> failed in #2.  I would note that method on the DataFrame as been deprecated
>> since 1.4, not sure what version your on.  You should be able to do
>> something like this:
>>
>>  DataFrame.write.mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL_WRITE,
>> "track_on_alarm", connectionProps)
>>
>> HTH.
>>
>> -Todd
>>
>> On Fri, Jan 8, 2016 at 10:53 AM, Ted Yu  wrote:
>>
>>> Which Spark release are you using ?
>>>
>>> For case #2, was there any error / clue in the logs ?
>>>
>>> Cheers
>>>
>>> On Fri, Jan 8, 2016 at 7:36 AM, Yasemin Kaya  wrote:
>>>
 Hi,

 I want to write dataframe existing mysql table, but when i use
 *peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
 "track_on_alarm",false)*

 it says "Table track_on_alarm already exists."

 And when i *use peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
 "track_on_alarm",true)*

 i lost the existing data.

 How i can write new data to db?

 Best,
 yasemin

 --
 hiç ender hiç

>>>
>>>
>>
>
>
> --
> hiç ender hiç
>


Re: Do we need to enabled Tungsten sort in Spark 1.6?

2016-01-08 Thread Umesh Kacha
ok thanks so it will be enabled by default always if yes then in
documentation why default shuffle manager is mentioned as sort?

On Sat, Jan 9, 2016 at 1:55 AM, Ted Yu  wrote:

> From sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
> :
>
> case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
>   val runFunc = (sqlContext: SQLContext) => {
> logWarning(
>   s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated
> and " +
> s"will be ignored. Tungsten will continue to be used.")
> Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true"))
>   }
>
> FYI
>
> On Fri, Jan 8, 2016 at 12:21 PM, unk1102  wrote:
>
>> Hi I was using Spark 1.5 with Tungsten sort and now I have using Spark
>> 1.6 I
>> dont see any difference I was expecting Spark 1.6 to be faster. Anyways do
>> we need to enable Tunsten and unsafe options or they are enabled by
>> default
>> I see in documentation that default sort manager is sort I though it is
>> Tungsten no? Please guide.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Do-we-need-to-enabled-Tungsten-sort-in-Spark-1-6-tp25923.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Ted Yu
Is your Parquet data source partitioned by date ?

Can you dedup within partitions ?

Cheers

On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue  wrote:

> I tried on Three day's data.  The total input is only 980GB, but the
> shuffle write Data is about 6.2TB, then the job failed during shuffle read
> step, which should be another 6.2TB shuffle read.
>
> I think to Dedup, the shuffling can not be avoided. Is there anything I
> could do to stablize this process?
>
> Thanks.
>
>
> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue  wrote:
>
>> Hey,
>>
>> I got everyday's Event table and want to merge them into a single Event
>> table. But there so many duplicates among each day's data.
>>
>> I use Parquet as the data source.  What I am doing now is
>>
>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>> file").
>>
>> Each day's Event is stored in their own Parquet file
>>
>> But it failed at the stage2 which keeps losing connection to one
>> executor. I guess this is due to the memory issue.
>>
>> Any suggestion how I do this efficiently?
>>
>> Thanks,
>> Gavin
>>
>
>


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Gavin Yue
hey Ted,

Event table is like this: UserID, EventType, EventKey, TimeStamp,
MetaData.  I just parse it from Json and save as Parquet, did not change
the partition.

Annoyingly, every day's incoming Event data having duplicates among each
other.  One same event could show up in Day1 and Day2 and probably Day3.

I only want to keep single Event table and each day it come so many
duplicates.

Is there a way I could just insert into Parquet and if duplicate found,
just ignore?

Thanks,
Gavin







On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu  wrote:

> Is your Parquet data source partitioned by date ?
>
> Can you dedup within partitions ?
>
> Cheers
>
> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue  wrote:
>
>> I tried on Three day's data.  The total input is only 980GB, but the
>> shuffle write Data is about 6.2TB, then the job failed during shuffle read
>> step, which should be another 6.2TB shuffle read.
>>
>> I think to Dedup, the shuffling can not be avoided. Is there anything I
>> could do to stablize this process?
>>
>> Thanks.
>>
>>
>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue  wrote:
>>
>>> Hey,
>>>
>>> I got everyday's Event table and want to merge them into a single Event
>>> table. But there so many duplicates among each day's data.
>>>
>>> I use Parquet as the data source.  What I am doing now is
>>>
>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>>> file").
>>>
>>> Each day's Event is stored in their own Parquet file
>>>
>>> But it failed at the stage2 which keeps losing connection to one
>>> executor. I guess this is due to the memory issue.
>>>
>>> Any suggestion how I do this efficiently?
>>>
>>> Thanks,
>>> Gavin
>>>
>>
>>
>


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Benyi Wang
   - I assume your parquet files are compressed. Gzip or Snappy?
   - What spark version did you use? It seems at least 1.4. If you use
   spark-sql and tungsten, you might have better performance. but spark 1.5.2
   gave me a wrong result when the data was about 300~400GB, just for a simple
   group-by and aggregate.
   - Did you use kyro serialization?
   - you should have spark.shuffle.compress=true, verify it.
   - How many tasks did you use? spark.default.parallelism=?
   - What about this:
  - Read the data day by day
  - compute a bucket id from timestamp, e.g., the date and hour
  - Write into different buckets (you probably need a special writer to
  write data efficiently without shuffling the data).
  - distinct for each bucket. Because each bucket is small, spark can
  get it done faster than having everything in one run.
  - I think using groupBy (userId, timestamp) might be better than
  distinct. I guess distinct() will compare every field.


On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue  wrote:

> And the most frequent operation I am gonna do is find the UserID who have
> some events, then retrieve all the events associted with the UserID.
>
> In this case, how should I partition to speed up the process?
>
> Thanks.
>
> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue  wrote:
>
>> hey Ted,
>>
>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>> MetaData.  I just parse it from Json and save as Parquet, did not change
>> the partition.
>>
>> Annoyingly, every day's incoming Event data having duplicates among each
>> other.  One same event could show up in Day1 and Day2 and probably Day3.
>>
>> I only want to keep single Event table and each day it come so many
>> duplicates.
>>
>> Is there a way I could just insert into Parquet and if duplicate found,
>> just ignore?
>>
>> Thanks,
>> Gavin
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu  wrote:
>>
>>> Is your Parquet data source partitioned by date ?
>>>
>>> Can you dedup within partitions ?
>>>
>>> Cheers
>>>
>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue 
>>> wrote:
>>>
 I tried on Three day's data.  The total input is only 980GB, but the
 shuffle write Data is about 6.2TB, then the job failed during shuffle read
 step, which should be another 6.2TB shuffle read.

 I think to Dedup, the shuffling can not be avoided. Is there anything I
 could do to stablize this process?

 Thanks.


 On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue 
 wrote:

> Hey,
>
> I got everyday's Event table and want to merge them into a single
> Event table. But there so many duplicates among each day's data.
>
> I use Parquet as the data source.  What I am doing now is
>
> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
> file").
>
> Each day's Event is stored in their own Parquet file
>
> But it failed at the stage2 which keeps losing connection to one
> executor. I guess this is due to the memory issue.
>
> Any suggestion how I do this efficiently?
>
> Thanks,
> Gavin
>


>>>
>>
>


Re: SparkContext SyntaxError: invalid syntax

2016-01-08 Thread Andrew Weiner
Now for simplicity I'm testing with wordcount.py from the provided
examples, and using Spark 1.6.0

The first error I get is:

16/01/08 19:14:46 ERROR lzo.GPLNativeCodeLoader: Could not load native gpl
library
java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1864)
at []

A bit lower down, I see this error:

16/01/08 19:14:48 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0
(TID 0, mundonovo-priv): org.apache.spark.SparkException:
Error from python worker:
  python: module pyspark.daemon not found
PYTHONPATH was:

/scratch5/hadoop/yarn/local/usercache/awp066/filecache/22/spark-assembly-1.6.0-hadoop2.4.0.jar:/home/jpr123/hg.pacific/python-common:/home/jpr123/python-libs:/home/jpr123/lib/python2.7/site-packages:/home/zsb739/local/lib/python2.7/site-packages:/home/jpr123/mobile-cdn-analysis:/home/awp066/lib/python2.7/site-packages:/scratch4/hadoop/yarn/local/usercache/awp066/appcache/application_1450370639491_0136/container_1450370639491_0136_01_02/pyspark.zip:/scratch4/hadoop/yarn/local/usercache/awp066/appcache/application_1450370639491_0136/container_1450370639491_0136_01_02/py4j-0.9-src.zip
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at []

And then a few more similar pyspark.daemon not found errors...

Andrew



On Fri, Jan 8, 2016 at 2:31 PM, Bryan Cutler  wrote:

> Hi Andrew,
>
> I know that older versions of Spark could not run PySpark on YARN in
> cluster mode.  I'm not sure if that is fixed in 1.6.0 though.  Can you try
> setting deploy-mode option to "client" when calling spark-submit?
>
> Bryan
>
> On Thu, Jan 7, 2016 at 2:39 PM, weineran <
> andrewweiner2...@u.northwestern.edu> wrote:
>
>> Hello,
>>
>> When I try to submit a python job using spark-submit (using --master yarn
>> --deploy-mode cluster), I get the following error:
>>
>> /Traceback (most recent call last):
>>   File "loss_rate_by_probe.py", line 15, in ?
>> from pyspark import SparkContext
>>   File
>>
>> "/scratch5/hadoop/yarn/local/usercache//filecache/18/spark-assembly-1.3.1-hadoop2.4.0.jar/pyspark/__init__.py",
>> line 41, in ?
>>   File
>>
>> "/scratch5/hadoop/yarn/local/usercache//filecache/18/spark-assembly-1.3.1-hadoop2.4.0.jar/pyspark/context.py",
>> line 219
>> with SparkContext._lock:
>> ^
>> SyntaxError: invalid syntax/
>>
>> This is very similar to  this post from 2014
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-lock-Error-td18233.html
>> >
>> , but unlike that person I am using Python 2.7.8.
>>
>> Here is what I'm using:
>> Spark 1.3.1
>> Hadoop 2.4.0.2.1.5.0-695
>> Python 2.7.8
>>
>> Another clue:  I also installed Spark 1.6.0 and tried to submit the same
>> job.  I got a similar error:
>>
>> /Traceback (most recent call last):
>>   File "loss_rate_by_probe.py", line 15, in ?
>> from pyspark import SparkContext
>>   File
>>
>> "/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0119/container_1450370639491_0119_01_01/pyspark.zip/pyspark/__init__.py",
>> line 61
>> indent = ' ' * (min(len(m) for m in indents) if indents else 0)
>>   ^
>> SyntaxError: invalid syntax/
>>
>> Any thoughts?
>>
>> Andrew
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-SyntaxError-invalid-syntax-tp25910.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


How to merge two large table and remove duplicates?

2016-01-08 Thread Gavin Yue
Hey,

I got everyday's Event table and want to merge them into a single Event
table. But there so many duplicates among each day's data.

I use Parquet as the data source.  What I am doing now is

EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
file").

Each day's Event is stored in their own Parquet file

But it failed at the stage2 which keeps losing connection to one executor.
I guess this is due to the memory issue.

Any suggestion how I do this efficiently?

Thanks,
Gavin


Re: Spark job uses only one Worker

2016-01-08 Thread Prem Sure
to narrow down,you can try below
1) is the job going to same node everytime( when you execute job multiple
times)?. enable property spark.speculation, keep thread.sleep for 2 mins
and see if the job is going to a different worker from the executor posted
on initially. ( trying to find, there are no connection or setup related
issue)
2) whats your spark.executor.memory. try decreasing executor memory to a
value less than data size and if that helps in distributing.
3 While launching the cluster, play around with with number of slaves -
start with 1
./spark-ec2 -k  -i  -s  launch 

On Fri, Jan 8, 2016 at 2:53 PM, Michael Pisula 
wrote:

> Hi Annabel,
>
> I am using Spark in stand-alone mode (deployment using the ec2 scripts
> packaged with spark).
>
> Cheers,
> Michael
>
>
> On 08.01.2016 00:43, Annabel Melongo wrote:
>
> Michael,
>
> I don't know what's your environment but if it's Cloudera, you should be
> able to see the link to your master in the Hue.
>
> Thanks
>
>
> On Thursday, January 7, 2016 5:03 PM, Michael Pisula
>   wrote:
>
>
> I had tried several parameters, including --total-executor-cores, no
> effect.
> As for the port, I tried 7077, but if I remember correctly I got some kind
> of error that suggested to try 6066, with which it worked just fine (apart
> from this issue here).
>
> Each worker has two cores. I also tried increasing cores, again no effect.
> I was able to increase the number of cores the job was using on one worker,
> but it would not use any other worker (and it would not start if the number
> of cores the job wanted was higher than the number available on one worker).
>
> On 07.01.2016 22:51, Igor Berman wrote:
>
> read about *--total-executor-cores*
> not sure why you specify port 6066 in master...usually it's 7077
> verify in master ui(usually port 8080) how many cores are there(depends on
> other configs, but usually workers connect to master with all their cores)
>
> On 7 January 2016 at 23:46, Michael Pisula 
> wrote:
>
> Hi,
>
> I start the cluster using the spark-ec2 scripts, so the cluster is in
> stand-alone mode.
> Here is how I submit my job:
> spark/bin/spark-submit --class demo.spark.StaticDataAnalysis --master
> spark://:6066 --deploy-mode cluster demo/Demo-1.0-SNAPSHOT-all.jar
>
> Cheers,
> Michael
>
>
> On 07.01.2016 22:41, Igor Berman wrote:
>
> share how you submit your job
> what cluster(yarn, standalone)
>
> On 7 January 2016 at 23:24, Michael Pisula < 
> michael.pis...@tngtech.com> wrote:
>
> Hi there,
>
> I ran a simple Batch Application on a Spark Cluster on EC2. Despite having
> 3
> Worker Nodes, I could not get the application processed on more than one
> node, regardless if I submitted the Application in Cluster or Client mode.
> I also tried manually increasing the number of partitions in the code, no
> effect. I also pass the master into the application.
> I verified on the nodes themselves that only one node was active while the
> job was running.
> I pass enough data to make the job take 6 minutes to process.
> The job is simple enough, reading data from two S3 files, joining records
> on
> a shared field, filtering out some records and writing the result back to
> S3.
>
> Tried all kinds of stuff, but could not make it work. I did find similar
> questions, but had already tried the solutions that worked in those cases.
> Would be really happy about any pointers.
>
> Cheers,
> Michael
>
>
>
> --
> View this message in context:
> 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-uses-only-one-Worker-tp25909.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: 
> user-unsubscr...@spark.apache.org
> For additional commands, e-mail: 
> user-h...@spark.apache.org
>
>
>
> --
> Michael Pisula * michael.pis...@tngtech.com * +49-174-3180084
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>
>
> --
> Michael Pisula * michael.pis...@tngtech.com * +49-174-3180084
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>
>
>
> --
> Michael Pisula * michael.pis...@tngtech.com * +49-174-3180084
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Gavin Yue
And the most frequent operation I am gonna do is find the UserID who have
some events, then retrieve all the events associted with the UserID.

In this case, how should I partition to speed up the process?

Thanks.

On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue  wrote:

> hey Ted,
>
> Event table is like this: UserID, EventType, EventKey, TimeStamp,
> MetaData.  I just parse it from Json and save as Parquet, did not change
> the partition.
>
> Annoyingly, every day's incoming Event data having duplicates among each
> other.  One same event could show up in Day1 and Day2 and probably Day3.
>
> I only want to keep single Event table and each day it come so many
> duplicates.
>
> Is there a way I could just insert into Parquet and if duplicate found,
> just ignore?
>
> Thanks,
> Gavin
>
>
>
>
>
>
>
> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu  wrote:
>
>> Is your Parquet data source partitioned by date ?
>>
>> Can you dedup within partitions ?
>>
>> Cheers
>>
>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue  wrote:
>>
>>> I tried on Three day's data.  The total input is only 980GB, but the
>>> shuffle write Data is about 6.2TB, then the job failed during shuffle read
>>> step, which should be another 6.2TB shuffle read.
>>>
>>> I think to Dedup, the shuffling can not be avoided. Is there anything I
>>> could do to stablize this process?
>>>
>>> Thanks.
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue 
>>> wrote:
>>>
 Hey,

 I got everyday's Event table and want to merge them into a single Event
 table. But there so many duplicates among each day's data.

 I use Parquet as the data source.  What I am doing now is

 EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
 file").

 Each day's Event is stored in their own Parquet file

 But it failed at the stage2 which keeps losing connection to one
 executor. I guess this is due to the memory issue.

 Any suggestion how I do this efficiently?

 Thanks,
 Gavin

>>>
>>>
>>
>


Create a n x n graph given only the vertices

2016-01-08 Thread praveen S
Is it possible in graphx to create/generate a graph n x n given n vertices?


Re: adding jars - hive on spark cdh 5.4.3

2016-01-08 Thread Ophir Etzion
It didn't work. assuming I did the right thing.
in the properties  you could see

{"key":"hive.aux.jars.path","value":"file:///data/loko/foursquare.web-hiverc/current/hadoop-hive-serde.jar,file:///data/loko/foursquare.web-hiverc/current/hadoop-hive-udf.jar","isFinal":false,"resource":"programatically"}
which includes the jar that has the class I need but I still get

org.apache.hive.com.esotericsoftware.kryo.KryoException: Unable to
find class: com.foursquare.hadoop.hive.io.HiveThriftSequenceFileInputFormat



On Fri, Jan 8, 2016 at 12:24 PM, Edward Capriolo 
wrote:

> You can not 'add jar' input formats and serde's. They need to be part of
> your auxlib.
>
> On Fri, Jan 8, 2016 at 12:19 PM, Ophir Etzion 
> wrote:
>
>> I tried now. still getting
>>
>> 16/01/08 16:37:34 ERROR exec.Utilities: Failed to load plan: 
>> hdfs://hadoop-alidoro-nn-vip/tmp/hive/hive/c2af9882-38a9-42b0-8d17-3f56708383e8/hive_2016-01-08_16-36-41_370_3307331506800215903-3/-mr-10004/3c90a796-47fc-4541-bbec-b196c40aefab/map.xml:
>>  org.apache.hive.com.esotericsoftware.kryo.KryoException: Unable to find 
>> class: com.foursquare.hadoop.hive.io.HiveThriftSequenceFileInputFormat
>> Serialization trace:
>> inputFileFormatClass (org.apache.hadoop.hive.ql.plan.PartitionDesc)
>> aliasToPartnInfo (org.apache.hadoop.hive.ql.plan.MapWork)
>> org.apache.hive.com.esotericsoftware.kryo.KryoException: Unable to find 
>> class: com.foursquare.hadoop.hive.io.HiveThriftSequenceFileInputFormat
>>
>>
>> HiveThriftSequenceFileInputFormat is in one of the jars I'm trying to add.
>>
>>
>> On Thu, Jan 7, 2016 at 9:58 PM, Prem Sure  wrote:
>>
>>> did you try -- jars property in spark submit? if your jar is of huge
>>> size, you can pre-load the jar on all executors in a common available
>>> directory to avoid network IO.
>>>
>>> On Thu, Jan 7, 2016 at 4:03 PM, Ophir Etzion 
>>> wrote:
>>>
 I' trying to add jars before running a query using hive on spark on cdh
 5.4.3.
 I've tried applying the patch in
 https://issues.apache.org/jira/browse/HIVE-12045 (manually as the
 patch is done on a different hive version) but still hasn't succeeded.

 did anyone manage to do ADD JAR successfully with CDH?

 Thanks,
 Ophir

>>>
>>>
>>
>


Unable to compile from source

2016-01-08 Thread Gaini Rajeshwar
Hi All,

I am new to apache spark.

I have downloaded *Spark 1.6.0 (Jan 04 2016) source code version*.

I did run the following command following command as per spark documentation
.

build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package

When run above command, i am getting the following error

[ERROR] Failed to execute goal on project spark-catalyst_2.10: Could
not resolve dependencies for project
org.apache.spark:spark-catalyst_2.10:jar:1.6.0: Failed to collect
dependencies at org.codehaus.janino:janino:jar:2.7.8: Failed to read
artifact descriptor for org.codehaus.janino:janino:jar:2.7.8: Could
not transfer artifact org.codehaus.janino:janino:pom:2.7.8 from/to
central (https://repo1.maven.org/maven2): Remote host closed
connection during handshake: SSL peer shut down incorrectly -> [Help
1]

Can anyone help with this ?


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Benyi Wang
Just try to give 1000, even 2000 to see if it works. If your see something
like "Lost Executor", you'd better to stop your job, otherwise you are
wasting time. Usually the container of the lost executor is killed by
NodeManager because there is not enough memory. You can check NodeManager's
log to confirm it.

There are couple of parameters may affect the performance of shuffle.

--num-executors use larger number, e.g., 2 x #data nodes
--executor-cores give small number 3/4
--executor-memory #cores x (memory for one core)

increase spark.shuffle.memoryFraction

With larger number of spark.sql.shuffle.partitions, a partition (task) will
be smaller and fit in the memory for one core. If you use too large
partitions, the performance might be worse. You have to try based on your
cluster's nodes/memory.

On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue  wrote:

> I used to maintain a HBase cluster. The experience with it was not happy.
>
> I just tried query the data  from each day's first and dedup with smaller
> set, the performance is acceptable.  So I guess I will use this method.
>
> Again, could anyone give advice about:
>
>- Automatically determine the number of reducers for joins and
>groupbys: Currently in Spark SQL, you need to control the degree of
>parallelism post-shuffle using “SET
>spark.sql.shuffle.partitions=[num_tasks];”.
>
> Thanks.
>
> Gavin
>
>
>
>
> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu  wrote:
>
>> bq. in an noSQL db such as Hbase
>>
>> +1 :-)
>>
>>
>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha  wrote:
>>
>>> One option you may want to explore is writing event table in an noSQL db
>>> such as Hbase. One inherent problem in your approach is you always need to
>>> load either full data set or a defined number of partitions to see if the
>>> event has already come (and no gurantee it is full proof, but lead to
>>> unnecessary loading in most cases).
>>>
>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue 
>>> wrote:
>>>
 Hey,
 Thank you for the answer. I checked the setting you mentioend they are
 all correct.  I noticed that in the job, there are always only 200 reducers
 for shuffle read, I believe it is setting in the sql shuffle parallism.

 In the doc, it mentions:

- Automatically determine the number of reducers for joins and
groupbys: Currently in Spark SQL, you need to control the degree of
parallelism post-shuffle using “SET
spark.sql.shuffle.partitions=[num_tasks];”.


 What would be the ideal number for this setting? Is it based on the
 hardware of cluster?


 Thanks,

 Gavin

 On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang 
 wrote:

>
>- I assume your parquet files are compressed. Gzip or Snappy?
>- What spark version did you use? It seems at least 1.4. If you
>use spark-sql and tungsten, you might have better performance. but 
> spark
>1.5.2 gave me a wrong result when the data was about 300~400GB, just 
> for a
>simple group-by and aggregate.
>- Did you use kyro serialization?
>- you should have spark.shuffle.compress=true, verify it.
>- How many tasks did you use? spark.default.parallelism=?
>- What about this:
>   - Read the data day by day
>   - compute a bucket id from timestamp, e.g., the date and hour
>   - Write into different buckets (you probably need a special
>   writer to write data efficiently without shuffling the data).
>   - distinct for each bucket. Because each bucket is small, spark
>   can get it done faster than having everything in one run.
>   - I think using groupBy (userId, timestamp) might be better
>   than distinct. I guess distinct() will compare every field.
>
>
> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue 
> wrote:
>
>> And the most frequent operation I am gonna do is find the UserID who
>> have some events, then retrieve all the events associted with the UserID.
>>
>> In this case, how should I partition to speed up the process?
>>
>> Thanks.
>>
>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue 
>> wrote:
>>
>>> hey Ted,
>>>
>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>> the partition.
>>>
>>> Annoyingly, every day's incoming Event data having duplicates among
>>> each other.  One same event could show up in Day1 and Day2 and probably
>>> Day3.
>>>
>>> I only want to keep single Event table and each day it come so many
>>> duplicates.
>>>
>>> Is there a way I could just insert into Parquet and if duplicate
>>> found, 

Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Gavin Yue
I tried on Three day's data.  The total input is only 980GB, but the
shuffle write Data is about 6.2TB, then the job failed during shuffle read
step, which should be another 6.2TB shuffle read.

I think to Dedup, the shuffling can not be avoided. Is there anything I
could do to stablize this process?

Thanks.


On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue  wrote:

> Hey,
>
> I got everyday's Event table and want to merge them into a single Event
> table. But there so many duplicates among each day's data.
>
> I use Parquet as the data source.  What I am doing now is
>
> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
> file").
>
> Each day's Event is stored in their own Parquet file
>
> But it failed at the stage2 which keeps losing connection to one executor.
> I guess this is due to the memory issue.
>
> Any suggestion how I do this efficiently?
>
> Thanks,
> Gavin
>


Re: Date Time Regression as Feature

2016-01-08 Thread Chris Fregly
Here's a good blog post by Sandy Ryza @ Cloudera on Spark + Time Series
Data:
http://blog.cloudera.com/blog/2015/12/spark-ts-a-new-library-for-analyzing-time-series-data-with-apache-spark/

Might give you some things to try.

On Thu, Jan 7, 2016 at 11:40 PM, dEEPU  wrote:

> Maybe u want to convert the date to a duration in form of number of
> hours/days and then do calculation on it
> On Jan 8, 2016 12:39 AM, Jorge Machado 
> wrote:
> Hello all,
>
> I'm new to machine learning. I'm trying to predict some electric usage
> with a decision  Free
> The data is :
> 2015-12-10-10:00, 1200
> 2015-12-11-10:00, 1150
>
> My question is : What is the best way to turn date and time into feature
> on my Vector ?
>
> Something like this :  Vector (1200, [2015,12,10,10,10] )?
> I could not fine any example with value prediction where features had
> dates in it.
>
> Thanks
>
> Jorge Machado
>
> Jorge Machado
> jo...@jmachado.me
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Standalone Scala Project 'sbt package erroring out"

2016-01-08 Thread srkanth devineni
Hi all,

I am going over this official tutorial on standalone scala project in
cloudera virtual machine

I am using Spark 1.5.0 and Scala 2.10.4, and I change the parameters in the
sparkpi.sbt file as the following:
name := "SparkPi Project"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.0" 



after running sbt package
I am getting below error

[info] Updating {file:/home/cloudera/sampleData/spark/SparkPi/}sparkpi...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Compiling 1 Scala source to
/home/cloudera/sampleData/spark/SparkPi/target/scala-2.10/classes...
[error]
/home/cloudera/sampleData/spark/SparkPi/src/main/scala/SparkPi.scala:2:
object apache is not a member of package org
[error] import org.apache.spark._
[error]^
[error]
/home/cloudera/sampleData/spark/SparkPi/src/main/scala/SparkPi.scala:8: not
found: type SparkConf
[error] val conf = new SparkConf().setAppName("Spark Pi")
[error]^
[error]
/home/cloudera/sampleData/spark/SparkPi/src/main/scala/SparkPi.scala:9: not
found: type SparkContext
[error] val spark = new SparkContext(conf)
[error] ^
[error] three errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 7 s, completed Jan 8, 2016 2:40:57 PM


Tried changing sbt file by refering from goggle but still i am with the same
error.
Earliest help is much appreciated

modified my sbt file like below

name := "SparkPi Project" 
version := "1.5.0" 
scalaVersion := "2.10"
libraryDependencies += "org.apache.spark" %
System.getenv.get("SPARK_MODULE") % System.getenv.get("SPARK_VERSION")
resolvers ++= Seq(
  "Spark Release Repository" at
System.getenv.get("SPARK_RELEASE_REPOSITORY"),
  "Akka Repository" at "http://repo.akka.io/releases/;,
  "Spray Repository" at "http://repo.spray.cc/;)


still same error









--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Standalone-Scala-Project-sbt-package-erroring-out-tp25924.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Ted Yu
Benyi:

bq. spark 1.5.2 gave me a wrong result when the data was about 300~400GB,
just for a simple group-by and aggregate

Can you reproduce the above using Spark 1.6.0 ?

Thanks

On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang  wrote:

>
>- I assume your parquet files are compressed. Gzip or Snappy?
>- What spark version did you use? It seems at least 1.4. If you use
>spark-sql and tungsten, you might have better performance. but spark 1.5.2
>gave me a wrong result when the data was about 300~400GB, just for a simple
>group-by and aggregate.
>- Did you use kyro serialization?
>- you should have spark.shuffle.compress=true, verify it.
>- How many tasks did you use? spark.default.parallelism=?
>- What about this:
>   - Read the data day by day
>   - compute a bucket id from timestamp, e.g., the date and hour
>   - Write into different buckets (you probably need a special writer
>   to write data efficiently without shuffling the data).
>   - distinct for each bucket. Because each bucket is small, spark can
>   get it done faster than having everything in one run.
>   - I think using groupBy (userId, timestamp) might be better than
>   distinct. I guess distinct() will compare every field.
>
>
> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue  wrote:
>
>> And the most frequent operation I am gonna do is find the UserID who have
>> some events, then retrieve all the events associted with the UserID.
>>
>> In this case, how should I partition to speed up the process?
>>
>> Thanks.
>>
>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue  wrote:
>>
>>> hey Ted,
>>>
>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>> the partition.
>>>
>>> Annoyingly, every day's incoming Event data having duplicates among each
>>> other.  One same event could show up in Day1 and Day2 and probably Day3.
>>>
>>> I only want to keep single Event table and each day it come so many
>>> duplicates.
>>>
>>> Is there a way I could just insert into Parquet and if duplicate found,
>>> just ignore?
>>>
>>> Thanks,
>>> Gavin
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu  wrote:
>>>
 Is your Parquet data source partitioned by date ?

 Can you dedup within partitions ?

 Cheers

 On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue 
 wrote:

> I tried on Three day's data.  The total input is only 980GB, but the
> shuffle write Data is about 6.2TB, then the job failed during shuffle read
> step, which should be another 6.2TB shuffle read.
>
> I think to Dedup, the shuffling can not be avoided. Is there anything
> I could do to stablize this process?
>
> Thanks.
>
>
> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue 
> wrote:
>
>> Hey,
>>
>> I got everyday's Event table and want to merge them into a single
>> Event table. But there so many duplicates among each day's data.
>>
>> I use Parquet as the data source.  What I am doing now is
>>
>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>> file").
>>
>> Each day's Event is stored in their own Parquet file
>>
>> But it failed at the stage2 which keeps losing connection to one
>> executor. I guess this is due to the memory issue.
>>
>> Any suggestion how I do this efficiently?
>>
>> Thanks,
>> Gavin
>>
>
>

>>>
>>
>


[no subject]

2016-01-08 Thread Suresh Thalamati



Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Ted Yu
gzip is relatively slow. It consumes much CPU.

snappy is faster.

LZ4 is faster than GZIP and smaller than Snappy.

Cheers

On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue  wrote:

> Thank you .
>
> And speaking of compression, is there big difference on performance
> between gzip and snappy? And why parquet is using gzip by default?
>
> Thanks.
>
>
> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu  wrote:
>
>> Cycling old bits:
>> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
>>
>> Gavin:
>> Which release of hbase did you play with ?
>>
>> HBase has been evolving and is getting more stable.
>>
>> Cheers
>>
>> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue  wrote:
>>
>>> I used to maintain a HBase cluster. The experience with it was not
>>> happy.
>>>
>>> I just tried query the data  from each day's first and dedup with
>>> smaller set, the performance is acceptable.  So I guess I will use this
>>> method.
>>>
>>> Again, could anyone give advice about:
>>>
>>>- Automatically determine the number of reducers for joins and
>>>groupbys: Currently in Spark SQL, you need to control the degree of
>>>parallelism post-shuffle using “SET
>>>spark.sql.shuffle.partitions=[num_tasks];”.
>>>
>>> Thanks.
>>>
>>> Gavin
>>>
>>>
>>>
>>>
>>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu  wrote:
>>>
 bq. in an noSQL db such as Hbase

 +1 :-)


 On Fri, Jan 8, 2016 at 6:25 PM, ayan guha  wrote:

> One option you may want to explore is writing event table in an noSQL
> db such as Hbase. One inherent problem in your approach is you always need
> to load either full data set or a defined number of partitions to see if
> the event has already come (and no gurantee it is full proof, but lead to
> unnecessary loading in most cases).
>
> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue 
> wrote:
>
>> Hey,
>> Thank you for the answer. I checked the setting you mentioend they
>> are all correct.  I noticed that in the job, there are always only 200
>> reducers for shuffle read, I believe it is setting in the sql shuffle
>> parallism.
>>
>> In the doc, it mentions:
>>
>>- Automatically determine the number of reducers for joins and
>>groupbys: Currently in Spark SQL, you need to control the degree of
>>parallelism post-shuffle using “SET
>>spark.sql.shuffle.partitions=[num_tasks];”.
>>
>>
>> What would be the ideal number for this setting? Is it based on the
>> hardware of cluster?
>>
>>
>> Thanks,
>>
>> Gavin
>>
>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang 
>> wrote:
>>
>>>
>>>- I assume your parquet files are compressed. Gzip or Snappy?
>>>- What spark version did you use? It seems at least 1.4. If you
>>>use spark-sql and tungsten, you might have better performance. but 
>>> spark
>>>1.5.2 gave me a wrong result when the data was about 300~400GB, just 
>>> for a
>>>simple group-by and aggregate.
>>>- Did you use kyro serialization?
>>>- you should have spark.shuffle.compress=true, verify it.
>>>- How many tasks did you use? spark.default.parallelism=?
>>>- What about this:
>>>   - Read the data day by day
>>>   - compute a bucket id from timestamp, e.g., the date and hour
>>>   - Write into different buckets (you probably need a special
>>>   writer to write data efficiently without shuffling the data).
>>>   - distinct for each bucket. Because each bucket is small,
>>>   spark can get it done faster than having everything in one run.
>>>   - I think using groupBy (userId, timestamp) might be better
>>>   than distinct. I guess distinct() will compare every field.
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue 
>>> wrote:
>>>
 And the most frequent operation I am gonna do is find the UserID
 who have some events, then retrieve all the events associted with the
 UserID.

 In this case, how should I partition to speed up the process?

 Thanks.

 On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue 
 wrote:

> hey Ted,
>
> Event table is like this: UserID, EventType, EventKey, TimeStamp,
> MetaData.  I just parse it from Json and save as Parquet, did not 
> change
> the partition.
>
> Annoyingly, every day's incoming Event data having duplicates
> among each other.  One same event could show up in Day1 and Day2 and
> probably Day3.
>
> I only want to keep single Event table and each day it come so
> many duplicates.

Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Ted Yu
bq. in an noSQL db such as Hbase

+1 :-)


On Fri, Jan 8, 2016 at 6:25 PM, ayan guha  wrote:

> One option you may want to explore is writing event table in an noSQL db
> such as Hbase. One inherent problem in your approach is you always need to
> load either full data set or a defined number of partitions to see if the
> event has already come (and no gurantee it is full proof, but lead to
> unnecessary loading in most cases).
>
> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue  wrote:
>
>> Hey,
>> Thank you for the answer. I checked the setting you mentioend they are
>> all correct.  I noticed that in the job, there are always only 200 reducers
>> for shuffle read, I believe it is setting in the sql shuffle parallism.
>>
>> In the doc, it mentions:
>>
>>- Automatically determine the number of reducers for joins and
>>groupbys: Currently in Spark SQL, you need to control the degree of
>>parallelism post-shuffle using “SET
>>spark.sql.shuffle.partitions=[num_tasks];”.
>>
>>
>> What would be the ideal number for this setting? Is it based on the
>> hardware of cluster?
>>
>>
>> Thanks,
>>
>> Gavin
>>
>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang  wrote:
>>
>>>
>>>- I assume your parquet files are compressed. Gzip or Snappy?
>>>- What spark version did you use? It seems at least 1.4. If you use
>>>spark-sql and tungsten, you might have better performance. but spark 
>>> 1.5.2
>>>gave me a wrong result when the data was about 300~400GB, just for a 
>>> simple
>>>group-by and aggregate.
>>>- Did you use kyro serialization?
>>>- you should have spark.shuffle.compress=true, verify it.
>>>- How many tasks did you use? spark.default.parallelism=?
>>>- What about this:
>>>   - Read the data day by day
>>>   - compute a bucket id from timestamp, e.g., the date and hour
>>>   - Write into different buckets (you probably need a special
>>>   writer to write data efficiently without shuffling the data).
>>>   - distinct for each bucket. Because each bucket is small, spark
>>>   can get it done faster than having everything in one run.
>>>   - I think using groupBy (userId, timestamp) might be better than
>>>   distinct. I guess distinct() will compare every field.
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue 
>>> wrote:
>>>
 And the most frequent operation I am gonna do is find the UserID who
 have some events, then retrieve all the events associted with the UserID.

 In this case, how should I partition to speed up the process?

 Thanks.

 On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue 
 wrote:

> hey Ted,
>
> Event table is like this: UserID, EventType, EventKey, TimeStamp,
> MetaData.  I just parse it from Json and save as Parquet, did not change
> the partition.
>
> Annoyingly, every day's incoming Event data having duplicates among
> each other.  One same event could show up in Day1 and Day2 and probably
> Day3.
>
> I only want to keep single Event table and each day it come so many
> duplicates.
>
> Is there a way I could just insert into Parquet and if duplicate
> found, just ignore?
>
> Thanks,
> Gavin
>
>
>
>
>
>
>
> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu  wrote:
>
>> Is your Parquet data source partitioned by date ?
>>
>> Can you dedup within partitions ?
>>
>> Cheers
>>
>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue 
>> wrote:
>>
>>> I tried on Three day's data.  The total input is only 980GB, but the
>>> shuffle write Data is about 6.2TB, then the job failed during shuffle 
>>> read
>>> step, which should be another 6.2TB shuffle read.
>>>
>>> I think to Dedup, the shuffling can not be avoided. Is there
>>> anything I could do to stablize this process?
>>>
>>> Thanks.
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue 
>>> wrote:
>>>
 Hey,

 I got everyday's Event table and want to merge them into a single
 Event table. But there so many duplicates among each day's data.

 I use Parquet as the data source.  What I am doing now is

 EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
 parquet file").

 Each day's Event is stored in their own Parquet file

 But it failed at the stage2 which keeps losing connection to one
 executor. I guess this is due to the memory issue.

 Any suggestion how I do this efficiently?

 Thanks,
 Gavin

>>>
>>>
>>
>

>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Gavin Yue
I used to maintain a HBase cluster. The experience with it was not happy.

I just tried query the data  from each day's first and dedup with smaller
set, the performance is acceptable.  So I guess I will use this method.

Again, could anyone give advice about:

   - Automatically determine the number of reducers for joins and groupbys:
   Currently in Spark SQL, you need to control the degree of parallelism
   post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.

Thanks.

Gavin




On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu  wrote:

> bq. in an noSQL db such as Hbase
>
> +1 :-)
>
>
> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha  wrote:
>
>> One option you may want to explore is writing event table in an noSQL db
>> such as Hbase. One inherent problem in your approach is you always need to
>> load either full data set or a defined number of partitions to see if the
>> event has already come (and no gurantee it is full proof, but lead to
>> unnecessary loading in most cases).
>>
>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue 
>> wrote:
>>
>>> Hey,
>>> Thank you for the answer. I checked the setting you mentioend they are
>>> all correct.  I noticed that in the job, there are always only 200 reducers
>>> for shuffle read, I believe it is setting in the sql shuffle parallism.
>>>
>>> In the doc, it mentions:
>>>
>>>- Automatically determine the number of reducers for joins and
>>>groupbys: Currently in Spark SQL, you need to control the degree of
>>>parallelism post-shuffle using “SET
>>>spark.sql.shuffle.partitions=[num_tasks];”.
>>>
>>>
>>> What would be the ideal number for this setting? Is it based on the
>>> hardware of cluster?
>>>
>>>
>>> Thanks,
>>>
>>> Gavin
>>>
>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang 
>>> wrote:
>>>

- I assume your parquet files are compressed. Gzip or Snappy?
- What spark version did you use? It seems at least 1.4. If you use
spark-sql and tungsten, you might have better performance. but spark 
 1.5.2
gave me a wrong result when the data was about 300~400GB, just for a 
 simple
group-by and aggregate.
- Did you use kyro serialization?
- you should have spark.shuffle.compress=true, verify it.
- How many tasks did you use? spark.default.parallelism=?
- What about this:
   - Read the data day by day
   - compute a bucket id from timestamp, e.g., the date and hour
   - Write into different buckets (you probably need a special
   writer to write data efficiently without shuffling the data).
   - distinct for each bucket. Because each bucket is small, spark
   can get it done faster than having everything in one run.
   - I think using groupBy (userId, timestamp) might be better than
   distinct. I guess distinct() will compare every field.


 On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue 
 wrote:

> And the most frequent operation I am gonna do is find the UserID who
> have some events, then retrieve all the events associted with the UserID.
>
> In this case, how should I partition to speed up the process?
>
> Thanks.
>
> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue 
> wrote:
>
>> hey Ted,
>>
>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>> MetaData.  I just parse it from Json and save as Parquet, did not change
>> the partition.
>>
>> Annoyingly, every day's incoming Event data having duplicates among
>> each other.  One same event could show up in Day1 and Day2 and probably
>> Day3.
>>
>> I only want to keep single Event table and each day it come so many
>> duplicates.
>>
>> Is there a way I could just insert into Parquet and if duplicate
>> found, just ignore?
>>
>> Thanks,
>> Gavin
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu  wrote:
>>
>>> Is your Parquet data source partitioned by date ?
>>>
>>> Can you dedup within partitions ?
>>>
>>> Cheers
>>>
>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue 
>>> wrote:
>>>
 I tried on Three day's data.  The total input is only 980GB, but
 the shuffle write Data is about 6.2TB, then the job failed during 
 shuffle
 read step, which should be another 6.2TB shuffle read.

 I think to Dedup, the shuffling can not be avoided. Is there
 anything I could do to stablize this process?

 Thanks.


 On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue 
 wrote:

> Hey,
>
> I got everyday's Event table and want to merge 

How to compile Python and use How to compile Python and use spark-submit

2016-01-08 Thread Ascot Moss
Hi,

Instead of using Spark-shell, does anyone know how to build .zip (or .egg)
for Python and use Spark-submit to run?

Regards


Re: How to compile Python and use How to compile Python and use spark-submit

2016-01-08 Thread Denny Lee
Per http://spark.apache.org/docs/latest/submitting-applications.html:

For Python, you can use the --py-files argument of spark-submit to add .py,
.zip or .egg files to be distributed with your application. If you depend
on multiple Python files we recommend packaging them into a .zip or .egg.



On Fri, Jan 8, 2016 at 6:44 PM Ascot Moss  wrote:

> Hi,
>
> Instead of using Spark-shell, does anyone know how to build .zip (or .egg)
> for Python and use Spark-submit to run?
>
> Regards
>


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Gavin Yue
Thank you .

And speaking of compression, is there big difference on performance between
gzip and snappy? And why parquet is using gzip by default?

Thanks.


On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu  wrote:

> Cycling old bits:
> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
>
> Gavin:
> Which release of hbase did you play with ?
>
> HBase has been evolving and is getting more stable.
>
> Cheers
>
> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue  wrote:
>
>> I used to maintain a HBase cluster. The experience with it was not happy.
>>
>> I just tried query the data  from each day's first and dedup with smaller
>> set, the performance is acceptable.  So I guess I will use this method.
>>
>> Again, could anyone give advice about:
>>
>>- Automatically determine the number of reducers for joins and
>>groupbys: Currently in Spark SQL, you need to control the degree of
>>parallelism post-shuffle using “SET
>>spark.sql.shuffle.partitions=[num_tasks];”.
>>
>> Thanks.
>>
>> Gavin
>>
>>
>>
>>
>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu  wrote:
>>
>>> bq. in an noSQL db such as Hbase
>>>
>>> +1 :-)
>>>
>>>
>>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha  wrote:
>>>
 One option you may want to explore is writing event table in an noSQL
 db such as Hbase. One inherent problem in your approach is you always need
 to load either full data set or a defined number of partitions to see if
 the event has already come (and no gurantee it is full proof, but lead to
 unnecessary loading in most cases).

 On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue 
 wrote:

> Hey,
> Thank you for the answer. I checked the setting you mentioend they are
> all correct.  I noticed that in the job, there are always only 200 
> reducers
> for shuffle read, I believe it is setting in the sql shuffle parallism.
>
> In the doc, it mentions:
>
>- Automatically determine the number of reducers for joins and
>groupbys: Currently in Spark SQL, you need to control the degree of
>parallelism post-shuffle using “SET
>spark.sql.shuffle.partitions=[num_tasks];”.
>
>
> What would be the ideal number for this setting? Is it based on the
> hardware of cluster?
>
>
> Thanks,
>
> Gavin
>
> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang 
> wrote:
>
>>
>>- I assume your parquet files are compressed. Gzip or Snappy?
>>- What spark version did you use? It seems at least 1.4. If you
>>use spark-sql and tungsten, you might have better performance. but 
>> spark
>>1.5.2 gave me a wrong result when the data was about 300~400GB, just 
>> for a
>>simple group-by and aggregate.
>>- Did you use kyro serialization?
>>- you should have spark.shuffle.compress=true, verify it.
>>- How many tasks did you use? spark.default.parallelism=?
>>- What about this:
>>   - Read the data day by day
>>   - compute a bucket id from timestamp, e.g., the date and hour
>>   - Write into different buckets (you probably need a special
>>   writer to write data efficiently without shuffling the data).
>>   - distinct for each bucket. Because each bucket is small,
>>   spark can get it done faster than having everything in one run.
>>   - I think using groupBy (userId, timestamp) might be better
>>   than distinct. I guess distinct() will compare every field.
>>
>>
>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue 
>> wrote:
>>
>>> And the most frequent operation I am gonna do is find the UserID who
>>> have some events, then retrieve all the events associted with the 
>>> UserID.
>>>
>>> In this case, how should I partition to speed up the process?
>>>
>>> Thanks.
>>>
>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue 
>>> wrote:
>>>
 hey Ted,

 Event table is like this: UserID, EventType, EventKey, TimeStamp,
 MetaData.  I just parse it from Json and save as Parquet, did not 
 change
 the partition.

 Annoyingly, every day's incoming Event data having duplicates among
 each other.  One same event could show up in Day1 and Day2 and probably
 Day3.

 I only want to keep single Event table and each day it come so many
 duplicates.

 Is there a way I could just insert into Parquet and if duplicate
 found, just ignore?

 Thanks,
 Gavin







 On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu  wrote:

> Is your Parquet data source partitioned 

Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Gavin Yue
Hey,
Thank you for the answer. I checked the setting you mentioend they are all
correct.  I noticed that in the job, there are always only 200 reducers for
shuffle read, I believe it is setting in the sql shuffle parallism.

In the doc, it mentions:

   - Automatically determine the number of reducers for joins and groupbys:
   Currently in Spark SQL, you need to control the degree of parallelism
   post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.


What would be the ideal number for this setting? Is it based on the
hardware of cluster?


Thanks,

Gavin

On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang  wrote:

>
>- I assume your parquet files are compressed. Gzip or Snappy?
>- What spark version did you use? It seems at least 1.4. If you use
>spark-sql and tungsten, you might have better performance. but spark 1.5.2
>gave me a wrong result when the data was about 300~400GB, just for a simple
>group-by and aggregate.
>- Did you use kyro serialization?
>- you should have spark.shuffle.compress=true, verify it.
>- How many tasks did you use? spark.default.parallelism=?
>- What about this:
>   - Read the data day by day
>   - compute a bucket id from timestamp, e.g., the date and hour
>   - Write into different buckets (you probably need a special writer
>   to write data efficiently without shuffling the data).
>   - distinct for each bucket. Because each bucket is small, spark can
>   get it done faster than having everything in one run.
>   - I think using groupBy (userId, timestamp) might be better than
>   distinct. I guess distinct() will compare every field.
>
>
> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue  wrote:
>
>> And the most frequent operation I am gonna do is find the UserID who have
>> some events, then retrieve all the events associted with the UserID.
>>
>> In this case, how should I partition to speed up the process?
>>
>> Thanks.
>>
>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue  wrote:
>>
>>> hey Ted,
>>>
>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>> the partition.
>>>
>>> Annoyingly, every day's incoming Event data having duplicates among each
>>> other.  One same event could show up in Day1 and Day2 and probably Day3.
>>>
>>> I only want to keep single Event table and each day it come so many
>>> duplicates.
>>>
>>> Is there a way I could just insert into Parquet and if duplicate found,
>>> just ignore?
>>>
>>> Thanks,
>>> Gavin
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu  wrote:
>>>
 Is your Parquet data source partitioned by date ?

 Can you dedup within partitions ?

 Cheers

 On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue 
 wrote:

> I tried on Three day's data.  The total input is only 980GB, but the
> shuffle write Data is about 6.2TB, then the job failed during shuffle read
> step, which should be another 6.2TB shuffle read.
>
> I think to Dedup, the shuffling can not be avoided. Is there anything
> I could do to stablize this process?
>
> Thanks.
>
>
> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue 
> wrote:
>
>> Hey,
>>
>> I got everyday's Event table and want to merge them into a single
>> Event table. But there so many duplicates among each day's data.
>>
>> I use Parquet as the data source.  What I am doing now is
>>
>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>> file").
>>
>> Each day's Event is stored in their own Parquet file
>>
>> But it failed at the stage2 which keeps losing connection to one
>> executor. I guess this is due to the memory issue.
>>
>> Any suggestion how I do this efficiently?
>>
>> Thanks,
>> Gavin
>>
>
>

>>>
>>
>


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread ayan guha
One option you may want to explore is writing event table in an noSQL db
such as Hbase. One inherent problem in your approach is you always need to
load either full data set or a defined number of partitions to see if the
event has already come (and no gurantee it is full proof, but lead to
unnecessary loading in most cases).

On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue  wrote:

> Hey,
> Thank you for the answer. I checked the setting you mentioend they are all
> correct.  I noticed that in the job, there are always only 200 reducers for
> shuffle read, I believe it is setting in the sql shuffle parallism.
>
> In the doc, it mentions:
>
>- Automatically determine the number of reducers for joins and
>groupbys: Currently in Spark SQL, you need to control the degree of
>parallelism post-shuffle using “SET
>spark.sql.shuffle.partitions=[num_tasks];”.
>
>
> What would be the ideal number for this setting? Is it based on the
> hardware of cluster?
>
>
> Thanks,
>
> Gavin
>
> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang  wrote:
>
>>
>>- I assume your parquet files are compressed. Gzip or Snappy?
>>- What spark version did you use? It seems at least 1.4. If you use
>>spark-sql and tungsten, you might have better performance. but spark 1.5.2
>>gave me a wrong result when the data was about 300~400GB, just for a 
>> simple
>>group-by and aggregate.
>>- Did you use kyro serialization?
>>- you should have spark.shuffle.compress=true, verify it.
>>- How many tasks did you use? spark.default.parallelism=?
>>- What about this:
>>   - Read the data day by day
>>   - compute a bucket id from timestamp, e.g., the date and hour
>>   - Write into different buckets (you probably need a special writer
>>   to write data efficiently without shuffling the data).
>>   - distinct for each bucket. Because each bucket is small, spark
>>   can get it done faster than having everything in one run.
>>   - I think using groupBy (userId, timestamp) might be better than
>>   distinct. I guess distinct() will compare every field.
>>
>>
>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue  wrote:
>>
>>> And the most frequent operation I am gonna do is find the UserID who
>>> have some events, then retrieve all the events associted with the UserID.
>>>
>>> In this case, how should I partition to speed up the process?
>>>
>>> Thanks.
>>>
>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue 
>>> wrote:
>>>
 hey Ted,

 Event table is like this: UserID, EventType, EventKey, TimeStamp,
 MetaData.  I just parse it from Json and save as Parquet, did not change
 the partition.

 Annoyingly, every day's incoming Event data having duplicates among
 each other.  One same event could show up in Day1 and Day2 and probably
 Day3.

 I only want to keep single Event table and each day it come so many
 duplicates.

 Is there a way I could just insert into Parquet and if duplicate found,
 just ignore?

 Thanks,
 Gavin







 On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu  wrote:

> Is your Parquet data source partitioned by date ?
>
> Can you dedup within partitions ?
>
> Cheers
>
> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue 
> wrote:
>
>> I tried on Three day's data.  The total input is only 980GB, but the
>> shuffle write Data is about 6.2TB, then the job failed during shuffle 
>> read
>> step, which should be another 6.2TB shuffle read.
>>
>> I think to Dedup, the shuffling can not be avoided. Is there anything
>> I could do to stablize this process?
>>
>> Thanks.
>>
>>
>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue 
>> wrote:
>>
>>> Hey,
>>>
>>> I got everyday's Event table and want to merge them into a single
>>> Event table. But there so many duplicates among each day's data.
>>>
>>> I use Parquet as the data source.  What I am doing now is
>>>
>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>> parquet file").
>>>
>>> Each day's Event is stored in their own Parquet file
>>>
>>> But it failed at the stage2 which keeps losing connection to one
>>> executor. I guess this is due to the memory issue.
>>>
>>> Any suggestion how I do this efficiently?
>>>
>>> Thanks,
>>> Gavin
>>>
>>
>>
>

>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Ted Yu
Cycling old bits:
http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ

Gavin:
Which release of hbase did you play with ?

HBase has been evolving and is getting more stable.

Cheers

On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue  wrote:

> I used to maintain a HBase cluster. The experience with it was not happy.
>
> I just tried query the data  from each day's first and dedup with smaller
> set, the performance is acceptable.  So I guess I will use this method.
>
> Again, could anyone give advice about:
>
>- Automatically determine the number of reducers for joins and
>groupbys: Currently in Spark SQL, you need to control the degree of
>parallelism post-shuffle using “SET
>spark.sql.shuffle.partitions=[num_tasks];”.
>
> Thanks.
>
> Gavin
>
>
>
>
> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu  wrote:
>
>> bq. in an noSQL db such as Hbase
>>
>> +1 :-)
>>
>>
>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha  wrote:
>>
>>> One option you may want to explore is writing event table in an noSQL db
>>> such as Hbase. One inherent problem in your approach is you always need to
>>> load either full data set or a defined number of partitions to see if the
>>> event has already come (and no gurantee it is full proof, but lead to
>>> unnecessary loading in most cases).
>>>
>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue 
>>> wrote:
>>>
 Hey,
 Thank you for the answer. I checked the setting you mentioend they are
 all correct.  I noticed that in the job, there are always only 200 reducers
 for shuffle read, I believe it is setting in the sql shuffle parallism.

 In the doc, it mentions:

- Automatically determine the number of reducers for joins and
groupbys: Currently in Spark SQL, you need to control the degree of
parallelism post-shuffle using “SET
spark.sql.shuffle.partitions=[num_tasks];”.


 What would be the ideal number for this setting? Is it based on the
 hardware of cluster?


 Thanks,

 Gavin

 On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang 
 wrote:

>
>- I assume your parquet files are compressed. Gzip or Snappy?
>- What spark version did you use? It seems at least 1.4. If you
>use spark-sql and tungsten, you might have better performance. but 
> spark
>1.5.2 gave me a wrong result when the data was about 300~400GB, just 
> for a
>simple group-by and aggregate.
>- Did you use kyro serialization?
>- you should have spark.shuffle.compress=true, verify it.
>- How many tasks did you use? spark.default.parallelism=?
>- What about this:
>   - Read the data day by day
>   - compute a bucket id from timestamp, e.g., the date and hour
>   - Write into different buckets (you probably need a special
>   writer to write data efficiently without shuffling the data).
>   - distinct for each bucket. Because each bucket is small, spark
>   can get it done faster than having everything in one run.
>   - I think using groupBy (userId, timestamp) might be better
>   than distinct. I guess distinct() will compare every field.
>
>
> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue 
> wrote:
>
>> And the most frequent operation I am gonna do is find the UserID who
>> have some events, then retrieve all the events associted with the UserID.
>>
>> In this case, how should I partition to speed up the process?
>>
>> Thanks.
>>
>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue 
>> wrote:
>>
>>> hey Ted,
>>>
>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>> the partition.
>>>
>>> Annoyingly, every day's incoming Event data having duplicates among
>>> each other.  One same event could show up in Day1 and Day2 and probably
>>> Day3.
>>>
>>> I only want to keep single Event table and each day it come so many
>>> duplicates.
>>>
>>> Is there a way I could just insert into Parquet and if duplicate
>>> found, just ignore?
>>>
>>> Thanks,
>>> Gavin
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu  wrote:
>>>
 Is your Parquet data source partitioned by date ?

 Can you dedup within partitions ?

 Cheers

 On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue 
 wrote:

> I tried on Three day's data.  The total input is only 980GB, but
> the shuffle write Data is about 6.2TB, then the job failed during 
> shuffle
> read step, which 

how garbage collection works on parallelize

2016-01-08 Thread jluan
Hi,

I am curious about garbage collect on an object which gets parallelized. Say
if we have a really large array (say 40GB in ram) that we want to
parallelize across our machines. 

I have the following function:

def doSomething(): RDD[Double] = {
val reallyBigArray = Array[Double[(some really big value)
sc.parallelize(reallyBigArray)
}

Theoretically, will reallyBigArray be marked for GC? Or will reallyBigArray
not be GC'd because parallelize somehow has a reference on reallyBigArray?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-garbage-collection-works-on-parallelize-tp25926.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how garbage collection works on parallelize

2016-01-08 Thread Josh Rosen
It won't be GC'd as long as the RDD which results from `parallelize()` is
kept around; that RDD keeps strong references to the parallelized
collection's elements in order to enable fault-tolerance.

On Fri, Jan 8, 2016 at 6:50 PM, jluan  wrote:

> Hi,
>
> I am curious about garbage collect on an object which gets parallelized.
> Say
> if we have a really large array (say 40GB in ram) that we want to
> parallelize across our machines.
>
> I have the following function:
>
> def doSomething(): RDD[Double] = {
> val reallyBigArray = Array[Double[(some really big value)
> sc.parallelize(reallyBigArray)
> }
>
> Theoretically, will reallyBigArray be marked for GC? Or will reallyBigArray
> not be GC'd because parallelize somehow has a reference on reallyBigArray?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-garbage-collection-works-on-parallelize-tp25926.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


pyspark: conditionals inside functions

2016-01-08 Thread Franc Carter
Hi,

I'm trying to write a short function that returns the last sunday of the
week of a given date, code below

def getSunday(day):

day = day.cast("date")

sun = next_day(day, "Sunday")

n = datediff(sun,day)

if (n == 7):

return day

else:

return sun


this gives me

ValueError: Cannot convert column into bool:


Can someone point out what I am doing wrong

thanks


-- 
Franc


Re: Unable to compile from source

2016-01-08 Thread hareesh makam
Are you behind a proxy?

Or

Try disabling the SSL check while building.

http://stackoverflow.com/questions/21252800/maven-trusting-all-certs-unlimited-java-policy

Check above link to know how to disable SSL check.

- hareesh.
On Jan 8, 2016 4:54 PM, "Gaini Rajeshwar" 
wrote:

> Hi All,
>
> I am new to apache spark.
>
> I have downloaded *Spark 1.6.0 (Jan 04 2016) source code version*.
>
> I did run the following command following command as per spark
> documentation .
>
> build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
>
> When run above command, i am getting the following error
>
> [ERROR] Failed to execute goal on project spark-catalyst_2.10: Could not 
> resolve dependencies for project 
> org.apache.spark:spark-catalyst_2.10:jar:1.6.0: Failed to collect 
> dependencies at org.codehaus.janino:janino:jar:2.7.8: Failed to read artifact 
> descriptor for org.codehaus.janino:janino:jar:2.7.8: Could not transfer 
> artifact org.codehaus.janino:janino:pom:2.7.8 from/to central 
> (https://repo1.maven.org/maven2): Remote host closed connection during 
> handshake: SSL peer shut down incorrectly -> [Help 1]
>
> Can anyone help with this ?
>
>
>


Re: Problems with reading data from parquet files in a HDFS remotely

2016-01-08 Thread Henrik Baastrup
I solved the problem. I needed to tell the SparkContext about my Hadoop
set up, so now my program is as follow:

SparkConf conf = new SparkConf()
.setAppName("SparkTest")
.setMaster("spark://172.27.13.57:7077")
.set("spark.executor.memory", "2g") // We assign 2 GB ram to our
job on each Worker
.set("spark.driver.port", "51810"); // Fix the port the driver
will listen on, good for firewalls!
JavaSparkContext sc = new JavaSparkContext(conf);

// Tell Spark about our Hadoop environment
File coreSite = new File("/etc/hadoop/conf/core-site.xml");
File hdfsSite = new File("/etc/hadoop/conf/hdfs-site.xml");
Configuration hConf =  sc.hadoopConfiguration();
hConf.addResource(new Path(coreSite.getAbsolutePath()));
hConf.addResource(new Path(hdfsSite.getAbsolutePath()));

SQLContext sqlContext = new SQLContext(sc);

DataFrameReader reader = sqlContext.read();
DataFrame df = reader.parquet("/user/hdfs/parquet-multi/BICC");
DataFrame filtered = df.filter("endTime>=14494218 AND
endTime<=14494224 AND calling='6287870642893' AND
p_endtime=14494224");
filtered.show();

Henrik

On 07/01/2016 19:41, Ewan Leith wrote:
>
> Try the path
>
>
> "hdfs:///user/hdfs/parquet-multi/BICC"
> Thanks,
> Ewan
>
>
> -- Original message--
>
> *From: *Henrik Baastrup
>
> *Date: *Thu, 7 Jan 2016 17:54
>
> *To: *user@spark.apache.org;
>
> *Cc: *Baastrup, Henrik;
>
> *Subject:*Problems with reading data from parquet files in a HDFS remotely
>
>
> Hi All,
>
> I have a small Hadoop cluster where I have stored a lot of data in parquet 
> files. I have installed a Spark master service on one of the nodes and now 
> would like to query my parquet files from a Spark client. When I run the 
> following program from the spark-shell on the Spark Master node all function 
> correct:
>
> # val sqlCont = new org.apache.spark.sql.SQLContext(sc)
> # val reader = sqlCont.read
> # val dataFrame = reader.parquet("/user/hdfs/parquet-multi/BICC")
> # dataFrame.registerTempTable("BICC")
> # val recSet = sqlCont.sql("SELECT 
> protocolCode,beginTime,endTime,called,calling FROM BICC WHERE 
> endTime>=14494218 AND endTime<=14494224 AND 
> calling='6287870642893' AND p_endtime=14494224")
> # recSet.show()  
>
> But when I run the Java program below, from my client, I get: 
>
> Exception in thread "main" java.lang.AssertionError: assertion failed: No 
> predefined schema found, and no Parquet data files or summary files found 
> under file:/user/hdfs/parquet-multi/BICC.
>
> The exception occurs at the line: DataFrame df = 
> reader.parquet("/user/hdfs/parquet-multi/BICC");
>
> On the Master node I can see the client connect when the SparkContext is 
> instanced, as I get the following lines in the Spark log:
>
> 16/01/07 18:27:47 INFO Master: Registering app SparkTest
> 16/01/07 18:27:47 INFO Master: Registered app SparkTest with ID 
> app-20160107182747-00801
>
> If I create a local directory with the given path, my program goes in an 
> endless loop, with the following warning on the console:
>
> WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; 
> check your cluster UI to ensure that workers are registered and have 
> sufficient resources
>
> To me it seams that my SQLContext does not connect to the Spark Master, but 
> try to work locally on the client, where the requested files do not exist.
>
> Java program:
>   SparkConf conf = new SparkConf()
>   .setAppName("SparkTest")
>   .setMaster("spark://172.27.13.57:7077");
>   JavaSparkContext sc = new JavaSparkContext(conf);
>   SQLContext sqlContext = new SQLContext(sc);
>   
>   DataFrameReader reader = sqlContext.read();
>   DataFrame df = reader.parquet("/user/hdfs/parquet-multi/BICC");
>   DataFrame filtered = df.filter("endTime>=14494218 AND 
> endTime<=14494224 AND calling='6287870642893' AND 
> p_endtime=14494224");
>   filtered.show();
>
> Are there someone there can help me?
>
> Henrik
>



subscribe

2016-01-08 Thread Jeetendra Gangele



Re: spark 1.6 Issue

2016-01-08 Thread kali.tumm...@gmail.com
Hi All, 

worked OK by adding below in VM options.

-Xms128m -Xmx512m -XX:MaxPermSize=300m -ea

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-6-Issue-tp25893p25920.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how deploy pmml model in spark

2016-01-08 Thread Sangameshwar Swami
Hi,


can anyone know how to deploy pmml model in spark machine learning or R. please 
give reply ASAP



::DISCLAIMER::


The contents of this e-mail and any attachment(s) are confidential and intended 
for the named recipient(s) only.
E-mail transmission is not guaranteed to be secure or error-free as information 
could be intercepted, corrupted,
lost, destroyed, arrive late or incomplete, or may contain viruses in 
transmission. The e mail and its contents
(with or without referred errors) shall therefore not attach any liability on 
the originator or HCL or its affiliates.
Views or opinions, if any, presented in this email are solely those of the 
author and may not necessarily reflect the
views or opinions of HCL or its affiliates. Any form of reproduction, 
dissemination, copying, disclosure, modification,
distribution and / or publication of this message without the prior written 
consent of authorized representative of
HCL is strictly prohibited. If you have received this email in error please 
delete it and notify the sender immediately.
Before opening any email and/or attachments, please check them for viruses and 
other defects.




Re: adding jars - hive on spark cdh 5.4.3

2016-01-08 Thread Edward Capriolo
You can not 'add jar' input formats and serde's. They need to be part of
your auxlib.

On Fri, Jan 8, 2016 at 12:19 PM, Ophir Etzion  wrote:

> I tried now. still getting
>
> 16/01/08 16:37:34 ERROR exec.Utilities: Failed to load plan: 
> hdfs://hadoop-alidoro-nn-vip/tmp/hive/hive/c2af9882-38a9-42b0-8d17-3f56708383e8/hive_2016-01-08_16-36-41_370_3307331506800215903-3/-mr-10004/3c90a796-47fc-4541-bbec-b196c40aefab/map.xml:
>  org.apache.hive.com.esotericsoftware.kryo.KryoException: Unable to find 
> class: com.foursquare.hadoop.hive.io.HiveThriftSequenceFileInputFormat
> Serialization trace:
> inputFileFormatClass (org.apache.hadoop.hive.ql.plan.PartitionDesc)
> aliasToPartnInfo (org.apache.hadoop.hive.ql.plan.MapWork)
> org.apache.hive.com.esotericsoftware.kryo.KryoException: Unable to find 
> class: com.foursquare.hadoop.hive.io.HiveThriftSequenceFileInputFormat
>
>
> HiveThriftSequenceFileInputFormat is in one of the jars I'm trying to add.
>
>
> On Thu, Jan 7, 2016 at 9:58 PM, Prem Sure  wrote:
>
>> did you try -- jars property in spark submit? if your jar is of huge
>> size, you can pre-load the jar on all executors in a common available
>> directory to avoid network IO.
>>
>> On Thu, Jan 7, 2016 at 4:03 PM, Ophir Etzion 
>> wrote:
>>
>>> I' trying to add jars before running a query using hive on spark on cdh
>>> 5.4.3.
>>> I've tried applying the patch in
>>> https://issues.apache.org/jira/browse/HIVE-12045 (manually as the patch
>>> is done on a different hive version) but still hasn't succeeded.
>>>
>>> did anyone manage to do ADD JAR successfully with CDH?
>>>
>>> Thanks,
>>> Ophir
>>>
>>
>>
>


Re: subscribe

2016-01-08 Thread Denny Lee
To subscribe, please go to http://spark.apache.org/community.html to join
the mailing list.


On Fri, Jan 8, 2016 at 3:58 AM Jeetendra Gangele 
wrote:

>
>


Re: write new data to mysql

2016-01-08 Thread Todd Nist
It is not clear from the information provided why the insertIntoJDBC failed
in #2.  I would note that method on the DataFrame as been deprecated since
1.4, not sure what version your on.  You should be able to do something
like this:

 DataFrame.write.mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL_WRITE,
"track_on_alarm", connectionProps)

HTH.

-Todd

On Fri, Jan 8, 2016 at 10:53 AM, Ted Yu  wrote:

> Which Spark release are you using ?
>
> For case #2, was there any error / clue in the logs ?
>
> Cheers
>
> On Fri, Jan 8, 2016 at 7:36 AM, Yasemin Kaya  wrote:
>
>> Hi,
>>
>> I want to write dataframe existing mysql table, but when i use
>> *peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
>> "track_on_alarm",false)*
>>
>> it says "Table track_on_alarm already exists."
>>
>> And when i *use peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
>> "track_on_alarm",true)*
>>
>> i lost the existing data.
>>
>> How i can write new data to db?
>>
>> Best,
>> yasemin
>>
>> --
>> hiç ender hiç
>>
>
>


write new data to mysql

2016-01-08 Thread Yasemin Kaya
Hi,

I want to write dataframe existing mysql table, but when i use
*peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
"track_on_alarm",false)*

it says "Table track_on_alarm already exists."

And when i *use peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
"track_on_alarm",true)*

i lost the existing data.

How i can write new data to db?

Best,
yasemin

-- 
hiç ender hiç


Efficient join multiple times

2016-01-08 Thread Jason White
I'm trying to join a contant large-ish RDD to each RDD in a DStream, and I'm
trying to keep the join as efficient as possible so each batch finishes
within the batch window. I'm using PySpark on 1.6

I've tried the trick of keying the large RDD into (k, v) pairs and using
.partitionBy(100).persist() to pre-partition it for each join. This works,
and definitely cuts down on the time. The dstream is also mapped to matching
(k, v) pairs.

Then I'm joining the data using:
my_dstream.transform(lambda rdd: rdd.leftOuterJoin(big_rdd))

What I'm seeing happening is that, while the right side is partitioned
exactly once, thus saving me an expensive shuffle each batch, the data is
still being transferred across the network each batch. This is putting me up
to or beyond my batch window.

I thought the point of the .partitionBy() call was to persist the data at a
fixed set of nodes, and have the data from the smaller RDD shuffled to those
nodes?

I've also tried using a .rightOuterJoin instead, it appears to make no
difference. Any suggestions?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-join-multiple-times-tp25922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: write new data to mysql

2016-01-08 Thread Ted Yu
Which Spark release are you using ?

For case #2, was there any error / clue in the logs ?

Cheers

On Fri, Jan 8, 2016 at 7:36 AM, Yasemin Kaya  wrote:

> Hi,
>
> I want to write dataframe existing mysql table, but when i use
> *peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
> "track_on_alarm",false)*
>
> it says "Table track_on_alarm already exists."
>
> And when i *use peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
> "track_on_alarm",true)*
>
> i lost the existing data.
>
> How i can write new data to db?
>
> Best,
> yasemin
>
> --
> hiç ender hiç
>


Re: adding jars - hive on spark cdh 5.4.3

2016-01-08 Thread Ophir Etzion
I tried now. still getting

16/01/08 16:37:34 ERROR exec.Utilities: Failed to load plan:
hdfs://hadoop-alidoro-nn-vip/tmp/hive/hive/c2af9882-38a9-42b0-8d17-3f56708383e8/hive_2016-01-08_16-36-41_370_3307331506800215903-3/-mr-10004/3c90a796-47fc-4541-bbec-b196c40aefab/map.xml:
org.apache.hive.com.esotericsoftware.kryo.KryoException: Unable to
find class: com.foursquare.hadoop.hive.io.HiveThriftSequenceFileInputFormat
Serialization trace:
inputFileFormatClass (org.apache.hadoop.hive.ql.plan.PartitionDesc)
aliasToPartnInfo (org.apache.hadoop.hive.ql.plan.MapWork)
org.apache.hive.com.esotericsoftware.kryo.KryoException: Unable to
find class: com.foursquare.hadoop.hive.io.HiveThriftSequenceFileInputFormat


HiveThriftSequenceFileInputFormat is in one of the jars I'm trying to add.


On Thu, Jan 7, 2016 at 9:58 PM, Prem Sure  wrote:

> did you try -- jars property in spark submit? if your jar is of huge size,
> you can pre-load the jar on all executors in a common available directory
> to avoid network IO.
>
> On Thu, Jan 7, 2016 at 4:03 PM, Ophir Etzion  wrote:
>
>> I' trying to add jars before running a query using hive on spark on cdh
>> 5.4.3.
>> I've tried applying the patch in
>> https://issues.apache.org/jira/browse/HIVE-12045 (manually as the patch
>> is done on a different hive version) but still hasn't succeeded.
>>
>> did anyone manage to do ADD JAR successfully with CDH?
>>
>> Thanks,
>> Ophir
>>
>
>


Re: Spark Context not getting initialized in local mode

2016-01-08 Thread Dean Wampler
ClassNotFoundException usually means one of a few problems:

1. Your app assembly is missing the jar files with those classes.
2. You mixed jar files from imcompatible versions in your assembly.
3. You built with one version of Spark and deployed to another.


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Fri, Jan 8, 2016 at 1:24 AM, Rahul Kumar 
wrote:

>
>
>
>
> Hi all,
> I am trying to start solr with a custom plugin which uses spark library. I
> am trying to initialize sparkcontext in local mode. I have made a fat jar
> for this plugin using maven shade and put it in the lib directory. *While
> starting solr it is not able to initialize sparkcontext.* It says class
> not found exception for AkkaRpcEnvFactory. Can anyone please help.
>
> *It gives the following error:*
>
> 3870 [coreLoadExecutor-4-thread-1] ERROR org.apache.spark.SparkContext  – 
> Error initializing SparkContext.
> java.lang.ClassNotFoundException:org.apache.spark.rpc.akka.AkkaRpcEnvFactory
>
> *Here is the detailed error*
>
> java -jar start.jar0[main] INFO  org.eclipse.jetty.server.Server  – 
> jetty-8.1.10.v2013031227   [main] INFO  
> org.eclipse.jetty.deploy.providers.ScanningAppProvider  – Deployment monitor 
> /home/rahul/solr-4.7.2/example/contexts at interval 040   [main] INFO  
> org.eclipse.jetty.deploy.DeploymentManager  – Deployable added: 
> /home/rahul/solr-4.7.2/example/contexts/solr-jetty-context.xml1095 [main] 
> INFO  org.eclipse.jetty.webapp.StandardDescriptorProcessor  – NO JSP Support 
> for /solr, did not find org.apache.jasper.servlet.JspServlet1155 [main] INFO  
> org.apache.solr.servlet.SolrDispatchFilter  – SolrDispatchFilter.init()1189 
> [main] INFO  org.apache.solr.core.SolrResourceLoader  – JNDI not configured 
> for solr (NoInitialContextEx)1190 [main] INFO  
> org.apache.solr.core.SolrResourceLoader  – solr home defaulted to 'solr/' 
> (could not find system property or JNDI)1190 [main] INFO  
> org.apache.solr.core.SolrResourceLoader  – new SolrResourceLoader for 
> directory: 'solr/'1280 [main] INFO  org.apache.solr.core.ConfigSolr  – 
> Loading container configuration from 
> /home/rahul/solr-4.7.2/example/solr/solr.xml1458 [main] INFO  
> org.apache.solr.core.CoresLocator  – Config-defined core root directory: 
> /home/rahul/solr-4.7.2/example/solr1465 [main] INFO  
> org.apache.solr.core.CoreContainer  – New CoreContainer 
> 602710225...
> 3870 [coreLoadExecutor-4-thread-1] ERROR org.apache.spark.SparkContext  – 
> Error initializing SparkContext.
> java.lang.ClassNotFoundException: org.apache.spark.rpc.akka.AkkaRpcEnvFactory
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at 
> org.eclipse.jetty.webapp.WebAppClassLoader.loadClass(WebAppClassLoader.java:430)
> at 
> org.eclipse.jetty.webapp.WebAppClassLoader.loadClass(WebAppClassLoader.java:383)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:274)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:173)
> at org.apache.spark.rpc.RpcEnv$.getRpcEnvFactory(RpcEnv.scala:42)
> at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:53)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:252)
> at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
> at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:276)
> at org.apache.spark.SparkContext.(SparkContext.scala:441)
> at 
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
> at 
> com.snapdeal.search.spark.SparkLoadModel.loadModel(SparkLoadModel.java:11)
> at 
> com.snapdeal.search.valuesource.parser.RankingModelValueSourceParser.init(RankingModelValueSourceParser.java:29)
> at org.apache.solr.core.SolrCore.createInitInstance(SolrCore.java:591)
> at org.apache.solr.core.SolrCore.initPlugins(SolrCore.java:2191)
> at org.apache.solr.core.SolrCore.initPlugins(SolrCore.java:2185)
> at org.apache.solr.core.SolrCore.initPlugins(SolrCore.java:2218)
> at 
> org.apache.solr.core.SolrCore.initValueSourceParsers(SolrCore.java:2130)
> at org.apache.solr.core.SolrCore.(SolrCore.java:765)
> at org.apache.solr.core.SolrCore.(SolrCore.java:630)
> at 
> org.apache.solr.core.CoreContainer.createFromLocal(CoreContainer.java:562)
> at org.apache.solr.core.CoreContainer.create(CoreContainer.java:597)
> at org.apache.solr.core.CoreContainer$1.call(CoreContainer.java:258)
> at 

Re: Kryo serializer Exception during serialization: java.io.IOException: java.lang.IllegalArgumentException:

2016-01-08 Thread jiml
(point of post is to see if anyone has ideas about errors at end of post)

In addition, the real way to test if it's working is to force serialization:

In Java:

Create array of all your classes:
// for kyro serializer it wants to register all classes that need to be
serialized
Class[] kryoClassArray = new Class[]{DropResult.class, DropEvaluation.class,
PrintHetSharing.class};

in the builder for your SparkConf (or in conf/spark-defaults.sh)
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
//require registration of all classes with Kyro
.set("spark.kryo.registrationRequired", "true")
// don't forget to register ALL classes or will get error
.registerKryoClasses(kryoClassArray);

Then you will start to get neat errors like the one I am working on:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Failed to serialize task 0, not attempting to retry it.
Exception during serialization: java.io.IOException:
java.lang.IllegalArgumentException: Class is not registered:
scala.collection.mutable.WrappedArray$ofRef
Note: To register this class use:
kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);

I did try adding scala.collection.mutable.WrappedArray to the Class array up
top but no luck. Thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/kryos-serializer-tp16454p25921.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kryo serializer Exception during serialization: java.io.IOException: java.lang.IllegalArgumentException:

2016-01-08 Thread Ted Yu
bq. try adding scala.collection.mutable.WrappedArray

But the hint said registering scala.collection.mutable.WrappedArray$ofRef.class
, right ?

On Fri, Jan 8, 2016 at 8:52 AM, jiml  wrote:

> (point of post is to see if anyone has ideas about errors at end of post)
>
> In addition, the real way to test if it's working is to force
> serialization:
>
> In Java:
>
> Create array of all your classes:
> // for kyro serializer it wants to register all classes that need to be
> serialized
> Class[] kryoClassArray = new Class[]{DropResult.class,
> DropEvaluation.class,
> PrintHetSharing.class};
>
> in the builder for your SparkConf (or in conf/spark-defaults.sh)
> .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> //require registration of all classes with Kyro
> .set("spark.kryo.registrationRequired", "true")
> // don't forget to register ALL classes or will get error
> .registerKryoClasses(kryoClassArray);
>
> Then you will start to get neat errors like the one I am working on:
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due
> to stage failure: Failed to serialize task 0, not attempting to retry it.
> Exception during serialization: java.io.IOException:
> java.lang.IllegalArgumentException: Class is not registered:
> scala.collection.mutable.WrappedArray$ofRef
> Note: To register this class use:
> kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
>
> I did try adding scala.collection.mutable.WrappedArray to the Class array
> up
> top but no luck. Thanks
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/kryos-serializer-tp16454p25921.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>