Re: how to load compressed (gzip) csv file using spark-csv

2016-06-16 Thread Vamsi Krishna
Thanks. It works.

On Thu, Jun 16, 2016 at 5:32 PM Hyukjin Kwon  wrote:

> It will 'auto-detect' the compression codec by the file extension and then
> will decompress and read it correctly.
>
> Thanks!
>
> 2016-06-16 20:27 GMT+09:00 Vamsi Krishna :
>
>> Hi,
>>
>> I'm using Spark 1.4.1 (HDP 2.3.2).
>> As per the spark-csv documentation (
>> https://github.com/databricks/spark-csv), I see that we can write to a
>> csv file in compressed form using the 'codec' option.
>> But, didn't see the support for 'codec' option to read a csv file.
>>
>> Is there a way to read a compressed (gzip) file using spark-csv?
>>
>> Thanks,
>> Vamsi Attluri
>> --
>> Vamsi Attluri
>>
>
> --
Vamsi Attluri


Re: test - what is the wrong while adding one column in the dataframe

2016-06-16 Thread Zhiliang Zhu
just for test, since it seemed that the user email system was something wrong 
ago, is okay now.



On Friday, June 17, 2016 12:18 PM, Zhiliang Zhu 
 wrote:
 

 

 On Tuesday, May 17, 2016 10:44 AM, Zhiliang Zhu 
 wrote:
 

  Hi All,
For the given DataFrame created by hive sql, however, then it is required to 
add one more column based on the existing column, and should also keep the 
previous columns there for the result DataFrame.

final double DAYS_30 = 1000 * 60 * 60 * 24 * 30.0;
//DAYS_30 seems difficult to call in the sql ? 
DataFrame behavior_df = jhql.sql("SELECT cast (user_id as double) as user_id, 
cast (server_timestamp as 
   double) as server_timestamp, url, referer, source, 
app_version, params FROM log.request");
//it is okay to run, but behavior_df.printSchema() not changed any
behavior_df.withColumn("daysLater30", 
behavior_df.col("server_timestamp").plus(DAYS_30));

//it is okay to run, but behavior_df.printSchema() only has one column as 
daysLater30 .//it would be the schema is with the previous all columns and 
added one as daysLater30 
behavior_df = behavior_df.withColumn("daysLater30", 
behavior_df.col("server_timestamp").plus(DAYS_30));
Then, how would do it?
Thank you, 

 

the issue was resolved.

   

  

Re: Spark crashes worker nodes with multiple application starts

2016-06-16 Thread Deepak Goel
Well, my only guess (It is just a guess, as I don't have access to the
machines which requires a hard reset)..The system is running into some kind
of race condition while accessing the disk...And is not able to solve
this..hence it is hanging (well this is a pretty vague statement, but it
seems it will require some trial and error to figure out why exactly the
system is hanging)...Also I believe you are using HDFS as data
storage..HDFS relaxes some POSIX requirements for faster data access to the
system, i wonder if this is the cause

Hey

Namaskara~Nalama~Guten Tag~Bonjour


   --
Keigu

Deepak
73500 12833
www.simtree.net, dee...@simtree.net
deic...@gmail.com

LinkedIn: www.linkedin.com/in/deicool
Skype: thumsupdeicool
Google talk: deicool
Blog: http://loveandfearless.wordpress.com
Facebook: http://www.facebook.com/deicool

"Contribute to the world, environment and more : http://www.gridrepublic.org
"

On Thu, Jun 16, 2016 at 10:54 PM, Carlile, Ken 
wrote:

> Hi Deepak,
>
> Yes, that’s about the size of it. The spark job isn’t filling the disk by
> any stretch of the imagination; in fact the only stuff that’s writing to
> the disk from Spark in certain of these instances is the logging.
>
> Thanks,
> —Ken
>
> On Jun 16, 2016, at 12:17 PM, Deepak Goel  wrote:
>
> I guess what you are saying is:
>
> 1. The nodes work perfectly ok without io wait before Spark job.
> 2. After you have run Spark job and killed it, the io wait persist.
>
> So what it seems, the Spark Job is altering the disk in such a way that
> other programs can't access the disk after the spark job is killed. (A
> naive thought) I wonder if the spark job fills up the disk so that no other
> program on your node could write to it and hence the io wait.
>
> Also facter just normally reads up your system so it shouldn't block your
> system. There must be some other background scripts running on your node
> which are writing to the disk perhaps..
>
> Hey
>
> Namaskara~Nalama~Guten Tag~Bonjour
>
>
>--
> Keigu
>
> Deepak
> 73500 12833
> www.simtree.net, dee...@simtree.net
> deic...@gmail.com
>
> LinkedIn: www.linkedin.com/in/deicool
> Skype: thumsupdeicool
> Google talk: deicool
> Blog: http://loveandfearless.wordpress.com
> Facebook: http://www.facebook.com/deicool
>
> "Contribute to the world, environment and more :
> http://www.gridrepublic.org
> "
>
> On Thu, Jun 16, 2016 at 5:56 PM, Carlile, Ken 
> wrote:
>
>> 1. There are 320 nodes in total, with 96 dedicated to Spark. In this
>> particular case, 21 are in the Spark cluster. In typical Spark usage, maybe
>> 1-3 nodes will crash in a day, with probably an average of 4-5 Spark
>> clusters running at a given time. In THIS case, 7-12 nodes will crash
>> simultaneously on application termination (not Spark cluster termination,
>> but termination of a Spark application/jupyter kernel)
>> 2. I’ve turned off puppet, no effect. I’ve not fully disabled facter. The
>> iowait persists after the scheduler kills the Spark job (that still works,
>> at least)
>> 3. He’s attempted to run with 15 cores out of 16 and 25GB of RAM out of
>> 128. He still lost nodes.
>> 4. He’s currently running storage benchmarking tests, which consist
>> mainly of shuffles.
>>
>> Thanks!
>> Ken
>>
>> On Jun 16, 2016, at 8:00 AM, Deepak Goel  wrote:
>>
>> I am no expert, but some naive thoughts...
>>
>> 1. How many HPC nodes do you have? How many of them crash (What do you
>> mean by multiple)? Do all of them crash?
>>
>> 2. What things are you running on Puppet? Can't you switch it off and
>> test Spark? Also you can switch of Facter. Btw, your observation that there
>> is iowait on these applications might be because they have low priority
>> than Spark. Hence they are waiting for Spark to finish. So the real
>> bottleneck might be Spark and not these background processes
>>
>> 3. Limiting cpu's and memory for Spark, might have an inverse effect on
>> iowait. As more of Spark processes would have to access the disk due to
>> reduced memory and CPU
>>
>> 4. Offcourse, you might have to give more info on what kind of
>> applications you are running on Spark as they might be the main culpirit
>>
>> Deepak
>>
>> Hey
>>
>> Namaskara~Nalama~Guten Tag~Bonjour
>>
>>
>>--
>> Keigu
>>
>> Deepak
>> 73500 12833
>> www.simtree.net, dee...@simtree.net
>> deic...@gmail.com
>>
>> LinkedIn: www.linkedin.com/in/deicool
>> Skype: thumsupdeicool
>> Google talk: deicool
>> Blog: http://loveandfearless.wordpress.com
>> Facebook: http://www.facebook.com/deicool
>>
>> "Contribute to the world, environment and more :
>> http://www.gridrepublic.org
>> "
>>
>> On Thu, Jun 16, 2016 at 5:10 PM, Carlile, Ken 
>> wrote:
>>
>>> We run Spark on a general purpose HPC cluster (using standalone mode and
>>> the HPC scheduler), and are currently on Spark 1.6.1. One of the primary
>>> users has been testing various storage and other 

test - what is the wrong while adding one column in the dataframe

2016-06-16 Thread Zhiliang Zhu


 On Tuesday, May 17, 2016 10:44 AM, Zhiliang Zhu 
 wrote:
 

  Hi All,
For the given DataFrame created by hive sql, however, then it is required to 
add one more column based on the existing column, and should also keep the 
previous columns there for the result DataFrame.

final double DAYS_30 = 1000 * 60 * 60 * 24 * 30.0;
//DAYS_30 seems difficult to call in the sql ? 
DataFrame behavior_df = jhql.sql("SELECT cast (user_id as double) as user_id, 
cast (server_timestamp as 
   double) as server_timestamp, url, referer, source, 
app_version, params FROM log.request");
//it is okay to run, but behavior_df.printSchema() not changed any
behavior_df.withColumn("daysLater30", 
behavior_df.col("server_timestamp").plus(DAYS_30));

//it is okay to run, but behavior_df.printSchema() only has one column as 
daysLater30 .//it would be the schema is with the previous all columns and 
added one as daysLater30 
behavior_df = behavior_df.withColumn("daysLater30", 
behavior_df.col("server_timestamp").plus(DAYS_30));
Then, how would do it?
Thank you, 

 

  

Re: Spark Kafka stream processing time increasing gradually

2016-06-16 Thread Roshan Singh
Hi,
According to the docs (
https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.DStream.reduceByKeyAndWindow),
filerFunc can be used to retain expiring keys. I do not want to retain any
expiring key, so I do not understand how can this help me stabilize it.
Please correct me if this is not the case.

I am also specifying both reduceFunc and invReduceFunc. Can you can a
sample code of what you are using.

Thanks.

On Fri, Jun 17, 2016 at 3:43 AM, N B  wrote:

> We had this same issue with the reduceByKeyAndWindow API that you are
> using. For fixing this issue, you have to use  different flavor of that
> API, specifically the 2 versions that allow you to give a 'Filter function'
> to them. Putting in the filter functions helped stabilize our application
> too.
>
> HTH
> NB
>
>
> On Sun, Jun 12, 2016 at 11:19 PM, Roshan Singh 
> wrote:
>
>> Hi all,
>> I have a python streaming job which is supposed to run 24x7. I am unable
>> to stabilize it. The job just counts no of links shared in a 30 minute
>> sliding window. I am using reduceByKeyAndWindow operation with a batch of
>> 30 seconds, slide interval of 60 seconds.
>>
>> The kafka queue has a rate of nearly 2200 messages/second which can
>> increase to 3000 but the mean is 2200.
>>
>> I have played around with batch size, slide interval, and by increasing
>> parallelism with no fruitful result. These just delay the destabilization.
>>
>> GC time is usually between 60-100 ms.
>>
>> I also noticed that the jobs were not distributed to other nodes in the
>> spark UI, for which I have used configured spark.locality.wait as 100ms.
>> After which I have noticed that the job is getting distributed properly.
>>
>> I have a cluster of 6 slaves and one master each with 16 cores and 15gb
>> of ram.
>>
>> Code and configuration: http://pastebin.com/93DMSiji
>>
>> Streaming screenshot: http://imgur.com/psNfjwJ
>>
>> I need help in debugging the issue. Any help will be appreciated.
>>
>> --
>> Roshan Singh
>>
>>
>


-- 
Roshan Singh
http://roshansingh.in


sparkR.init() can not load sparkPackages.

2016-06-16 Thread Joseph
Hi all,

I find an issue in sparkR, maybe it's a bug:

When I read csv file, it's normal to use the following way:
${SPARK_HOME}/bin/spark-submit  --packages com.databricks:spark-csv_2.11:1.4.0  
 example.R 

But using the following way will give an error:
sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.4.0")

16/06/17 09:54:12 ERROR RBackendHandler: loadDF on 
org.apache.spark.sql.api.r.SQLUtils failed
Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
  java.lang.ClassNotFoundException: Failed to find data source: csv. Please 
find packages at http://spark-packages.org
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:77)

It is obvious that the sparkR.init() does not load the specified package!
-

Appendix:
The complete code for example.R:

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/home/hadoop/spark-1.6.1-bin-hadoop2.6")
}

library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))

sc <- sparkR.init(master = "local[2]", sparkEnvir = 
list(spark.driver.memory="1g"), 
sparkPackages="com.databricks:spark-csv_2.11:1.4.0")

sqlContext <- sparkRSQL.init(sc)
people <- read.df(sqlContext, 
"file:/home/hadoop/spark-1.6.1-bin-hadoop2.6/data/mllib/sample_tree_data.csv", 
"csv")
registerTempTable(people, "people")
teenagers <- sql(sqlContext, "SELECT * FROM people")
head(teenagers)



Joseph


Re: Reporting warnings from workers

2016-06-16 Thread Mathieu Longtin
It turns out you can easily use a Python set, so I can send back a list of
failed files. Thanks.

On Wed, Jun 15, 2016 at 4:28 PM Ted Yu  wrote:

> Have you looked at:
>
> https://spark.apache.org/docs/latest/programming-guide.html#accumulators
>
> On Wed, Jun 15, 2016 at 1:24 PM, Mathieu Longtin 
> wrote:
>
>> Is there a way to report warnings from the workers back to the driver
>> process?
>>
>> Let's say I have an RDD and do this:
>>
>> newrdd = rdd.map(somefunction)
>>
>> In *somefunction*, I want to catch when there are invalid values in *rdd
>> *and either put them in another RDD or send some sort of message back.
>>
>> Is that possible?
>> --
>> Mathieu Longtin
>> 1-514-803-8977
>>
>
> --
Mathieu Longtin
1-514-803-8977


RE: Can I control the execution of Spark jobs?

2016-06-16 Thread Haopu Wang
Jacek,

For example, one ETL job is saving raw events and update a file.
The other job is using that file's content to process the data set.

In this case, the first job has to be done before the second one. That's what I 
mean by dependency. Any suggestions/comments are appreciated.

-Original Message-
From: Jacek Laskowski [mailto:ja...@japila.pl] 
Sent: 2016年6月16日 19:09
To: user
Subject: Re: Can I control the execution of Spark jobs?

Hi,

When you say "several ETL types of things", what is this exactly? What
would an example of "dependency between these jobs" be?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Jun 16, 2016 at 11:36 AM, Haopu Wang  wrote:
> Hi,
>
>
>
> Suppose I have a spark application which is doing several ETL types of
> things.
>
> I understand Spark can analyze and generate several jobs to execute.
>
> The question is: is it possible to control the dependency between these
> jobs?
>
>
>
> Thanks!
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to execute sparkr jobs through Chronos

2016-06-16 Thread Sun Rui
I saw in the job definition an Env Var: SPARKR_MASTER. What is that for? I 
don’t think SparkR uses it.
> On Jun 17, 2016, at 10:08, Sun Rui  wrote:
> 
> It seems that spark master URL is not correct. What is it?
>> On Jun 16, 2016, at 18:57, Rodrick Brown > > wrote:
>> 
>> Master must start with yarn, spark, mesos, or local
> 



Re: Unable to execute sparkr jobs through Chronos

2016-06-16 Thread Sun Rui
It seems that spark master URL is not correct. What is it?
> On Jun 16, 2016, at 18:57, Rodrick Brown  wrote:
> 
> Master must start with yarn, spark, mesos, or local



RE: Spark SQL driver memory keeps rising

2016-06-16 Thread Mohammed Guller
I haven’t read the code yet, but when you invoke spark-submit, where are you 
specifying --master yarn --deploy-mode client? Is it in the default config file 
and are you sure that spark-submit is reading the right file?

Mohammed
Author: Big Data Analytics with 
Spark

From: Khaled Hammouda [mailto:khaled.hammo...@kik.com]
Sent: Thursday, June 16, 2016 11:45 AM
To: Mohammed Guller
Cc: user
Subject: Re: Spark SQL driver memory keeps rising

I'm using pyspark and running in YARN client mode. I managed to anonymize the 
code a bit and pasted it below.

You'll notice that I don't collect any output in the driver, instead the data 
is written to parquet directly. Also notice that I increased 
spark.driver.maxResultSize to 10g because the job was failing with the error 
"serialized results of x tasks () is bigger than spark.driver.maxResultSize 
(xxx)", which means the tasks were sending something to the driver, and that's 
probably what's causing the driver memory usage to keep rising. This happens at 
stages that read/write shuffled data (as opposed to input stages).

I also noticed this message appear many times in the output "INFO: 
MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 
4 to ip-xx-xx-xx-xx.ec2.internal:49490". I'm not sure if it's relevant.

Cluster setup:
- AWS EMR 4.7.1 using Spark 1.6.1
- 200 nodes (r3.2xlarge, 61 GB memory), and the master node is r3.4xlarge (122 
GB memory)
- 1.4 TB json logs split across ~70k files in S3


# spark-submit --driver-memory 100g --executor-memory 44g --executor-cores 8 
--conf spark.yarn.executor.memoryOverhead=9000 --num-executors 199 
log_cleansing.py

conf = SparkConf()
conf.setAppName('log_cleansing') \
.set("spark.driver.maxResultSize", "10g") \
.set("spark.hadoop.spark.sql.parquet.output.committer.class", 
"org.apache.spark.sql.execution.datasources.parquet.DirectParquetOutputCommitter")
 \
.set("spark.hadoop.mapred.output.committer.class", 
"org.apache.hadoop.mapred.DirectFileOutputCommitter") \
.set("spark.hadoop.mapreduce.use.directfileoutputcommitter", "true")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions", "3600")

# raw data is stored in s3 and is split across 10s of thousands of files
# (each file is about 20MB, total data set size is around 1.4 TB)
# so we coalesce to a manageble number of partitions and store it in hdfs
text = sc.textFile("s3://json_logs/today/*/*").coalesce(400)
text.saveAsTextFile("hdfs:///json_logs/today")

# now read the json data set from hdfs
json_logs = sqlContext.read.json("hdfs:///json_logs/today")
sqlContext.registerDataFrameAsTable(json_logs, "json_logs")

# this cleans the date and only pulls in events within two days of their 
ingestion ts, elimiated pure duplicates
clean_date_and_gap = sqlContext.sql(
 '''SELECT DISTINCT t.*
FROM json_logs t
WHERE datediff(to_utc_timestamp(`ingestionTime`, 'UTC'), 
to_utc_timestamp(`timestamp`, 'UTC')) < 2
  AND to_utc_timestamp(`timestamp`, 'UTC') IS NOT NULL''')
sqlContext.registerDataFrameAsTable(clean_date_and_gap, "clean_date_and_gap")
clean_date_and_gap.cache()

# extract unique event instanceIds and their min ts
distinct_intanceIds = sqlContext.sql(
 '''SELECT
  instanceId,
  min(to_utc_timestamp(`ingestionTime`, 'UTC')) min_ingestion_ts
FROM clean_date_and_gap
GROUP BY instanceId''')
sqlContext.registerDataFrameAsTable(distinct_intanceIds, "distinct_intanceIds")

# create table with only the min ingestion ts records
deduped_day = sqlContext.sql(
 '''SELECT t.*
FROM clean_date_and_gap t
JOIN distinct_intanceIds d
  ON (d.instanceId = t.instanceId AND
  to_utc_timestamp(`ingestionTime`, 'UTC') = d.min_ingestion_ts)''')

# drop weird columns
deduped_day = deduped_day.drop('weird column name1') \
 .drop('weird column name2') \
 .drop('weird column name3')

# standardize all column names so that Parquet is happy
# rename() adds clean alias names to hide problematic column names
oldCols = deduped_day.schema.names
deduped_day_1 = deduped_day.select(*(rename(oldCols)))

sqlContext.registerDataFrameAsTable(deduped_day_1, 'deduped_day_1')
deduped_day_1.cache()

clean_date_and_gap.unpersist()

# load prior days isntanceids for deduping purpose
last1_instanceIds = 
sqlContext.read.parquet("s3://unique_instanceids/today_minus_1day/*")
last2_instanceIds = 
sqlContext.read.parquet("s3://unique_instanceids/today_minus_2day/*")
prior_instanceIds = 
last1_instanceIds.unionAll(last2_instanceIds).distinct().cache()
sqlContext.registerDataFrameAsTable(prior_instanceIds, 'prior_instanceIds')

# determine the overlap from prior days
overlap_from_prior_days = sqlContext.sql(
 '''SELECT DISTINCT
  d.Instanceid,
  case when y.Instanceid is not null then 1 else 0 end as seen_before
FROM deduped_day_1 d

Re: Spark Memory Error - Not enough space to cache broadcast

2016-06-16 Thread Deepak Goel
Seems like the exexutor memory is not enough for your job and it is writing
objects to disk
On Jun 17, 2016 2:25 AM, "Cassa L"  wrote:

>
>
> On Thu, Jun 16, 2016 at 5:27 AM, Deepak Goel  wrote:
>
>> What is your hardware configuration like which you are running Spark on?
>>
>> It  is 24core, 128GB RAM
>
>> Hey
>>
>> Namaskara~Nalama~Guten Tag~Bonjour
>>
>>
>>--
>> Keigu
>>
>> Deepak
>> 73500 12833
>> www.simtree.net, dee...@simtree.net
>> deic...@gmail.com
>>
>> LinkedIn: www.linkedin.com/in/deicool
>> Skype: thumsupdeicool
>> Google talk: deicool
>> Blog: http://loveandfearless.wordpress.com
>> Facebook: http://www.facebook.com/deicool
>>
>> "Contribute to the world, environment and more :
>> http://www.gridrepublic.org
>> "
>>
>> On Thu, Jun 16, 2016 at 5:33 PM, Jacek Laskowski  wrote:
>>
>>> Hi,
>>>
>>> What do you see under Executors and Details for Stage (for the
>>> affected stages)? Anything weird memory-related?
>>>
>>> How does your "I am reading data from Kafka into Spark and writing it
>>> into Cassandra after processing it." pipeline look like?
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Mon, Jun 13, 2016 at 11:56 PM, Cassa L  wrote:
>>> > Hi,
>>> >
>>> > I'm using spark 1.5.1 version. I am reading data from Kafka into Spark
>>> and
>>> > writing it into Cassandra after processing it. Spark job starts fine
>>> and
>>> > runs all good for some time until I start getting below errors. Once
>>> these
>>> > errors come, job start to lag behind and I see that job has scheduling
>>> and
>>> > processing delays in streaming  UI.
>>> >
>>> > Worker memory is 6GB, executor-memory is 5GB, I also tried to tweak
>>> > memoryFraction parameters. Nothing works.
>>> >
>>> >
>>> > 16/06/13 21:26:02 INFO MemoryStore: ensureFreeSpace(4044) called with
>>> > curMem=565394, maxMem=2778495713
>>> > 16/06/13 21:26:02 INFO MemoryStore: Block broadcast_69652_piece0
>>> stored as
>>> > bytes in memory (estimated size 3.9 KB, free 2.6 GB)
>>> > 16/06/13 21:26:02 INFO TorrentBroadcast: Reading broadcast variable
>>> 69652
>>> > took 2 ms
>>> > 16/06/13 21:26:02 WARN MemoryStore: Failed to reserve initial memory
>>> > threshold of 1024.0 KB for computing block broadcast_69652 in memory.
>>> > 16/06/13 21:26:02 WARN MemoryStore: Not enough space to cache
>>> > broadcast_69652 in memory! (computed 496.0 B so far)
>>> > 16/06/13 21:26:02 INFO MemoryStore: Memory use = 556.1 KB (blocks) +
>>> 2.6 GB
>>> > (scratch space shared across 0 tasks(s)) = 2.6 GB. Storage limit = 2.6
>>> GB.
>>> > 16/06/13 21:26:02 WARN MemoryStore: Persisting block broadcast_69652
>>> to disk
>>> > instead.
>>> > 16/06/13 21:26:02 INFO BlockManager: Found block rdd_100761_1 locally
>>> > 16/06/13 21:26:02 INFO Executor: Finished task 0.0 in stage 71577.0
>>> (TID
>>> > 452316). 2043 bytes result sent to driver
>>> >
>>> >
>>> > Thanks,
>>> >
>>> > L
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Skew data

2016-06-16 Thread Selvam Raman
Hi,

What is skew data.

I read that if the data was skewed while joining it would take long time to
finish the job.(99 percent finished in seconds where 1 percent of task
taking minutes to hour).

How to handle skewed data in spark.

Thanks,
Selvam R
+91-97877-87724


Re: how to load compressed (gzip) csv file using spark-csv

2016-06-16 Thread Hyukjin Kwon
It will 'auto-detect' the compression codec by the file extension and then
will decompress and read it correctly.

Thanks!

2016-06-16 20:27 GMT+09:00 Vamsi Krishna :

> Hi,
>
> I'm using Spark 1.4.1 (HDP 2.3.2).
> As per the spark-csv documentation (
> https://github.com/databricks/spark-csv), I see that we can write to a
> csv file in compressed form using the 'codec' option.
> But, didn't see the support for 'codec' option to read a csv file.
>
> Is there a way to read a compressed (gzip) file using spark-csv?
>
> Thanks,
> Vamsi Attluri
> --
> Vamsi Attluri
>


Spark Streaming WAL issue**: File exists and there is no append support!

2016-06-16 Thread tosaigan...@gmail.com
Hello

I am using Azure Blob storage for Wal persistence. Iam getting below
warnings in driver logs.

Is it it something related to Compatibility/throttling issues with storage?

java.lang.IllegalStateException: File exists and there is no append support!
at
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
at
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
at
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
at
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:41)
at
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
at
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
at
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
at
org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
at org.apache.spark



-
Sai Ganesh
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-WAL-issue-File-exists-and-there-is-no-append-support-tp27185.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Neither previous window has value for key, nor new values found.

2016-06-16 Thread N B
That post from TD that you reference has a good explanation of the issue
you are encountering. The issue in my view here is that the reduce and the
inverseReduce function that you have specified are not perfect opposites of
each other. Consider the following strings:

"a"
"b"
"a"

forward reduce will form them into "aba".
When the first "a" falls off the window, your inverse reduce function will
produce "b" and not "ba" as would be required by the opposite of the
concatenation. Now when the second "a" falls off, the check for this being
inconsistent is triggered with the exception you see.

HTH
NB


On Fri, Jun 10, 2016 at 1:19 PM, Marco1982  wrote:

> Hi all,
>
> I'm running a Spark Streaming application that uses reduceByKeyAndWindow().
> The window interval is 2 hours, while the slide interval is 1 hour. I have
> a
> JavaPairRDD in which both keys and values are strings. Each time the
> reduceByKeyAndWindow() function is called, it uses appendString() and
> removeString() functions below to incrementally build a windowed stream of
> data:
>
> Function2 appendString = new Function2 String, String>() {
>   @Override
>   public String call(String s1, String s2) {
> return s1 + s2;
>   }
> };
>
> Function2 removeString = new Function2 String, String>() {
>   @Override
>   public String call(String s1, String s2) {
> return s1.replace(s2, "");
>   }
> };
>
> filterEmptyRecords() removes keys that eventually won't contain any value:
>
> Function, Boolean> filterEmptyRecords =
> new Function, Boolean>() {
>   @Override
>   public Boolean call(scala.Tuple2 t) {
> return (!t._2().isEmpty());
>   }
> };
>
> The windowed operation is then:
>
> JavaPairDStream cdr_kv =
> cdr_filtered.reduceByKeyAndWindow(appendString, removeString,
> Durations.seconds(WINDOW_DURATION), Durations.seconds(SLIDE_DURATION),
> PARTITIONS, filterEmptyRecords);
>
> After a few hours of operation, this function raises the following
> exception:
> "Neither previous window has value for key, nor new values found. Are you
> sure your key class hashes consistently?"
>
> I've found this post from 2013:
> https://groups.google.com/forum/#!msg/spark-users/9OM1YvWzwgE/PhFgdSTP2OQJ
> which however doesn't solve my problem. I'm using String to represent keys,
> which I'm pretty sure hash consistently.
>
> Any clue why this happens and possible suggestions to fix it?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Neither-previous-window-has-value-for-key-nor-new-values-found-tp27140.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Kafka stream processing time increasing gradually

2016-06-16 Thread N B
We had this same issue with the reduceByKeyAndWindow API that you are
using. For fixing this issue, you have to use  different flavor of that
API, specifically the 2 versions that allow you to give a 'Filter function'
to them. Putting in the filter functions helped stabilize our application
too.

HTH
NB


On Sun, Jun 12, 2016 at 11:19 PM, Roshan Singh 
wrote:

> Hi all,
> I have a python streaming job which is supposed to run 24x7. I am unable
> to stabilize it. The job just counts no of links shared in a 30 minute
> sliding window. I am using reduceByKeyAndWindow operation with a batch of
> 30 seconds, slide interval of 60 seconds.
>
> The kafka queue has a rate of nearly 2200 messages/second which can
> increase to 3000 but the mean is 2200.
>
> I have played around with batch size, slide interval, and by increasing
> parallelism with no fruitful result. These just delay the destabilization.
>
> GC time is usually between 60-100 ms.
>
> I also noticed that the jobs were not distributed to other nodes in the
> spark UI, for which I have used configured spark.locality.wait as 100ms.
> After which I have noticed that the job is getting distributed properly.
>
> I have a cluster of 6 slaves and one master each with 16 cores and 15gb of
> ram.
>
> Code and configuration: http://pastebin.com/93DMSiji
>
> Streaming screenshot: http://imgur.com/psNfjwJ
>
> I need help in debugging the issue. Any help will be appreciated.
>
> --
> Roshan Singh
>
>


Update Batch DF with Streaming

2016-06-16 Thread Amit Assudani
Hi All,


Can I update batch data frames loaded in memory with Streaming data,


For eg,


I have employee DF is registered as temporary table, it has EmployeeID, Name, 
Address, etc. fields,  and assuming it is very big and takes time to load in 
memory,


I've two types of employee events (both having empID bundled in payload) coming 
in streams,


1) which looks up  for a particular empID in batch data and does some 
calculation and persist the results,

2) which has updated values of some of the fields for an empID,


Now I want to keep the employee DF up to date with the updates coming in type 2 
events for future type 1 events to use,


Now the question is can I update the employee DF with type 2 events in memory ? 
Do I need the whole DF refresh ?


p.s. I can join the stream with batch and get the joined table, but i am not 
sure how to get and use the handle of joined data for subsequent events,


Regards,

Amit








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Re: Spark jobs without a login

2016-06-16 Thread Ted Yu
Can you describe more about the container ?

Please show complete stack trace for the exception.

Thanks

On Thu, Jun 16, 2016 at 1:32 PM, jay vyas 
wrote:

> Hi spark:
>
> Is it possible to avoid reliance on a login user when running a spark job?
>
> I'm running out a container that doesnt supply a valid user name,
> and thus, I'm getting the following exception:
>
>
> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:675)
>
> I'm not too worries about this - but it seems like it might be nice if
> maybe we could specify a user name as part of sparks context or as part of
> an external parameter rather then having to
>  use the java based user/group extractor.
>
>
>
> --
> jay vyas
>


Re: Spark Memory Error - Not enough space to cache broadcast

2016-06-16 Thread Cassa L
On Thu, Jun 16, 2016 at 5:27 AM, Deepak Goel  wrote:

> What is your hardware configuration like which you are running Spark on?
>
> It  is 24core, 128GB RAM

> Hey
>
> Namaskara~Nalama~Guten Tag~Bonjour
>
>
>--
> Keigu
>
> Deepak
> 73500 12833
> www.simtree.net, dee...@simtree.net
> deic...@gmail.com
>
> LinkedIn: www.linkedin.com/in/deicool
> Skype: thumsupdeicool
> Google talk: deicool
> Blog: http://loveandfearless.wordpress.com
> Facebook: http://www.facebook.com/deicool
>
> "Contribute to the world, environment and more :
> http://www.gridrepublic.org
> "
>
> On Thu, Jun 16, 2016 at 5:33 PM, Jacek Laskowski  wrote:
>
>> Hi,
>>
>> What do you see under Executors and Details for Stage (for the
>> affected stages)? Anything weird memory-related?
>>
>> How does your "I am reading data from Kafka into Spark and writing it
>> into Cassandra after processing it." pipeline look like?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Mon, Jun 13, 2016 at 11:56 PM, Cassa L  wrote:
>> > Hi,
>> >
>> > I'm using spark 1.5.1 version. I am reading data from Kafka into Spark
>> and
>> > writing it into Cassandra after processing it. Spark job starts fine and
>> > runs all good for some time until I start getting below errors. Once
>> these
>> > errors come, job start to lag behind and I see that job has scheduling
>> and
>> > processing delays in streaming  UI.
>> >
>> > Worker memory is 6GB, executor-memory is 5GB, I also tried to tweak
>> > memoryFraction parameters. Nothing works.
>> >
>> >
>> > 16/06/13 21:26:02 INFO MemoryStore: ensureFreeSpace(4044) called with
>> > curMem=565394, maxMem=2778495713
>> > 16/06/13 21:26:02 INFO MemoryStore: Block broadcast_69652_piece0 stored
>> as
>> > bytes in memory (estimated size 3.9 KB, free 2.6 GB)
>> > 16/06/13 21:26:02 INFO TorrentBroadcast: Reading broadcast variable
>> 69652
>> > took 2 ms
>> > 16/06/13 21:26:02 WARN MemoryStore: Failed to reserve initial memory
>> > threshold of 1024.0 KB for computing block broadcast_69652 in memory.
>> > 16/06/13 21:26:02 WARN MemoryStore: Not enough space to cache
>> > broadcast_69652 in memory! (computed 496.0 B so far)
>> > 16/06/13 21:26:02 INFO MemoryStore: Memory use = 556.1 KB (blocks) +
>> 2.6 GB
>> > (scratch space shared across 0 tasks(s)) = 2.6 GB. Storage limit = 2.6
>> GB.
>> > 16/06/13 21:26:02 WARN MemoryStore: Persisting block broadcast_69652 to
>> disk
>> > instead.
>> > 16/06/13 21:26:02 INFO BlockManager: Found block rdd_100761_1 locally
>> > 16/06/13 21:26:02 INFO Executor: Finished task 0.0 in stage 71577.0 (TID
>> > 452316). 2043 bytes result sent to driver
>> >
>> >
>> > Thanks,
>> >
>> > L
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark Memory Error - Not enough space to cache broadcast

2016-06-16 Thread Cassa L
Hi,

>
> What do you see under Executors and Details for Stage (for the
> affected stages)? Anything weird memory-related?
>
Under executor Tab, logs throw these warning -

16/06/16 20:45:40 INFO TorrentBroadcast: Reading broadcast variable
422145 took 1 ms
16/06/16 20:45:40 WARN MemoryStore: Failed to reserve initial memory
threshold of 1024.0 KB for computing block broadcast_422145 in memory.
16/06/16 20:45:40 WARN MemoryStore: Not enough space to cache
broadcast_422145 in memory! (computed 496.0 B so far)
16/06/16 20:45:40 INFO MemoryStore: Memory use = 147.9 KB (blocks) +
2.2 GB (scratch space shared across 0 tasks(s)) = 2.2 GB. Storage
limit = 2.2 GB.
16/06/16 20:45:40 WARN MemoryStore: Persisting block broadcast_422145
to disk instead.
16/06/16 20:45:40 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 70278, fetching them

16/06/16 20:45:40 INFO MapOutputTrackerWorker: Doing the fetch; tracker
endpoint = AkkaRpcEndpointRef(Actor[akka.tcp://
sparkDriver@17.40.240.71:46187/user/MapOutputTracker#-1794035569])

I dont see any memory related errors on 'stages' Tab.

>
> How does your "I am reading data from Kafka into Spark and writing it
> into Cassandra after processing it." pipeline look like?
>
> This part has no issues. Reading from Kafka is always up to date. There
are no offset lags. Writting to Cassandra is also fine with less than 1ms
to write data.


> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Mon, Jun 13, 2016 at 11:56 PM, Cassa L  wrote:
> > Hi,
> >
> > I'm using spark 1.5.1 version. I am reading data from Kafka into Spark
> and
> > writing it into Cassandra after processing it. Spark job starts fine and
> > runs all good for some time until I start getting below errors. Once
> these
> > errors come, job start to lag behind and I see that job has scheduling
> and
> > processing delays in streaming  UI.
> >
> > Worker memory is 6GB, executor-memory is 5GB, I also tried to tweak
> > memoryFraction parameters. Nothing works.
> >
> >
> > 16/06/13 21:26:02 INFO MemoryStore: ensureFreeSpace(4044) called with
> > curMem=565394, maxMem=2778495713
> > 16/06/13 21:26:02 INFO MemoryStore: Block broadcast_69652_piece0 stored
> as
> > bytes in memory (estimated size 3.9 KB, free 2.6 GB)
> > 16/06/13 21:26:02 INFO TorrentBroadcast: Reading broadcast variable 69652
> > took 2 ms
> > 16/06/13 21:26:02 WARN MemoryStore: Failed to reserve initial memory
> > threshold of 1024.0 KB for computing block broadcast_69652 in memory.
> > 16/06/13 21:26:02 WARN MemoryStore: Not enough space to cache
> > broadcast_69652 in memory! (computed 496.0 B so far)
> > 16/06/13 21:26:02 INFO MemoryStore: Memory use = 556.1 KB (blocks) + 2.6
> GB
> > (scratch space shared across 0 tasks(s)) = 2.6 GB. Storage limit = 2.6
> GB.
> > 16/06/13 21:26:02 WARN MemoryStore: Persisting block broadcast_69652 to
> disk
> > instead.
> > 16/06/13 21:26:02 INFO BlockManager: Found block rdd_100761_1 locally
> > 16/06/13 21:26:02 INFO Executor: Finished task 0.0 in stage 71577.0 (TID
> > 452316). 2043 bytes result sent to driver
> >
> >
> > Thanks,
> >
> > L
>


Re: How to enable core dump in spark

2016-06-16 Thread prateek arora
hi

I am using spark with yarn .  how can i  make sure that the ulimit settings
are applied to the Spark process ?

I set core dump limit to unlimited in all nodes .
   Edit  /etc/security/limits.conf file and add  " * soft core unlimited "
line.

i rechecked  using :  $ ulimit -all

core file size  (blocks, -c) unlimited
data seg size   (kbytes, -d) unlimited
scheduling priority (-e) 0
file size   (blocks, -f) unlimited
pending signals (-i) 241204
max locked memory   (kbytes, -l) 64
max memory size (kbytes, -m) unlimited
open files  (-n) 1024
pipe size(512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority  (-r) 0
stack size  (kbytes, -s) 8192
cpu time   (seconds, -t) unlimited
max user processes  (-u) 241204
virtual memory  (kbytes, -v) unlimited
file locks  (-x) unlimited

Regards
Prateek


On Thu, Jun 16, 2016 at 4:46 AM, Jacek Laskowski  wrote:

> Hi,
>
> Can you make sure that the ulimit settings are applied to the Spark
> process? Is this Spark on YARN or Standalone?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Wed, Jun 1, 2016 at 7:55 PM, prateek arora
>  wrote:
> > Hi
> >
> > I am using cloudera to  setup spark 1.6.0  on ubuntu 14.04 .
> >
> > I set core dump limit to unlimited in all nodes .
> >Edit  /etc/security/limits.conf file and add  " * soft core unlimited
> "
> > line.
> >
> > i rechecked  using :  $ ulimit -all
> >
> > core file size  (blocks, -c) unlimited
> > data seg size   (kbytes, -d) unlimited
> > scheduling priority (-e) 0
> > file size   (blocks, -f) unlimited
> > pending signals (-i) 241204
> > max locked memory   (kbytes, -l) 64
> > max memory size (kbytes, -m) unlimited
> > open files  (-n) 1024
> > pipe size(512 bytes, -p) 8
> > POSIX message queues (bytes, -q) 819200
> > real-time priority  (-r) 0
> > stack size  (kbytes, -s) 8192
> > cpu time   (seconds, -t) unlimited
> > max user processes  (-u) 241204
> > virtual memory  (kbytes, -v) unlimited
> > file locks  (-x) unlimited
> >
> > but when I am running my spark application with some third party native
> > libraries . but it crashes some time and show error " Failed to write
> core
> > dump. Core dumps have been disabled. To enable core dumping, try "ulimit
> -c
> > unlimited" before starting Java again " .
> >
> > Below are the log :
> >
> >  A fatal error has been detected by the Java Runtime Environment:
> > #
> > #  SIGSEGV (0xb) at pc=0x7fd44b491fb9, pid=20458, tid=140549318547200
> > #
> > # JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build
> > 1.7.0_67-b01)
> > # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode
> > linux-amd64 compressed oops)
> > # Problematic frame:
> > # V  [libjvm.so+0x650fb9]  jni_SetByteArrayRegion+0xa9
> > #
> > # Failed to write core dump. Core dumps have been disabled. To enable
> core
> > dumping, try "ulimit -c unlimited" before starting Java again
> > #
> > # An error report file with more information is saved as:
> > #
> >
> /yarn/nm/usercache/master/appcache/application_1462930975871_0004/container_1462930975871_0004_01_66/hs_err_pid20458.log
> > #
> > # If you would like to submit a bug report, please visit:
> > #   http://bugreport.sun.com/bugreport/crash.jsp
> > #
> >
> >
> > so how can i enable core dump and save it some place ?
> >
> > Regards
> > Prateek
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-enable-core-dump-in-spark-tp27065.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Spark jobs without a login

2016-06-16 Thread jay vyas
Hi spark:

Is it possible to avoid reliance on a login user when running a spark job?

I'm running out a container that doesnt supply a valid user name,
and thus, I'm getting the following exception:

org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:675)

I'm not too worries about this - but it seems like it might be nice if
maybe we could specify a user name as part of sparks context or as part of
an external parameter rather then having to
 use the java based user/group extractor.



-- 
jay vyas


Re: choice of RDD function

2016-06-16 Thread Sivakumaran S
Dear Jacek and Cody,


I receive a stream of JSON (exactly this much: 4 json objects) once every 30 
seconds from Kafka as follows (I have changed my data source to include more 
fields)
: 
{"windspeed":4.23,"pressure":1008.39,"location":"Dundee","latitude":56.5,"longitude":-2.96667,"id":2650752,"humidity":97.0,"temp":12.54,"winddirection":12.0003}
{"windspeed":4.23,"pressure":1008.39,"location":"Saint 
Andrews","latitude":56.338711,"longitude":-2.79902,"id":2638864,"humidity":97.0,"temp":12.54,"winddirection":12.0003}
{"windspeed":5.53,"pressure":1016.25,"location":"Arbroath","latitude":56.563171,"longitude":-2.58736,"id":2657215,"humidity":96.0,"temp":11.59,"winddirection":9.50031}
{"windspeed":4.73,"pressure":994.0,"location":"Aberdeen","latitude":57.143688,"longitude":-2.09814,"id":2657832,"humidity":1.0,"temp":0.0,"winddirection":357.0}
{"windspeed":6.13,"pressure":994.0,"location":"Peterhead","latitude":57.50584,"longitude":-1.79806,"id":2640351,"humidity":1.0,"temp":0.0,"winddirection":8.50031}

In my Spark app, I have set the batch duration as 60 seconds. Now, as per the 
1.6.1 documentation, "Spark SQL can automatically infer the schema of a JSON 
dataset and load it as a DataFrame. This conversion can be done using 
SQLContext.read.json() on either an RDD of String, or a JSON file.”. But what 
both of you pointed out is correct, it consumes the RDD twice, i do not 
understand why. Below is the snap of the DAG. 

I do not need stateful calculations and I need to write this result to a 
database at a later stage. Any input to improve this solution is appreciated. 




Regards,

Siva

> On 16-Jun-2016, at 12:48 PM, Sivakumaran S  wrote:
> 
> Hi Jacek and Cody,
> 
> First of all, thanks for helping me out.
> 
> I started with using combineByKey while testing with just one field. Of 
> course it worked fine, but I was worried that the code would become 
> unreadable if there were many fields. Which is why I shifted to sqlContext 
> because the code is comprehensible. Let me work out the stream statistics and 
> update you in a while. 
> 
> 
> 
> Regards,
> 
> Siva
> 
> 
> 
>> On 16-Jun-2016, at 11:29 AM, Jacek Laskowski  wrote:
>> 
>> Rather
>> 
>> val df = sqlContext.read.json(rdd)
>> 
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>> 
>> 
>> On Wed, Jun 15, 2016 at 11:55 PM, Sivakumaran S  wrote:
>>> Cody,
>>> 
>>> Are you referring to the  val lines = messages.map(_._2)?
>>> 
>>> Regards,
>>> 
>>> Siva
>>> 
 On 15-Jun-2016, at 10:32 PM, Cody Koeninger  wrote:
 
 Doesn't that result in consuming each RDD twice, in order to infer the
 json schema?
 
 On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S  
 wrote:
> Of course :)
> 
> object sparkStreaming {
> def main(args: Array[String]) {
>  StreamingExamples.setStreamingLogLevels() //Set reasonable logging
> levels for streaming if the user has not configured log4j.
>  val topics = "test"
>  val brokers = "localhost:9092"
>  val topicsSet = topics.split(",").toSet
>  val sparkConf = new
> SparkConf().setAppName("KafkaDroneCalc").setMaster("local")
> //spark://localhost:7077
>  val sc = new SparkContext(sparkConf)
>  val ssc = new StreamingContext(sc, Seconds(30))
>  val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>  val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
>  val lines = messages.map(_._2)
>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>  lines.foreachRDD( rdd => {
>val df = sqlContext.read.json(rdd)
>df.registerTempTable(“drone")
>sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm),
> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show()
>  })
>  ssc.start()
>  ssc.awaitTermination()
> }
> }
> 
> I haven’t checked long running performance though.
> 
> Regards,
> 
> Siva
> 
> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski  wrote:
> 
> Hi,
> 
> Good to hear so! Mind sharing a few snippets of your solution?
> 
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
> 
> 
> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S  
> wrote:
> 
> Thanks Jacek,
> 
> Job completed!! :) Just used data frames and sql query. Very clean and
> functional code.
> 
> Siva
> 
> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski  wrote:
> 
> 

spark sql broadcast join ?

2016-06-16 Thread kali.tumm...@gmail.com
Hi All, 

I had used broadcast join in spark-scala applications I did used partitionby
(Hash Partitioner) and then persit for wide dependencies, present project
which I am working on pretty much Hive migration to spark-sql which is
pretty much sql to be honest no scala or python apps.

My question how to achieve broadcast join in plain spark-sql ? at the moment
join between two talbes is taking ages.

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-broadcast-join-tp27184.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Recommended way to push data into HBase through Spark streaming

2016-06-16 Thread Mohammad Tariq
Forgot to add, I'm on HBase 1.0.0-cdh5.4.5, so can't use HBaseContext. And
spark version is 1.6.1



[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Thu, Jun 16, 2016 at 10:12 PM, Mohammad Tariq  wrote:

> Hi group,
>
> I have a streaming job which reads data from Kafka, performs some
> computation and pushes the result into HBase. Actually the results are
> pushed into 3 different HBase tables. So I was wondering what could be the
> best way to achieve this.
>
> Since each executor will open its own HBase connection and write data to a
> regionserver independent of rest of the executors I feel this is a bit of
> overkill. How about collecting the results of each micro batch and putting
> them in one shot at the end of that batch?
>
> If so what should be the way to go about this?
>
> Many thanks!
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>


Re: Spark SQL driver memory keeps rising

2016-06-16 Thread Khaled Hammouda
I'm using pyspark and running in YARN client mode. I managed to anonymize
the code a bit and pasted it below.

You'll notice that I don't collect any output in the driver, instead the
data is written to parquet directly. Also notice that I increased
spark.driver.maxResultSize to 10g because the job was failing with the
error "serialized results of x tasks () is bigger than
spark.driver.maxResultSize (xxx)", which means the tasks were sending
something to the driver, and that's probably what's causing the driver
memory usage to keep rising. This happens at stages that read/write
shuffled data (as opposed to input stages).

I also noticed this message appear many times in the output "INFO:
MapOutputTrackerMasterEndpoint: Asked to send map output locations for
shuffle 4 to ip-xx-xx-xx-xx.ec2.internal:49490". I'm not sure if it's
relevant.

Cluster setup:
- AWS EMR 4.7.1 using Spark 1.6.1
- 200 nodes (r3.2xlarge, 61 GB memory), and the master node is r3.4xlarge
(122 GB memory)
- 1.4 TB json logs split across ~70k files in S3


# spark-submit --driver-memory 100g --executor-memory 44g --executor-cores
8 --conf spark.yarn.executor.memoryOverhead=9000 --num-executors 199
log_cleansing.py

conf = SparkConf()
conf.setAppName('log_cleansing') \
.set("spark.driver.maxResultSize", "10g") \
.set("spark.hadoop.spark.sql.parquet.output.committer.class",
"org.apache.spark.sql.execution.datasources.parquet.DirectParquetOutputCommitter")
\
.set("spark.hadoop.mapred.output.committer.class",
"org.apache.hadoop.mapred.DirectFileOutputCommitter") \
.set("spark.hadoop.mapreduce.use.directfileoutputcommitter", "true")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions", "3600")

# raw data is stored in s3 and is split across 10s of thousands of files
# (each file is about 20MB, total data set size is around 1.4 TB)
# so we coalesce to a manageble number of partitions and store it in hdfs
text = sc.textFile("s3://json_logs/today/*/*").coalesce(400)
text.saveAsTextFile("hdfs:///json_logs/today")

# now read the json data set from hdfs
json_logs = sqlContext.read.json("hdfs:///json_logs/today")
sqlContext.registerDataFrameAsTable(json_logs, "json_logs")

# this cleans the date and only pulls in events within two days of their
ingestion ts, elimiated pure duplicates
clean_date_and_gap = sqlContext.sql(
 '''SELECT DISTINCT t.*
FROM json_logs t
WHERE datediff(to_utc_timestamp(`ingestionTime`, 'UTC'),
to_utc_timestamp(`timestamp`, 'UTC')) < 2
  AND to_utc_timestamp(`timestamp`, 'UTC') IS NOT NULL''')
sqlContext.registerDataFrameAsTable(clean_date_and_gap,
"clean_date_and_gap")
clean_date_and_gap.cache()

# extract unique event instanceIds and their min ts
distinct_intanceIds = sqlContext.sql(
 '''SELECT
  instanceId,
  min(to_utc_timestamp(`ingestionTime`, 'UTC')) min_ingestion_ts
FROM clean_date_and_gap
GROUP BY instanceId''')
sqlContext.registerDataFrameAsTable(distinct_intanceIds,
"distinct_intanceIds")

# create table with only the min ingestion ts records
deduped_day = sqlContext.sql(
 '''SELECT t.*
FROM clean_date_and_gap t
JOIN distinct_intanceIds d
  ON (d.instanceId = t.instanceId AND
  to_utc_timestamp(`ingestionTime`, 'UTC') = d.min_ingestion_ts)''')

# drop weird columns
deduped_day = deduped_day.drop('weird column name1') \
 .drop('weird column name2') \
 .drop('weird column name3')

# standardize all column names so that Parquet is happy
# rename() adds clean alias names to hide problematic column names
oldCols = deduped_day.schema.names
deduped_day_1 = deduped_day.select(*(rename(oldCols)))

sqlContext.registerDataFrameAsTable(deduped_day_1, 'deduped_day_1')
deduped_day_1.cache()

clean_date_and_gap.unpersist()

# load prior days isntanceids for deduping purpose
last1_instanceIds =
sqlContext.read.parquet("s3://unique_instanceids/today_minus_1day/*")
last2_instanceIds =
sqlContext.read.parquet("s3://unique_instanceids/today_minus_2day/*")
prior_instanceIds =
last1_instanceIds.unionAll(last2_instanceIds).distinct().cache()
sqlContext.registerDataFrameAsTable(prior_instanceIds, 'prior_instanceIds')

# determine the overlap from prior days
overlap_from_prior_days = sqlContext.sql(
 '''SELECT DISTINCT
  d.Instanceid,
  case when y.Instanceid is not null then 1 else 0 end as seen_before
FROM deduped_day_1 d
LEFT JOIN prior_instanceIds y
  ON (y.Instanceid=d.Instanceid)''')
sqlContext.registerDataFrameAsTable(overlap_from_prior_days,'overlap_from_prior_days')

# save the final data out to a parquet file
final_cleaned_data = sqlContext.sql(
 '''SELECT d.*
FROM deduped_day_1 d
JOIN overlap_from_prior_days o
  ON (d.Instanceid = o.Instanceid)
WHERE o.seen_before = 0''')

final_cleaned_data = final_cleaned_data.coalesce(200)
final_cleaned_data.write.parquet("s3://cleaned_data/today.parquet")


On Wed, Jun 15, 2016 at 10:23 PM, 

Re: What is the interpretation of Cores in Spark doc

2016-06-16 Thread Mark Hamstra
I mean only that hardware-level threads and the processor's scheduling of
those threads is only one segment of the total space of threads and thread
scheduling, and that saying things like cores have threads or only the core
schedules threads can be more confusing than helpful.

On Thu, Jun 16, 2016 at 11:33 AM, Mich Talebzadeh  wrote:

> Well LOL
>
> Given a set of parameters one can argue from any angle.
>
> It is not obvious what you are trying to sate here? "It is not strictly
> true"  yeah OK
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 16 June 2016 at 19:07, Mark Hamstra  wrote:
>
>> In addition, it is the core (not the OS) that determines when the thread
>>> is executed.
>>
>>
>> That's also not strictly true.  "Thread" is a concept that can exist at
>> multiple levels -- even concurrently at multiple levels for a single
>> running program.  Different entities will be responsible for scheduling the
>> execution of threads at these different levels, and the CPU is only in
>> direct control at the lowest level, that of so-called hardware threads.  Of
>> course, higher-level threads eventually need to be run as lower-level
>> hardware tasks, and the mappings between various types of application-level
>> threads and OS- and/or hardware-level threads can be complicated, but it is
>> still not helpful to think of the CPU as being the only entity responsible
>> for the scheduling of threads.
>>
>> On Thu, Jun 16, 2016 at 7:45 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Thanks all.
>>>
>>> I think we are diverging but IMO it is a worthwhile discussion
>>>
>>> Actually, threads are a hardware implementation - hence the whole notion
>>> of “multi-threaded cores”.   What happens is that the cores often have
>>> duplicate registers, etc. for holding execution state.   While it is
>>> correct that only a single process is executing at a time, a single core
>>> will have execution states of multiple processes preserved in these
>>> registers. In addition, it is the core (not the OS) that determines when
>>> the thread is executed. The approach often varies according to the CPU
>>> manufacturer, but the most simple approach is when one thread of execution
>>> executes a multi-cycle operation (e.g. a fetch from main memory, etc.), the
>>> core simply stops processing that thread saves the execution state to a set
>>> of registers, loads instructions from the other set of registers and goes
>>> on.  On the Oracle SPARC chips, it will actually check the next thread to
>>> see if the reason it was ‘parked’ has completed and if not, skip it for the
>>> subsequent thread. The OS is only aware of what are cores and what are
>>> logical processors - and dispatches accordingly.  *Execution is up to
>>> the cores*. .
>>>
>>> Cheers
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 16 June 2016 at 13:02, Robin East  wrote:
>>>
 Mich

 >> A core may have one or more threads
 It would be more accurate to say that a core could *run* one or more
 threads scheduled for execution. Threads are a software/OS concept that
 represent executable code that is scheduled to run by the OS; A CPU, core
 or virtual core/virtual processor execute that code. Threads are not CPUs
 or cores whether physical or logical - any Spark documentation that implies
 this is mistaken. I’ve looked at the documentation you mention and I don’t
 read it to mean that threads are logical processors.

 To go back to your original question, if you set local[6] and you have
 12 logical processors then you are likely to have half your CPU resources
 unused by Spark.


 On 15 Jun 2016, at 23:08, Mich Talebzadeh 
 wrote:

 I think it is slightly more than that.

 These days  software is licensed by core (generally speaking).   That
 is the physical processor.   * A core may have one or more threads -
 or logical processors*. Virtualization adds some fun to the mix.
 Generally what they present is ‘virtual processors’.   What that equates to
 depends on the virtualization layer itself.   In some simpler VM’s - it is
 virtual=logical.   In others, virtual=logical but they are constrained to
 be from the same cores - e.g. if you get 6 virtual processors, it really is
 3 full cores with 2 threads each.   Rational is due to the way OS
 dispatching works on 

Re: What is the interpretation of Cores in Spark doc

2016-06-16 Thread Mich Talebzadeh
Well LOL

Given a set of parameters one can argue from any angle.

It is not obvious what you are trying to sate here? "It is not strictly
true"  yeah OK

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 16 June 2016 at 19:07, Mark Hamstra  wrote:

> In addition, it is the core (not the OS) that determines when the thread
>> is executed.
>
>
> That's also not strictly true.  "Thread" is a concept that can exist at
> multiple levels -- even concurrently at multiple levels for a single
> running program.  Different entities will be responsible for scheduling the
> execution of threads at these different levels, and the CPU is only in
> direct control at the lowest level, that of so-called hardware threads.  Of
> course, higher-level threads eventually need to be run as lower-level
> hardware tasks, and the mappings between various types of application-level
> threads and OS- and/or hardware-level threads can be complicated, but it is
> still not helpful to think of the CPU as being the only entity responsible
> for the scheduling of threads.
>
> On Thu, Jun 16, 2016 at 7:45 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Thanks all.
>>
>> I think we are diverging but IMO it is a worthwhile discussion
>>
>> Actually, threads are a hardware implementation - hence the whole notion
>> of “multi-threaded cores”.   What happens is that the cores often have
>> duplicate registers, etc. for holding execution state.   While it is
>> correct that only a single process is executing at a time, a single core
>> will have execution states of multiple processes preserved in these
>> registers. In addition, it is the core (not the OS) that determines when
>> the thread is executed. The approach often varies according to the CPU
>> manufacturer, but the most simple approach is when one thread of execution
>> executes a multi-cycle operation (e.g. a fetch from main memory, etc.), the
>> core simply stops processing that thread saves the execution state to a set
>> of registers, loads instructions from the other set of registers and goes
>> on.  On the Oracle SPARC chips, it will actually check the next thread to
>> see if the reason it was ‘parked’ has completed and if not, skip it for the
>> subsequent thread. The OS is only aware of what are cores and what are
>> logical processors - and dispatches accordingly.  *Execution is up to
>> the cores*. .
>>
>> Cheers
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 16 June 2016 at 13:02, Robin East  wrote:
>>
>>> Mich
>>>
>>> >> A core may have one or more threads
>>> It would be more accurate to say that a core could *run* one or more
>>> threads scheduled for execution. Threads are a software/OS concept that
>>> represent executable code that is scheduled to run by the OS; A CPU, core
>>> or virtual core/virtual processor execute that code. Threads are not CPUs
>>> or cores whether physical or logical - any Spark documentation that implies
>>> this is mistaken. I’ve looked at the documentation you mention and I don’t
>>> read it to mean that threads are logical processors.
>>>
>>> To go back to your original question, if you set local[6] and you have
>>> 12 logical processors then you are likely to have half your CPU resources
>>> unused by Spark.
>>>
>>>
>>> On 15 Jun 2016, at 23:08, Mich Talebzadeh 
>>> wrote:
>>>
>>> I think it is slightly more than that.
>>>
>>> These days  software is licensed by core (generally speaking).   That
>>> is the physical processor.   * A core may have one or more threads - or
>>> logical processors*. Virtualization adds some fun to the mix.
>>> Generally what they present is ‘virtual processors’.   What that equates to
>>> depends on the virtualization layer itself.   In some simpler VM’s - it is
>>> virtual=logical.   In others, virtual=logical but they are constrained to
>>> be from the same cores - e.g. if you get 6 virtual processors, it really is
>>> 3 full cores with 2 threads each.   Rational is due to the way OS
>>> dispatching works on ‘logical’ processors vs. cores and POSIX threaded
>>> applications.
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 13 June 2016 at 18:17, Mark Hamstra  wrote:
>>>
 I don't know what documentation you were 

Re: cache datframe

2016-06-16 Thread Jacek Laskowski
Yes. Yes.

What's the use case?

Jacek
On 16 Jun 2016 2:17 p.m., "pseudo oduesp"  wrote:

> hi,
> if i cache same data frame and transforme and add collumns i should cache
> second times
>
> df.cache()
>
>   transforamtion
>   add new columns
>
> df.cache()
> ?
>
>


Re: ERROR TaskResultGetter: Exception while getting task result java.io.IOException: java.lang.ClassNotFoundException: scala.Some

2016-06-16 Thread Jacek Laskowski
Hi,

Why do you provided spark-core while the others are non-provided? How do
you assemble the app? How do you submit it for execution? What's the
deployment environment?

More info...more info...

Jacek
On 15 Jun 2016 10:26 p.m., "S Sarkar"  wrote:

Hello,

I built package for a spark application with the following sbt file:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.3"

libraryDependencies ++= Seq(
  "org.apache.spark"  %% "spark-core"  % "1.4.0" % "provided",
  "org.apache.spark"  %% "spark-mllib" % "1.4.0",
  "org.apache.spark"  %% "spark-sql"   % "1.4.0",
  "org.apache.spark"  %% "spark-sql"   % "1.4.0"
  )
resolvers += "Akka Repository" at "http://repo.akka.io/releases/;

I am getting TaskResultGetter error with ClassNotFoundException for
scala.Some .

Can I please get some help how to fix it?

Thanks,
S. Sarkar



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-TaskResultGetter-Exception-while-getting-task-result-java-io-IOException-java-lang-ClassNotFoue-tp27178.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


Re: What is the interpretation of Cores in Spark doc

2016-06-16 Thread Mark Hamstra
>
> In addition, it is the core (not the OS) that determines when the thread
> is executed.


That's also not strictly true.  "Thread" is a concept that can exist at
multiple levels -- even concurrently at multiple levels for a single
running program.  Different entities will be responsible for scheduling the
execution of threads at these different levels, and the CPU is only in
direct control at the lowest level, that of so-called hardware threads.  Of
course, higher-level threads eventually need to be run as lower-level
hardware tasks, and the mappings between various types of application-level
threads and OS- and/or hardware-level threads can be complicated, but it is
still not helpful to think of the CPU as being the only entity responsible
for the scheduling of threads.

On Thu, Jun 16, 2016 at 7:45 AM, Mich Talebzadeh 
wrote:

> Thanks all.
>
> I think we are diverging but IMO it is a worthwhile discussion
>
> Actually, threads are a hardware implementation - hence the whole notion
> of “multi-threaded cores”.   What happens is that the cores often have
> duplicate registers, etc. for holding execution state.   While it is
> correct that only a single process is executing at a time, a single core
> will have execution states of multiple processes preserved in these
> registers. In addition, it is the core (not the OS) that determines when
> the thread is executed. The approach often varies according to the CPU
> manufacturer, but the most simple approach is when one thread of execution
> executes a multi-cycle operation (e.g. a fetch from main memory, etc.), the
> core simply stops processing that thread saves the execution state to a set
> of registers, loads instructions from the other set of registers and goes
> on.  On the Oracle SPARC chips, it will actually check the next thread to
> see if the reason it was ‘parked’ has completed and if not, skip it for the
> subsequent thread. The OS is only aware of what are cores and what are
> logical processors - and dispatches accordingly.  *Execution is up to the
> cores*. .
>
> Cheers
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 16 June 2016 at 13:02, Robin East  wrote:
>
>> Mich
>>
>> >> A core may have one or more threads
>> It would be more accurate to say that a core could *run* one or more
>> threads scheduled for execution. Threads are a software/OS concept that
>> represent executable code that is scheduled to run by the OS; A CPU, core
>> or virtual core/virtual processor execute that code. Threads are not CPUs
>> or cores whether physical or logical - any Spark documentation that implies
>> this is mistaken. I’ve looked at the documentation you mention and I don’t
>> read it to mean that threads are logical processors.
>>
>> To go back to your original question, if you set local[6] and you have 12
>> logical processors then you are likely to have half your CPU resources
>> unused by Spark.
>>
>>
>> On 15 Jun 2016, at 23:08, Mich Talebzadeh 
>> wrote:
>>
>> I think it is slightly more than that.
>>
>> These days  software is licensed by core (generally speaking).   That is
>> the physical processor.   * A core may have one or more threads - or
>> logical processors*. Virtualization adds some fun to the mix.
>> Generally what they present is ‘virtual processors’.   What that equates to
>> depends on the virtualization layer itself.   In some simpler VM’s - it is
>> virtual=logical.   In others, virtual=logical but they are constrained to
>> be from the same cores - e.g. if you get 6 virtual processors, it really is
>> 3 full cores with 2 threads each.   Rational is due to the way OS
>> dispatching works on ‘logical’ processors vs. cores and POSIX threaded
>> applications.
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 13 June 2016 at 18:17, Mark Hamstra  wrote:
>>
>>> I don't know what documentation you were referring to, but this is
>>> clearly an erroneous statement: "Threads are virtual cores."  At best it is
>>> terminology abuse by a hardware manufacturer.  Regardless, Spark can't get
>>> too concerned about how any particular hardware vendor wants to refer to
>>> the specific components of their CPU architecture.  For us, a core is a
>>> logical execution unit, something on which a thread of execution can run.
>>> That can map in different ways to different physical or virtual hardware.
>>>
>>> On Mon, Jun 13, 2016 at 12:02 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> 

Re: What is the interpretation of Cores in Spark doc

2016-06-16 Thread Mark Hamstra
>
> Actually, threads are a hardware implementation - hence the whole notion
> of “multi-threaded cores”.


No, a multi-threaded core is a core that supports multiple concurrent
threads of execution, not a core that has multiple threads.  The
terminology and marketing around multi-core processors, hyper threading and
virtualization are confusing enough without taking the further step of
misapplying software-specific terms to hardware components.

On Thu, Jun 16, 2016 at 7:45 AM, Mich Talebzadeh 
wrote:

> Thanks all.
>
> I think we are diverging but IMO it is a worthwhile discussion
>
> Actually, threads are a hardware implementation - hence the whole notion
> of “multi-threaded cores”.   What happens is that the cores often have
> duplicate registers, etc. for holding execution state.   While it is
> correct that only a single process is executing at a time, a single core
> will have execution states of multiple processes preserved in these
> registers. In addition, it is the core (not the OS) that determines when
> the thread is executed. The approach often varies according to the CPU
> manufacturer, but the most simple approach is when one thread of execution
> executes a multi-cycle operation (e.g. a fetch from main memory, etc.), the
> core simply stops processing that thread saves the execution state to a set
> of registers, loads instructions from the other set of registers and goes
> on.  On the Oracle SPARC chips, it will actually check the next thread to
> see if the reason it was ‘parked’ has completed and if not, skip it for the
> subsequent thread. The OS is only aware of what are cores and what are
> logical processors - and dispatches accordingly.  *Execution is up to the
> cores*. .
>
> Cheers
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 16 June 2016 at 13:02, Robin East  wrote:
>
>> Mich
>>
>> >> A core may have one or more threads
>> It would be more accurate to say that a core could *run* one or more
>> threads scheduled for execution. Threads are a software/OS concept that
>> represent executable code that is scheduled to run by the OS; A CPU, core
>> or virtual core/virtual processor execute that code. Threads are not CPUs
>> or cores whether physical or logical - any Spark documentation that implies
>> this is mistaken. I’ve looked at the documentation you mention and I don’t
>> read it to mean that threads are logical processors.
>>
>> To go back to your original question, if you set local[6] and you have 12
>> logical processors then you are likely to have half your CPU resources
>> unused by Spark.
>>
>>
>> On 15 Jun 2016, at 23:08, Mich Talebzadeh 
>> wrote:
>>
>> I think it is slightly more than that.
>>
>> These days  software is licensed by core (generally speaking).   That is
>> the physical processor.   * A core may have one or more threads - or
>> logical processors*. Virtualization adds some fun to the mix.
>> Generally what they present is ‘virtual processors’.   What that equates to
>> depends on the virtualization layer itself.   In some simpler VM’s - it is
>> virtual=logical.   In others, virtual=logical but they are constrained to
>> be from the same cores - e.g. if you get 6 virtual processors, it really is
>> 3 full cores with 2 threads each.   Rational is due to the way OS
>> dispatching works on ‘logical’ processors vs. cores and POSIX threaded
>> applications.
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 13 June 2016 at 18:17, Mark Hamstra  wrote:
>>
>>> I don't know what documentation you were referring to, but this is
>>> clearly an erroneous statement: "Threads are virtual cores."  At best it is
>>> terminology abuse by a hardware manufacturer.  Regardless, Spark can't get
>>> too concerned about how any particular hardware vendor wants to refer to
>>> the specific components of their CPU architecture.  For us, a core is a
>>> logical execution unit, something on which a thread of execution can run.
>>> That can map in different ways to different physical or virtual hardware.
>>>
>>> On Mon, Jun 13, 2016 at 12:02 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Hi,

 It is not the issue of testing anything. I was referring to
 documentation that clearly use the term "threads". As I said and showed
 before, one line is using the term "thread" and the next one "logical
 cores".


 HTH

 Dr Mich Talebzadeh


 LinkedIn * 
 

Re: Anyone has used Apache nifi

2016-06-16 Thread u...@moosheimer.com
Hi Mich,

we use NiFi and it's really great.

My company made a architecture blueprint based on NiFi and Spark.

https://www.mysecondway.com/en/BOSON-Architecture

Mit freundlichen Grüßen / best regards
Kay-Uwe Moosheimer

> Am 16.06.2016 um 11:10 schrieb Mich Talebzadeh :
> 
> Hi,
> 
> Anyone has used Apache nifi for data ingestion?
> 
> There was a presentation yesterday in Hortonworks' London office titled 
> "Learn more about Data Ingest at Hortonworks". It is about HDF ( .. Data 
> Flow) including nifi (Niagara Files?)  as its core solution.
> 
> It looks impressive for low to medium size loads. So I was wondering anyone 
> has tried out.
> 
> 
> 
> My interest was NIFI 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
>  


Re: Spark crashes worker nodes with multiple application starts

2016-06-16 Thread Carlile, Ken



Hi Deepak, 


Yes, that’s about the size of it. The spark job isn’t filling the disk by any stretch of the imagination; in fact the only stuff that’s writing to the disk from Spark in certain of these instances is the logging. 


Thanks,
—Ken



On Jun 16, 2016, at 12:17 PM, Deepak Goel  wrote:


I guess what you are saying is:


1. The nodes work perfectly ok without io wait before Spark job.
2. After you have run Spark job and killed it, the io wait persist.


So what it seems, the Spark Job is altering the disk in such a way that other programs can't access the disk after the spark job is killed. (A naive thought) I wonder if the spark job fills up the disk so that no other program on your node could
 write to it and hence the io wait.

Also facter just normally reads up your system so it shouldn't block your system. There must be some other background scripts running on your node which are writing to the disk perhaps..






Hey

Namaskara~Nalama~Guten Tag~Bonjour


   -- 
Keigu

Deepak
73500 12833
www.simtree.net, 
dee...@simtree.net
deic...@gmail.com

LinkedIn: www.linkedin.com/in/deicool
Skype: thumsupdeicool
Google talk: deicool
Blog: http://loveandfearless.wordpress.com
Facebook: http://www.facebook.com/deicool

"Contribute to the world, environment and more : 
http://www.gridrepublic.org
"






On Thu, Jun 16, 2016 at 5:56 PM, Carlile, Ken 
 wrote:

1. There are 320 nodes in total, with 96 dedicated to Spark. In this particular case, 21 are in the Spark cluster. In typical Spark usage, maybe 1-3 nodes will crash in a day, with probably an average of 4-5 Spark
 clusters running at a given time. In THIS case, 7-12 nodes will crash simultaneously on application termination (not Spark cluster termination, but termination of a Spark application/jupyter kernel)
2. I’ve turned off puppet, no effect. I’ve not fully disabled facter. The iowait persists after the scheduler kills the Spark job (that still works, at least)
3. He’s attempted to run with 15 cores out of 16 and 25GB of RAM out of 128. He still lost nodes. 
4. He’s currently running storage benchmarking tests, which consist mainly of shuffles. 


Thanks!

Ken




On Jun 16, 2016, at 8:00 AM, Deepak Goel  wrote:


I am no expert, but some naive thoughts...



1. How many HPC nodes do you have? How many of them crash (What do you mean by multiple)? Do all of them crash?


2. What things are you running on Puppet? Can't you switch it off and test Spark? Also you can switch of Facter. Btw, your observation that there is iowait on these applications might be because they have low priority than Spark. Hence they are
 waiting for Spark to finish. So the real bottleneck might be Spark and not these background processes


3. Limiting cpu's and memory for Spark, might have an inverse effect on iowait. As more of Spark processes would have to access the disk due to reduced memory and CPU


4. Offcourse, you might have to give more info on what kind of applications you are running on Spark as they might be the main culpirit



Deepak





Hey

Namaskara~Nalama~Guten Tag~Bonjour


   -- 
Keigu

Deepak
73500 12833
www.simtree.net, 
dee...@simtree.net
deic...@gmail.com

LinkedIn: www.linkedin.com/in/deicool
Skype: thumsupdeicool
Google talk: deicool
Blog: http://loveandfearless.wordpress.com
Facebook: http://www.facebook.com/deicool

"Contribute to the world, environment and more : 
http://www.gridrepublic.org
"






On Thu, Jun 16, 2016 at 5:10 PM, Carlile, Ken 
 wrote:

We run Spark on a general purpose HPC cluster (using standalone mode and the HPC scheduler), and are currently on Spark 1.6.1. One of the primary users has been testing various storage and other parameters for Spark, which involves doing multiple shuffles and
 shutting down and starting many applications serially on a single cluster instance. He is using pyspark (via jupyter notebooks). Python version is 2.7.6.

We have been seeing multiple HPC node hard locks in this scenario, all at the termination of a jupyter kernel (read Spark application). The symptom is that the load on the node keeps going higher. We have determined this is because of iowait on background processes
 (namely puppet and facter, clean up scripts, etc). What he sees is that when he starts a new kernel (application), the executor on those nodes will not start. We can no longer ssh into the nodes, and no commands can be run on them; everything goes into iowait.
 The only solution is to do a hard reset on the nodes.

Obviously this is very disruptive, both to us sysadmins and to him. We have a limited number of HPC nodes that are permitted to run spark clusters, so this is a big problem.

I have attempted to limit the background processes, but it doesn’t seem to matter; it can be any 

Re: difference between dataframe and dataframwrite

2016-06-16 Thread Richard Catlin
I believe it depends on your Spark application.

To write to Hive, use 
dataframe.saveAsTable

To write to S3, use
dataframe.write.parquet(“s3://”)

Hope this helps.
Richard

> On Jun 16, 2016, at 9:54 AM, Natu Lauchande  wrote:
> 
> Does



RE: difference between dataframe and dataframwrite

2016-06-16 Thread Natu Lauchande
Hi

Does anyone know wich one aws emr uses by default?

Thanks,
Natu
On Jun 16, 2016 5:12 PM, "David Newberger" 
wrote:

> DataFrame is a collection of data which is organized into named columns.
>
> DataFrame.write is an interface for saving the contents of a DataFrame to
> external storage.
>
>
>
> Hope this helps
>
>
>
> *David Newberger*
>
>
>
>
>
> *From:* pseudo oduesp [mailto:pseudo20...@gmail.com]
> *Sent:* Thursday, June 16, 2016 9:43 AM
> *To:* user@spark.apache.org
> *Subject:* difference between dataframe and dataframwrite
>
>
>
> hi,
>
>
>
> what is difference between dataframe and dataframwrite ?
>
>
>


Re: How to deal with tasks running too long?

2016-06-16 Thread Utkarsh Sengar
Thanks All, I know i have a data skew but the data is unpredictable and
hard to find every time.
Do you think this workaround is reasonable?

ExecutorService executor =
Executors.newCachedThreadPool();
Callable< Result > task = () -> simulation.run();
Future future = executor.submit(task);
try {
simResult = future.get(20, TimeUnit.MINUTES);
} catch (TimeoutException ex) {
SPARKLOG.info("Task timed out");
}

It will force timeout the task if it runs for more than 20mins.


On Thu, Jun 16, 2016 at 5:00 AM, Jacek Laskowski  wrote:

> Hi,
>
> I'd check Details for Stage page in web UI.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Thu, Jun 16, 2016 at 6:45 AM, Utkarsh Sengar 
> wrote:
> > This SO question was asked about 1yr ago.
> >
> http://stackoverflow.com/questions/31799755/how-to-deal-with-tasks-running-too-long-comparing-to-others-in-job-in-yarn-cli
> >
> > I answered this question with a suggestion to try speculation but it
> doesn't
> > quite do what the OP expects. I have been running into this issue more
> these
> > days. Out of 5000 tasks, 4950 completes in 5mins but the last 50 never
> > really completes, have tried waiting for 4hrs. This can be a memory
> issue or
> > maybe the way spark's fine grained mode works with mesos, I am trying to
> > enable jmxsink to get a heap dump.
> >
> > But in the mean time, is there a better fix for this? (in any version of
> > spark, I am using 1.5.1 but can upgrade). It would be great if the last
> 50
> > tasks in my example can be killed (timed out) and the stage completes
> > successfully.
> >
> > --
> > Thanks,
> > -Utkarsh
>



-- 
Thanks,
-Utkarsh


Recommended way to push data into HBase through Spark streaming

2016-06-16 Thread Mohammad Tariq
Hi group,

I have a streaming job which reads data from Kafka, performs some
computation and pushes the result into HBase. Actually the results are
pushed into 3 different HBase tables. So I was wondering what could be the
best way to achieve this.

Since each executor will open its own HBase connection and write data to a
regionserver independent of rest of the executors I feel this is a bit of
overkill. How about collecting the results of each micro batch and putting
them in one shot at the end of that batch?

If so what should be the way to go about this?

Many thanks!


[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



Re: Kerberos setup in Apache spark connecting to remote HDFS/Yarn

2016-06-16 Thread akhandeshi
Rest of the stacktrace.  

WARNING] 
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:294)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
at
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:227)
at
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:214)
at
org.apache.hadoop.security.UserGroupInformation.isAuthenticationMethodEnabled(UserGroupInformation.java:275)
at
org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:269)
at
org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab(UserGroupInformation.java:820)
at org.apache.spark.examples.SparkYarn$.launchClient(SparkYarn.scala:57)
at org.apache.spark.examples.SparkYarn$.main(SparkYarn.scala:84)
at org.apache.spark.examples.SparkYarn.main(SparkYarn.scala)
... 6 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm(KerberosUtil.java:75)
at
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:63)
... 14 more
Caused by: KrbException: Cannot locate default realm
at sun.security.krb5.Config.getDefaultRealm(Config.java:1029)

I did add krb5.config to classpath as well as define KRB5_CONFIG




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kerberos-setup-in-Apache-spark-connecting-to-remote-HDFS-Yarn-tp27181p27183.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



converting timestamp from UTC to many time zones

2016-06-16 Thread ericjhilton
This is using python with Spark 1.6.1 and dataframes.

I have timestamps in UTC that I want to convert to local time, but a given
row could be in any of several timezones. I have an 'offset' value (or
alternately, the local timezone abbreviation. I can adjust all the
timestamps to a single zone or with a single offset easily enough, but I
can't figure out how to make the adjustment dependent on the 'offset' or
'tz' column.

There appear to be 2 main ways of adjusting a timestamp: using the
'INTERVAL' method, or using pyspark.sql.from_utc_timestamp.

Here's an example:
---

data = [ ("2015-01-01 23:59:59", "2015-01-02 00:01:02", 1, 300,"MST"),
("2015-01-02 23:00:00", "2015-01-02 23:59:59", 2, 60,"EST"),
("2015-01-02 22:59:58", "2015-01-02 23:59:59", 3, 120,"EST"),
("2015-03-02 15:59:58", "2015-01-02 23:59:59", 4, 120,"PST"),
("2015-03-16 15:15:58", "2015-01-02 23:59:59", 5, 120,"PST"),
("2015-10-02 18:59:58", "2015-01-02 23:59:59", 4, 120,"PST"),
("2015-11-16 18:58:58", "2015-01-02 23:59:59", 5, 120,"PST"),
("2015-03-02 15:59:58", "2015-01-02 23:59:59", 4, 120,"MST"),
("2015-03-16 15:15:58", "2015-01-02 23:59:59", 5, 120,"MST"),
("2015-10-02 18:59:58", "2015-01-02 23:59:59", 4, 120,"MST"),
("2015-11-16 18:58:58", "2015-01-02 23:59:59", 5, 120,"MST"),]

df = sqlCtx.createDataFrame(data, ["start_time", "end_time",
"id","offset","tz"])
from pyspark.sql import functions as F

df.withColumn('testthis', F.from_utc_timestamp(df.start_time, "PST")).show()
df.withColumn('testThat', df.start_time.cast("timestamp") - F.expr("INTERVAL
50 MINUTES")).show()


those last 2 lines work as expected, but I want to replace "PST" with the
df.tz column or use the df.offset column with INTERVAL 


Here's the error I get. Is there a workaround to this?

---
TypeError Traceback (most recent call last)
 in ()
> 1 df.withColumn('testthis', F.from_utc_timestamp(df.start_time,
df.tz)).show()

/opt/spark-1.6.1/python/pyspark/sql/functions.py in
from_utc_timestamp(timestamp, tz)
967 """
968 sc = SparkContext._active_spark_context
--> 969 return
Column(sc._jvm.functions.from_utc_timestamp(_to_java_column(timestamp), tz))
970 
971 

/opt/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in
__call__(self, *args)
796 def __call__(self, *args):
797 if self.converters is not None and len(self.converters) > 0:
--> 798 (new_args, temp_args) = self._get_args(args)
799 else:
800 new_args = args

/opt/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in
_get_args(self, args)
783 for converter in self.gateway_client.converters:
784 if converter.can_convert(arg):
--> 785 temp_arg = converter.convert(arg,
self.gateway_client)
786 temp_args.append(temp_arg)
787 new_args.append(temp_arg)

/opt/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_collections.py in
convert(self, object, gateway_client)
510 HashMap = JavaClass("java.util.HashMap", gateway_client)
511 java_map = HashMap()
--> 512 for key in object.keys():
513 java_map[key] = object[key]
514 return java_map

TypeError: 'Column' object is not callable



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/converting-timestamp-from-UTC-to-many-time-zones-tp27182.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark crashes worker nodes with multiple application starts

2016-06-16 Thread Deepak Goel
I guess what you are saying is:

1. The nodes work perfectly ok without io wait before Spark job.
2. After you have run Spark job and killed it, the io wait persist.

So what it seems, the Spark Job is altering the disk in such a way that
other programs can't access the disk after the spark job is killed. (A
naive thought) I wonder if the spark job fills up the disk so that no other
program on your node could write to it and hence the io wait.

Also facter just normally reads up your system so it shouldn't block your
system. There must be some other background scripts running on your node
which are writing to the disk perhaps..

Hey

Namaskara~Nalama~Guten Tag~Bonjour


   --
Keigu

Deepak
73500 12833
www.simtree.net, dee...@simtree.net
deic...@gmail.com

LinkedIn: www.linkedin.com/in/deicool
Skype: thumsupdeicool
Google talk: deicool
Blog: http://loveandfearless.wordpress.com
Facebook: http://www.facebook.com/deicool

"Contribute to the world, environment and more : http://www.gridrepublic.org
"

On Thu, Jun 16, 2016 at 5:56 PM, Carlile, Ken 
wrote:

> 1. There are 320 nodes in total, with 96 dedicated to Spark. In this
> particular case, 21 are in the Spark cluster. In typical Spark usage, maybe
> 1-3 nodes will crash in a day, with probably an average of 4-5 Spark
> clusters running at a given time. In THIS case, 7-12 nodes will crash
> simultaneously on application termination (not Spark cluster termination,
> but termination of a Spark application/jupyter kernel)
> 2. I’ve turned off puppet, no effect. I’ve not fully disabled facter. The
> iowait persists after the scheduler kills the Spark job (that still works,
> at least)
> 3. He’s attempted to run with 15 cores out of 16 and 25GB of RAM out of
> 128. He still lost nodes.
> 4. He’s currently running storage benchmarking tests, which consist mainly
> of shuffles.
>
> Thanks!
> Ken
>
> On Jun 16, 2016, at 8:00 AM, Deepak Goel  wrote:
>
> I am no expert, but some naive thoughts...
>
> 1. How many HPC nodes do you have? How many of them crash (What do you
> mean by multiple)? Do all of them crash?
>
> 2. What things are you running on Puppet? Can't you switch it off and test
> Spark? Also you can switch of Facter. Btw, your observation that there is
> iowait on these applications might be because they have low priority than
> Spark. Hence they are waiting for Spark to finish. So the real bottleneck
> might be Spark and not these background processes
>
> 3. Limiting cpu's and memory for Spark, might have an inverse effect on
> iowait. As more of Spark processes would have to access the disk due to
> reduced memory and CPU
>
> 4. Offcourse, you might have to give more info on what kind of
> applications you are running on Spark as they might be the main culpirit
>
> Deepak
>
> Hey
>
> Namaskara~Nalama~Guten Tag~Bonjour
>
>
>--
> Keigu
>
> Deepak
> 73500 12833
> www.simtree.net, dee...@simtree.net
> deic...@gmail.com
>
> LinkedIn: www.linkedin.com/in/deicool
> Skype: thumsupdeicool
> Google talk: deicool
> Blog: http://loveandfearless.wordpress.com
> Facebook: http://www.facebook.com/deicool
>
> "Contribute to the world, environment and more :
> http://www.gridrepublic.org
> "
>
> On Thu, Jun 16, 2016 at 5:10 PM, Carlile, Ken 
> wrote:
>
>> We run Spark on a general purpose HPC cluster (using standalone mode and
>> the HPC scheduler), and are currently on Spark 1.6.1. One of the primary
>> users has been testing various storage and other parameters for Spark,
>> which involves doing multiple shuffles and shutting down and starting many
>> applications serially on a single cluster instance. He is using pyspark
>> (via jupyter notebooks). Python version is 2.7.6.
>>
>> We have been seeing multiple HPC node hard locks in this scenario, all at
>> the termination of a jupyter kernel (read Spark application). The symptom
>> is that the load on the node keeps going higher. We have determined this is
>> because of iowait on background processes (namely puppet and facter, clean
>> up scripts, etc). What he sees is that when he starts a new kernel
>> (application), the executor on those nodes will not start. We can no longer
>> ssh into the nodes, and no commands can be run on them; everything goes
>> into iowait. The only solution is to do a hard reset on the nodes.
>>
>> Obviously this is very disruptive, both to us sysadmins and to him. We
>> have a limited number of HPC nodes that are permitted to run spark
>> clusters, so this is a big problem.
>>
>> I have attempted to limit the background processes, but it doesn’t seem
>> to matter; it can be any process that attempts io on the boot drive. He has
>> tried various things (limiting CPU cores used by Spark, reducing the
>> memory, etc.), but we have been unable to find a solution, or really, a
>> cause.
>>
>> Has anyone seen anything like this? Any ideas where to look next?
>>
>> Thanks,
>> Ken
>> 

Re: Kerberos setup in Apache spark connecting to remote HDFS/Yarn

2016-06-16 Thread Ami Khandeshi
Spark 1.6.1; Java 7; Hadoop 2.6

On Thursday, June 16, 2016, Ted Yu  wrote:

> bq. Caused by: KrbException: Cannot locate default realm
>
> Can you show the rest of the stack trace ?
>
> What versions of Spark / Hadoop are you using ?
>
> Which version of Java are you using (local and in cluster) ?
>
> Thanks
>
> On Thu, Jun 16, 2016 at 6:32 AM, akhandeshi  > wrote:
>
>> I am trying to setup my IDE to a scala spark application.  I want to
>> access
>> HDFS files from remote Hadoop server that has Kerberos enabled.  My
>> understanding is I should be able to do that from Spark.  Here is my code
>> so
>> far:
>>
>> val sparkConf = new SparkConf().setAppName(appName).setMaster(master);
>>
>> if(jars.length>0) {
>> sparkConf.setJars(jars);
>> }
>>
>> if(!properties.isEmpty) {
>> //val iter = properties.keys.iterator
>> for((k,v)<-properties)
>> sparkConf.set(k, v);
>> } else {
>> sparkConf
>> .set("spark.executor.memory", "1024m")
>> .set("spark.cores.max", "1")
>> .set("spark.default.parallelism", "4");
>> }
>>
>> try {
>> if(!StringUtils.isBlank(principal) &&
>> !StringUtils.isBlank(keytab)) {
>> //UserGroupInformation.setConfiguration(config);
>>
>> UserGroupInformation.loginUserFromKeytab(principal, keytab);
>> }
>> } catch  {
>>   case ioe:IOException =>{
>> println("Failed to login to Hadoop [principal = "
>> + principal + ", keytab
>> = " + keytab + "]");
>> ioe.printStackTrace();}
>> }
>>  val sc = new SparkContext(sparkConf)
>>val MY_FILE: String =
>> "hdfs://remoteserver:port/file.out"
>>val rDD = sc.textFile(MY_FILE,10)
>>println("Lines "+rDD.count);
>>
>> I have core-site.xml in my classpath.  I changed hadoop.ssl.enabled to
>> false
>> as it was expecting a secret key.  The principal I am using is correct.  I
>> tried username/_HOST@fully.qualified.domain and
>> username@fully.qualified.domain with no success.  I tried running spark
>> in
>> local mode and yarn client mode.   I am hoping someone has a recipe/solved
>> this problem.  Any pointers to help setup/debug this problem will be
>> helpful.
>>
>> I am getting following error message:
>>
>> Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
>> at
>>
>> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
>> at
>>
>> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:227)
>> at
>>
>> org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:249)
>> at
>> org.apache.spark.examples.SparkYarn$.launchClient(SparkYarn.scala:55)
>> at org.apache.spark.examples.SparkYarn$.main(SparkYarn.scala:83)
>> at org.apache.spark.examples.SparkYarn.main(SparkYarn.scala)
>> ... 6 more
>> Caused by: java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>>
>> org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm(KerberosUtil.java:75)
>> at
>>
>> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:63)
>> ... 11 more
>> Caused by: KrbException: Cannot locate default realm
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kerberos-setup-in-Apache-spark-connecting-to-remote-HDFS-Yarn-tp27181.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
>>
>>
>


Re: What is the interpretation of Cores in Spark doc

2016-06-16 Thread Deepak Goel
Just wondering, if threads were purely an hardware implementation then if
my application in Java had one thread, and it was ran on a multcore machine
then that thread in Java could be split up into small parts and ran in
different cores simultaneously. However this would raise synchronization
problems.

So while it is true, threads are implemented at hardware level, but there
are threads also at software level.


Hey

Namaskara~Nalama~Guten Tag~Bonjour


   --
Keigu

Deepak
73500 12833
www.simtree.net, dee...@simtree.net
deic...@gmail.com

LinkedIn: www.linkedin.com/in/deicool
Skype: thumsupdeicool
Google talk: deicool
Blog: http://loveandfearless.wordpress.com
Facebook: http://www.facebook.com/deicool

"Contribute to the world, environment and more : http://www.gridrepublic.org
"

On Thu, Jun 16, 2016 at 8:15 PM, Mich Talebzadeh 
wrote:

> Thanks all.
>
> I think we are diverging but IMO it is a worthwhile discussion
>
> Actually, threads are a hardware implementation - hence the whole notion
> of “multi-threaded cores”.   What happens is that the cores often have
> duplicate registers, etc. for holding execution state.   While it is
> correct that only a single process is executing at a time, a single core
> will have execution states of multiple processes preserved in these
> registers. In addition, it is the core (not the OS) that determines when
> the thread is executed. The approach often varies according to the CPU
> manufacturer, but the most simple approach is when one thread of execution
> executes a multi-cycle operation (e.g. a fetch from main memory, etc.), the
> core simply stops processing that thread saves the execution state to a set
> of registers, loads instructions from the other set of registers and goes
> on.  On the Oracle SPARC chips, it will actually check the next thread to
> see if the reason it was ‘parked’ has completed and if not, skip it for the
> subsequent thread. The OS is only aware of what are cores and what are
> logical processors - and dispatches accordingly.  *Execution is up to the
> cores*. .
>
> Cheers
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 16 June 2016 at 13:02, Robin East  wrote:
>
>> Mich
>>
>> >> A core may have one or more threads
>> It would be more accurate to say that a core could *run* one or more
>> threads scheduled for execution. Threads are a software/OS concept that
>> represent executable code that is scheduled to run by the OS; A CPU, core
>> or virtual core/virtual processor execute that code. Threads are not CPUs
>> or cores whether physical or logical - any Spark documentation that implies
>> this is mistaken. I’ve looked at the documentation you mention and I don’t
>> read it to mean that threads are logical processors.
>>
>> To go back to your original question, if you set local[6] and you have 12
>> logical processors then you are likely to have half your CPU resources
>> unused by Spark.
>>
>>
>> On 15 Jun 2016, at 23:08, Mich Talebzadeh 
>> wrote:
>>
>> I think it is slightly more than that.
>>
>> These days  software is licensed by core (generally speaking).   That is
>> the physical processor.   * A core may have one or more threads - or
>> logical processors*. Virtualization adds some fun to the mix.
>> Generally what they present is ‘virtual processors’.   What that equates to
>> depends on the virtualization layer itself.   In some simpler VM’s - it is
>> virtual=logical.   In others, virtual=logical but they are constrained to
>> be from the same cores - e.g. if you get 6 virtual processors, it really is
>> 3 full cores with 2 threads each.   Rational is due to the way OS
>> dispatching works on ‘logical’ processors vs. cores and POSIX threaded
>> applications.
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 13 June 2016 at 18:17, Mark Hamstra  wrote:
>>
>>> I don't know what documentation you were referring to, but this is
>>> clearly an erroneous statement: "Threads are virtual cores."  At best it is
>>> terminology abuse by a hardware manufacturer.  Regardless, Spark can't get
>>> too concerned about how any particular hardware vendor wants to refer to
>>> the specific components of their CPU architecture.  For us, a core is a
>>> logical execution unit, something on which a thread of execution can run.
>>> That can map in different ways to different physical or virtual hardware.
>>>
>>> On Mon, Jun 13, 2016 at 12:02 AM, Mich Talebzadeh <
>>> 

Unsubscribe

2016-06-16 Thread Sanjeev Sagar
Unsubscribe 

Sent from my iPhone

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: difference between dataframe and dataframwrite

2016-06-16 Thread David Newberger
DataFrame is a collection of data which is organized into named columns.
DataFrame.write is an interface for saving the contents of a DataFrame to 
external storage.

Hope this helps

David Newberger


From: pseudo oduesp [mailto:pseudo20...@gmail.com]
Sent: Thursday, June 16, 2016 9:43 AM
To: user@spark.apache.org
Subject: difference between dataframe and dataframwrite

hi,

what is difference between dataframe and dataframwrite ?



difference between dataframe and dataframwrite

2016-06-16 Thread pseudo oduesp
hi,

what is difference between dataframe and dataframwrite ?


Re: What is the interpretation of Cores in Spark doc

2016-06-16 Thread Mich Talebzadeh
Thanks all.

I think we are diverging but IMO it is a worthwhile discussion

Actually, threads are a hardware implementation - hence the whole notion of
“multi-threaded cores”.   What happens is that the cores often have
duplicate registers, etc. for holding execution state.   While it is
correct that only a single process is executing at a time, a single core
will have execution states of multiple processes preserved in these
registers. In addition, it is the core (not the OS) that determines when
the thread is executed. The approach often varies according to the CPU
manufacturer, but the most simple approach is when one thread of execution
executes a multi-cycle operation (e.g. a fetch from main memory, etc.), the
core simply stops processing that thread saves the execution state to a set
of registers, loads instructions from the other set of registers and goes
on.  On the Oracle SPARC chips, it will actually check the next thread to
see if the reason it was ‘parked’ has completed and if not, skip it for the
subsequent thread. The OS is only aware of what are cores and what are
logical processors - and dispatches accordingly.  *Execution is up to the
cores*. .

Cheers



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 16 June 2016 at 13:02, Robin East  wrote:

> Mich
>
> >> A core may have one or more threads
> It would be more accurate to say that a core could *run* one or more
> threads scheduled for execution. Threads are a software/OS concept that
> represent executable code that is scheduled to run by the OS; A CPU, core
> or virtual core/virtual processor execute that code. Threads are not CPUs
> or cores whether physical or logical - any Spark documentation that implies
> this is mistaken. I’ve looked at the documentation you mention and I don’t
> read it to mean that threads are logical processors.
>
> To go back to your original question, if you set local[6] and you have 12
> logical processors then you are likely to have half your CPU resources
> unused by Spark.
>
>
> On 15 Jun 2016, at 23:08, Mich Talebzadeh 
> wrote:
>
> I think it is slightly more than that.
>
> These days  software is licensed by core (generally speaking).   That is
> the physical processor.   * A core may have one or more threads - or
> logical processors*. Virtualization adds some fun to the mix.   Generally
> what they present is ‘virtual processors’.   What that equates to depends
> on the virtualization layer itself.   In some simpler VM’s - it is
> virtual=logical.   In others, virtual=logical but they are constrained to
> be from the same cores - e.g. if you get 6 virtual processors, it really is
> 3 full cores with 2 threads each.   Rational is due to the way OS
> dispatching works on ‘logical’ processors vs. cores and POSIX threaded
> applications.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 13 June 2016 at 18:17, Mark Hamstra  wrote:
>
>> I don't know what documentation you were referring to, but this is
>> clearly an erroneous statement: "Threads are virtual cores."  At best it is
>> terminology abuse by a hardware manufacturer.  Regardless, Spark can't get
>> too concerned about how any particular hardware vendor wants to refer to
>> the specific components of their CPU architecture.  For us, a core is a
>> logical execution unit, something on which a thread of execution can run.
>> That can map in different ways to different physical or virtual hardware.
>>
>> On Mon, Jun 13, 2016 at 12:02 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> It is not the issue of testing anything. I was referring to
>>> documentation that clearly use the term "threads". As I said and showed
>>> before, one line is using the term "thread" and the next one "logical
>>> cores".
>>>
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 12 June 2016 at 23:57, Daniel Darabos <
>>> daniel.dara...@lynxanalytics.com> wrote:
>>>
 Spark is a software product. In software a "core" is something that a
 process can run on. So it's a "virtual core". (Do not call these "threads".
 A "thread" is not something a process can run on.)

 local[*] uses java.lang.Runtime.availableProcessors()
 

Re: Kerberos setup in Apache spark connecting to remote HDFS/Yarn

2016-06-16 Thread Ted Yu
bq. Caused by: KrbException: Cannot locate default realm

Can you show the rest of the stack trace ?

What versions of Spark / Hadoop are you using ?

Which version of Java are you using (local and in cluster) ?

Thanks

On Thu, Jun 16, 2016 at 6:32 AM, akhandeshi  wrote:

> I am trying to setup my IDE to a scala spark application.  I want to access
> HDFS files from remote Hadoop server that has Kerberos enabled.  My
> understanding is I should be able to do that from Spark.  Here is my code
> so
> far:
>
> val sparkConf = new SparkConf().setAppName(appName).setMaster(master);
>
> if(jars.length>0) {
> sparkConf.setJars(jars);
> }
>
> if(!properties.isEmpty) {
> //val iter = properties.keys.iterator
> for((k,v)<-properties)
> sparkConf.set(k, v);
> } else {
> sparkConf
> .set("spark.executor.memory", "1024m")
> .set("spark.cores.max", "1")
> .set("spark.default.parallelism", "4");
> }
>
> try {
> if(!StringUtils.isBlank(principal) &&
> !StringUtils.isBlank(keytab)) {
> //UserGroupInformation.setConfiguration(config);
>
> UserGroupInformation.loginUserFromKeytab(principal, keytab);
> }
> } catch  {
>   case ioe:IOException =>{
> println("Failed to login to Hadoop [principal = "
> + principal + ", keytab
> = " + keytab + "]");
> ioe.printStackTrace();}
> }
>  val sc = new SparkContext(sparkConf)
>val MY_FILE: String =
> "hdfs://remoteserver:port/file.out"
>val rDD = sc.textFile(MY_FILE,10)
>println("Lines "+rDD.count);
>
> I have core-site.xml in my classpath.  I changed hadoop.ssl.enabled to
> false
> as it was expecting a secret key.  The principal I am using is correct.  I
> tried username/_HOST@fully.qualified.domain and
> username@fully.qualified.domain with no success.  I tried running spark in
> local mode and yarn client mode.   I am hoping someone has a recipe/solved
> this problem.  Any pointers to help setup/debug this problem will be
> helpful.
>
> I am getting following error message:
>
> Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
> at
>
> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
> at
>
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:227)
> at
>
> org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:249)
> at
> org.apache.spark.examples.SparkYarn$.launchClient(SparkYarn.scala:55)
> at org.apache.spark.examples.SparkYarn$.main(SparkYarn.scala:83)
> at org.apache.spark.examples.SparkYarn.main(SparkYarn.scala)
> ... 6 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
>
> org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm(KerberosUtil.java:75)
> at
>
> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:63)
> ... 11 more
> Caused by: KrbException: Cannot locate default realm
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Kerberos-setup-in-Apache-spark-connecting-to-remote-HDFS-Yarn-tp27181.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: cache datframe

2016-06-16 Thread Alexey Pechorin
What's the reason for your first cache call? It looks like you've used the
data only once to transform it without reusing the data, so there's no
reason for the first cache call, and you need only the second call (and
that also depends on the rest of your code).

On Thu, Jun 16, 2016 at 3:17 PM, pseudo oduesp 
wrote:

> hi,
> if i cache same data frame and transforme and add collumns i should cache
> second times
>
> df.cache()
>
>   transforamtion
>   add new columns
>
> df.cache()
> ?
>
>


Re: advise please

2016-06-16 Thread pseudo oduesp
hi ,
 i use pyspark 1.5.0 on yarn cluster with 19 nodes and 200 GO
and 4 cores eache (include  driver)

2016-06-16 15:42 GMT+02:00 pseudo oduesp :

> Hi ,
> who i can dummies large set of columns with  STRINGindexer fast ?
> becasue i tested with 89 values and eache one had 10 max distinct  values
> and that take
> lot of time
> thanks
>


advise please

2016-06-16 Thread pseudo oduesp
Hi ,
who i can dummies large set of columns with  STRINGindexer fast ?
becasue i tested with 89 values and eache one had 10 max distinct  values
and that take
lot of time
thanks


Kerberos setup in Apache spark connecting to remote HDFS/Yarn

2016-06-16 Thread akhandeshi
I am trying to setup my IDE to a scala spark application.  I want to access
HDFS files from remote Hadoop server that has Kerberos enabled.  My
understanding is I should be able to do that from Spark.  Here is my code so
far:

val sparkConf = new SparkConf().setAppName(appName).setMaster(master);

if(jars.length>0) {
sparkConf.setJars(jars);
}

if(!properties.isEmpty) {
//val iter = properties.keys.iterator
for((k,v)<-properties)
sparkConf.set(k, v);
} else {
sparkConf
.set("spark.executor.memory", "1024m")
.set("spark.cores.max", "1")
.set("spark.default.parallelism", "4");
}

try {
if(!StringUtils.isBlank(principal) && 
!StringUtils.isBlank(keytab)) {
//UserGroupInformation.setConfiguration(config);

UserGroupInformation.loginUserFromKeytab(principal, keytab);
}
} catch  {
  case ioe:IOException =>{
println("Failed to login to Hadoop [principal = " + 
principal + ", keytab
= " + keytab + "]");
ioe.printStackTrace();}
}
 val sc = new SparkContext(sparkConf)
   val MY_FILE: String = "hdfs://remoteserver:port/file.out"
   val rDD = sc.textFile(MY_FILE,10)
   println("Lines "+rDD.count);

I have core-site.xml in my classpath.  I changed hadoop.ssl.enabled to false
as it was expecting a secret key.  The principal I am using is correct.  I
tried username/_HOST@fully.qualified.domain and
username@fully.qualified.domain with no success.  I tried running spark in
local mode and yarn client mode.   I am hoping someone has a recipe/solved
this problem.  Any pointers to help setup/debug this problem will be
helpful.

I am getting following error message:

Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
at
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:227)
at
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:249)
at org.apache.spark.examples.SparkYarn$.launchClient(SparkYarn.scala:55)
at org.apache.spark.examples.SparkYarn$.main(SparkYarn.scala:83)
at org.apache.spark.examples.SparkYarn.main(SparkYarn.scala)
... 6 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm(KerberosUtil.java:75)
at
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:63)
... 11 more
Caused by: KrbException: Cannot locate default realm



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kerberos-setup-in-Apache-spark-connecting-to-remote-HDFS-Yarn-tp27181.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: streaming example has error

2016-06-16 Thread David Newberger
Try adding wordCounts.print() before ssc.start()


David Newberger

From: Lee Ho Yeung [mailto:jobmatt...@gmail.com]
Sent: Wednesday, June 15, 2016 9:16 PM
To: David Newberger
Cc: user@spark.apache.org
Subject: Re: streaming example has error

got another error StreamingContext: Error starting the context, marking it as 
stopped

/home/martin/Downloads/spark-1.6.1/bin/spark-shell --packages 
com.databricks:spark-csv_2.11:1.4.0
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount").set("spark.driver.allowMultipleContexts",
 "true")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", )
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
ssc.start()
ssc.awaitTermination()



scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = 
org.apache.spark.streaming.dstream.MappedDStream@61a5e7

scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = 
org.apache.spark.streaming.dstream.ShuffledDStream@a522f1

scala> ssc.start()
16/06/15 19:14:10 ERROR StreamingContext: Error starting the context, marking 
it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations 
registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at 
org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at 
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:44)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:46)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:48)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:50)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:52)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:54)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56)
at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58)
at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)
at $line42.$read$$iwC$$iwC$$iwC$$iwC.(:62)
at $line42.$read$$iwC$$iwC$$iwC.(:64)
at $line42.$read$$iwC$$iwC.(:66)
at $line42.$read$$iwC.(:68)
at $line42.$read.(:70)
at $line42.$read$.(:74)
at $line42.$read$.()
at $line42.$eval$.(:7)
at $line42.$eval$.()
at $line42.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at 

Re: Spark Memory Error - Not enough space to cache broadcast

2016-06-16 Thread Deepak Goel
What is your hardware configuration like which you are running Spark on?

Hey

Namaskara~Nalama~Guten Tag~Bonjour


   --
Keigu

Deepak
73500 12833
www.simtree.net, dee...@simtree.net
deic...@gmail.com

LinkedIn: www.linkedin.com/in/deicool
Skype: thumsupdeicool
Google talk: deicool
Blog: http://loveandfearless.wordpress.com
Facebook: http://www.facebook.com/deicool

"Contribute to the world, environment and more : http://www.gridrepublic.org
"

On Thu, Jun 16, 2016 at 5:33 PM, Jacek Laskowski  wrote:

> Hi,
>
> What do you see under Executors and Details for Stage (for the
> affected stages)? Anything weird memory-related?
>
> How does your "I am reading data from Kafka into Spark and writing it
> into Cassandra after processing it." pipeline look like?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Mon, Jun 13, 2016 at 11:56 PM, Cassa L  wrote:
> > Hi,
> >
> > I'm using spark 1.5.1 version. I am reading data from Kafka into Spark
> and
> > writing it into Cassandra after processing it. Spark job starts fine and
> > runs all good for some time until I start getting below errors. Once
> these
> > errors come, job start to lag behind and I see that job has scheduling
> and
> > processing delays in streaming  UI.
> >
> > Worker memory is 6GB, executor-memory is 5GB, I also tried to tweak
> > memoryFraction parameters. Nothing works.
> >
> >
> > 16/06/13 21:26:02 INFO MemoryStore: ensureFreeSpace(4044) called with
> > curMem=565394, maxMem=2778495713
> > 16/06/13 21:26:02 INFO MemoryStore: Block broadcast_69652_piece0 stored
> as
> > bytes in memory (estimated size 3.9 KB, free 2.6 GB)
> > 16/06/13 21:26:02 INFO TorrentBroadcast: Reading broadcast variable 69652
> > took 2 ms
> > 16/06/13 21:26:02 WARN MemoryStore: Failed to reserve initial memory
> > threshold of 1024.0 KB for computing block broadcast_69652 in memory.
> > 16/06/13 21:26:02 WARN MemoryStore: Not enough space to cache
> > broadcast_69652 in memory! (computed 496.0 B so far)
> > 16/06/13 21:26:02 INFO MemoryStore: Memory use = 556.1 KB (blocks) + 2.6
> GB
> > (scratch space shared across 0 tasks(s)) = 2.6 GB. Storage limit = 2.6
> GB.
> > 16/06/13 21:26:02 WARN MemoryStore: Persisting block broadcast_69652 to
> disk
> > instead.
> > 16/06/13 21:26:02 INFO BlockManager: Found block rdd_100761_1 locally
> > 16/06/13 21:26:02 INFO Executor: Finished task 0.0 in stage 71577.0 (TID
> > 452316). 2043 bytes result sent to driver
> >
> >
> > Thanks,
> >
> > L
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark crashes worker nodes with multiple application starts

2016-06-16 Thread Carlile, Ken



1. There are 320 nodes in total, with 96 dedicated to Spark. In this particular case, 21 are in the Spark cluster. In typical Spark usage, maybe 1-3 nodes will crash in a day, with probably an average of 4-5 Spark clusters running at a given time. In THIS case,
 7-12 nodes will crash simultaneously on application termination (not Spark cluster termination, but termination of a Spark application/jupyter kernel)
2. I’ve turned off puppet, no effect. I’ve not fully disabled facter. The iowait persists after the scheduler kills the Spark job (that still works, at least)
3. He’s attempted to run with 15 cores out of 16 and 25GB of RAM out of 128. He still lost nodes. 
4. He’s currently running storage benchmarking tests, which consist mainly of shuffles. 


Thanks!
Ken


On Jun 16, 2016, at 8:00 AM, Deepak Goel  wrote:


I am no expert, but some naive thoughts...



1. How many HPC nodes do you have? How many of them crash (What do you mean by multiple)? Do all of them crash?


2. What things are you running on Puppet? Can't you switch it off and test Spark? Also you can switch of Facter. Btw, your observation that there is iowait on these applications might be because they have low priority than Spark. Hence they are
 waiting for Spark to finish. So the real bottleneck might be Spark and not these background processes


3. Limiting cpu's and memory for Spark, might have an inverse effect on iowait. As more of Spark processes would have to access the disk due to reduced memory and CPU


4. Offcourse, you might have to give more info on what kind of applications you are running on Spark as they might be the main culpirit



Deepak





Hey

Namaskara~Nalama~Guten Tag~Bonjour


   -- 
Keigu

Deepak
73500 12833
www.simtree.net, 
dee...@simtree.net
deic...@gmail.com

LinkedIn: www.linkedin.com/in/deicool
Skype: thumsupdeicool
Google talk: deicool
Blog: http://loveandfearless.wordpress.com
Facebook: http://www.facebook.com/deicool

"Contribute to the world, environment and more : 
http://www.gridrepublic.org
"






On Thu, Jun 16, 2016 at 5:10 PM, Carlile, Ken 
 wrote:

We run Spark on a general purpose HPC cluster (using standalone mode and the HPC scheduler), and are currently on Spark 1.6.1. One of the primary users has been testing various storage and other parameters for Spark, which involves doing multiple shuffles and
 shutting down and starting many applications serially on a single cluster instance. He is using pyspark (via jupyter notebooks). Python version is 2.7.6.

We have been seeing multiple HPC node hard locks in this scenario, all at the termination of a jupyter kernel (read Spark application). The symptom is that the load on the node keeps going higher. We have determined this is because of iowait on background processes
 (namely puppet and facter, clean up scripts, etc). What he sees is that when he starts a new kernel (application), the executor on those nodes will not start. We can no longer ssh into the nodes, and no commands can be run on them; everything goes into iowait.
 The only solution is to do a hard reset on the nodes.

Obviously this is very disruptive, both to us sysadmins and to him. We have a limited number of HPC nodes that are permitted to run spark clusters, so this is a big problem.

I have attempted to limit the background processes, but it doesn’t seem to matter; it can be any process that attempts io on the boot drive. He has tried various things (limiting CPU cores used by Spark, reducing the memory, etc.), but we have been unable to
 find a solution, or really, a cause.

Has anyone seen anything like this? Any ideas where to look next?

Thanks,
Ken
-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org















Re: [scala-user] ERROR TaskResultGetter: Exception while getting task result java.io.IOException: java.lang.ClassNotFoundException: scala.Some

2016-06-16 Thread Oliver Ruebenacker
 Hello,

  It would be useful to see the code that throws the exception. It probably
means that the Scala standard library is not being uploaded to the
executers. Try adding the Scala standard library to the SBT file
("org.scala-lang" % "scala-library" % "2.10.3"), or check your
configuration. Also, did you launch using spark-submit?

 Best, Oliver

On Wed, Jun 15, 2016 at 4:16 PM,  wrote:

> Hello,
>
> I am building package for spark application with the following sbt file:
>
> name := "Simple Project"
>
> version := "1.0"
>
> scalaVersion := "2.10.3"
>
> libraryDependencies ++= Seq(
>   "org.apache.spark"  %% "spark-core"  % "1.4.0" % "provided",
>   "org.apache.spark"  %% "spark-mllib" % "1.4.0",
>   "org.apache.spark"  %% "spark-sql"   % "1.4.0",
>   "org.apache.spark"  %% "spark-sql"   % "1.4.0"
>   )
> resolvers += "Akka Repository" at "http://repo.akka.io/releases/;
>
> I am getting TaskResultGetter error with ClassNotFoundException for
> scala.Some .
>
> Can I please get some help how to fix it?
>
> Thanks,
> S. Sarkar
>
> --
> You received this message because you are subscribed to the Google Groups
> "scala-user" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to scala-user+unsubscr...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Oliver Ruebenacker
Senior Software Engineer, Diabetes Portal
, Broad Institute



unsubscribe

2016-06-16 Thread Marco Platania
unsubscribe

cache datframe

2016-06-16 Thread pseudo oduesp
hi,
if i cache same data frame and transforme and add collumns i should cache
second times

df.cache()

  transforamtion
  add new columns

df.cache()
?


Re: Spark Memory Error - Not enough space to cache broadcast

2016-06-16 Thread Jacek Laskowski
Hi,

What do you see under Executors and Details for Stage (for the
affected stages)? Anything weird memory-related?

How does your "I am reading data from Kafka into Spark and writing it
into Cassandra after processing it." pipeline look like?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Mon, Jun 13, 2016 at 11:56 PM, Cassa L  wrote:
> Hi,
>
> I'm using spark 1.5.1 version. I am reading data from Kafka into Spark and
> writing it into Cassandra after processing it. Spark job starts fine and
> runs all good for some time until I start getting below errors. Once these
> errors come, job start to lag behind and I see that job has scheduling and
> processing delays in streaming  UI.
>
> Worker memory is 6GB, executor-memory is 5GB, I also tried to tweak
> memoryFraction parameters. Nothing works.
>
>
> 16/06/13 21:26:02 INFO MemoryStore: ensureFreeSpace(4044) called with
> curMem=565394, maxMem=2778495713
> 16/06/13 21:26:02 INFO MemoryStore: Block broadcast_69652_piece0 stored as
> bytes in memory (estimated size 3.9 KB, free 2.6 GB)
> 16/06/13 21:26:02 INFO TorrentBroadcast: Reading broadcast variable 69652
> took 2 ms
> 16/06/13 21:26:02 WARN MemoryStore: Failed to reserve initial memory
> threshold of 1024.0 KB for computing block broadcast_69652 in memory.
> 16/06/13 21:26:02 WARN MemoryStore: Not enough space to cache
> broadcast_69652 in memory! (computed 496.0 B so far)
> 16/06/13 21:26:02 INFO MemoryStore: Memory use = 556.1 KB (blocks) + 2.6 GB
> (scratch space shared across 0 tasks(s)) = 2.6 GB. Storage limit = 2.6 GB.
> 16/06/13 21:26:02 WARN MemoryStore: Persisting block broadcast_69652 to disk
> instead.
> 16/06/13 21:26:02 INFO BlockManager: Found block rdd_100761_1 locally
> 16/06/13 21:26:02 INFO Executor: Finished task 0.0 in stage 71577.0 (TID
> 452316). 2043 bytes result sent to driver
>
>
> Thanks,
>
> L

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: What is the interpretation of Cores in Spark doc

2016-06-16 Thread Robin East
Mich

>> A core may have one or more threads
It would be more accurate to say that a core could run one or more threads 
scheduled for execution. Threads are a software/OS concept that represent 
executable code that is scheduled to run by the OS; A CPU, core or virtual 
core/virtual processor execute that code. Threads are not CPUs or cores whether 
physical or logical - any Spark documentation that implies this is mistaken. 
I’ve looked at the documentation you mention and I don’t read it to mean that 
threads are logical processors.

To go back to your original question, if you set local[6] and you have 12 
logical processors then you are likely to have half your CPU resources unused 
by Spark.


> On 15 Jun 2016, at 23:08, Mich Talebzadeh  wrote:
> 
> I think it is slightly more than that.
> 
> These days  software is licensed by core (generally speaking).   That is the 
> physical processor.A core may have one or more threads - or logical 
> processors. Virtualization adds some fun to the mix.   Generally what they 
> present is ‘virtual processors’.   What that equates to depends on the 
> virtualization layer itself.   In some simpler VM’s - it is virtual=logical.  
>  In others, virtual=logical but they are constrained to be from the same 
> cores - e.g. if you get 6 virtual processors, it really is 3 full cores with 
> 2 threads each.   Rational is due to the way OS dispatching works on 
> ‘logical’ processors vs. cores and POSIX threaded applications.
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 13 June 2016 at 18:17, Mark Hamstra  > wrote:
> I don't know what documentation you were referring to, but this is clearly an 
> erroneous statement: "Threads are virtual cores."  At best it is terminology 
> abuse by a hardware manufacturer.  Regardless, Spark can't get too concerned 
> about how any particular hardware vendor wants to refer to the specific 
> components of their CPU architecture.  For us, a core is a logical execution 
> unit, something on which a thread of execution can run.  That can map in 
> different ways to different physical or virtual hardware. 
> 
> On Mon, Jun 13, 2016 at 12:02 AM, Mich Talebzadeh  > wrote:
> Hi,
> 
> It is not the issue of testing anything. I was referring to documentation 
> that clearly use the term "threads". As I said and showed before, one line is 
> using the term "thread" and the next one "logical cores".
> 
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 12 June 2016 at 23:57, Daniel Darabos  > wrote:
> Spark is a software product. In software a "core" is something that a process 
> can run on. So it's a "virtual core". (Do not call these "threads". A 
> "thread" is not something a process can run on.)
> 
> local[*] uses java.lang.Runtime.availableProcessors() 
> .
>  Since Java is software, this also returns the number of virtual cores. (You 
> can test this easily.)
> 
> 
> On Sun, Jun 12, 2016 at 9:23 PM, Mich Talebzadeh  > wrote:
> 
> Hi,
> 
> I was writing some docs on Spark P and came across this.
> 
> It is about the terminology or interpretation of that in Spark doc.
> 
> This is my understanding of cores and threads.
> 
>  Cores are physical cores. Threads are virtual cores. Cores with 2 threads is 
> called hyper threading technology so 2 threads per core makes the core work 
> on two loads at same time. In other words, every thread takes care of one 
> load.
> 
> Core has its own memory. So if you have a dual core with hyper threading, the 
> core works with 2 loads each at same time because of the 2 threads per core, 
> but this 2 threads will share memory in that core.
> 
> Some vendors as I am sure most of you aware charge licensing per core.
> 
> For example on the same host that I have Spark, I have a SAP product that 
> checks the licensing and shuts the application down if the license does not 
> agree with the cores speced.
> 
> This is what it says
> 
> ./cpuinfo
> License hostid:00e04c69159a 0050b60fd1e7
> Detected 12 logical processor(s), 6 core(s), in 1 chip(s)
> 
> So here I have 12 logical 

Re: Spark crashes worker nodes with multiple application starts

2016-06-16 Thread Deepak Goel
I am no expert, but some naive thoughts...

1. How many HPC nodes do you have? How many of them crash (What do you mean
by multiple)? Do all of them crash?

2. What things are you running on Puppet? Can't you switch it off and test
Spark? Also you can switch of Facter. Btw, your observation that there is
iowait on these applications might be because they have low priority than
Spark. Hence they are waiting for Spark to finish. So the real bottleneck
might be Spark and not these background processes

3. Limiting cpu's and memory for Spark, might have an inverse effect on
iowait. As more of Spark processes would have to access the disk due to
reduced memory and CPU

4. Offcourse, you might have to give more info on what kind of applications
you are running on Spark as they might be the main culpirit

Deepak

Hey

Namaskara~Nalama~Guten Tag~Bonjour


   --
Keigu

Deepak
73500 12833
www.simtree.net, dee...@simtree.net
deic...@gmail.com

LinkedIn: www.linkedin.com/in/deicool
Skype: thumsupdeicool
Google talk: deicool
Blog: http://loveandfearless.wordpress.com
Facebook: http://www.facebook.com/deicool

"Contribute to the world, environment and more : http://www.gridrepublic.org
"

On Thu, Jun 16, 2016 at 5:10 PM, Carlile, Ken 
wrote:

> We run Spark on a general purpose HPC cluster (using standalone mode and
> the HPC scheduler), and are currently on Spark 1.6.1. One of the primary
> users has been testing various storage and other parameters for Spark,
> which involves doing multiple shuffles and shutting down and starting many
> applications serially on a single cluster instance. He is using pyspark
> (via jupyter notebooks). Python version is 2.7.6.
>
> We have been seeing multiple HPC node hard locks in this scenario, all at
> the termination of a jupyter kernel (read Spark application). The symptom
> is that the load on the node keeps going higher. We have determined this is
> because of iowait on background processes (namely puppet and facter, clean
> up scripts, etc). What he sees is that when he starts a new kernel
> (application), the executor on those nodes will not start. We can no longer
> ssh into the nodes, and no commands can be run on them; everything goes
> into iowait. The only solution is to do a hard reset on the nodes.
>
> Obviously this is very disruptive, both to us sysadmins and to him. We
> have a limited number of HPC nodes that are permitted to run spark
> clusters, so this is a big problem.
>
> I have attempted to limit the background processes, but it doesn’t seem to
> matter; it can be any process that attempts io on the boot drive. He has
> tried various things (limiting CPU cores used by Spark, reducing the
> memory, etc.), but we have been unable to find a solution, or really, a
> cause.
>
> Has anyone seen anything like this? Any ideas where to look next?
>
> Thanks,
> Ken
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to deal with tasks running too long?

2016-06-16 Thread Jacek Laskowski
Hi,

I'd check Details for Stage page in web UI.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Jun 16, 2016 at 6:45 AM, Utkarsh Sengar  wrote:
> This SO question was asked about 1yr ago.
> http://stackoverflow.com/questions/31799755/how-to-deal-with-tasks-running-too-long-comparing-to-others-in-job-in-yarn-cli
>
> I answered this question with a suggestion to try speculation but it doesn't
> quite do what the OP expects. I have been running into this issue more these
> days. Out of 5000 tasks, 4950 completes in 5mins but the last 50 never
> really completes, have tried waiting for 4hrs. This can be a memory issue or
> maybe the way spark's fine grained mode works with mesos, I am trying to
> enable jmxsink to get a heap dump.
>
> But in the mean time, is there a better fix for this? (in any version of
> spark, I am using 1.5.1 but can upgrade). It would be great if the last 50
> tasks in my example can be killed (timed out) and the stage completes
> successfully.
>
> --
> Thanks,
> -Utkarsh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Error Running SparkPi.scala Example

2016-06-16 Thread Jacek Laskowski
Hi,

Before you try to do it inside another environment like an IDE, could
you build Spark using mvn or sbt and only when successful try to run
SparkPi using spark-submit run-example. With that, you could try to
have a complete environment inside your beloved IDE (and I'm very glad
to hear it's IDEA :))

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Jun 16, 2016 at 1:37 AM, Krishna Kalyan
 wrote:
> Hello,
> I am faced with problems when I try to run SparkPi.scala.
> I took the following steps below:
> a) git pull https://github.com/apache/spark
> b) Import the project in Intellij as a maven project
> c) Run 'SparkPi'
>
> Error Below:
> Information:16/06/16 01:34 - Compilation completed with 10 errors and 5
> warnings in 5s 843ms
> Warning:scalac: Class org.jboss.netty.channel.ChannelFactory not found -
> continuing with a stub.
> Warning:scalac: Class org.jboss.netty.channel.ChannelPipelineFactory not
> found - continuing with a stub.
> Warning:scalac: Class org.jboss.netty.handler.execution.ExecutionHandler not
> found - continuing with a stub.
> Warning:scalac: Class org.jboss.netty.channel.group.ChannelGroup not found -
> continuing with a stub.
> Warning:scalac: Class com.google.common.collect.ImmutableMap not found -
> continuing with a stub.
> /Users/krishna/Experiment/spark/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
> Error:(45, 66) not found: type SparkFlumeProtocol
>   val transactionTimeout: Int, val backOffInterval: Int) extends
> SparkFlumeProtocol with Logging {
>  ^
> Error:(70, 39) not found: type EventBatch
>   override def getEventBatch(n: Int): EventBatch = {
>   ^
> Error:(85, 13) not found: type EventBatch
> new EventBatch("Spark sink has been stopped!", "",
> java.util.Collections.emptyList())
> ^
> /Users/krishna/Experiment/spark/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
> Error:(80, 22) not found: type EventBatch
>   def getEventBatch: EventBatch = {
>  ^
> Error:(48, 37) not found: type EventBatch
>   @volatile private var eventBatch: EventBatch = new EventBatch("Unknown
> Error", "",
> ^
> Error:(48, 54) not found: type EventBatch
>   @volatile private var eventBatch: EventBatch = new EventBatch("Unknown
> Error", "",
>  ^
> Error:(115, 41) not found: type SparkSinkEvent
> val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
> ^
> Error:(146, 28) not found: type EventBatch
>   eventBatch = new EventBatch("", seqNum, events)
>^
> /Users/krishna/Experiment/spark/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
> Error:(25, 27) not found: type EventBatch
>   def isErrorBatch(batch: EventBatch): Boolean = {
>   ^
> /Users/krishna/Experiment/spark/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
> Error:(86, 51) not found: type SparkFlumeProtocol
> val responder = new SpecificResponder(classOf[SparkFlumeProtocol],
> handler.get)
>
> Thanks,
> Krishan

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: choice of RDD function

2016-06-16 Thread Sivakumaran S
Hi Jacek and Cody,

First of all, thanks for helping me out.

I started with using combineByKey while testing with just one field. Of course 
it worked fine, but I was worried that the code would become unreadable if 
there were many fields. Which is why I shifted to sqlContext because the code 
is comprehensible. Let me work out the stream statistics and update you in a 
while. 



Regards,

Siva



> On 16-Jun-2016, at 11:29 AM, Jacek Laskowski  wrote:
> 
> Rather
> 
> val df = sqlContext.read.json(rdd)
> 
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
> 
> 
> On Wed, Jun 15, 2016 at 11:55 PM, Sivakumaran S  wrote:
>> Cody,
>> 
>> Are you referring to the  val lines = messages.map(_._2)?
>> 
>> Regards,
>> 
>> Siva
>> 
>>> On 15-Jun-2016, at 10:32 PM, Cody Koeninger  wrote:
>>> 
>>> Doesn't that result in consuming each RDD twice, in order to infer the
>>> json schema?
>>> 
>>> On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S  wrote:
 Of course :)
 
 object sparkStreaming {
 def main(args: Array[String]) {
   StreamingExamples.setStreamingLogLevels() //Set reasonable logging
 levels for streaming if the user has not configured log4j.
   val topics = "test"
   val brokers = "localhost:9092"
   val topicsSet = topics.split(",").toSet
   val sparkConf = new
 SparkConf().setAppName("KafkaDroneCalc").setMaster("local")
 //spark://localhost:7077
   val sc = new SparkContext(sparkConf)
   val ssc = new StreamingContext(sc, Seconds(30))
   val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
   val messages = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
   val lines = messages.map(_._2)
   val sqlContext = new org.apache.spark.sql.SQLContext(sc)
   lines.foreachRDD( rdd => {
 val df = sqlContext.read.json(rdd)
 df.registerTempTable(“drone")
 sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm),
 AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show()
   })
   ssc.start()
   ssc.awaitTermination()
 }
 }
 
 I haven’t checked long running performance though.
 
 Regards,
 
 Siva
 
 On 15-Jun-2016, at 5:02 PM, Jacek Laskowski  wrote:
 
 Hi,
 
 Good to hear so! Mind sharing a few snippets of your solution?
 
 Pozdrawiam,
 Jacek Laskowski
 
 https://medium.com/@jaceklaskowski/
 Mastering Apache Spark http://bit.ly/mastering-apache-spark
 Follow me at https://twitter.com/jaceklaskowski
 
 
 On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S  wrote:
 
 Thanks Jacek,
 
 Job completed!! :) Just used data frames and sql query. Very clean and
 functional code.
 
 Siva
 
 On 15-Jun-2016, at 3:10 PM, Jacek Laskowski  wrote:
 
 mapWithState
 
 
 
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to enable core dump in spark

2016-06-16 Thread Jacek Laskowski
Hi,

Can you make sure that the ulimit settings are applied to the Spark
process? Is this Spark on YARN or Standalone?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Wed, Jun 1, 2016 at 7:55 PM, prateek arora
 wrote:
> Hi
>
> I am using cloudera to  setup spark 1.6.0  on ubuntu 14.04 .
>
> I set core dump limit to unlimited in all nodes .
>Edit  /etc/security/limits.conf file and add  " * soft core unlimited "
> line.
>
> i rechecked  using :  $ ulimit -all
>
> core file size  (blocks, -c) unlimited
> data seg size   (kbytes, -d) unlimited
> scheduling priority (-e) 0
> file size   (blocks, -f) unlimited
> pending signals (-i) 241204
> max locked memory   (kbytes, -l) 64
> max memory size (kbytes, -m) unlimited
> open files  (-n) 1024
> pipe size(512 bytes, -p) 8
> POSIX message queues (bytes, -q) 819200
> real-time priority  (-r) 0
> stack size  (kbytes, -s) 8192
> cpu time   (seconds, -t) unlimited
> max user processes  (-u) 241204
> virtual memory  (kbytes, -v) unlimited
> file locks  (-x) unlimited
>
> but when I am running my spark application with some third party native
> libraries . but it crashes some time and show error " Failed to write core
> dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c
> unlimited" before starting Java again " .
>
> Below are the log :
>
>  A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7fd44b491fb9, pid=20458, tid=140549318547200
> #
> # JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build
> 1.7.0_67-b01)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode
> linux-amd64 compressed oops)
> # Problematic frame:
> # V  [libjvm.so+0x650fb9]  jni_SetByteArrayRegion+0xa9
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # An error report file with more information is saved as:
> #
> /yarn/nm/usercache/master/appcache/application_1462930975871_0004/container_1462930975871_0004_01_66/hs_err_pid20458.log
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.sun.com/bugreport/crash.jsp
> #
>
>
> so how can i enable core dump and save it some place ?
>
> Regards
> Prateek
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-enable-core-dump-in-spark-tp27065.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark crashes worker nodes with multiple application starts

2016-06-16 Thread Carlile, Ken
We run Spark on a general purpose HPC cluster (using standalone mode and the 
HPC scheduler), and are currently on Spark 1.6.1. One of the primary users has 
been testing various storage and other parameters for Spark, which involves 
doing multiple shuffles and shutting down and starting many applications 
serially on a single cluster instance. He is using pyspark (via jupyter 
notebooks). Python version is 2.7.6. 

We have been seeing multiple HPC node hard locks in this scenario, all at the 
termination of a jupyter kernel (read Spark application). The symptom is that 
the load on the node keeps going higher. We have determined this is because of 
iowait on background processes (namely puppet and facter, clean up scripts, 
etc). What he sees is that when he starts a new kernel (application), the 
executor on those nodes will not start. We can no longer ssh into the nodes, 
and no commands can be run on them; everything goes into iowait. The only 
solution is to do a hard reset on the nodes. 

Obviously this is very disruptive, both to us sysadmins and to him. We have a 
limited number of HPC nodes that are permitted to run spark clusters, so this 
is a big problem. 

I have attempted to limit the background processes, but it doesn’t seem to 
matter; it can be any process that attempts io on the boot drive. He has tried 
various things (limiting CPU cores used by Spark, reducing the memory, etc.), 
but we have been unable to find a solution, or really, a cause. 

Has anyone seen anything like this? Any ideas where to look next? 

Thanks, 
Ken
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[YARN] Questions about YARN's queues and Spark's FAIR scheduler

2016-06-16 Thread Jacek Laskowski
Hi,

I'm trying to get my head around the different parts of Spark on YARN
architecture with YARN's schedulers and queues as well as Spark's own
schedulers - FAIR and FIFO.

I'd appreciate if you could read how I see things and correct me where
I'm wrong. Thanks!

The default scheduler in YARN is Capacity Scheduler [1]. It comes with
the notion of queues. When you spark-submit a Spark application with
--master yarn, you can specify --queue for the scheduling queue and it
is **only** to offer the right share of CPUs and memory to the
application. There could be more resources in the cluster, but that
particular queue has only that exact share of vcores and memory.

In other words, Spark does not know about any other resources but the
ones available in the queue.

Is this correct?

You can also spark-submit a Spark application using FAIR scheduler
(the default is FIFO) using -c spark.scheduler.mode=FAIR.

In FAIR mode, there's also a notion of queue-like (Schedulable) pools.
They can also control the resource shares assigned to Spark
jobs/applications. You could sc.setLocalProperty to control what pool
to use.

Is this correct?

If both are yes, why would I want to go as far as using queues and
FAIR scheduling mode with pools? What are the benefits? Is this for
multi-tenant environments? Do you have any use cases that would fit
better with FAIR scheduling mode? What about YARN's queues with Spark
on YARN?

Share as much as you could since the topic bothers me so much (and
without your support I won't be able to recover from this painful
mental state :))

Thanks for reading so far! Appreciate any help.

[1] 
https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can I control the execution of Spark jobs?

2016-06-16 Thread Alonso Isidoro Roman
Hi Wang,

maybe you can consider to use an integration framework like Apache Camel in
order to run differents jobs...

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman


2016-06-16 13:08 GMT+02:00 Jacek Laskowski :

> Hi,
>
> When you say "several ETL types of things", what is this exactly? What
> would an example of "dependency between these jobs" be?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Thu, Jun 16, 2016 at 11:36 AM, Haopu Wang  wrote:
> > Hi,
> >
> >
> >
> > Suppose I have a spark application which is doing several ETL types of
> > things.
> >
> > I understand Spark can analyze and generate several jobs to execute.
> >
> > The question is: is it possible to control the dependency between these
> > jobs?
> >
> >
> >
> > Thanks!
> >
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


how to load compressed (gzip) csv file using spark-csv

2016-06-16 Thread Vamsi Krishna
Hi,

I'm using Spark 1.4.1 (HDP 2.3.2).
As per the spark-csv documentation (https://github.com/databricks/spark-csv),
I see that we can write to a csv file in compressed form using the 'codec'
option.
But, didn't see the support for 'codec' option to read a csv file.

Is there a way to read a compressed (gzip) file using spark-csv?

Thanks,
Vamsi Attluri
-- 
Vamsi Attluri


Re: In yarn-cluster mode, provide system prop to the client jvm

2016-06-16 Thread Jacek Laskowski
Hi,

You could use --properties-file to point to the properties file with
properties or use spark.driver.extraJavaOptions.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Jun 16, 2016 at 1:02 PM, Ellis, Tom (Financial Markets IT)
 wrote:
> Hi,
>
>
>
> I was wondering if it was possible to submit a java system property to the
> JVM that does the submission of a yarn-cluster application, for instance,
> -Dlog4j.configuration. I believe it will default to using the
> SPARK_CONF_DIR’s log4j.properties, is it possible to override this, as I do
> not have access?
>
>
>
> Cheers,
>
>
>
> Tom Ellis
> Consultant Developer – Excelian
>
> Data Lake | Financial Markets IT
> LLOYDS BANK COMMERCIAL BANKING
>
> 
>
>
> E: tom.el...@lloydsbanking.com
> Website: www.lloydsbankcommercial.com
> , , ,
> Reduce printing. Lloyds Banking Group is helping to build the low carbon
> economy.
> Corporate Responsibility Report: www.lloydsbankinggroup-cr.com/downloads
>
>
>
>
> Lloyds Banking Group plc. Registered Office: The Mound, Edinburgh EH1 1YZ.
> Registered in Scotland no. SC95000. Telephone: 0131 225 4555. Lloyds Bank
> plc. Registered Office: 25 Gresham Street, London EC2V 7HN. Registered in
> England and Wales no. 2065. Telephone 0207626 1500. Bank of Scotland plc.
> Registered Office: The Mound, Edinburgh EH1 1YZ. Registered in Scotland no.
> SC327000. Telephone: 03457 801 801. Cheltenham & Gloucester plc. Registered
> Office: Barnett Way, Gloucester GL4 3RL. Registered in England and Wales
> 2299428. Telephone: 0345 603 1637
>
> Lloyds Bank plc, Bank of Scotland plc are authorised by the Prudential
> Regulation Authority and regulated by the Financial Conduct Authority and
> Prudential Regulation Authority.
>
> Cheltenham & Gloucester plc is authorised and regulated by the Financial
> Conduct Authority.
>
> Halifax is a division of Bank of Scotland plc. Cheltenham & Gloucester
> Savings is a division of Lloyds Bank plc.
>
> HBOS plc. Registered Office: The Mound, Edinburgh EH1 1YZ. Registered in
> Scotland no. SC218813.
>
> This e-mail (including any attachments) is private and confidential and may
> contain privileged material. If you have received this e-mail in error,
> please notify the sender and delete it (including any attachments)
> immediately. You must not copy, distribute, disclose or use any of the
> information in it or any attachments. Telephone calls may be monitored or
> recorded.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can I control the execution of Spark jobs?

2016-06-16 Thread Jacek Laskowski
Hi,

When you say "several ETL types of things", what is this exactly? What
would an example of "dependency between these jobs" be?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Jun 16, 2016 at 11:36 AM, Haopu Wang  wrote:
> Hi,
>
>
>
> Suppose I have a spark application which is doing several ETL types of
> things.
>
> I understand Spark can analyze and generate several jobs to execute.
>
> The question is: is it possible to control the dependency between these
> jobs?
>
>
>
> Thanks!
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



In yarn-cluster mode, provide system prop to the client jvm

2016-06-16 Thread Ellis, Tom (Financial Markets IT)
Hi,

I was wondering if it was possible to submit a java system property to the JVM 
that does the submission of a yarn-cluster application, for instance, 
-Dlog4j.configuration. I believe it will default to using the SPARK_CONF_DIR's 
log4j.properties, is it possible to override this, as I do not have access?

Cheers,

Tom Ellis
Consultant Developer - Excelian
Data Lake | Financial Markets IT
LLOYDS BANK COMMERCIAL BANKING


E: tom.el...@lloydsbanking.com
Website: www.lloydsbankcommercial.com
, , ,
Reduce printing. Lloyds Banking Group is helping to build the low carbon 
economy.
Corporate Responsibility Report: 
www.lloydsbankinggroup-cr.com/downloads



Lloyds Banking Group plc. Registered Office: The Mound, Edinburgh EH1 1YZ. 
Registered in Scotland no. SC95000. Telephone: 0131 225 4555. Lloyds Bank plc. 
Registered Office: 25 Gresham Street, London EC2V 7HN. Registered in England 
and Wales no. 2065. Telephone 0207626 1500. Bank of Scotland plc. Registered 
Office: The Mound, Edinburgh EH1 1YZ. Registered in Scotland no. SC327000. 
Telephone: 03457 801 801. Cheltenham & Gloucester plc. Registered Office: 
Barnett Way, Gloucester GL4 3RL. Registered in England and Wales 2299428. 
Telephone: 0345 603 1637

Lloyds Bank plc, Bank of Scotland plc are authorised by the Prudential 
Regulation Authority and regulated by the Financial Conduct Authority and 
Prudential Regulation Authority.

Cheltenham & Gloucester plc is authorised and regulated by the Financial 
Conduct Authority.

Halifax is a division of Bank of Scotland plc. Cheltenham & Gloucester Savings 
is a division of Lloyds Bank plc.

HBOS plc. Registered Office: The Mound, Edinburgh EH1 1YZ. Registered in 
Scotland no. SC218813.

This e-mail (including any attachments) is private and confidential and may 
contain privileged material. If you have received this e-mail in error, please 
notify the sender and delete it (including any attachments) immediately. You 
must not copy, distribute, disclose or use any of the information in it or any 
attachments. Telephone calls may be monitored or recorded.


Unable to execute sparkr jobs through Chronos

2016-06-16 Thread Rodrick Brown
We use Chronos extensively for running our batch based spark jobs and recently
started using sparkr however I'm seeing an issue trying to get sparkr jobs to
run successfully when launched through Chronos basically always failing with a
generic error below

  

Launching java with spark-submit command /opt/spark-1.6.1/bin/spark-submit
--jars /data/orchard/etc/config/load-tradedata-accumulo-
prod.jar,/data/orchard/jars/dataloader-library-
626d9b10bef28a163ad61d8a2e87bb71d455ea34-assembled.jar --driver-memory "6000M"
sparkr-shell /tmp/RtmpzJUGsJ/backend_port11c5c2f3ca40b

Error: Master must start with yarn, spark, mesos, or local

Run with --help for usage help or --verbose for debug output Error in
sparkR.init(master = sparkr_master, appName = job_name, sparkJars =
sparkr_jars, :

JVM is not ready after 10 seconds

Calls: orchard_sparkR_init - sparkR.init

In addition: Warning message:

In processSparkJars(sparkJars) :

 sparkJars as a comma-separated string is deprecated, use character vector
instead

Execution halted  

  

I believe it may have something to do with the sparkr-shell that's forked from
R

If I run this job outside of Chronos directly from the CLI everything runs
fine it connects to my mesos cluster and succeeds.

  

I'm not sure what could be causing this issue.

  

I've tried using R CMD BATCH myscript.R and Rscript \--vanilla myscript.R both
suffer from the same issue.

  

Here is the job definition

  

{  
   "name": "PROD_applepie_loan_detail_R",  
   "command": "env  exec /data/orchard/bin/orchard_job_executor.py
-j PROD_applepie_loan_detail_R",  
   "shell": true,  
   "epsilon": "PT15M",  
   "executor": "",  
   "executorFlags": "",  
   "retries": 3,  
   "owner": "o...@orchardplatform.com",  
   "ownerName": "",  
   "runAsUser": "orchard",  
   "description": "",  
   "async": false,  
   "successCount": 0,  
   "errorCount": 0,  
   "lastSuccess": "",  
   "lastError": "",  
   "cpus": 0.1,  
   "disk": 10,  
   "mem": 6000,  
   "disabled": false,  
   "softError": false,  
   "dataProcessingJobType": false,  
   "errorsSinceLastSuccess": 0,  
   "uris": [ "file:///data/orchard/R/sparkr_env.R",
"file:///data/orchard/R/applepie_loan_detail.R"],  
   "environmentVariables": [  
 {  
   "name": "SPARKR_MASTER",  
   "value": "mesos://leader.mesos:5050"  
 }  
   ],  
   "arguments": [],  
   "highPriority": false,  
   "runAsUser": "fedora",  
   "constraints": [[ "rack", "EQUALS", "spark" ]],  
   "schedule": "R/2016-06-09T02:00:00.000Z/P1W",  
   "scheduleTimeZone": "UTC"  
 }

  

  

\--

**Rodrick Brown** / Systems Engineer 

+1 917 445 6839 /
[rodr...@orchardplatform.com](mailto:char...@orchardplatform.com)

**Orchard Platform** 

101 5th Avenue, 4th Floor, New York, NY 10003

[http://www.orchardplatform.com](http://www.orchardplatform.com/)

[Orchard Blog](http://www.orchardplatform.com/blog/) | [Marketplace Lending
Meetup](http://www.meetup.com/Peer-to-Peer-Lending-P2P/)


-- 
*NOTICE TO RECIPIENTS*: This communication is confidential and intended for 
the use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an 
offer to sell or a solicitation of an indication of interest to purchase 
any loan, security or any other financial product or instrument, nor is it 
an offer to sell or a solicitation of an indication of interest to purchase 
any products or services to any persons who are prohibited from receiving 
such information under applicable law. The contents of this communication 
may not be accurate or complete and are subject to change without notice. 
As such, Orchard App, Inc. (including its subsidiaries and affiliates, 
"Orchard") makes no representation regarding the accuracy or completeness 
of the information contained herein. The intended recipient is advised to 
consult its own professional advisors, including those specializing in 
legal, tax and accounting matters. Orchard does not provide legal, tax or 
accounting advice.


Re: HIVE Query 25x faster than SPARK Query

2016-06-16 Thread Mich Talebzadeh
Hi,

Your statement

"I have a system with 64 GB RAM and SSD and its performance on local
cluster SPARK is way better"

Is this a host with 64GB of RAM and you data is stored on local Solid State
Disks?

Can you kindly provide the parameters you pass to spark-submit:

${SPARK_HOME}/bin/spark-submit \

--master local[?] \

--driver-memory ?G \

--num-executors 1 \

--executor-memory ?G \


Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 16 June 2016 at 11:40, Gourav Sengupta  wrote:

> Hi,
>
> We do have a dimension table with around few hundred columns from which we
> need only a few columns to join with the main fact table which has a few
> million rows. I do not know how one off this case sounds like but  since I
> have been working in data warehousing it sounds like a fairly general used
> case.
>
> Spark in local mode will be way faster compared to SPARK running on
> HADOOP. I have a system with 64 GB RAM and SSD and its performance on local
> cluster SPARK is way better.
>
> Did your join include the same number of columns and rows for the
> dimension table?
>
>
> Regards,
> Gourav Sengupta
>
> On Thu, Jun 16, 2016 at 9:35 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> sounds like this is a one off case.
>>
>> Do you have any other use case where you have Hive on MR outperforms
>> Spark?
>>
>> I did some tests on 1 billion row table getting the selectivity of a
>> column using Hive on MR, Hive on Spark engine and Spark running on local
>> mode (to keep it simple)
>>
>>
>> Hive 2, Spark 1.6.1
>>
>> Results:
>>
>> Hive with map-reduce --> 18  minutes
>> Hive on Spark engine -->  6 minutes
>> Spark-->  2 minutes
>>
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 16 June 2016 at 08:43, Jörn Franke  wrote:
>>
>>> I agree here.
>>>
>>> However it depends always on your use case !
>>>
>>> Best regards
>>>
>>> On 16 Jun 2016, at 04:58, Gourav Sengupta 
>>> wrote:
>>>
>>> Hi Mahender,
>>>
>>> please ensure that for dimension tables you are enabling the broadcast
>>> method. You must be able to see surprising gains @12x.
>>>
>>> Overall I think that SPARK cannot figure out whether to scan all the
>>> columns in a table or just the ones which are being used causing this
>>> issue.
>>>
>>> When you start using HIVE with ORC and TEZ  (*) you will see some
>>> amazing results, and leaves SPARK way way behind. So pretty much you need
>>> to have your data in memory for matching the performance claims of SPARK
>>> and the advantage in that case you are getting is not because of SPARK
>>> algorithms but just fast I/O from RAM. The advantage of SPARK is that it
>>> makes accessible analytics, querying, and streaming frameworks together.
>>>
>>>
>>> In case you are following the optimisations mentioned in the link you
>>> hardly have any reasons for using SPARK SQL:
>>> http://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/ . And
>>> imagine being able to do all of that without having machines which requires
>>> huge RAM, or in short you are achieving those performance gains using
>>> commodity low cost systems around which HADOOP was designed.
>>>
>>> I think that Hortonworks is giving a stiff competition here :)
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Wed, Jun 15, 2016 at 11:35 PM, Mahender Sarangam <
>>> mahender.bigd...@outlook.com> wrote:
>>>
 +1,

 Even see performance degradation while comparing SPark SQL with Hive.
 We have table of 260 columns. We have executed in hive and SPARK. In
 Hive, it is taking 66 sec for 1 gb of data whereas in Spark, it is taking 4
 mins of time.
 On 6/9/2016 3:19 PM, Gavin Yue wrote:

 Could you print out the sql execution plan? My guess is about broadcast
 join.



 On Jun 9, 2016, at 07:14, Gourav Sengupta < 
 gourav.sengu...@gmail.com> wrote:

 Hi,

 Query1 is almost 25x faster in HIVE than in SPARK. What is happening
 here and is there a way we can optimize the queries in SPARK without the
 obvious hack in Query2.


 ---
 ENVIRONMENT:
 ---

 > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
 million rows. Both the files are single gzipped csv file.
 > Both table A and B are external tables in AWS S3 and created in HIVE
 accessed through SPARK 

Re: HIVE Query 25x faster than SPARK Query

2016-06-16 Thread Gourav Sengupta
Hi,

We do have a dimension table with around few hundred columns from which we
need only a few columns to join with the main fact table which has a few
million rows. I do not know how one off this case sounds like but  since I
have been working in data warehousing it sounds like a fairly general used
case.

Spark in local mode will be way faster compared to SPARK running on HADOOP.
I have a system with 64 GB RAM and SSD and its performance on local cluster
SPARK is way better.

Did your join include the same number of columns and rows for the dimension
table?


Regards,
Gourav Sengupta

On Thu, Jun 16, 2016 at 9:35 AM, Mich Talebzadeh 
wrote:

> sounds like this is a one off case.
>
> Do you have any other use case where you have Hive on MR outperforms Spark?
>
> I did some tests on 1 billion row table getting the selectivity of a
> column using Hive on MR, Hive on Spark engine and Spark running on local
> mode (to keep it simple)
>
>
> Hive 2, Spark 1.6.1
>
> Results:
>
> Hive with map-reduce --> 18  minutes
> Hive on Spark engine -->  6 minutes
> Spark-->  2 minutes
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 16 June 2016 at 08:43, Jörn Franke  wrote:
>
>> I agree here.
>>
>> However it depends always on your use case !
>>
>> Best regards
>>
>> On 16 Jun 2016, at 04:58, Gourav Sengupta 
>> wrote:
>>
>> Hi Mahender,
>>
>> please ensure that for dimension tables you are enabling the broadcast
>> method. You must be able to see surprising gains @12x.
>>
>> Overall I think that SPARK cannot figure out whether to scan all the
>> columns in a table or just the ones which are being used causing this
>> issue.
>>
>> When you start using HIVE with ORC and TEZ  (*) you will see some amazing
>> results, and leaves SPARK way way behind. So pretty much you need to have
>> your data in memory for matching the performance claims of SPARK and the
>> advantage in that case you are getting is not because of SPARK algorithms
>> but just fast I/O from RAM. The advantage of SPARK is that it makes
>> accessible analytics, querying, and streaming frameworks together.
>>
>>
>> In case you are following the optimisations mentioned in the link you
>> hardly have any reasons for using SPARK SQL:
>> http://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/ . And
>> imagine being able to do all of that without having machines which requires
>> huge RAM, or in short you are achieving those performance gains using
>> commodity low cost systems around which HADOOP was designed.
>>
>> I think that Hortonworks is giving a stiff competition here :)
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Wed, Jun 15, 2016 at 11:35 PM, Mahender Sarangam <
>> mahender.bigd...@outlook.com> wrote:
>>
>>> +1,
>>>
>>> Even see performance degradation while comparing SPark SQL with Hive.
>>> We have table of 260 columns. We have executed in hive and SPARK. In
>>> Hive, it is taking 66 sec for 1 gb of data whereas in Spark, it is taking 4
>>> mins of time.
>>> On 6/9/2016 3:19 PM, Gavin Yue wrote:
>>>
>>> Could you print out the sql execution plan? My guess is about broadcast
>>> join.
>>>
>>>
>>>
>>> On Jun 9, 2016, at 07:14, Gourav Sengupta < 
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening
>>> here and is there a way we can optimize the queries in SPARK without the
>>> obvious hack in Query2.
>>>
>>>
>>> ---
>>> ENVIRONMENT:
>>> ---
>>>
>>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
>>> million rows. Both the files are single gzipped csv file.
>>> > Both table A and B are external tables in AWS S3 and created in HIVE
>>> accessed through SPARK using HiveContext
>>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
>>> allowMaximumResource allocation and node types are c3.4xlarge).
>>>
>>> --
>>> QUERY1:
>>> --
>>> select A.PK, B.FK
>>> from A
>>> left outer join B on (A.PK = B.FK)
>>> where B.FK is not null;
>>>
>>>
>>>
>>> This query takes 4 mins in HIVE and 1.1 hours in SPARK
>>>
>>>
>>> --
>>> QUERY 2:
>>> --
>>>
>>> select A.PK, B.FK
>>> from (select PK from A) A
>>> left outer join B on (A.PK = B.FK)
>>> where B.FK is not null;
>>>
>>> This query takes 4.5 mins in SPARK
>>>
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>>
>>>
>>>
>>>
>>
>


Re: choice of RDD function

2016-06-16 Thread Jacek Laskowski
Rather

val df = sqlContext.read.json(rdd)

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Wed, Jun 15, 2016 at 11:55 PM, Sivakumaran S  wrote:
> Cody,
>
> Are you referring to the  val lines = messages.map(_._2)?
>
> Regards,
>
> Siva
>
>> On 15-Jun-2016, at 10:32 PM, Cody Koeninger  wrote:
>>
>> Doesn't that result in consuming each RDD twice, in order to infer the
>> json schema?
>>
>> On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S  wrote:
>>> Of course :)
>>>
>>> object sparkStreaming {
>>>  def main(args: Array[String]) {
>>>StreamingExamples.setStreamingLogLevels() //Set reasonable logging
>>> levels for streaming if the user has not configured log4j.
>>>val topics = "test"
>>>val brokers = "localhost:9092"
>>>val topicsSet = topics.split(",").toSet
>>>val sparkConf = new
>>> SparkConf().setAppName("KafkaDroneCalc").setMaster("local")
>>> //spark://localhost:7077
>>>val sc = new SparkContext(sparkConf)
>>>val ssc = new StreamingContext(sc, Seconds(30))
>>>val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>>>val messages = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
>>>val lines = messages.map(_._2)
>>>val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>lines.foreachRDD( rdd => {
>>>  val df = sqlContext.read.json(rdd)
>>>  df.registerTempTable(“drone")
>>>  sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm),
>>> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show()
>>>})
>>>ssc.start()
>>>ssc.awaitTermination()
>>>  }
>>> }
>>>
>>> I haven’t checked long running performance though.
>>>
>>> Regards,
>>>
>>> Siva
>>>
>>> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski  wrote:
>>>
>>> Hi,
>>>
>>> Good to hear so! Mind sharing a few snippets of your solution?
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S  wrote:
>>>
>>> Thanks Jacek,
>>>
>>> Job completed!! :) Just used data frames and sql query. Very clean and
>>> functional code.
>>>
>>> Siva
>>>
>>> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski  wrote:
>>>
>>> mapWithState
>>>
>>>
>>>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: choice of RDD function

2016-06-16 Thread Jacek Laskowski
Hi,

That's one of my concerns with the code. What concerned me the most is
that the RDD(s) were converted to DataFrames only to registerTempTable
and execute SQLs. I think it'd have better performance if DataFrame
operators were used instead. Wish I had numbers.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Wed, Jun 15, 2016 at 11:32 PM, Cody Koeninger  wrote:
> Doesn't that result in consuming each RDD twice, in order to infer the
> json schema?
>
> On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S  wrote:
>> Of course :)
>>
>> object sparkStreaming {
>>   def main(args: Array[String]) {
>> StreamingExamples.setStreamingLogLevels() //Set reasonable logging
>> levels for streaming if the user has not configured log4j.
>> val topics = "test"
>> val brokers = "localhost:9092"
>> val topicsSet = topics.split(",").toSet
>> val sparkConf = new
>> SparkConf().setAppName("KafkaDroneCalc").setMaster("local")
>> //spark://localhost:7077
>> val sc = new SparkContext(sparkConf)
>> val ssc = new StreamingContext(sc, Seconds(30))
>> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>> val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
>> val lines = messages.map(_._2)
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>> lines.foreachRDD( rdd => {
>>   val df = sqlContext.read.json(rdd)
>>   df.registerTempTable(“drone")
>>   sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm),
>> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show()
>> })
>> ssc.start()
>> ssc.awaitTermination()
>>   }
>> }
>>
>> I haven’t checked long running performance though.
>>
>> Regards,
>>
>> Siva
>>
>> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> Good to hear so! Mind sharing a few snippets of your solution?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S  wrote:
>>
>> Thanks Jacek,
>>
>> Job completed!! :) Just used data frames and sql query. Very clean and
>> functional code.
>>
>> Siva
>>
>> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski  wrote:
>>
>> mapWithState
>>
>>
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



String indexer

2016-06-16 Thread pseudo oduesp
hi ,
what is limite of  modalties in Stingindexer :

if i have columns with 1000 modalities it good to use STRINGindexers ?
or i should try other function and which one please ?

thanks


Can I control the execution of Spark jobs?

2016-06-16 Thread Haopu Wang
Hi,

 

Suppose I have a spark application which is doing several ETL types of
things.

I understand Spark can analyze and generate several jobs to execute.

The question is: is it possible to control the dependency between these
jobs?

 

Thanks!

 



STringindexer

2016-06-16 Thread pseudo oduesp
Hi ,
i have dataframe with 1000 columns to dummies with stingIndexer
when i apply pipliene take  long times whene i want merge result with other
data frame

i mean  :
 originnal data frame + columns indexed by STringindexers

PB save stage it s long  why ?

code

 indexers  = [StringIndexer(inputCol=i, outputCol=i+"_index").fit(df)
for i in l]
 li = [i+"_index" for i in l]
 pipeline = Pipeline(stages=indexers)
 df_r = pipeline.fit(df).transform(df)
 df_r = df_r.repartition(500)
 df_r.persist()
 df_r.write().parquet(paths)


Spark cache behaviour when the source table is modified

2016-06-16 Thread Anjali Chadha
Hi all,

I am having a hard time understanding the caching concepts in Spark.

I have a hive table("person"), which is cached in Spark.

sqlContext.sql("create table person (name string, age int)") //Create
a new table
//Add some values to the table
...
...
//Cache the table in Spark
sqlContext.cacheTable("person")
sqlContext.isCached("person") //Returns true
sqlContext.sql("insert into table person values ("Foo", 25)") //
Insert some other value in the table

//Check caching status again
sqlContext.isCached("person") //Returns true

sqlContext is *HiveContext*.

Will the entries inserted after *cacheTable("person")* statement be cached?
In other words, ("Foo", 25) entry is cached in Spark or not?

If not, how can I cache only the entries inserted later? I don't want to
first uncache and then again cache the whole table.

Any relevant web link or information will be appreciated.

- Anjali Chadha


Anyone has used Apache nifi

2016-06-16 Thread Mich Talebzadeh
Hi,

Anyone has used Apache nifi  for data ingestion?

There was a presentation yesterday in Hortonworks' London office titled
"Learn more about Data Ingest at Hortonworks". It is about HDF ( .. Data
Flow) including nifi (Niagara Files?)  as its core solution.

It looks impressive for low to medium size loads. So I was wondering anyone
has tried out.



My interest was NIFI

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: HIVE Query 25x faster than SPARK Query

2016-06-16 Thread Mich Talebzadeh
sounds like this is a one off case.

Do you have any other use case where you have Hive on MR outperforms Spark?

I did some tests on 1 billion row table getting the selectivity of a column
using Hive on MR, Hive on Spark engine and Spark running on local mode (to
keep it simple)


Hive 2, Spark 1.6.1

Results:

Hive with map-reduce --> 18  minutes
Hive on Spark engine -->  6 minutes
Spark-->  2 minutes


HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 16 June 2016 at 08:43, Jörn Franke  wrote:

> I agree here.
>
> However it depends always on your use case !
>
> Best regards
>
> On 16 Jun 2016, at 04:58, Gourav Sengupta 
> wrote:
>
> Hi Mahender,
>
> please ensure that for dimension tables you are enabling the broadcast
> method. You must be able to see surprising gains @12x.
>
> Overall I think that SPARK cannot figure out whether to scan all the
> columns in a table or just the ones which are being used causing this
> issue.
>
> When you start using HIVE with ORC and TEZ  (*) you will see some amazing
> results, and leaves SPARK way way behind. So pretty much you need to have
> your data in memory for matching the performance claims of SPARK and the
> advantage in that case you are getting is not because of SPARK algorithms
> but just fast I/O from RAM. The advantage of SPARK is that it makes
> accessible analytics, querying, and streaming frameworks together.
>
>
> In case you are following the optimisations mentioned in the link you
> hardly have any reasons for using SPARK SQL:
> http://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/ . And
> imagine being able to do all of that without having machines which requires
> huge RAM, or in short you are achieving those performance gains using
> commodity low cost systems around which HADOOP was designed.
>
> I think that Hortonworks is giving a stiff competition here :)
>
> Regards,
> Gourav Sengupta
>
> On Wed, Jun 15, 2016 at 11:35 PM, Mahender Sarangam <
> mahender.bigd...@outlook.com> wrote:
>
>> +1,
>>
>> Even see performance degradation while comparing SPark SQL with Hive.
>> We have table of 260 columns. We have executed in hive and SPARK. In
>> Hive, it is taking 66 sec for 1 gb of data whereas in Spark, it is taking 4
>> mins of time.
>> On 6/9/2016 3:19 PM, Gavin Yue wrote:
>>
>> Could you print out the sql execution plan? My guess is about broadcast
>> join.
>>
>>
>>
>> On Jun 9, 2016, at 07:14, Gourav Sengupta < 
>> gourav.sengu...@gmail.com> wrote:
>>
>> Hi,
>>
>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here
>> and is there a way we can optimize the queries in SPARK without the obvious
>> hack in Query2.
>>
>>
>> ---
>> ENVIRONMENT:
>> ---
>>
>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
>> million rows. Both the files are single gzipped csv file.
>> > Both table A and B are external tables in AWS S3 and created in HIVE
>> accessed through SPARK using HiveContext
>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
>> allowMaximumResource allocation and node types are c3.4xlarge).
>>
>> --
>> QUERY1:
>> --
>> select A.PK, B.FK
>> from A
>> left outer join B on (A.PK = B.FK)
>> where B.FK is not null;
>>
>>
>>
>> This query takes 4 mins in HIVE and 1.1 hours in SPARK
>>
>>
>> --
>> QUERY 2:
>> --
>>
>> select A.PK, B.FK
>> from (select PK from A) A
>> left outer join B on (A.PK = B.FK)
>> where B.FK is not null;
>>
>> This query takes 4.5 mins in SPARK
>>
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>>
>>
>>
>


Re: HIVE Query 25x faster than SPARK Query

2016-06-16 Thread Jörn Franke
I agree here.

However it depends always on your use case ! 

Best regards

> On 16 Jun 2016, at 04:58, Gourav Sengupta  wrote:
> 
> Hi Mahender, 
> 
> please ensure that for dimension tables you are enabling the broadcast 
> method. You must be able to see surprising gains @12x. 
> 
> Overall I think that SPARK cannot figure out whether to scan all the columns 
> in a table or just the ones which are being used causing this issue. 
> 
> When you start using HIVE with ORC and TEZ  (*) you will see some amazing 
> results, and leaves SPARK way way behind. So pretty much you need to have 
> your data in memory for matching the performance claims of SPARK and the 
> advantage in that case you are getting is not because of SPARK algorithms but 
> just fast I/O from RAM. The advantage of SPARK is that it makes accessible 
> analytics, querying, and streaming frameworks together.
> 
> 
> In case you are following the optimisations mentioned in the link you hardly 
> have any reasons for using SPARK SQL: 
> http://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/ . And 
> imagine being able to do all of that without having machines which requires 
> huge RAM, or in short you are achieving those performance gains using 
> commodity low cost systems around which HADOOP was designed. 
> 
> I think that Hortonworks is giving a stiff competition here :)
> 
> Regards,
> Gourav Sengupta
> 
>> On Wed, Jun 15, 2016 at 11:35 PM, Mahender Sarangam 
>>  wrote:
>> +1,
>> 
>> Even see performance degradation while comparing SPark SQL with Hive. 
>> We have table of 260 columns. We have executed in hive and SPARK. In Hive, 
>> it is taking 66 sec for 1 gb of data whereas in Spark, it is taking 4 mins 
>> of time. 
>>> On 6/9/2016 3:19 PM, Gavin Yue wrote:
>>> Could you print out the sql execution plan? My guess is about broadcast 
>>> join. 
>>> 
>>> 
>>> 
>>> On Jun 9, 2016, at 07:14, Gourav Sengupta  wrote:
>>> 
 Hi,
 
 Query1 is almost 25x faster in HIVE than in SPARK. What is happening here 
 and is there a way we can optimize the queries in SPARK without the 
 obvious hack in Query2.
 
 
 ---
 ENVIRONMENT:
 ---
 
 > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 
 > million rows. Both the files are single gzipped csv file.
 > Both table A and B are external tables in AWS S3 and created in HIVE 
 > accessed through SPARK using HiveContext
 > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using 
 > allowMaximumResource allocation and node types are c3.4xlarge).
 
 --
 QUERY1: 
 --
 select A.PK, B.FK
 from A 
 left outer join B on (A.PK = B.FK)
 where B.FK is not null;
 
 
 
 This query takes 4 mins in HIVE and 1.1 hours in SPARK 
 
 
 --
 QUERY 2:
 --
 
 select A.PK, B.FK
 from (select PK from A) A 
 left outer join B on (A.PK = B.FK)
 where B.FK is not null;
 
 This query takes 4.5 mins in SPARK 
 
 
 
 Regards,
 Gourav Sengupta
> 


Re: How to deal with tasks running too long?

2016-06-16 Thread Jeff Zhang
This  may be due to data skew

On Thu, Jun 16, 2016 at 12:45 PM, Utkarsh Sengar 
wrote:

> This SO question was asked about 1yr ago.
>
> http://stackoverflow.com/questions/31799755/how-to-deal-with-tasks-running-too-long-comparing-to-others-in-job-in-yarn-cli
>
> I answered this question with a suggestion to try speculation but it
> doesn't quite do what the OP expects. I have been running into this issue
> more these days. Out of 5000 tasks, 4950 completes in 5mins but the last 50
> never really completes, have tried waiting for 4hrs. This can be a memory
> issue or maybe the way spark's fine grained mode works with mesos, I am
> trying to enable jmxsink to get a heap dump.
>
> But in the mean time, is there a better fix for this? (in any version of
> spark, I am using 1.5.1 but can upgrade). It would be great if the last 50
> tasks in my example can be killed (timed out) and the stage completes
> successfully.
>
> --
> Thanks,
> -Utkarsh
>



-- 
Best Regards

Jeff Zhang


Re: Spark Memory Error - Not enough space to cache broadcast

2016-06-16 Thread Takeshi Yamamuro
Hi,

Have you checked the statistics of storage memory, or something?

// maropu

On Thu, Jun 16, 2016 at 1:37 PM, Cassa L  wrote:

> Hi,
>  I did set  --driver-memory 4G. I still run into this issue after 1 hour
> of data load.
>
> I also tried version 1.6 in test environment. I hit this issue much faster
> than in 1.5.1 setup.
> LCassa
>
> On Tue, Jun 14, 2016 at 3:57 PM, Gaurav Bhatnagar 
> wrote:
>
>> try setting the option --driver-memory 4G
>>
>> On Tue, Jun 14, 2016 at 3:52 PM, Ben Slater 
>> wrote:
>>
>>> A high level shot in the dark but in our testing we found Spark 1.6 a
>>> lot more reliable in low memory situations (presumably due to
>>> https://issues.apache.org/jira/browse/SPARK-1). If it’s an option,
>>> probably worth a try.
>>>
>>> Cheers
>>> Ben
>>>
>>> On Wed, 15 Jun 2016 at 08:48 Cassa L  wrote:
>>>
 Hi,
 I would appreciate any clue on this. It has become a bottleneck for our
 spark job.

 On Mon, Jun 13, 2016 at 2:56 PM, Cassa L  wrote:

> Hi,
>
> I'm using spark 1.5.1 version. I am reading data from Kafka into Spark 
> and writing it into Cassandra after processing it. Spark job starts fine 
> and runs all good for some time until I start getting below errors. Once 
> these errors come, job start to lag behind and I see that job has 
> scheduling and processing delays in streaming  UI.
>
> Worker memory is 6GB, executor-memory is 5GB, I also tried to tweak 
> memoryFraction parameters. Nothing works.
>
>
> 16/06/13 21:26:02 INFO MemoryStore: ensureFreeSpace(4044) called with 
> curMem=565394, maxMem=2778495713
> 16/06/13 21:26:02 INFO MemoryStore: Block broadcast_69652_piece0 stored 
> as bytes in memory (estimated size 3.9 KB, free 2.6 GB)
> 16/06/13 21:26:02 INFO TorrentBroadcast: Reading broadcast variable 69652 
> took 2 ms
> 16/06/13 21:26:02 WARN MemoryStore: Failed to reserve initial memory 
> threshold of 1024.0 KB for computing block broadcast_69652 in memory.
> 16/06/13 21:26:02 WARN MemoryStore: Not enough space to cache 
> broadcast_69652 in memory! (computed 496.0 B so far)
> 16/06/13 21:26:02 INFO MemoryStore: Memory use = 556.1 KB (blocks) + 2.6 
> GB (scratch space shared across 0 tasks(s)) = 2.6 GB. Storage limit = 2.6 
> GB.
> 16/06/13 21:26:02 WARN MemoryStore: Persisting block broadcast_69652 to 
> disk instead.
> 16/06/13 21:26:02 INFO BlockManager: Found block rdd_100761_1 locally
> 16/06/13 21:26:02 INFO Executor: Finished task 0.0 in stage 71577.0 (TID 
> 452316). 2043 bytes result sent to driver
>
>
> Thanks,
>
> L
>
>
 --
>>> 
>>> Ben Slater
>>> Chief Product Officer
>>> Instaclustr: Cassandra + Spark - Managed | Consulting | Support
>>> +61 437 929 798
>>>
>>
>>
>


-- 
---
Takeshi Yamamuro