Re: Parquet problems

2015-07-22 Thread Anders Arpteg
No, never really resolved the problem, except by increasing the permgem
space which only partially solved it. Still have to restart the job
multiple times so make the whole job complete (it stores intermediate
results).

The parquet data sources have about 70 columns, and yes Cheng, it works
fine when only loading a small sample of the data.

Thankful for any hints,
Anders

On Wed, Jul 22, 2015 at 5:29 PM Cheng Lian lian.cs@gmail.com wrote:

  How many columns are there in these Parquet files? Could you load a small
 portion of the original large dataset successfully?

 Cheng


 On 6/25/15 5:52 PM, Anders Arpteg wrote:

 Yes, both the driver and the executors. Works a little bit better with
 more space, but still a leak that will cause failure after a number of
 reads. There are about 700 different data sources that needs to be loaded,
 lots of data...

  tor 25 jun 2015 08:02 Sabarish Sasidharan 
 sabarish.sasidha...@manthan.comsabarish.sasidha...@manthan.com skrev:

 Did you try increasing the perm gen for the driver?

 Regards
 Sab

 On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com wrote:

 When reading large (and many) datasets with the Spark 1.4.0 DataFrames
 parquet reader (the org.apache.spark.sql.parquet format), the following
 exceptions are thrown:

  Exception in thread sk-result-getter-0

 Exception: java.lang.OutOfMemoryError thrown from the
 UncaughtExceptionHandler in thread task-result-getter-0
 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread task-result-getter-1 java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread task-result-getter-2 java.lang.OutOfMemoryError:
 PermGen space


  and many more like these from different threads. I've tried increasing
 the PermGen space using the -XX:MaxPermSize VM setting, but even after
 tripling the space, the same errors occur. I've also tried storing
 intermediate results, and am able to get the full job completed by running
 it multiple times and starting for the last successful intermediate result.
 There seems to be some memory leak in the parquet format. Any hints on how
 to fix this problem?

  Thanks,
 Anders




spark.deploy.spreadOut core allocation

2015-07-22 Thread Srikanth
Hello,

I've set spark.deploy.spreadOut=false in spark-env.sh.

 export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4
 -Dspark.deploy.spreadOut=false


There are 3 workers each with 4 cores. Spark-shell was started with noof
cores = 6.
Spark UI show that one executor was used with 6 cores.

Is this a bug? This is with Spark 1.4.

[image: Inline image 1]

Srikanth


Re: user threads in executors

2015-07-22 Thread Shushant Arora
Thanks !

I am using spark streaming 1.3 , And if some post fails because of any
reason, I will store the offset of that message in another kafka topic. I
want to read these offsets in another spark job  and from them the original
kafka topic's messages based on these offsets-
 So is it possible in spark job to get kafka messages based on random
offsets ? Or is there any better alternative to handle failure of post
request?

On Wed, Jul 22, 2015 at 11:31 AM, Tathagata Das t...@databricks.com wrote:

 Yes, you could unroll from the iterator in batch of 100-200 and then post
 them in multiple rounds.
 If you are using the Kafka receiver based approach (not Direct), then the
 raw Kafka data is stored in the executor memory. If you are using Direct
 Kafka, then it is read from Kafka directly at the time of filtering.

 TD

 On Tue, Jul 21, 2015 at 9:34 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 I can post multiple items at a time.

 Data is being read from kafka and filtered after that its posted . Does 
 foreachPartition
 load complete partition in memory or use an iterator of batch underhood? If
 compete batch is not loaded will using custim size of 100-200 request in
 one batch and post will help instead of whole partition ?

 On Wed, Jul 22, 2015 at 12:18 AM, Tathagata Das t...@databricks.com
 wrote:

 If you can post multiple items at a time, then use foreachPartition to
 post the whole partition in a single request.

 On Tue, Jul 21, 2015 at 9:35 AM, Richard Marscher 
 rmarsc...@localytics.com wrote:

 You can certainly create threads in a map transformation. We do this to
 do concurrent DB lookups during one stage for example. I would recommend,
 however, that you switch to mapPartitions from map as this allows you to
 create a fixed size thread pool to share across items on a partition as
 opposed to spawning a future per record in the RDD for example.

 On Tue, Jul 21, 2015 at 4:11 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi

 Can I create user threads in executors.
 I have a streaming app where after processing I have a requirement to
 push events to external system . Each post request costs ~90-100 ms.

 To make post parllel, I can not use same thread because that is
 limited by no of cores available in system , can I useuser therads in 
 spark
 App? I tried to create 2 thredas in a map tasks and it worked.

 Is there any upper limit on no of user threds in spark executor ? Is
 it a good idea to create user threads in spark map task?

 Thanks




 --
 *Richard Marscher*
 Software Engineer
 Localytics
 Localytics.com http://localytics.com/ | Our Blog
 http://localytics.com/blog | Twitter http://twitter.com/localytics
  | Facebook http://facebook.com/localytics | LinkedIn
 http://www.linkedin.com/company/1148792?trk=tyah







problems running Spark on a firewalled remote YARN cluster via SOCKS proxy

2015-07-22 Thread rok
I am trying to run Spark applications with the driver running locally and
interacting with a firewalled remote cluster via a SOCKS proxy. 

I have to modify the hadoop configuration on the *local machine* to try to
make this work, adding 

property
   namehadoop.rpc.socket.factory.class.default/name
   valueorg.apache.hadoop.net.SocksSocketFactory/value
/property
property
   namehadoop.socks.server/name
   valuelocalhost:9998/value
/property

and on the *remote cluster* side

property
namehadoop.rpc.socket.factory.class.default/name
valueorg.apache.hadoop.net.StandardSocketFactory/value
finaltrue/final
/property

With this setup, and running ssh -D 9998 gateway.host to start the proxy
connection, MapReduce jobs started on the local machine execute fine on the
remote cluster. However, trying to launch a Spark job fails with the nodes
of the cluster apparently unable to communicate with one another: 

java.io.IOException: Failed on local exception: java.net.SocketException:
Connection refused; Host Details : local host is: node3/10.211.55.103;
destination host is: node1:8030;

Looking at the packets being sent to node1 from node3, it's clear that no
requests are made on port 8030, hinting that the connection is somehow being
proxied. 

Is it possible that the Spark job is not honoring the socket.factory
settings on the *cluster* side for some reason? 

Note that  Spark JIRA 5004
https://issues.apache.org/jira/browse/SPARK-5004   addresses a similar
problem, though it looks like they are actually not the same (since in that
case it sounds like a standalone cluster is being used). 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problems-running-Spark-on-a-firewalled-remote-YARN-cluster-via-SOCKS-proxy-tp23955.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: user threads in executors

2015-07-22 Thread Cody Koeninger
Yes, look at KafkaUtils.createRDD

On Wed, Jul 22, 2015 at 11:17 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 Thanks !

 I am using spark streaming 1.3 , And if some post fails because of any
 reason, I will store the offset of that message in another kafka topic. I
 want to read these offsets in another spark job  and from them the original
 kafka topic's messages based on these offsets-
  So is it possible in spark job to get kafka messages based on random
 offsets ? Or is there any better alternative to handle failure of post
 request?

 On Wed, Jul 22, 2015 at 11:31 AM, Tathagata Das t...@databricks.com
 wrote:

 Yes, you could unroll from the iterator in batch of 100-200 and then post
 them in multiple rounds.
 If you are using the Kafka receiver based approach (not Direct), then the
 raw Kafka data is stored in the executor memory. If you are using Direct
 Kafka, then it is read from Kafka directly at the time of filtering.

 TD

 On Tue, Jul 21, 2015 at 9:34 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 I can post multiple items at a time.

 Data is being read from kafka and filtered after that its posted . Does 
 foreachPartition
 load complete partition in memory or use an iterator of batch underhood? If
 compete batch is not loaded will using custim size of 100-200 request in
 one batch and post will help instead of whole partition ?

 On Wed, Jul 22, 2015 at 12:18 AM, Tathagata Das t...@databricks.com
 wrote:

 If you can post multiple items at a time, then use foreachPartition to
 post the whole partition in a single request.

 On Tue, Jul 21, 2015 at 9:35 AM, Richard Marscher 
 rmarsc...@localytics.com wrote:

 You can certainly create threads in a map transformation. We do this
 to do concurrent DB lookups during one stage for example. I would
 recommend, however, that you switch to mapPartitions from map as this
 allows you to create a fixed size thread pool to share across items on a
 partition as opposed to spawning a future per record in the RDD for 
 example.

 On Tue, Jul 21, 2015 at 4:11 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi

 Can I create user threads in executors.
 I have a streaming app where after processing I have a requirement to
 push events to external system . Each post request costs ~90-100 ms.

 To make post parllel, I can not use same thread because that is
 limited by no of cores available in system , can I useuser therads in 
 spark
 App? I tried to create 2 thredas in a map tasks and it worked.

 Is there any upper limit on no of user threds in spark executor ? Is
 it a good idea to create user threads in spark map task?

 Thanks




 --
 *Richard Marscher*
 Software Engineer
 Localytics
 Localytics.com http://localytics.com/ | Our Blog
 http://localytics.com/blog | Twitter http://twitter.com/localytics
  | Facebook http://facebook.com/localytics | LinkedIn
 http://www.linkedin.com/company/1148792?trk=tyah








Re: Parquet problems

2015-07-22 Thread Cheng Lian
How many columns are there in these Parquet files? Could you load a 
small portion of the original large dataset successfully?


Cheng

On 6/25/15 5:52 PM, Anders Arpteg wrote:


Yes, both the driver and the executors. Works a little bit better with 
more space, but still a leak that will cause failure after a number of 
reads. There are about 700 different data sources that needs to be 
loaded, lots of data...



tor 25 jun 2015 08:02 Sabarish Sasidharan 
sabarish.sasidha...@manthan.com 
mailto:sabarish.sasidha...@manthan.com skrev:


Did you try increasing the perm gen for the driver?

Regards
Sab

On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com
mailto:arp...@spotify.com wrote:

When reading large (and many) datasets with the Spark 1.4.0
DataFrames parquet reader (the org.apache.spark.sql.parquet
format), the following exceptions are thrown:

Exception in thread sk-result-getter-0
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread task-result-getter-0
Exception in thread task-result-getter-3
java.lang.OutOfMemoryError: PermGen space
Exception in thread task-result-getter-1
java.lang.OutOfMemoryError: PermGen space
Exception in thread task-result-getter-2
java.lang.OutOfMemoryError: PermGen space

and many more like these from different threads. I've tried
increasing the PermGen space using the -XX:MaxPermSize VM
setting, but even after tripling the space, the same errors
occur. I've also tried storing intermediate results, and am
able to get the full job completed by running it multiple
times and starting for the last successful intermediate
result. There seems to be some memory leak in the parquet
format. Any hints on how to fix this problem?

Thanks,
Anders





Re: Spark-hive parquet schema evolution

2015-07-22 Thread Cheng Lian
Yeah, the benefit of `saveAsTable` is that you don't need to deal with 
schema explicitly, while the benefit of ALTER TABLE is you still have a 
standard vanilla Hive table.


Cheng

On 7/22/15 11:00 PM, Dean Wampler wrote:
While it's not recommended to overwrite files Hive thinks it 
understands, you can add the column to Hive's metastore using an ALTER 
TABLE command using HiveQL in the Hive shell or using HiveContext.sql():


ALTER TABLE mytable ADD COLUMNS col_name data_type

See 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-AlterTable/Partition/Column 
for full details.


dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition 
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)

Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Jul 22, 2015 at 4:36 AM, Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


Since Hive doesn’t support schema evolution, you’ll have to update
the schema stored in metastore somehow. For example, you can
create a new external table with the merged schema. Say you have a
Hive table |t1|:

|CREATE TABLE t1 (c0 INT, c1 DOUBLE); |

By default, this table is stored in HDFS path
|hdfs://some-host:9000/user/hive/warehouse/t1|. Now you append
some Parquet data with an extra column |c2| to the same directory:

|import org.apache.spark.sql.types._ val path =
hdfs://some-host:9000/user/hive/warehouse/t1 val df1 =
sqlContext.range(10).select('id as 'c0, 'id cast DoubleType as
'c1, 'id cast StringType as 'c2)
df1.write.mode(append).parquet(path) |

Now you can create a new external table |t2| like this:

|val df2 = sqlContext.read.option( mergeSchema,
true).parquet(path) df2.write.path(path).saveAsTable(t2) |

Since we specified a path above, the newly created |t2| is an
external table pointing to the original HDFS location. But the
schema of |t2| is the merged version.

The drawback of this approach is that, |t2| is actually a Spark
SQL specific data source table rather than a genuine Hive table.
This means, it can be accessed by Spark SQL only. We’re just using
Hive metastore to help persisting metadata of the data source
table. However, since you’re asking how to access the new table
via Spark SQL CLI, this should work for you. We are working on
making Parquet and ORC data source tables accessible via Hive in
Spark 1.5.0.

Cheng

On 7/22/15 10:32 AM, Jerrick Hoang wrote:


Hi Lian,

Sorry I'm new to Spark so I did not express myself very clearly.
I'm concerned about the situation when let's say I have a Parquet
table some partitions and I add a new column A to parquet schema
and write some data with the new schema to a new partition in the
table. If i'm not mistaken, if I do a
sqlContext.read.parquet(table_path).printSchema() it will print
the correct schema with new column A. But if I do a 'describe
table' from SparkSQLCLI I won't see the new column being added. I
understand that this is because Hive doesn't support schema
evolution. So what is the best way to support CLI queries in this
situation? Do I need to manually alter the table everytime the
underlying schema changes?

Thanks

On Tue, Jul 21, 2015 at 4:37 PM, Cheng Lian
lian.cs@gmail.com mailto:lian.cs@gmail.com wrote:

Hey Jerrick,

What do you mean by schema evolution with Hive metastore
tables? Hive doesn't take schema evolution into account.
Could you please give a concrete use case? Are you trying to
write Parquet data with extra columns into an existing
metastore Parquet table?

Cheng


On 7/21/15 1:04 AM, Jerrick Hoang wrote:

I'm new to Spark, any ideas would be much appreciated! Thanks

On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang
jerrickho...@gmail.com mailto:jerrickho...@gmail.com wrote:

Hi all,

I'm aware of the support for schema evolution via
DataFrame API. Just wondering what would be the best way
to go about dealing with schema evolution with Hive
metastore tables. So, say I create a table via SparkSQL
CLI, how would I deal with Parquet schema evolution?

Thanks,
J






​






Performance issue with Spak's foreachpartition method

2015-07-22 Thread diplomatic Guru
Hello all,

We are having a major performance issue with the Spark, which is holding us
from going live.

We have a job that carries out computation on log files and write the
results into Oracle DB.

The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't
want to establish too many DB connections.

We are then calling the foreachPartition on the RDD pairs that were reduced
by the key. Within this foreachPartition method we establish DB connection,
then iterate the results, prepare the Oracle statement for batch insertion
then we commit the batch and close the connection. All these are working
fine.

However, when we execute the job to process 12GB of data, it takes forever
to complete, especially at the foreachPartition stage.

We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3
is assigned to spark.storage.memoryFraction.

The job is taking about 50 minutes to complete, which is not ideal. I'm not
sure how we could enhance the performance. I've provided the main body of
the codes, please take a look and advice:

From Driver:

reduceResultsRDD.foreachPartition(new DB.InsertFunction(
dbuser,dbpass,batchsize));


DB class:

public class DB {
private static final Logger logger = LoggerFactory
.getLogger(DB.class);
public static class InsertFunction implements
VoidFunctionIteratorTuple2String, String {

private static final long serialVersionUID = 55766876878L;
private String dbuser = ;
private String dbpass = ;
private int batchsize;

public InsertFunction(String dbuser, String dbpass, int batchsize) {
super();
this.dbuser = dbuser;
this.dbuser = dbuser;
this.batchsize=batchsize;
}

@Override
public void call(IteratorTuple2String, String results) {
Connection connect = null;
PreparedStatement pstmt = null;
try {
connect = getDBConnection(dbuser,
dbpass);

int count = 0;

if (batchsize = 0) {
batchsize = 1;
}

pstmt1 = connect
.prepareStatement(MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT);

while (results.hasNext()) {

Tuple2String, String kv = results.next();
 String [] data = kv._1.concat(, +kv._2).split(,);

 pstmt.setString(1, data[0].toString());
pstmt.setString(2, data[1].toString());
.

pstmt.addBatch();

count++;

if (count == batchsize) {
logger.info(BulkCount :  + count);
pstmt.executeBatch();
connect.commit();
count = 0;
}

pstmt.executeBatch();
connect.commit();

}

pstmt.executeBatch();
connect.commit();

} catch (Exception e) {
logger.error(InsertFunction error:  + e.getMessage());
} finally {

if (pstmt != null) {
pstmt.close();
}

try {
 connect.close();
} catch (SQLException e) {
logger.error(InsertFunction Connection Close error: 
+ e.getMessage());
}
}
}

}
}


Help accessing protected S3

2015-07-22 Thread Greg Anderson
I have a protected s3 bucket that requires a certain IAM role to access.  When 
I start my cluster using the spark-ec2 script, everything works just fine until 
I try to read from that part of s3.  Here is the command I am using:

./spark-ec2 -k KEY -i KEY_FILE.pem --additional-security-group=IAM_ROLE 
--copy-aws-credentials --zone=us-east-1e -t m1.large --worker-instances=3 
--hadoop-major-version=2.7.1 --user-data=test.sh launch my-cluster

I have read through this article: 
http://apache-spark-user-list.1001560.n3.nabble.com/S3-Bucket-Access-td16303.html

The problem seems to be very similar, but I wasn't able to find a solution in 
it for me.  I'm not sure what else to provide here, just let me know what you 
need.  Thanks in advance!
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parquet problems

2015-07-22 Thread Jerry Lam
Hi guys,

I noticed that too. Anders, can you confirm that it works on Spark 1.5
snapshot? This is what I tried at the end. It seems it is 1.4 issue.

Best Regards,

Jerry

On Wed, Jul 22, 2015 at 11:46 AM, Anders Arpteg arp...@spotify.com wrote:

 No, never really resolved the problem, except by increasing the permgem
 space which only partially solved it. Still have to restart the job
 multiple times so make the whole job complete (it stores intermediate
 results).

 The parquet data sources have about 70 columns, and yes Cheng, it works
 fine when only loading a small sample of the data.

 Thankful for any hints,
 Anders

 On Wed, Jul 22, 2015 at 5:29 PM Cheng Lian lian.cs@gmail.com wrote:

  How many columns are there in these Parquet files? Could you load a
 small portion of the original large dataset successfully?

 Cheng


 On 6/25/15 5:52 PM, Anders Arpteg wrote:

 Yes, both the driver and the executors. Works a little bit better with
 more space, but still a leak that will cause failure after a number of
 reads. There are about 700 different data sources that needs to be loaded,
 lots of data...

  tor 25 jun 2015 08:02 Sabarish Sasidharan 
 sabarish.sasidha...@manthan.comsabarish.sasidha...@manthan.com skrev:

 Did you try increasing the perm gen for the driver?

 Regards
 Sab

 On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com wrote:

 When reading large (and many) datasets with the Spark 1.4.0 DataFrames
 parquet reader (the org.apache.spark.sql.parquet format), the following
 exceptions are thrown:

  Exception in thread sk-result-getter-0

 Exception: java.lang.OutOfMemoryError thrown from the
 UncaughtExceptionHandler in thread task-result-getter-0
 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread task-result-getter-1 java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread task-result-getter-2 java.lang.OutOfMemoryError:
 PermGen space


  and many more like these from different threads. I've tried
 increasing the PermGen space using the -XX:MaxPermSize VM setting, but even
 after tripling the space, the same errors occur. I've also tried storing
 intermediate results, and am able to get the full job completed by running
 it multiple times and starting for the last successful intermediate result.
 There seems to be some memory leak in the parquet format. Any hints on how
 to fix this problem?

  Thanks,
 Anders




Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Stahlman, Jonathan
Hi Burak,

Looking at the source code, the intermediate RDDs used in ALS.train() are 
persisted during the computation using intermediateRDDStorageLevel (default 
value is StorageLevel.MEMORY_AND_DISK) - see 
herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L546,
 
herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L548,
 and 
herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L556.
  At the end of the ALS calculation, these RDDs are no longer needed nor 
returned, so I would assume the logical choice would be to unpersist() these 
RDDs.  The strategy in the code seems to be set by finalRDDStorageLevel, which 
for some reason only calls unpersist() on the intermediate RDDs if  
finalRDDStorageLevel != StorageLevel.NONE, which seems counter-intuitive to me.

Jonathan

From: Burak Yavuz brk...@gmail.commailto:brk...@gmail.com
Date: Wednesday, July 22, 2015 at 10:47 AM
To: Stahlman Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hi Jonathan,

I believe calling persist with StorageLevel.NONE doesn't do anything. That's 
why the unpersist has an if statement before it.
Could you give more information about your setup please? Number of cores, 
memory, number of partitions of ratings_train?

Thanks,
Burak

On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com 
wrote:
Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into 
the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = 
StorageLevel.NONE, which I would understand to mean that the intermediate RDDs 
will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs 
listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the 
cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: Stahlman, Stahlman Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com
Date: Thursday, July 16, 2015 at 2:18 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
#train model
ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ,.join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
results.append(result)



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. 

Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Burak Yavuz
Hi Jonathan,

I believe calling persist with StorageLevel.NONE doesn't do anything.
That's why the unpersist has an if statement before it.
Could you give more information about your setup please? Number of cores,
memory, number of partitions of ratings_train?

Thanks,
Burak

On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan 
jonathan.stahl...@capitalone.com wrote:

 Hello again,

 In trying to understand the caching of intermediate RDDs by ALS, I looked
 into the source code and found what may be a bug.  Looking here:


 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

 you see that ALS.train() is being called with finalRDDStorageLevel =
 StorageLevel.NONE, which I would understand to mean that the intermediate
 RDDs will not be persisted.  Looking here:


 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

 unpersist() is only being called on the intermediate RDDs (all the *Blocks
 RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.


 This doesn’t make sense to me – I would expect the RDDs to be removed from
 the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way
 around.

 Jonathan


 From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.com
 Date: Thursday, July 16, 2015 at 2:18 PM
 To: user@spark.apache.org user@spark.apache.org
 Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

 Hello all,

 I am running the Spark recommendation algorithm in MLlib and I have been
 studying its output with various model configurations.  Ideally I would
 like to be able to run one job that trains the recommendation model with
 many different configurations to try to optimize for performance.  A sample
 code in python is copied below.

 The issue I have is that each new model which is trained caches a set of
 RDDs and eventually the executors run out of memory.  Is there any way in
 Pyspark to unpersist() these RDDs after each iteration?  The names of the
 RDDs which I gather from the UI is:

 itemInBlocks
 itemOutBlocks
 Products
 ratingBlocks
 userInBlocks
 userOutBlocks
 users

 I am using Spark 1.3.  Thank you for any help!

 Regards,
 Jonathan




   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
   functions = [rating] #defined elsewhere
   ranks = [10,20]
   iterations = [10,20]
   lambdas = [0.01,0.1]
   alphas  = [1.0,50.0]

   results = []
   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
 itertools.product( functions, ranks, iterations, lambdas, alphas ):
 #train model
 ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
 ratingFunction(l) ) )
 model   = ALS.trainImplicit( ratings_train, rank, numIterations,
 lambda_=float(m_lambda), alpha=float(m_alpha) )

 #test performance on CV data
 ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
 ratingFunction(l) ) )
 auc = areaUnderCurve( ratings_cv, model.predictAll )

 #save results
 result = ,.join(str(l) for l in
 [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
 results.append(result)

 --

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates and may only be used
 solely in performance of work or services for Capital One. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed. If the reader of this message is not the intended
 recipient, you are hereby notified that any review, retransmission,
 dissemination, distribution, copying or other use of, or taking of any
 action in reliance upon this information is strictly prohibited. If you
 have received this communication in error, please contact the sender and
 delete the material from your computer.



How to keep RDDs in memory between two different batch jobs?

2015-07-22 Thread swetha
Hi,

We have a requirement wherein we need to keep RDDs in memory between Spark
batch processing that happens every one hour. The idea here is to have RDDs
that have active user sessions in memory between two jobs so that once a job
processing is  done and another job is run after an hour the RDDs with
active sessions are still available for joining with those in the current
job. So, what do we need to keep the data in memory in between two batch
jobs? Can we use Tachyon?

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Ganelin, Ilya
To be Unpersisted the RDD must be persisted first. If it's set to None, then 
it's not persisted, and as such does not need to be freed. Does that make sense 
?



Thank you,
Ilya Ganelin



-Original Message-
From: Stahlman, Jonathan 
[jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com]
Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time
To: user@spark.apache.org
Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into 
the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = 
StorageLevel.NONE, which I would understand to mean that the intermediate RDDs 
will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs 
listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the 
cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: Stahlman, Stahlman Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com
Date: Thursday, July 16, 2015 at 2:18 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
#train model
ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ,.join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
results.append(result)



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Spark spark.shuffle.memoryFraction has no affect

2015-07-22 Thread Walid Baroni
Hi Andrew

I tried many different combinations, but still no change in the amount of 
shuffle bytes spilled to disk by checking the UI. I made sure the configuration 
have been applied by checking Spark UI/Environment. I only see changes in 
shuffle bytes spilled if I disable spark.shuffle.spill


 On Jul 22, 2015, at 3:15 AM, Andrew Or and...@databricks.com wrote:
 
 Hi,
 
 The setting of 0.2 / 0.6 looks reasonable to me. Since you are not using 
 caching at all, have you tried trying something more extreme, like 0.1 / 0.9? 
 Since disabling spark.shuffle.spill didn't cause an OOM this setting should 
 be fine. Also, one thing you could do is to verify the shuffle bytes spilled 
 on the UI before and after the change.
 
 Let me know if that helped.
 -Andrew
 
 2015-07-21 13:50 GMT-07:00 wdbaruni wdbar...@gmail.com 
 mailto:wdbar...@gmail.com:
 Hi
 I am testing Spark on Amazon EMR using Python and the basic wordcount
 example shipped with Spark.
 
 After running the application, I realized that in Stage 0 reduceByKey(add),
 around 2.5GB shuffle is spilled to memory and 4GB shuffle is spilled to
 disk. Since in the wordcount example I am not caching or persisting any
 data, so I thought I can increase the performance of this application by
 giving more shuffle memoryFraction. So, in spark-defaults.conf, I added the
 following:
 
 spark.storage.memoryFraction0.2
 spark.shuffle.memoryFraction0.6
 
 However, I am still getting the same performance and the same amount of
 shuffle data is being spilled to disk and memory. I validated that Spark is
 reading these configurations using Spark UI/Environment and I can see my
 changes. Moreover, I tried setting spark.shuffle.spill to false and I got
 the performance I am looking for and all shuffle data was spilled to memory
 only.
 
 So, what am I getting wrong here and why not the extra shuffle memory
 fraction is not utilized?
 
 *My environment:*
 Amazon EMR with Spark 1.3.1 running using -x argument
 1 Master node: m3.xlarge
 3 Core nodes: m3.xlarge
 Application: wordcount.py
 Input: 10 .gz files 90MB each (~350MB unarchived) stored in S3
 
 *Submit command:*
 /home/hadoop/spark/bin/spark-submit --deploy-mode client /mnt/wordcount.py
 s3n://input location
 
 *spark-defaults.conf:*
 spark.eventLog.enabled  false
 spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails
 -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
 -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
 spark.driver.extraJavaOptions   -Dspark.driver.log.level=INFO
 spark.masteryarn
 spark.executor.instances3
 spark.executor.cores4
 spark.executor.memory   9404M
 spark.default.parallelism   12
 spark.eventLog.enabled  true
 spark.eventLog.dir  hdfs:///spark-logs/
 spark.storage.memoryFraction0.2
 spark.shuffle.memoryFraction0.6
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-spark-shuffle-memoryFraction-has-no-affect-tp23944.html
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-spark-shuffle-memoryFraction-has-no-affect-tp23944.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 



Re: Parquet problems

2015-07-22 Thread Michael Misiewicz
For what it's worth, my data set has around 85 columns in Parquet format as
well. I have tried bumping the permgen up to 512m but I'm still getting
errors in the driver thread.

On Wed, Jul 22, 2015 at 1:20 PM, Jerry Lam chiling...@gmail.com wrote:

 Hi guys,

 I noticed that too. Anders, can you confirm that it works on Spark 1.5
 snapshot? This is what I tried at the end. It seems it is 1.4 issue.

 Best Regards,

 Jerry

 On Wed, Jul 22, 2015 at 11:46 AM, Anders Arpteg arp...@spotify.com
 wrote:

 No, never really resolved the problem, except by increasing the permgem
 space which only partially solved it. Still have to restart the job
 multiple times so make the whole job complete (it stores intermediate
 results).

 The parquet data sources have about 70 columns, and yes Cheng, it works
 fine when only loading a small sample of the data.

 Thankful for any hints,
 Anders

 On Wed, Jul 22, 2015 at 5:29 PM Cheng Lian lian.cs@gmail.com wrote:

  How many columns are there in these Parquet files? Could you load a
 small portion of the original large dataset successfully?

 Cheng


 On 6/25/15 5:52 PM, Anders Arpteg wrote:

 Yes, both the driver and the executors. Works a little bit better with
 more space, but still a leak that will cause failure after a number of
 reads. There are about 700 different data sources that needs to be loaded,
 lots of data...

  tor 25 jun 2015 08:02 Sabarish Sasidharan 
 sabarish.sasidha...@manthan.comsabarish.sasidha...@manthan.com skrev:

 Did you try increasing the perm gen for the driver?

 Regards
 Sab

 On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com wrote:

 When reading large (and many) datasets with the Spark 1.4.0 DataFrames
 parquet reader (the org.apache.spark.sql.parquet format), the following
 exceptions are thrown:

  Exception in thread sk-result-getter-0

 Exception: java.lang.OutOfMemoryError thrown from the
 UncaughtExceptionHandler in thread task-result-getter-0
 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread task-result-getter-1 java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread task-result-getter-2 java.lang.OutOfMemoryError:
 PermGen space


  and many more like these from different threads. I've tried
 increasing the PermGen space using the -XX:MaxPermSize VM setting, but 
 even
 after tripling the space, the same errors occur. I've also tried storing
 intermediate results, and am able to get the full job completed by running
 it multiple times and starting for the last successful intermediate 
 result.
 There seems to be some memory leak in the parquet format. Any hints on how
 to fix this problem?

  Thanks,
 Anders





Re: How to share a Map among RDDS?

2015-07-22 Thread Dan Dong
Hi, Andrew,
  If I broadcast the Map:
val map2=sc.broadcast(map1)

I will get compilation error:
org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[Int,String]]
does not take parameters
[error]  val matchs= Vecs.map(term=term.map{case (a,b)=(map2(a),b)})

Seems it's still an RDD, so how to access it by value=map2(key) ? Thanks!

Cheers,
Dan



2015-07-22 2:20 GMT-05:00 Andrew Or and...@databricks.com:

 Hi Dan,

 If the map is small enough, you can just broadcast it, can't you? It
 doesn't have to be an RDD. Here's an example of broadcasting an array and
 using it on the executors:
 https://github.com/apache/spark/blob/c03299a18b4e076cabb4b7833a1e7632c5c0dabe/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
 .

 -Andrew

 2015-07-21 19:56 GMT-07:00 ayan guha guha.a...@gmail.com:

 Either you have to do rdd.collect and then broadcast or you can do a join
 On 22 Jul 2015 07:54, Dan Dong dongda...@gmail.com wrote:

 Hi, All,


 I am trying to access a Map from RDDs that are on different compute
 nodes, but without success. The Map is like:

 val map1 = Map(aa-1,bb-2,cc-3,...)

 All RDDs will have to check against it to see if the key is in the Map
 or not, so seems I have to make the Map itself global, the problem is that
 if the Map is stored as RDDs and spread across the different nodes, each
 node will only see a piece of the Map and the info will not be complete to
 check against the Map( an then replace the key with the corresponding
 value) E,g:

 val matchs= Vecs.map(term=term.map{case (a,b)=(map1(a),b)})

 But if the Map is not an RDD, how to share it like sc.broadcast(map1)

 Any idea about this? Thanks!


 Cheers,
 Dan





Re: How to share a Map among RDDS?

2015-07-22 Thread Dan Dong
Thanks Andrew, exactly.

2015-07-22 14:26 GMT-05:00 Andrew Or and...@databricks.com:

 Hi Dan,

 `map2` is a broadcast variable, not your map. To access the map on the
 executors you need to do `map2.value(a)`.

 -Andrew

 2015-07-22 12:20 GMT-07:00 Dan Dong dongda...@gmail.com:

 Hi, Andrew,
   If I broadcast the Map:
 val map2=sc.broadcast(map1)

 I will get compilation error:
 org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[Int,String]]
 does not take parameters
 [error]  val matchs= Vecs.map(term=term.map{case (a,b)=(map2(a),b)})

 Seems it's still an RDD, so how to access it by value=map2(key) ? Thanks!

 Cheers,
 Dan



 2015-07-22 2:20 GMT-05:00 Andrew Or and...@databricks.com:

 Hi Dan,

 If the map is small enough, you can just broadcast it, can't you? It
 doesn't have to be an RDD. Here's an example of broadcasting an array and
 using it on the executors:
 https://github.com/apache/spark/blob/c03299a18b4e076cabb4b7833a1e7632c5c0dabe/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
 .

 -Andrew

 2015-07-21 19:56 GMT-07:00 ayan guha guha.a...@gmail.com:

 Either you have to do rdd.collect and then broadcast or you can do a
 join
 On 22 Jul 2015 07:54, Dan Dong dongda...@gmail.com wrote:

 Hi, All,


 I am trying to access a Map from RDDs that are on different compute
 nodes, but without success. The Map is like:

 val map1 = Map(aa-1,bb-2,cc-3,...)

 All RDDs will have to check against it to see if the key is in the Map
 or not, so seems I have to make the Map itself global, the problem is that
 if the Map is stored as RDDs and spread across the different nodes, each
 node will only see a piece of the Map and the info will not be complete to
 check against the Map( an then replace the key with the corresponding
 value) E,g:

 val matchs= Vecs.map(term=term.map{case (a,b)=(map1(a),b)})

 But if the Map is not an RDD, how to share it like sc.broadcast(map1)

 Any idea about this? Thanks!


 Cheers,
 Dan







Re: How to share a Map among RDDS?

2015-07-22 Thread Andrew Or
Hi Dan,

`map2` is a broadcast variable, not your map. To access the map on the
executors you need to do `map2.value(a)`.

-Andrew

2015-07-22 12:20 GMT-07:00 Dan Dong dongda...@gmail.com:

 Hi, Andrew,
   If I broadcast the Map:
 val map2=sc.broadcast(map1)

 I will get compilation error:
 org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[Int,String]]
 does not take parameters
 [error]  val matchs= Vecs.map(term=term.map{case (a,b)=(map2(a),b)})

 Seems it's still an RDD, so how to access it by value=map2(key) ? Thanks!

 Cheers,
 Dan



 2015-07-22 2:20 GMT-05:00 Andrew Or and...@databricks.com:

 Hi Dan,

 If the map is small enough, you can just broadcast it, can't you? It
 doesn't have to be an RDD. Here's an example of broadcasting an array and
 using it on the executors:
 https://github.com/apache/spark/blob/c03299a18b4e076cabb4b7833a1e7632c5c0dabe/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
 .

 -Andrew

 2015-07-21 19:56 GMT-07:00 ayan guha guha.a...@gmail.com:

 Either you have to do rdd.collect and then broadcast or you can do a join
 On 22 Jul 2015 07:54, Dan Dong dongda...@gmail.com wrote:

 Hi, All,


 I am trying to access a Map from RDDs that are on different compute
 nodes, but without success. The Map is like:

 val map1 = Map(aa-1,bb-2,cc-3,...)

 All RDDs will have to check against it to see if the key is in the Map
 or not, so seems I have to make the Map itself global, the problem is that
 if the Map is stored as RDDs and spread across the different nodes, each
 node will only see a piece of the Map and the info will not be complete to
 check against the Map( an then replace the key with the corresponding
 value) E,g:

 val matchs= Vecs.map(term=term.map{case (a,b)=(map1(a),b)})

 But if the Map is not an RDD, how to share it like sc.broadcast(map1)

 Any idea about this? Thanks!


 Cheers,
 Dan






Re: spark.executor.memory and spark.driver.memory have no effect in yarn-cluster mode (1.4.x)?

2015-07-22 Thread Michael Misiewicz
That makes a lot of sense, thanks for the concise answer!

On Wed, Jul 22, 2015 at 4:10 PM, Andrew Or and...@databricks.com wrote:

 Hi Michael,

 In general, driver related properties should not be set through the
 SparkConf. This is because by the time the SparkConf is created, we have
 already started the driver JVM, so it's too late to change the memory,
 class paths and other properties.

 In cluster mode, executor related properties should also not be set
 through the SparkConf. This is because the driver is run on the cluster
 just like the executors, and the executors are launched independently by
 whatever the cluster manager (e.g. YARN) is configured to do.

 The recommended way of setting these properties is either through the
 conf/spark-defaults.conf properties file, or through the spark-submit
 command line, e.g.:

 bin/spark-shell --master yarn --executor-memory 2g --driver-memory 5g

 Let me know if that answers your question,
 -Andrew


 2015-07-22 12:38 GMT-07:00 Michael Misiewicz mmisiew...@gmail.com:

 Hi group,

 I seem to have encountered a weird problem with 'spark-submit' and
 manually setting sparkconf values in my applications.

 It seems like setting the configuration values spark.executor.memory
 and spark.driver.memory don't have any effect, when they are set from
 within my application (i.e. prior to creating a SparkContext).

 In yarn-cluster mode, only the values specified on the command line via
 spark-submit for driver and executor memory are respected, and if not, it
 appears spark falls back to defaults. For example,

 Correct behavior noted in Driver's logs on YARN when --executor-memory is
 specified:

 15/07/22 19:25:59 INFO yarn.YarnAllocator: Will request 200 executor 
 containers, each with 1 cores and 13824 MB memory including 1536 MB overhead
 15/07/22 19:25:59 INFO yarn.YarnAllocator: Container request (host: Any, 
 capability: memory:13824, vCores:1)


 But not when spark.executor.memory is specified prior to spark context 
 initialization:

 15/07/22 19:22:22 INFO yarn.YarnAllocator: Will request 200 executor 
 containers, each with 1 cores and 2560 MB memory including 1536 MB overhead
 15/07/22 19:22:22 INFO yarn.YarnAllocator: Container request (host: Any, 
 capability: memory:2560, vCores:1)


 In both cases, executor mem should be 10g. Interestingly, I set a parameter 
 spark.yarn.executor.memoryOverhead which appears to be respected whether or 
 not I'm in yarn-cluster or yarn-client mode.


 Has anyone seen this before? Any idea what might be causing this behavior?





Re: spark.deploy.spreadOut core allocation

2015-07-22 Thread Andrew Or
Hi Srikanth,

It does look like a bug. Did you set `spark.executor.cores` in your
application by any chance?

-Andrew

2015-07-22 8:05 GMT-07:00 Srikanth srikanth...@gmail.com:

 Hello,

 I've set spark.deploy.spreadOut=false in spark-env.sh.

 export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4
 -Dspark.deploy.spreadOut=false


 There are 3 workers each with 4 cores. Spark-shell was started with noof
 cores = 6.
 Spark UI show that one executor was used with 6 cores.

 Is this a bug? This is with Spark 1.4.

 [image: Inline image 1]

 Srikanth



Re: How to keep RDDs in memory between two different batch jobs?

2015-07-22 Thread ericacm
Tachyon is one way.  Also check out the  Spark Job Server
https://github.com/spark-jobserver/spark-jobserver  .



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957p23958.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to keep RDDs in memory between two different batch jobs?

2015-07-22 Thread harirajaram
I was about say whatever the previous post said,so +1 to the previous
post,from my understanding (gut feeling) of your requirement it very easy to
do this with spark-job-server.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957p23960.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark DataFrame created from JavaRDDRow copies all columns data into first column

2015-07-22 Thread unk1102
Hi I have a DataFrame which I need to convert into JavaRDD and back to
DataFrame I have the following code

DataFrame sourceFrame =
hiveContext.read().format(orc).load(/path/to/orc/file);
//I do order by in above sourceFrame and then I convert it into JavaRDD
JavaRDDRow modifiedRDD = sourceFrame.toJavaRDD().map(new
FunctionRow,Row({
public Row call(Row row) throws Exception {
   if(row != null) {
   //updated row by creating new Row
   return RowFactory.create(updateRow);
   }
  return null;
});
//now I convert above JavaRDDRow into DataFrame using the following
DataFrame modifiedFrame = sqlContext.createDataFrame(modifiedRDD,schema);

sourceFrame and modifiedFrame schema is same when I call sourceFrame.show()
output is expected I see every column has corresponding values and no column
is empty but when I call modifiedFrame.show() I see all the columns values
gets merged into first column value for e.g. assume source DataFrame has 3
column as shown below

_col1_col2_col3
 ABC   10  DEF
 GHI   20  JKL
When I print modifiedFrame which I converted from JavaRDD it shows in the
following order

_col1 _col2   _col3
ABC,10,DEF
GHI,20,JKL

As shown above all the _col1 has all the values and _col2 and _col3 is
empty. I dont know what is wrong I am doing please guide I am new to Spark
thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrame-created-from-JavaRDD-Row-copies-all-columns-data-into-first-column-tp23961.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Performance issue with Spak's foreachpartition method

2015-07-22 Thread Robin East
The first question I would ask is have you determined whether you have a 
performance issue writing to Oracle? In particular how many commits are you 
making? If you are issuing a lot of commits that would be a performance problem.

Robin

 On 22 Jul 2015, at 19:11, diplomatic Guru diplomaticg...@gmail.com wrote:
 
 Hello all,
 
 We are having a major performance issue with the Spark, which is holding us 
 from going live.
 
 We have a job that carries out computation on log files and write the results 
 into Oracle DB.
 
 The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't want 
 to establish too many DB connections.
 
 We are then calling the foreachPartition on the RDD pairs that were reduced 
 by the key. Within this foreachPartition method we establish DB connection, 
 then iterate the results, prepare the Oracle statement for batch insertion 
 then we commit the batch and close the connection. All these are working fine.
 
 However, when we execute the job to process 12GB of data, it takes forever to 
 complete, especially at the foreachPartition stage.
 
 We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3 
 is assigned to spark.storage.memoryFraction.
 
 The job is taking about 50 minutes to complete, which is not ideal. I'm not 
 sure how we could enhance the performance. I've provided the main body of the 
 codes, please take a look and advice:
 
 From Driver:
 
 reduceResultsRDD.foreachPartition(new DB.InsertFunction( 
 dbuser,dbpass,batchsize));
 
 
 DB class:
 
 public class DB {
   private static final Logger logger = LoggerFactory
   .getLogger(DB.class);
   
 public static class InsertFunction implements
   VoidFunctionIteratorTuple2String, String {
 
   private static final long serialVersionUID = 55766876878L;
   private String dbuser = ;
   private String dbpass = ;
   private int batchsize;
 
   public InsertFunction(String dbuser, String dbpass, int 
 batchsize) {
   super();
   this.dbuser = dbuser;
   this.dbuser = dbuser;
   this.batchsize=batchsize;
   }
 
 @Override
   public void call(IteratorTuple2String, String results) {
   Connection connect = null;
   PreparedStatement pstmt = null;
   try {
   connect = getDBConnection(dbuser,
   dbpass);
 
   int count = 0;
 
   if (batchsize = 0) {
   batchsize = 1;
   }
 
   pstmt1 = connect
   .prepareStatement(MERGE INTO 
 SOME TABLE IF RECORD FOUND, IF NOT INSERT);
 
   while (results.hasNext()) {
 
   Tuple2String, String kv = 
 results.next();
   
   String [] data = 
 kv._1.concat(, +kv._2).split(,);
 
   
   pstmt.setString(1, data[0].toString());
   pstmt.setString(2, data[1].toString());
.
 
   pstmt.addBatch();
 
   count++;
 
   if (count == batchsize) {
   logger.info 
 http://logger.info/(BulkCount :  + count);
   pstmt.executeBatch();
   connect.commit();
   count = 0;
   }
 
   pstmt.executeBatch();
   connect.commit();
 
   }
 
   pstmt.executeBatch();
   connect.commit();
 
   } catch (Exception e) {
   logger.error(InsertFunction error:  + 
 e.getMessage());
   } finally {
 
   if (pstmt != null) {
   pstmt.close();
   }
 
   try {
   
   connect.close();
   } catch (SQLException e) {
   logger.error(InsertFunction Connection 
 Close error: 
   + e.getMessage());
   }
   }
   }
 
   }
 }



spark.executor.memory and spark.driver.memory have no effect in yarn-cluster mode (1.4.x)?

2015-07-22 Thread Michael Misiewicz
Hi group,

I seem to have encountered a weird problem with 'spark-submit' and manually
setting sparkconf values in my applications.

It seems like setting the configuration values spark.executor.memory
and spark.driver.memory don't have any effect, when they are set from
within my application (i.e. prior to creating a SparkContext).

In yarn-cluster mode, only the values specified on the command line via
spark-submit for driver and executor memory are respected, and if not, it
appears spark falls back to defaults. For example,

Correct behavior noted in Driver's logs on YARN when --executor-memory is
specified:

15/07/22 19:25:59 INFO yarn.YarnAllocator: Will request 200 executor
containers, each with 1 cores and 13824 MB memory including 1536 MB
overhead
15/07/22 19:25:59 INFO yarn.YarnAllocator: Container request (host:
Any, capability: memory:13824, vCores:1)


But not when spark.executor.memory is specified prior to spark context
initialization:

15/07/22 19:22:22 INFO yarn.YarnAllocator: Will request 200 executor
containers, each with 1 cores and 2560 MB memory including 1536 MB
overhead
15/07/22 19:22:22 INFO yarn.YarnAllocator: Container request (host:
Any, capability: memory:2560, vCores:1)


In both cases, executor mem should be 10g. Interestingly, I set a
parameter spark.yarn.executor.memoryOverhead which appears to be
respected whether or not I'm in yarn-cluster or yarn-client mode.


Has anyone seen this before? Any idea what might be causing this behavior?


databricks spark sql csv FAILFAST not failing, Spark 1.3.1 Java 7

2015-07-22 Thread Adam Pritchard
Hi all,

I am using the databricks csv library to load some data into a data frame.
https://github.com/databricks/spark-csv


I am trying to confirm that failfast mode works correctly and aborts
execution upon receiving an invalid csv file.  But have not been able to
see it fail yet after testing numerous invalid csv files.  Any advice?

spark 1.3.1 running on mapr vm 4.1.0 java 1.7


SparkConf conf = new SparkConf().setAppName(Dataframe testing);

JavaSparkContext sc = new JavaSparkContext(conf);


SQLContext sqlContext = new SQLContext(sc);
HashMapString, String options = new HashMapString, String();
options.put(header, true);
options.put(path, args[0]);
options.put(mode, FAILFAST);
//partner data
DataFrame partnerData = sqlContext.load(com.databricks.spark.csv, options
);
//register partnerData table in spark sql
partnerData.registerTempTable(partnerData);

partnerData.printSchema();
partnerData.show();


It just runs like normal, and will output the data, even with an invalid
csv file.


Thanks!


Re: spark.deploy.spreadOut core allocation

2015-07-22 Thread Srikanth
Cool. Thanks!

Srikanth

On Wed, Jul 22, 2015 at 3:12 PM, Andrew Or and...@databricks.com wrote:

 Hi Srikanth,

 I was able to reproduce the issue by setting `spark.cores.max` to a number
 greater than the number of cores on a worker. I've filed SPARK-9260 which I
 believe is already being fixed in
 https://github.com/apache/spark/pull/7274.

 Thanks for reporting the issue!
 -Andrew

 2015-07-22 11:49 GMT-07:00 Andrew Or and...@databricks.com:

 Hi Srikanth,

 It does look like a bug. Did you set `spark.executor.cores` in your
 application by any chance?

 -Andrew

 2015-07-22 8:05 GMT-07:00 Srikanth srikanth...@gmail.com:

 Hello,

 I've set spark.deploy.spreadOut=false in spark-env.sh.

 export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4
 -Dspark.deploy.spreadOut=false


 There are 3 workers each with 4 cores. Spark-shell was started with noof
 cores = 6.
 Spark UI show that one executor was used with 6 cores.

 Is this a bug? This is with Spark 1.4.

 [image: Inline image 1]

 Srikanth






Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Stahlman, Jonathan
Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into 
the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = 
StorageLevel.NONE, which I would understand to mean that the intermediate RDDs 
will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs 
listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the 
cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: Stahlman, Stahlman Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com
Date: Thursday, July 16, 2015 at 2:18 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
#train model
ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ,.join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
results.append(result)


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: How to keep RDDs in memory between two different batch jobs?

2015-07-22 Thread ericacm
Actually, I should clarify - Tachyon is a way to keep your data in RAM, but
it's not exactly the same as keeping it cached in Spark.  Spark Job Server
is a way to keep it cached in Spark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957p23959.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming 1.3 issues

2015-07-22 Thread Shushant Arora
In spark streaming 1.3 -

Say I have 10 executors each with 4 cores so in total 40 tasks in parllel
at once. If I repartition kafka directstream to 40 partitions vs say I have
in kafka topic 300 partitions - which one will be more efficient , Should I
repartition the kafka stream equal to num of cores or keep it same as 300?

 If I have number of partitions greater than parllel tasks will that not
cause overhead of task scheduling ?

On Wed, Jul 22, 2015 at 11:37 AM, Tathagata Das t...@databricks.com wrote:

 For Java, do

 OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd*.rdd()*).
 offsetRanges();

 If you fix that error, you should be seeing data.

 You can call arbitrary RDD operations on a DStream, using
 DStream.transform. Take a look at the docs.

 For the direct kafka approach you are doing,
 - tasks do get launched for empty partitions
 - driver may make multiple calls to Kafka brokers to get all the offset
 information. But that does not mean you should reduce partitions. the whole
 point of having large number of partition is the consume the data in
 parallel. If you reduce the number of partitions, that defeats the purpose
 of having partitoins at all. And the driver making calls for getting
 metadata (i.e. offsets) isnt very costly, nor is it a bottleneck usually.
 Rather receiving and processing the actual data is usually the bottleneck
 and to increase throughput you should have larger number of partitions.



 On Tue, Jul 21, 2015 at 1:02 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 I'd suggest you upgrading to 1.4 as it has better metrices and UI.

 Thanks
 Best Regards

 On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Is coalesce not applicable to kafkaStream ? How to do coalesce on
 kafkadirectstream its not there in api ?
 Shall calling repartition on directstream with number of executors as
 numpartitions will imrove perfromance ?

 Does in 1.3 tasks get launched for partitions which are empty? Does
 driver makes call for getting offsets of each partition separately or in
 single call it gets all partitions new offsets ? I mean will reducing no of
  partitions oin kafka help improving the performance?

 On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi

 1.I am using spark streaming 1.3 for reading from a kafka queue and
 pushing events to external source.

 I passed in my job 20 executors but it is showing only 6 in executor
 tab ?
 When I used highlevel streaming 1.2 - its showing 20 executors. My
 cluster is 10 node yarn cluster with each node has 8 cores.

 I am calling the script as :

 spark-submit --class classname --num-executors 10 --executor-cores 2
 --master yarn-client jarfile

 2. On Streaming UI

 Started at: Mon Jul 20 11:02:10 GMT+00:00 2015
 Time since start: 13 minutes 28 seconds
 Network receivers: 0
 Batch interval: 1 second
 Processed batches: 807
 Waiting batches: 0
 Received records: 0
 Processed records: 0

 Received records and processed records are always 0 . And Speed of
 processing is slow compare to highlevel api.

 I am procesing the stream using mapPartition.

 When I used
 directKafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[],
 Void() {
  @Override
 public Void call(JavaPairRDDbyte[], byte[] rdd) throws Exception {
 // TODO Auto-generated method stub
 OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges();
 }
 }

 It throws an exception
 java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD
 cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges

 Thanks
 Shushant












Re: spark.deploy.spreadOut core allocation

2015-07-22 Thread Andrew Or
Hi Srikanth,

I was able to reproduce the issue by setting `spark.cores.max` to a number
greater than the number of cores on a worker. I've filed SPARK-9260 which I
believe is already being fixed in https://github.com/apache/spark/pull/7274.

Thanks for reporting the issue!
-Andrew

2015-07-22 11:49 GMT-07:00 Andrew Or and...@databricks.com:

 Hi Srikanth,

 It does look like a bug. Did you set `spark.executor.cores` in your
 application by any chance?

 -Andrew

 2015-07-22 8:05 GMT-07:00 Srikanth srikanth...@gmail.com:

 Hello,

 I've set spark.deploy.spreadOut=false in spark-env.sh.

 export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4
 -Dspark.deploy.spreadOut=false


 There are 3 workers each with 4 cores. Spark-shell was started with noof
 cores = 6.
 Spark UI show that one executor was used with 6 cores.

 Is this a bug? This is with Spark 1.4.

 [image: Inline image 1]

 Srikanth





Re: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)

2015-07-22 Thread Eugene Morozov
Hi, 

I’m stuck with the same issue, but I see 
org.apache.hadoop.fs.s3native.NativeS3FileSystem in the hadoop-core:1.0.4 
(that’s the current hadoop-client I use) and this far is transitive dependency 
that comes from spark itself. I’m using custom build of spark 1.3.1 with 
hadoop-client 1.0.4. 

[INFO] +- 
org.apache.spark:spark-core_2.10:jar:1.3.1-hadoop-client-1.0.4:provided
...
[INFO] |  +- org.apache.hadoop:hadoop-client:jar:1.0.4:provided
[INFO] |  |  \- org.apache.hadoop:hadoop-core:jar:1.0.4:provided

The thing is I don’t have any direct usages of any hadoop-client version, so in 
my understanding I should be able to run my jar on any version of spark (1.3.1 
with hadoop-client 2.2.0 up to 2.2.6 or 1.3.1 with hadoop-client 1.0.4 up to 
1.2.1), but in reality, running it on a live cluster I’m getting class not 
found exception. I’ve checked über-jar of spark itself, and NativeS3FileSystem 
is there, so I don’t really understand where it comes from.

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)


I’ve just got an idea. Is it possible that Executor’s classpath is different 
from the Worker classpath? How can I check Executor’s classpath?

On 23 Apr 2015, at 17:35, Ted Yu yuzhih...@gmail.com wrote:

 NativeS3FileSystem class is in hadoop-aws jar.
 Looks like it was not on classpath.
 
 Cheers
 
 On Thu, Apr 23, 2015 at 7:30 AM, Sujee Maniyam su...@sujee.net wrote:
 Thanks all...
 
 btw, s3n load works without any issues with  spark-1.3.1-bulit-for-hadoop 2.4 
 
 I tried this on 1.3.1-hadoop26
   sc.hadoopConfiguration.set(fs.s3n.impl, 
  org.apache.hadoop.fs.s3native.NativeS3FileSystem)
  val f = sc.textFile(s3n://bucket/file)
  f.count
 
 No it can't find the implementation path.  Looks like some jar is missing ?
 
 java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
 org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
   at 
 org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
   at 
 org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
   at 
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
   at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
 
 On Wednesday, April 22, 2015, Shuai Zheng szheng.c...@gmail.com wrote:
 Below is my code to access s3n without problem (only for 1.3.1. there is a 
 bug in 1.3.0).
 
  
 
   Configuration hadoopConf = ctx.hadoopConfiguration();
 
   hadoopConf.set(fs.s3n.impl, 
 org.apache.hadoop.fs.s3native.NativeS3FileSystem);
 
   hadoopConf.set(fs.s3n.awsAccessKeyId, awsAccessKeyId);
 
   hadoopConf.set(fs.s3n.awsSecretAccessKey, awsSecretAccessKey);
 
  
 
 Regards,
 
  
 
 Shuai
 
  
 
 From: Sujee Maniyam [mailto:su...@sujee.net] 
 Sent: Wednesday, April 22, 2015 12:45 PM
 To: Spark User List
 Subject: spark 1.3.1 : unable to access s3n:// urls (no file system for 
 scheme s3n:)
 
  
 
 Hi all
 
 I am unable to access s3n://  urls using   sc.textFile().. getting 'no file 
 system for scheme s3n://'  error.
 
  
 
 a bug or some conf settings missing?
 
  
 
 See below for details:
 
  
 
 env variables : 
 
 AWS_SECRET_ACCESS_KEY=set
 
 AWS_ACCESS_KEY_ID=set
 
  
 
 spark/RELAESE :
 
 Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0
 
 Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver 
 -Pyarn -DzincPort=3034
 
  
 
  
 
 ./bin/spark-shell
 
  val f = sc.textFile(s3n://bucket/file)
 
  f.count
 
  
 
 error== 
 
 java.io.IOException: No FileSystem for scheme: s3n
 
 at 
 org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
 
 at 
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
 
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
 
 at 
 org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
 
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
 
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
 
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
 
 at 
 org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
 
 at 
 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
 
 at 
 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
 
 at 
 org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
 
 at 
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
 
 at 
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
 
 at scala.Option.getOrElse(Option.scala:120)
 
 at 

Re: Which memory fraction is Spark using to compute RDDs that are not going to be persisted

2015-07-22 Thread Andrew Or
Hi,

It would be whatever's left in the JVM. This is not explicitly controlled
by a fraction like storage or shuffle. However, the computation usually
doesn't need to use that much space. In my experience it's almost always
the caching or the aggregation during shuffles that's the most memory
intensive.

-Andrew

2015-07-21 13:47 GMT-07:00 wdbaruni wdbar...@gmail.com:

 I am new to Spark and I understand that Spark divides the executor memory
 into the following fractions:

 *RDD Storage:* Which Spark uses to store persisted RDDs using .persist() or
 .cache() and can be defined by setting spark.storage.memoryFraction
 (default
 0.6)

 *Shuffle and aggregation buffers:* Which Spark uses to store shuffle
 outputs. It can defined using spark.shuffle.memoryFraction. If shuffle
 output exceeds this fraction, then Spark will spill data to disk (default
 0.2)

 *User code:* Spark uses this fraction to execute arbitrary user code
 (default 0.2)

 I am not mentioning the storage and shuffle safety fractions for
 simplicity.

 My question is, which memory fraction is Spark using to compute and
 transform RDDs that are not going to be persisted? For example:

 lines = sc.textFile(i am a big file.txt)
 count = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x,
 1)).reduceByKey(add)
 count.saveAsTextFile(output)

 Here Spark will not load the whole file at once and will partition the
 input
 file and do all these transformations per partition in a single stage.
 However, which memory fraction Spark will use to load the partitioned
 lines,
 compute flatMap() and map()?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Which-memory-fraction-is-Spark-using-to-compute-RDDs-that-are-not-going-to-be-persisted-tp23942.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Does Spark streaming support is there with RabbitMQ

2015-07-22 Thread Abel Rincón
Hi,

We tested this receiver internally in stratio sparkta, and it works fine,
If you will try the receiver, we're open to your collaboration, your issues
will be wellcome.

Regards

A.Rincón
Stratio software architect



2015-07-22 8:15 GMT+02:00 Tathagata Das t...@databricks.com:

 You could contact the authors of the spark-packages.. maybe that will help?

 On Mon, Jul 20, 2015 at 6:41 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Thanks Todd,

 I m not sure whether somebody has used it or not. can somebody confirm if
 this integrate nicely with Spark streaming?



 On 20 July 2015 at 18:43, Todd Nist tsind...@gmail.com wrote:

 There is one package available on the spark-packages site,

 http://spark-packages.org/package/Stratio/RabbitMQ-Receiver

 The source is here:

 https://github.com/Stratio/RabbitMQ-Receiver

 Not sure that meets your needs or not.

 -Todd

 On Mon, Jul 20, 2015 at 8:52 AM, Jeetendra Gangele gangele...@gmail.com
  wrote:

 Does Apache spark support RabbitMQ. I have messages on RabbitMQ and I
 want to process them using Apache Spark streaming does it scale?

 Regards
 Jeetendra








Re: many-to-many join

2015-07-22 Thread Sonal Goyal
If I understand this correctly, you could join area_code_user and
area_code_state and then flat map to get
user, areacode, state. Then groupby/reduce by user.

You can also try some join optimizations like partitioning on area code or
broadcasting smaller table depending on size of area_code_state.
On Jul 22, 2015 10:15 AM, John Berryman jo...@eventbrite.com wrote:

 Quick example problem that's stumping me:

 * Users have 1 or more phone numbers and therefore one or more area codes.
 * There are 100M users.
 * States have one or more area codes.
 * I would like to the states for the users (as indicated by phone area
 code).

 I was thinking about something like this:

 If area_code_user looks like (area_code,[user_id]) ex: (615,[1234567])
 and area_code_state looks like (area_code,state) ex: (615, [Tennessee])
 then we could do

 states_and_users_mixed = area_code_user.join(area_code_state) \
 .reduceByKey(lambda a,b: a+b) \
 .values()

 user_state_pairs = states_and_users_mixed.flatMap(
 emit_cartesian_prod_of_userids_and_states )
 user_to_states =   user_state_pairs.reduceByKey(lambda a,b: a+b)

 user_to_states.first(1)

  (1234567,[Tennessee,Tennessee,California])

 This would work, but the user_state_pairs is just a list of user_ids and
 state names mixed together and emit_cartesian_prod_of_userids_and_states
 has to correctly pair them. This is problematic because 1) it's weird and
 sloppy and 2) there will be lots of users per state and having so many
 users in a single row is going to make
 emit_cartesian_prod_of_userids_and_states work extra hard to first locate
 states and then emit all userid-state pairs.

 How should I be doing this?

 Thanks,
 -John



Re: Spark spark.shuffle.memoryFraction has no affect

2015-07-22 Thread Andrew Or
Hi,

The setting of 0.2 / 0.6 looks reasonable to me. Since you are not using
caching at all, have you tried trying something more extreme, like 0.1 /
0.9? Since disabling spark.shuffle.spill didn't cause an OOM this setting
should be fine. Also, one thing you could do is to verify the shuffle bytes
spilled on the UI before and after the change.

Let me know if that helped.
-Andrew

2015-07-21 13:50 GMT-07:00 wdbaruni wdbar...@gmail.com:

 Hi
 I am testing Spark on Amazon EMR using Python and the basic wordcount
 example shipped with Spark.

 After running the application, I realized that in Stage 0 reduceByKey(add),
 around 2.5GB shuffle is spilled to memory and 4GB shuffle is spilled to
 disk. Since in the wordcount example I am not caching or persisting any
 data, so I thought I can increase the performance of this application by
 giving more shuffle memoryFraction. So, in spark-defaults.conf, I added the
 following:

 spark.storage.memoryFraction0.2
 spark.shuffle.memoryFraction0.6

 However, I am still getting the same performance and the same amount of
 shuffle data is being spilled to disk and memory. I validated that Spark is
 reading these configurations using Spark UI/Environment and I can see my
 changes. Moreover, I tried setting spark.shuffle.spill to false and I got
 the performance I am looking for and all shuffle data was spilled to memory
 only.

 So, what am I getting wrong here and why not the extra shuffle memory
 fraction is not utilized?

 *My environment:*
 Amazon EMR with Spark 1.3.1 running using -x argument
 1 Master node: m3.xlarge
 3 Core nodes: m3.xlarge
 Application: wordcount.py
 Input: 10 .gz files 90MB each (~350MB unarchived) stored in S3

 *Submit command:*
 /home/hadoop/spark/bin/spark-submit --deploy-mode client /mnt/wordcount.py
 s3n://input location

 *spark-defaults.conf:*
 spark.eventLog.enabled  false
 spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails
 -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
 -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
 spark.driver.extraJavaOptions   -Dspark.driver.log.level=INFO
 spark.masteryarn
 spark.executor.instances3
 spark.executor.cores4
 spark.executor.memory   9404M
 spark.default.parallelism   12
 spark.eventLog.enabled  true
 spark.eventLog.dir  hdfs:///spark-logs/
 spark.storage.memoryFraction0.2
 spark.shuffle.memoryFraction0.6



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-spark-shuffle-memoryFraction-has-no-affect-tp23944.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to share a Map among RDDS?

2015-07-22 Thread Andrew Or
Hi Dan,

If the map is small enough, you can just broadcast it, can't you? It
doesn't have to be an RDD. Here's an example of broadcasting an array and
using it on the executors:
https://github.com/apache/spark/blob/c03299a18b4e076cabb4b7833a1e7632c5c0dabe/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
.

-Andrew

2015-07-21 19:56 GMT-07:00 ayan guha guha.a...@gmail.com:

 Either you have to do rdd.collect and then broadcast or you can do a join
 On 22 Jul 2015 07:54, Dan Dong dongda...@gmail.com wrote:

 Hi, All,


 I am trying to access a Map from RDDs that are on different compute
 nodes, but without success. The Map is like:

 val map1 = Map(aa-1,bb-2,cc-3,...)

 All RDDs will have to check against it to see if the key is in the Map or
 not, so seems I have to make the Map itself global, the problem is that if
 the Map is stored as RDDs and spread across the different nodes, each node
 will only see a piece of the Map and the info will not be complete to check
 against the Map( an then replace the key with the corresponding value) E,g:

 val matchs= Vecs.map(term=term.map{case (a,b)=(map1(a),b)})

 But if the Map is not an RDD, how to share it like sc.broadcast(map1)

 Any idea about this? Thanks!


 Cheers,
 Dan




Re: How to restart Twitter spark stream

2015-07-22 Thread Akhil Das
That was a pseudo code, working version would look like this:

val stream = TwitterUtils.createStream(ssc, None)

val hashTags = stream.flatMap(status = status.getText.split(
).filter(_.startsWith(#))).map(x = (x.toLowerCase,1))

val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
Seconds(10))
  .map{case (topic, count) = (count, topic)}
  .transform(_.sortByKey(false)).map(x = x._2)

topCounts10.print()


val filteredStream = topCounts10.transform(rdd ={
  *val samplehashtags =
ssc.sparkContext.parallelize(Array(#RobinWilliams.toLowerCase,#android.toLowerCase,#iphone.toLowerCase))*
  val newRDD = samplehashtags.map { x = (x,1) }
  val joined = newRDD.join(rdd)

  joined
})

filteredStream.print()

Thanks
Best Regards

On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic zoran.jere...@gmail.com
wrote:

 Hi Akhil and Jorn,

 I tried as you suggested to create some simple scenario, but I have an
 error on rdd.join(newRDD):  value join is not a member of
 org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like:

 val stream = TwitterUtils.createStream(ssc, auth)
 val filteredStream= stream.transform(rdd ={
 val samplehashtags=Array(music,film)
 val newRDD= samplehashtags.map { x = (x,1) }
 rdd.join(newRDD)
  })


 Did I miss something here?

 Thanks,
 Zoran

 On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Thanks for explanation.

 If I understand this correctly, in this approach I would actually stream
 everything from Twitter, and perform filtering in my application using
 Spark. Isn't this too much overhead if my application is interested in
 listening for couple of hundreds or thousands hashtags?
 On one side, this will be better approach since I will not have the
 problem to open new streams if number of hashtags go over 400 which is the
 Twitter limit for User stream filtering, but on the other side I'm concern
 about how much it will affect application performance if I stream
 everything that is posted on Twitter and filter it locally. It would be
 great if somebody with experience on this could comment on these concerns.

 Thanks,
 Zoran

 On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Jorn meant something like this:

 val filteredStream = twitterStream.transform(rdd ={

 val newRDD =
 scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1))

 rdd.join(newRDD)

 })

 ​newRDD will work like a filter when you do the join.​


 Thanks
 Best Regards

 On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Hi Jorn,

 I didn't know that it is possible to change filter without re-opening
 twitter stream. Actually, I already had that question earlier at the
 stackoverflow
 http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming
 and I got the answer that it's not possible, but it would be even better if
 there is some other way to add new hashtags or to remove old hashtags that
 user stopped following. I guess the second request would be more difficult.

 However, it would be great if you can give me some short example how to
 make this. I didn't understand well from your explanation what you mean by
 join it with a rdd loading the newest hash tags from disk in a regular
 interval.

 Thanks,
 Zoran

 On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com
 wrote:

 Why do you even want to stop it? You can join it with a rdd loading
 the newest hash tags from disk in a regular interval

 Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com
 a écrit :

 Hi,

 I have a twitter spark stream initialized in the following way:

   val ssc:StreamingContext =
 SparkLauncher.getSparkScalaStreamingContext()
   val config = getTwitterConfigurationBuilder.build()
   val auth: Option[twitter4j.auth.Authorization] =
 Some(new

 twitter4j.auth.OAuthAuthorization(config))
   val stream = TwitterUtils.createStream(ssc, auth,
 filters)


 This works fine when I initialy start it. However, at some point I
 need to update filters since users might add new hashtags they want to
 follow. I tried to stop the running stream and spark streaming context
 without stoping spark context, e.g:


stream.stop()
ssc.stop(false)


 Afterward, I'm trying to initialize a new Twitter stream like I did
 previously. However, I got this exception:

 Exception in thread Firestorm JMX Monitor
 java.lang.IllegalStateException: Adding new inputs, transformations, and
 output operations after stopping a context is not supported
 at
 org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
 at
 org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64)
 at
 org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41)
 at
 

Mesos + Spark

2015-07-22 Thread boci
Hi guys!

I'm a new in mesos. I have two spark application (one streaming and one
batch). I want to run both app in mesos cluster. Now for testing I want to
run in docker container so I started a simple redjack/mesos-master, but I
think a lot of think unclear for me (both mesos and spark-mesos).

If I have a mesos cluster (for testing it will be some docker container) i
need a separate machine (container) to run my spark job? Or can I submit
the cluster and schedule (chronos or I dunno)?
How can I run the streaming job? What happened if the controller died? Or
if I call spark-submit with master=mesos my application started and I can
forget? How can I run in every 10 min without submit in every 10 min? How
can I run my streaming app in HA mode?

Thanks

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


Re: user threads in executors

2015-07-22 Thread Tathagata Das
Yes, you could unroll from the iterator in batch of 100-200 and then post
them in multiple rounds.
If you are using the Kafka receiver based approach (not Direct), then the
raw Kafka data is stored in the executor memory. If you are using Direct
Kafka, then it is read from Kafka directly at the time of filtering.

TD

On Tue, Jul 21, 2015 at 9:34 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 I can post multiple items at a time.

 Data is being read from kafka and filtered after that its posted . Does 
 foreachPartition
 load complete partition in memory or use an iterator of batch underhood? If
 compete batch is not loaded will using custim size of 100-200 request in
 one batch and post will help instead of whole partition ?

 On Wed, Jul 22, 2015 at 12:18 AM, Tathagata Das t...@databricks.com
 wrote:

 If you can post multiple items at a time, then use foreachPartition to
 post the whole partition in a single request.

 On Tue, Jul 21, 2015 at 9:35 AM, Richard Marscher 
 rmarsc...@localytics.com wrote:

 You can certainly create threads in a map transformation. We do this to
 do concurrent DB lookups during one stage for example. I would recommend,
 however, that you switch to mapPartitions from map as this allows you to
 create a fixed size thread pool to share across items on a partition as
 opposed to spawning a future per record in the RDD for example.

 On Tue, Jul 21, 2015 at 4:11 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi

 Can I create user threads in executors.
 I have a streaming app where after processing I have a requirement to
 push events to external system . Each post request costs ~90-100 ms.

 To make post parllel, I can not use same thread because that is limited
 by no of cores available in system , can I useuser therads in spark App? I
 tried to create 2 thredas in a map tasks and it worked.

 Is there any upper limit on no of user threds in spark executor ? Is it
 a good idea to create user threads in spark map task?

 Thanks




 --
 *Richard Marscher*
 Software Engineer
 Localytics
 Localytics.com http://localytics.com/ | Our Blog
 http://localytics.com/blog | Twitter http://twitter.com/localytics
  | Facebook http://facebook.com/localytics | LinkedIn
 http://www.linkedin.com/company/1148792?trk=tyah






Re: spark streaming 1.3 issues

2015-07-22 Thread Tathagata Das
For Java, do

OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd*.rdd()*).offsetRanges();

If you fix that error, you should be seeing data.

You can call arbitrary RDD operations on a DStream, using
DStream.transform. Take a look at the docs.

For the direct kafka approach you are doing,
- tasks do get launched for empty partitions
- driver may make multiple calls to Kafka brokers to get all the offset
information. But that does not mean you should reduce partitions. the whole
point of having large number of partition is the consume the data in
parallel. If you reduce the number of partitions, that defeats the purpose
of having partitoins at all. And the driver making calls for getting
metadata (i.e. offsets) isnt very costly, nor is it a bottleneck usually.
Rather receiving and processing the actual data is usually the bottleneck
and to increase throughput you should have larger number of partitions.



On Tue, Jul 21, 2015 at 1:02 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 I'd suggest you upgrading to 1.4 as it has better metrices and UI.

 Thanks
 Best Regards

 On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Is coalesce not applicable to kafkaStream ? How to do coalesce on
 kafkadirectstream its not there in api ?
 Shall calling repartition on directstream with number of executors as
 numpartitions will imrove perfromance ?

 Does in 1.3 tasks get launched for partitions which are empty? Does
 driver makes call for getting offsets of each partition separately or in
 single call it gets all partitions new offsets ? I mean will reducing no of
  partitions oin kafka help improving the performance?

 On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi

 1.I am using spark streaming 1.3 for reading from a kafka queue and
 pushing events to external source.

 I passed in my job 20 executors but it is showing only 6 in executor tab
 ?
 When I used highlevel streaming 1.2 - its showing 20 executors. My
 cluster is 10 node yarn cluster with each node has 8 cores.

 I am calling the script as :

 spark-submit --class classname --num-executors 10 --executor-cores 2
 --master yarn-client jarfile

 2. On Streaming UI

 Started at: Mon Jul 20 11:02:10 GMT+00:00 2015
 Time since start: 13 minutes 28 seconds
 Network receivers: 0
 Batch interval: 1 second
 Processed batches: 807
 Waiting batches: 0
 Received records: 0
 Processed records: 0

 Received records and processed records are always 0 . And Speed of
 processing is slow compare to highlevel api.

 I am procesing the stream using mapPartition.

 When I used
 directKafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[],
 Void() {
  @Override
 public Void call(JavaPairRDDbyte[], byte[] rdd) throws Exception {
 // TODO Auto-generated method stub
 OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges();
 }
 }

 It throws an exception
 java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD
 cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges

 Thanks
 Shushant











Re: spark streaming 1.3 coalesce on kafkadirectstream

2015-07-22 Thread Tathagata Das
With DirectKafkaStream there are two approaches.
1. you increase the number of KAfka partitions Spark will automatically
read in parallel
2. if that's not possible, then explicitly repartition only if there are
more cores in the cluster than the number of Kafka partitions, AND the
first map-like state on the directKafkaDStream is heavy enough to warrant
the cost of repartitioning (shuffles the data around).

On Mon, Jul 20, 2015 at 8:31 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 does spark streaming 1.3 launches task for each partition offset range
 whether that is 0 or not ?

 If yes, how can I enforce it to not to launch tasks for empty rdds.Not
 able t o use coalesce on directKafkaStream.

 Shall we enforce repartitioning always before processing direct stream ?

 use case is :

 directKafkaStream.repartition(numexecutors).mapPartitions(new
 FlatMapFunctionIteratorTuple2byte[],byte[], String(){
 ...
 }

 Thanks



Re: Does Spark streaming support is there with RabbitMQ

2015-07-22 Thread Tathagata Das
You could contact the authors of the spark-packages.. maybe that will help?

On Mon, Jul 20, 2015 at 6:41 AM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Thanks Todd,

 I m not sure whether somebody has used it or not. can somebody confirm if
 this integrate nicely with Spark streaming?



 On 20 July 2015 at 18:43, Todd Nist tsind...@gmail.com wrote:

 There is one package available on the spark-packages site,

 http://spark-packages.org/package/Stratio/RabbitMQ-Receiver

 The source is here:

 https://github.com/Stratio/RabbitMQ-Receiver

 Not sure that meets your needs or not.

 -Todd

 On Mon, Jul 20, 2015 at 8:52 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Does Apache spark support RabbitMQ. I have messages on RabbitMQ and I
 want to process them using Apache Spark streaming does it scale?

 Regards
 Jeetendra







Re: Spark-hive parquet schema evolution

2015-07-22 Thread Cheng Lian
Since Hive doesn’t support schema evolution, you’ll have to update the 
schema stored in metastore somehow. For example, you can create a new 
external table with the merged schema. Say you have a Hive table |t1|:


|CREATE TABLE t1 (c0 INT, c1 DOUBLE); |

By default, this table is stored in HDFS path 
|hdfs://some-host:9000/user/hive/warehouse/t1|. Now you append some 
Parquet data with an extra column |c2| to the same directory:


|import org.apache.spark.sql.types._ val path = 
hdfs://some-host:9000/user/hive/warehouse/t1 val df1 = 
sqlContext.range(10).select('id as 'c0, 'id cast DoubleType as 'c1, 'id 
cast StringType as 'c2) df1.write.mode(append).parquet(path) |


Now you can create a new external table |t2| like this:

|val df2 = sqlContext.read.option(mergeSchema, true).parquet(path) 
df2.write.path(path).saveAsTable(t2) |


Since we specified a path above, the newly created |t2| is an external 
table pointing to the original HDFS location. But the schema of |t2| is 
the merged version.


The drawback of this approach is that, |t2| is actually a Spark SQL 
specific data source table rather than a genuine Hive table. This means, 
it can be accessed by Spark SQL only. We’re just using Hive metastore to 
help persisting metadata of the data source table. However, since you’re 
asking how to access the new table via Spark SQL CLI, this should work 
for you. We are working on making Parquet and ORC data source tables 
accessible via Hive in Spark 1.5.0.


Cheng

On 7/22/15 10:32 AM, Jerrick Hoang wrote:


Hi Lian,

Sorry I'm new to Spark so I did not express myself very clearly. I'm 
concerned about the situation when let's say I have a Parquet table 
some partitions and I add a new column A to parquet schema and write 
some data with the new schema to a new partition in the table. If i'm 
not mistaken, if I do a 
sqlContext.read.parquet(table_path).printSchema() it will print the 
correct schema with new column A. But if I do a 'describe table' from 
SparkSQLCLI I won't see the new column being added. I understand that 
this is because Hive doesn't support schema evolution. So what is the 
best way to support CLI queries in this situation? Do I need to 
manually alter the table everytime the underlying schema changes?


Thanks

On Tue, Jul 21, 2015 at 4:37 PM, Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


Hey Jerrick,

What do you mean by schema evolution with Hive metastore tables?
Hive doesn't take schema evolution into account. Could you please
give a concrete use case? Are you trying to write Parquet data
with extra columns into an existing metastore Parquet table?

Cheng


On 7/21/15 1:04 AM, Jerrick Hoang wrote:

I'm new to Spark, any ideas would be much appreciated! Thanks

On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang
jerrickho...@gmail.com mailto:jerrickho...@gmail.com wrote:

Hi all,

I'm aware of the support for schema evolution via DataFrame
API. Just wondering what would be the best way to go about
dealing with schema evolution with Hive metastore tables. So,
say I create a table via SparkSQL CLI, how would I deal with
Parquet schema evolution?

Thanks,
J






​


Re: many-to-many join

2015-07-22 Thread ayan guha
Hi

RDD solution:
 u = [(615,1),(720,1),(615,2)]
 urdd=sc.parallelize(u,1)
 a1 = [(615,'T'),(720,'C')]
 ardd=sc.parallelize(a1,1)
 def addString(s1,s2):
... return s1+','+s2
 j = urdd.join(ardd).map(lambda t:t[1]).reduceByKey(addString)
 print j.collect()
[(2, 'T'), (1, 'C,T')]

However, if you can assume number of users  is far far greater than
number of distinct area codes, you may think to broadcast variable in a
dict format and look up in the map. Like this

 u = [(1,615),(1,720),(2,615)]
 a = {615:'T',720:'C'}
 urdd=sc.parallelize(u)
 def usr_area_state(tup):
... uid=tup[0]
... aid=tup[1]
... sid=bc.value[aid]
... return uid,(sid,)
...
 bc=sc.broadcast(a)
 usrdd=urdd.map(usr_area_state)
 def addTuple(t1,t2):
... return t1+t2
...
 out=usrdd.reduceByKey(addTuple)
 print out.collect()
[(1, ('T', 'C')), (2, ('T',))]

Best
Ayan

On Wed, Jul 22, 2015 at 5:14 PM, Sonal Goyal sonalgoy...@gmail.com wrote:

 If I understand this correctly, you could join area_code_user and
 area_code_state and then flat map to get
 user, areacode, state. Then groupby/reduce by user.

 You can also try some join optimizations like partitioning on area code or
 broadcasting smaller table depending on size of area_code_state.
 On Jul 22, 2015 10:15 AM, John Berryman jo...@eventbrite.com wrote:

 Quick example problem that's stumping me:

 * Users have 1 or more phone numbers and therefore one or more area codes.
 * There are 100M users.
 * States have one or more area codes.
 * I would like to the states for the users (as indicated by phone area
 code).

 I was thinking about something like this:

 If area_code_user looks like (area_code,[user_id]) ex: (615,[1234567])
 and area_code_state looks like (area_code,state) ex: (615, [Tennessee])
 then we could do

 states_and_users_mixed = area_code_user.join(area_code_state) \
 .reduceByKey(lambda a,b: a+b) \
 .values()

 user_state_pairs = states_and_users_mixed.flatMap(
 emit_cartesian_prod_of_userids_and_states )
 user_to_states =   user_state_pairs.reduceByKey(lambda a,b: a+b)

 user_to_states.first(1)

  (1234567,[Tennessee,Tennessee,California])

 This would work, but the user_state_pairs is just a list of user_ids and
 state names mixed together and emit_cartesian_prod_of_userids_and_states
 has to correctly pair them. This is problematic because 1) it's weird and
 sloppy and 2) there will be lots of users per state and having so many
 users in a single row is going to make
 emit_cartesian_prod_of_userids_and_states work extra hard to first locate
 states and then emit all userid-state pairs.

 How should I be doing this?

 Thanks,
 -John




-- 
Best Regards,
Ayan Guha


Scaling spark cluster for a running application

2015-07-22 Thread phagunbaya
I have a spark cluster running in client mode with driver outside the spark
cluster. I want to scale the cluster after an application is submitted. In
order to do this, I'm creating new workers and they are getting registered
with master but issue I'm seeing is; running application does not use the
newly added worker. Hence cannot add more resources to existing running
application.

Is there any other way or config to deal with this use-case ? How to make
running application to ask for executors from newly issued worker node ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Scaling-spark-cluster-for-a-running-application-tp23951.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is spark suitable for real time query

2015-07-22 Thread Robin East
Real-time is, of course, relative but you’ve mentioned microsecond level. Spark 
is designed to process large amounts of data in a distributed fashion. No 
distributed system I know of could give any kind of guarantees at the 
microsecond level.

Robin

 On 22 Jul 2015, at 11:14, Louis Hust louis.h...@gmail.com wrote:
 
 Hi, all
 
 I am using spark jar in standalone mode, fetch data from different mysql 
 instance and do some action, but i found the time is at second level.
 
 So i want to know if spark job is suitable for real time query which at 
 microseconds?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Proper saving/loading of MatrixFactorizationModel

2015-07-22 Thread PShestov
Hi all!
I have MatrixFactorizationModel object. If I'm trying to recommend products
to single user right after constructing model through ALS.train(...) then it
takes 300ms (for my data and hardware). But if I save model to disk and load
it back then recommendation takes almost 2000ms. Also Spark warns:
15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor does not have a
partitioner. Prediction on individual records could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor is not cached.
Prediction could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor does not
have a partitioner. Prediction on individual records could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor is not
cached. Prediction could be slow.
How can I create/set partitioner and cache user and product factors after
loading model? Following approach didn't help:
model.userFeatures().cache();
model.productFeatures().cache();
Also I was trying to repartition those rdds and create new model from
repartitioned versions but that also didn't help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Proper-saving-loading-of-MatrixFactorizationModel-tp23952.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is spark suitable for real time query

2015-07-22 Thread Louis Hust
Hi, all

I am using spark jar in standalone mode, fetch data from different mysql
instance and do some action, but i found the time is at second level.

So i want to know if spark job is suitable for real time query which at
microseconds?


Re: Broadcast variables in R

2015-07-22 Thread FRANCHOIS Serge
Thank you very much Shivaram. I’ve got it working on Mac now by specifying the 
namespace.
Using SparkR:::parallelize() iso just parallelize()

Wkr,
Serge



On 21 Jul 2015, at 17:20, Shivaram Venkataraman 
shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu wrote:

There shouldn't be anything Mac OS specific about this feature. One point of 
warning though -- As mentioned previously in this thread the APIs were made 
private because we aren't sure we will be supporting them in the future. If you 
are using these APIs it would be good to chime in on the JIRA with your use-case

Thanks
Shivaram

On Tue, Jul 21, 2015 at 2:34 AM, Serge Franchois 
serge.franch...@altran.commailto:serge.franch...@altran.com wrote:
I might add to this that I've done the same exercise on Linux (CentOS 6) and
there, broadcast variables ARE working. Is this functionality perhaps not
exposed on Mac OS X?  Or has it to do with the fact there are no native
Hadoop libs for Mac?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-in-R-tp23914p23927.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.comhttp://Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org





Re: Is spark suitable for real time query

2015-07-22 Thread Louis Hust
I do a simple test using spark in standalone mode(not cluster),
 and found a simple action take a few seconds, the data size is small, just
few rows.
So each spark job will cost some time for init or prepare work no matter
what the job is?
I mean if the basic framework of spark job will cost seconds?

2015-07-22 19:17 GMT+08:00 Robin East robin.e...@xense.co.uk:

 Real-time is, of course, relative but you’ve mentioned microsecond level.
 Spark is designed to process large amounts of data in a distributed
 fashion. No distributed system I know of could give any kind of guarantees
 at the microsecond level.

 Robin

  On 22 Jul 2015, at 11:14, Louis Hust louis.h...@gmail.com wrote:
 
  Hi, all
 
  I am using spark jar in standalone mode, fetch data from different mysql
 instance and do some action, but i found the time is at second level.
 
  So i want to know if spark job is suitable for real time query which at
 microseconds?




Need help in SparkSQL

2015-07-22 Thread Jeetendra Gangele
HI All,

I have data in MongoDb(few TBs) which I want to migrate to HDFS to do
complex queries analysis on this data.Queries like AND queries involved
multiple fields

So my question in which which format I should store the data in HDFS so
that processing will be fast for such kind of queries?


Regards
Jeetendra


Re: How to build Spark with my own version of Hadoop?

2015-07-22 Thread jay vyas
As you know, the hadoop versions and so on are available in the spark build
files, iirc the top level pox.xml has all the maven variables for versions.

So I think if you just build hadoop locally (i.e. build it as it to
2.2.1234-SNAPSHOT and mvn install it), you should be able to change the
corresponding varaible in the top level spark pom.xml.

.

Of course this is a pandoras box where now you need to also deploy your
custom YARN on your cluster, make sure it matches the spark target, and so
on (if your running spark on YARN).  RPMs and DEB packages tend to be
useful for this kind of thing, since you can easily sync the /etc/ config
files and uniformly manage/upgrade versions etc.  ...  Thus... if your
really serious about building a custom distribution, mixing  matching
hadoop components separately, you might want to consider using Apache
BigTop, just bring this up on that mailing list... We curate a hadoop
distribution builder that builds spark, hadoop, hive, ignite, kafka,
zookeeper, hbase and so on...  Since bigtop has all the tooling necessary
to fully build, test, and deploy on VMs/containers your hadoop bits, it
might make your life a little easier.



On Tue, Jul 21, 2015 at 11:11 PM, Dogtail Ray spark.ru...@gmail.com wrote:

 Hi,

 I have modified some Hadoop code, and want to build Spark with the
 modified version of Hadoop. Do I need to change the compilation dependency
 files? How to then? Great thanks!




-- 
jay vyas


Re: Is spark suitable for real time query

2015-07-22 Thread Igor Berman
you can use spark rest job server(or any other solution that provides long
running spark context) so that you won't pay this bootstrap time on each
query
in addition : if you have some rdd that u want your queries to be executed
on, you can cache this rdd in memory(depends on ur cluster memory size) so
that you wont pay reading from disk time


On 22 July 2015 at 14:46, Louis Hust louis.h...@gmail.com wrote:

 I do a simple test using spark in standalone mode(not cluster),
  and found a simple action take a few seconds, the data size is small,
 just few rows.
 So each spark job will cost some time for init or prepare work no matter
 what the job is?
 I mean if the basic framework of spark job will cost seconds?

 2015-07-22 19:17 GMT+08:00 Robin East robin.e...@xense.co.uk:

 Real-time is, of course, relative but you’ve mentioned microsecond level.
 Spark is designed to process large amounts of data in a distributed
 fashion. No distributed system I know of could give any kind of guarantees
 at the microsecond level.

 Robin

  On 22 Jul 2015, at 11:14, Louis Hust louis.h...@gmail.com wrote:
 
  Hi, all
 
  I am using spark jar in standalone mode, fetch data from different
 mysql instance and do some action, but i found the time is at second level.
 
  So i want to know if spark job is suitable for real time query which at
 microseconds?





java.lang.IllegalArgumentException: problem reading type: type = group, name = param, original type = null

2015-07-22 Thread SkyFox
Hello!

I don't understand why, but I can't read data from my parquet file. I made
parquet file from json file and read it to data frame:

/df.printSchema()

|-- param: struct (nullable = true)
 ||-- FORM: string (nullable = true)
 ||-- URL: string (nullable = true)/

/When I try to read any record I get an error:

df.select(param).first()

15/07/22 13:06:15 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 4)
java.lang.IllegalArgumentException: problem reading type: type = group, name
= param, original type = null
at
parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:132)
at
parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:106)
at
parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96)
at parquet.schema.MessageTypeParser.parse(MessageTypeParser.java:89)
at
parquet.schema.MessageTypeParser.parseMessageType(MessageTypeParser.java:79)
at
parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:189)
at
parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.init(SqlNewHadoopRDD.scala:153)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: expected one of [REQUIRED,
OPTIONAL, REPEATED] got utm_medium at line 29: optional binary
amp;utm_medium
at
parquet.schema.MessageTypeParser.asRepetition(MessageTypeParser.java:203)
at
parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:101)
at
parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96)
at
parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:130)
... 24 more
Caused by: java.lang.IllegalArgumentException: No enum constant
parquet.schema.Type.Repetition.UTM_MEDIUM
at java.lang.Enum.valueOf(Enum.java:238)
at parquet.schema.Type$Repetition.valueOf(Type.java:70)
at
parquet.schema.MessageTypeParser.asRepetition(MessageTypeParser.java:201)
... 27 more
15/07/22 13:06:15 WARN TaskSetManager: Lost task 0.0 in stage 8.0 (TID 4,
localhost): java.lang.IllegalArgumentException: problem reading type: type =
group, name = param, original type = null
at
parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:132)
at
parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:106)
at
parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96)
at parquet.schema.MessageTypeParser.parse(MessageTypeParser.java:89)
at
parquet.schema.MessageTypeParser.parseMessageType(MessageTypeParser.java:79)
at
parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:189)
at
parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.init(SqlNewHadoopRDD.scala:153)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at 

No suitable driver found for jdbc:mysql://

2015-07-22 Thread roni
Hi All,
 I have a cluster with spark 1.4.
I am trying to save data to mysql but getting error

Exception in thread main java.sql.SQLException: No suitable driver found
for jdbc:mysql://.rds.amazonaws.com:3306/DAE_kmer?user=password=


*I looked at - https://issues.apache.org/jira/browse/SPARK-8463
https://issues.apache.org/jira/browse/SPARK-8463 and added the connector
jar to the same location as on Master using copy-dir script.*

*But I am still getting the same error. This sued to work with 1.3.*

*This is my command  to run the program - **$SPARK_HOME/bin/spark-submit
--jars
/root/spark/lib/mysql-connector-java-5.1.35/mysql-connector-java-5.1.35-bin.jar
--conf
spark.executor.extraClassPath=/root/spark/lib/mysql-connector-java-5.1.35/mysql-connector-java-5.1.35-bin.jar
--conf spark.executor.memory=55g --driver-memory=55g
--master=spark://ec2-52-25-191-999.us-west-2.compute.amazonaws.com:7077
http://ec2-52-25-191-999.us-west-2.compute.amazonaws.com:7077  --class
saveBedToDB  target/scala-2.10/adam-project_2.10-1.0.jar*

*What else can I Do ?*

*Thanks*

*-Roni*


Re: Need help in SparkSQL

2015-07-22 Thread Jörn Franke
Can you provide an example of an and query ? If you do just look-up you
should try Hbase/ phoenix, otherwise you can try orc with storage index
and/or compression, but this depends on how your queries look like

Le mer. 22 juil. 2015 à 14:48, Jeetendra Gangele gangele...@gmail.com a
écrit :

 HI All,

 I have data in MongoDb(few TBs) which I want to migrate to HDFS to do
 complex queries analysis on this data.Queries like AND queries involved
 multiple fields

 So my question in which which format I should store the data in HDFS so
 that processing will be fast for such kind of queries?


 Regards
 Jeetendra




Re: Performance issue with Spak's foreachpartition method

2015-07-22 Thread diplomatic Guru
Thanks Robin for your reply.

I'm pretty sure that writing to Oracle is taking longer as when writing to
HDFS is only taking ~5minutes.

The job is writing about ~5 Million of records. I've set the job to call
executeBatch() when the batchSize reaches 200,000 of records, so I assume
that commit will be invoked at every 200K batch. In this case, it should
only call commit 25 times, is this too much? I wouldn't want to increase
the batch size any further as it may cause Java heap issue. I do not have
much knowledge in Oracle side, so any advice with the configuration will be
grateful.

Thanks,

Raj





On 22 July 2015 at 20:20, Robin East robin.e...@xense.co.uk wrote:

 The first question I would ask is have you determined whether you have a
 performance issue writing to Oracle? In particular how many commits are you
 making? If you are issuing a lot of commits that would be a performance
 problem.

 Robin

 On 22 Jul 2015, at 19:11, diplomatic Guru diplomaticg...@gmail.com
 wrote:

 Hello all,

 We are having a major performance issue with the Spark, which is holding
 us from going live.

 We have a job that carries out computation on log files and write the
 results into Oracle DB.

 The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't
 want to establish too many DB connections.

 We are then calling the foreachPartition on the RDD pairs that were
 reduced by the key. Within this foreachPartition method we establish DB
 connection, then iterate the results, prepare the Oracle statement for
 batch insertion then we commit the batch and close the connection. All
 these are working fine.

 However, when we execute the job to process 12GB of data, it takes forever
 to complete, especially at the foreachPartition stage.

 We submitted the job with 6 executors, 2 cores, and 6GB memory of which
 0.3 is assigned to spark.storage.memoryFraction.

 The job is taking about 50 minutes to complete, which is not ideal. I'm
 not sure how we could enhance the performance. I've provided the main body
 of the codes, please take a look and advice:

 From Driver:

 reduceResultsRDD.foreachPartition(new DB.InsertFunction(
 dbuser,dbpass,batchsize));


 DB class:

 public class DB {
 private static final Logger logger = LoggerFactory
 .getLogger(DB.class);
 public static class InsertFunction implements
 VoidFunctionIteratorTuple2String, String {

 private static final long serialVersionUID = 55766876878L;
 private String dbuser = ;
 private String dbpass = ;
 private int batchsize;

 public InsertFunction(String dbuser, String dbpass, int batchsize) {
 super();
 this.dbuser = dbuser;
 this.dbuser = dbuser;
 this.batchsize=batchsize;
 }

 @Override
 public void call(IteratorTuple2String, String results) {
 Connection connect = null;
 PreparedStatement pstmt = null;
 try {
 connect = getDBConnection(dbuser,
 dbpass);

 int count = 0;

 if (batchsize = 0) {
 batchsize = 1;
 }

 pstmt1 = connect
 .prepareStatement(MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT);

 while (results.hasNext()) {

 Tuple2String, String kv = results.next();
  String [] data = kv._1.concat(, +kv._2).split(,);

  pstmt.setString(1, data[0].toString());
 pstmt.setString(2, data[1].toString());
 .

 pstmt.addBatch();

 count++;

 if (count == batchsize) {
 logger.info(BulkCount :  + count);
 pstmt.executeBatch();
 connect.commit();
 count = 0;
 }

 pstmt.executeBatch();
 connect.commit();

 }

 pstmt.executeBatch();
 connect.commit();

 } catch (Exception e) {
 logger.error(InsertFunction error:  + e.getMessage());
 } finally {

 if (pstmt != null) {
 pstmt.close();
 }

 try {
  connect.close();
 } catch (SQLException e) {
 logger.error(InsertFunction Connection Close error: 
 + e.getMessage());
 }
 }
 }

 }
 }





ShuffledHashJoin instead of CartesianProduct

2015-07-22 Thread Srikanth
Hello,

I'm trying to link records from two large data sources. Both datasets have
almost same number of rows.
Goal is to match records based on multiple columns.

val matchId =
 SFAccountDF.as(SF).join(ELAccountDF.as(EL)).where($SF.Email ===
 $EL.EmailAddress || $SF.Phone === EL.Phone)


Joining with a OR(||) will result in a CartesianProduct. I'm trying to
avoid that.
One way to do this is to join on each column and UNION the results.


 val phoneMatch = SFAccountDF.as(SF).filter(Phone !=
 '').join(ELAccountDF.as(EL).filter(BusinessPhone !=
 '')).where($SF.Phone === $EL.BusinessPhone)
 val emailMatch = SFAccountDF.as(SF).filter(Email !=
 '').join(ELAccountDF.as(EL).filter(EmailAddress !=
 '')).where($SF.Email === $EL.EmailAddress)

 val matchId =
 phoneMatch.unionAll(emailMatch.unionAll(faxMatch.unionAll(mobileMatch)))
 matchId.cache().registerTempTable(matchId)


Is there a more elegant way to do this?

On a related note, has anyone worked on record linkage using Bloom Filters,
Levenshtein distance, etc in Spark?

Srikanth


What if request cores are not satisfied

2015-07-22 Thread bit1...@163.com
Hi,
Assume a following scenario:
The spark standalone cluster has 10 cores in total, I have an application that 
will request 12 cores. Will the application run with fewer cores than requested 
or will it simply wait for ever since there are only 10 cores available.
I would guess it will be run with fewer cores, but I didn't get a chance to 
try/test it. 
Thanks.




bit1...@163.com


Hive Session gets overwritten in ClientWrapper

2015-07-22 Thread Vishak
I'm currently using Spark 1.4 in standalone mode.

I've forked the Apache Hive branch from  https://github.com/pwendell/hive
https://github.com/pwendell/hive   and customised in the following way.

Added a thread local variable in SessionManager class. And I'm setting the
session variable in my Custom Authenticator class. 

 For achieving the above, I've built the necessary
jars(hive-common-0.13.1c.jar, hive-exec-0.13.1c.jar,
hive-metastore-0.13.1c.jar, hive-serde-0.13.1c.jar,
hive-service-0.13.1c.jar) from https://github.com/pwendell/hive and added to
Spark's classpath. 
 The above feature works in Spark 1.3.1, but is broken in Spark 1.4.
When I looked into it, I found out that the ClientWrapper class is creating
a new Session State and using it thereafter. As a result I'm not able to
retrieve the info which i had stored earlier in the session. Also I'm not
able to retrieve a value from hiveconf which was set earlier.

  When i looked into the source code for ClientWrapper.scala, i found
the following.

   // Create an internal session state for this ClientWrapper.
  val state = {
val original = Thread.currentThread().getContextClassLoader
// Switch to the initClassLoader.
Thread.currentThread().setContextClassLoader(initClassLoader)

From what i can understand, the above tries to use the existing Hive
session, else it creates it's own session. Am I right? If so, is there a bug
causing the ClientWrapper not to use the existing session. Or should I
implement my requirement in a different way?

My requirement is to have a custom session variable and use it throughout
the session.

My usage is as folows:

To set the value
/SessionManager.setSessionVar(value);/

To retrieve the value
/SessionManager.getSessionVar();/

To set a hiveconf
/hiveConf.set(conf, ConfVars.VAR, val);/

to Retrieve
/hiveConf.get(ConfVars.VAR);
SessionState.get().getConf().getVar(ConfVars.VAR)/




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hive-Session-gets-overwritten-in-ClientWrapper-tp23962.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Need help in SparkSQL

2015-07-22 Thread Jörn Franke
I do not think you can put all your queries into the row key without
duplicating the data for each query. However, this would be more last
resort.

Have you checked out phoenix for Hbase? This might suit your needs. It
makes it much simpler, because it provided sql on top of Hbase.

Nevertheless, Hive could also be a viable alternative depending on how
often you run queries etc

Le jeu. 23 juil. 2015 à 7:14, Jeetendra Gangele gangele...@gmail.com a
écrit :

 Query will be something like that

 1. how many users visited 1 BHK flat in last 1 hour in given particular
 area
 2. how many visitor for flats in give area
 3. list all user who bought given property in last 30 days

 Further it may go too complex involving multiple parameters in my query.

 The problem is HBase is designing row key to get this data efficiently.

 Since I have multiple fields to query upon base may not be a good choice?

 i dont dont to iterate the result set which Hbase returns and give the
 result because this will kill the performance?

 On 23 July 2015 at 01:02, Jörn Franke jornfra...@gmail.com wrote:

 Can you provide an example of an and query ? If you do just look-up you
 should try Hbase/ phoenix, otherwise you can try orc with storage index
 and/or compression, but this depends on how your queries look like

 Le mer. 22 juil. 2015 à 14:48, Jeetendra Gangele gangele...@gmail.com
 a écrit :

 HI All,

 I have data in MongoDb(few TBs) which I want to migrate to HDFS to do
 complex queries analysis on this data.Queries like AND queries involved
 multiple fields

 So my question in which which format I should store the data in HDFS so
 that processing will be fast for such kind of queries?


 Regards
 Jeetendra




 --
 Hi,

 Find my attached resume. I have total around 7 years of work experience.
 I worked for Amazon and Expedia in my previous assignments and currently I
 am working with start- up technology company called Insideview in hyderabad.

 Regards
 Jeetendra



Re: Need help in setting up spark cluster

2015-07-22 Thread Jeetendra Gangele
Can anybody help here?

On 22 July 2015 at 10:38, Jeetendra Gangele gangele...@gmail.com wrote:

 Hi All,

 I am trying to capture the user activities for real estate portal.

 I am using RabbitMS and Spark streaming combination where all the Events I
 am pushing to RabbitMQ and then 1 secs micro job I am consuming using Spark
 streaming.

 Later on I am thinking to store the consumed data for analytics or near
 real time recommendations.

 Where should I store this data in Spark RDD itself and using SparkSQL
 people can query this data for analytics or real time recommendations, this
 data is not huge currently its 10 GB per day.

 Another alternatiove will be either Hbase or Cassandra, which one will be
 better?

 Any suggestions?


 Also for this use cases should I use any existing big data platform like
 hortonworks or I can deploy standalone spark cluster ?



Re: Need help in SparkSQL

2015-07-22 Thread Jeetendra Gangele
Query will be something like that

1. how many users visited 1 BHK flat in last 1 hour in given particular area
2. how many visitor for flats in give area
3. list all user who bought given property in last 30 days

Further it may go too complex involving multiple parameters in my query.

The problem is HBase is designing row key to get this data efficiently.

Since I have multiple fields to query upon base may not be a good choice?

i dont dont to iterate the result set which Hbase returns and give the
result because this will kill the performance?

On 23 July 2015 at 01:02, Jörn Franke jornfra...@gmail.com wrote:

 Can you provide an example of an and query ? If you do just look-up you
 should try Hbase/ phoenix, otherwise you can try orc with storage index
 and/or compression, but this depends on how your queries look like

 Le mer. 22 juil. 2015 à 14:48, Jeetendra Gangele gangele...@gmail.com a
 écrit :

 HI All,

 I have data in MongoDb(few TBs) which I want to migrate to HDFS to do
 complex queries analysis on this data.Queries like AND queries involved
 multiple fields

 So my question in which which format I should store the data in HDFS so
 that processing will be fast for such kind of queries?


 Regards
 Jeetendra




-- 
Hi,

Find my attached resume. I have total around 7 years of work experience.
I worked for Amazon and Expedia in my previous assignments and currently I
am working with start- up technology company called Insideview in hyderabad.

Regards
Jeetendra


Re: Package Release Annoucement: Spark SQL on HBase Astro

2015-07-22 Thread Debasish Das
Does it also support insert operations ?
On Jul 22, 2015 4:53 PM, Bing Xiao (Bing) bing.x...@huawei.com wrote:

  We are happy to announce the availability of the Spark SQL on HBase
 1.0.0 release.
 http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase

 The main features in this package, dubbed “Astro”, include:

 · Systematic and powerful handling of data pruning and
 intelligent scan, based on partial evaluation technique

 · HBase pushdown capabilities like custom filters and coprocessor
 to support ultra low latency processing

 · SQL, Data Frame support

 · More SQL capabilities made possible (Secondary index, bloom
 filter, Primary Key, Bulk load, Update)

 · Joins with data from other sources

 · Python/Java/Scala support

 · Support latest Spark 1.4.0 release



 The tests by Huawei team and community contributors covered the areas:
 bulk load; projection pruning; partition pruning; partial evaluation; code
 generation; coprocessor; customer filtering; DML; complex filtering on keys
 and non-keys; Join/union with non-Hbase data; Data Frame; multi-column
 family test.  We will post the test results including performance tests the
 middle of August.

 You are very welcomed to try out or deploy the package, and help improve
 the integration tests with various combinations of the settings, extensive
 Data Frame tests, complex join/union test and extensive performance tests.
 Please use the “Issues” “Pull Requests” links at this package homepage, if
 you want to report bugs, improvement or feature requests.

 Special thanks to project owner and technical leader Yan Zhou, Huawei
 global team, community contributors and Databricks.   Databricks has been
 providing great assistance from the design to the release.

 “Astro”, the Spark SQL on HBase package will be useful for ultra low
 latency* query and analytics of large scale data sets in vertical
 enterprises**.* We will continue to work with the community to develop
 new features and improve code base.  Your comments and suggestions are
 greatly appreciated.



 Yan Zhou / Bing Xiao

 Huawei Big Data team





Issue with column named count in a DataFrame

2015-07-22 Thread Young, Matthew T
I'm trying to do some simple counting and aggregation in an IPython notebook 
with Spark 1.4.0 and I have encountered behavior that looks like a bug.

When I try to filter rows out of an RDD with a column name of count I get a 
large error message. I would just avoid naming things count, except for the 
fact that this is the default column name created with the count() operation in 
pyspark.sql.GroupedData

The small example program below demonstrates the issue.

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
dataFrame = sc.parallelize([(foo,), (foo,), (bar,)]).toDF([title])
counts = dataFrame.groupBy('title').count()
counts.filter(title = 'foo').show() # Works
counts.filter(count  1).show() # Errors out


I can even reproduce the issue in a PySpark shell session by entering these 
commands.

I suspect that the error has something to with Spark wanting to call the 
count() function in place of looking at the count column.

The error message is as follows:


Py4JJavaError Traceback (most recent call last)
ipython-input-29-62a1b7c71f21 in module()
 1 counts.filter(count  1).show() # Errors Out

C:\Users\User\Downloads\spark-1.4.0-bin-hadoop2.6\python\pyspark\sql\dataframe.pyc
 in filter(self, condition)
774 
775 if isinstance(condition, basestring):
-- 776 jdf = self._jdf.filter(condition)
777 elif isinstance(condition, Column):
778 jdf = self._jdf.filter(condition._jc)

C:\Python27\lib\site-packages\py4j\java_gateway.pyc in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
-- 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:

C:\Python27\lib\site-packages\py4j\protocol.pyc in get_return_value(answer, 
gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
-- 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling o229.filter.
: java.lang.RuntimeException: [1.7] failure: ``('' expected but `' found

count  1
  ^
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.catalyst.SqlParser.parseExpression(SqlParser.scala:45)
at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:652)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Unknown Source)



Is there a recommended workaround to the inability to filter on a column named 
count? Do I have to make a new DataFrame and rename the column just to work 
around this bug? What's the best way to do that?

Thanks,

-- Matthew Young

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Need help in SparkSQL

2015-07-22 Thread Mohammed Guller
Parquet

Mohammed

From: Jeetendra Gangele [mailto:gangele...@gmail.com]
Sent: Wednesday, July 22, 2015 5:48 AM
To: user
Subject: Need help in SparkSQL

HI All,

I have data in MongoDb(few TBs) which I want to migrate to HDFS to do complex 
queries analysis on this data.Queries like AND queries involved multiple fields

So my question in which which format I should store the data in HDFS so that 
processing will be fast for such kind of queries?


Regards
Jeetendra



spark-submit and spark-shell behaviors mismatch.

2015-07-22 Thread Dan Dong
Hi,

  I have a simple test spark program as below, the strange thing is that it
runs well under a spark-shell, but will get a runtime error of

java.lang.NoSuchMethodError:

in spark-submit, which indicate the line of:

val maps2=maps.collect.toMap

has problem. But why the compilation has no problem and it works well under
spark-shell(==maps2: scala.collection.immutable.Map[Int,String] =
Map(269953 - once, 97 - a, 451002 - upon, 117481 - was, 226916 -
there, 414413 - time, 146327 - king) )? Thanks!

import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark._
import SparkContext._


val docs=sc.parallelize(Array(Array(once ,upon, a, time),
Array(there, was, a, king)))

val hashingTF = new HashingTF()

val maps=docs.flatMap{term=term.map(ele=(hashingTF.indexOf(ele),ele))}

val maps2=maps.collect.toMap


Cheers,

Dan


Re: How to keep RDDs in memory between two different batch jobs?

2015-07-22 Thread Haoyuan Li
Yes. Tachyon can handle this well: http://tachyon-project.org/

Best,

Haoyuan

On Wed, Jul 22, 2015 at 10:56 AM, swetha swethakasire...@gmail.com wrote:

 Hi,

 We have a requirement wherein we need to keep RDDs in memory between Spark
 batch processing that happens every one hour. The idea here is to have RDDs
 that have active user sessions in memory between two jobs so that once a
 job
 processing is  done and another job is run after an hour the RDDs with
 active sessions are still available for joining with those in the current
 job. So, what do we need to keep the data in memory in between two batch
 jobs? Can we use Tachyon?

 Thanks,
 Swetha



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Haoyuan Li
CEO, Tachyon Nexus http://www.tachyonnexus.com/


Using Wurfl in Spark

2015-07-22 Thread Zhongxiao Ma
Hi all,

I am trying to do wurfl lookup in a spark cluster and getting exceptions, I am 
pretty sure that the same thing works in small scale. But it fails when I tried 
to do it in spark. I used spark-ec2/copy-dir to copy the wurfl library to 
workers already and launched the spark-shell with parameter —jars including 
wurfl and its dependencies in the lib/ directory.

To reconstruct the error, let’s say that I have a userAgentRdd already, which 
is RDD[String] and a userAgentSample of Array[String]. I am trying to reuse the 
wurfl engine by doing mapPartitions so I can save time for reloading it.

import net.sourceforge.wurfl.core.GeneralWURFLEngine

def lookupModel(wurfl: GeneralWURFLEngine)(userAgent: String) = {
  val device = wurfl.getDeviceForRequest(userAgent)
  val brand = device.getCapability(brand_name)
  val model = device.getCapability(model_name)
  (brand, model)
}

def lookupModelPartitions(wurflXmlPath: String)(userAgentIterator: 
Iterator[String]) = {
  val wurfl = new GeneralWURFLEngine(wurflXmlPath)
  wurfl.setEngineTarget(EngineTarget.accuracy)
  userAgentIterator.map(lookupModel(wurfl))
}

// the following will work
val wurflEngine = new 
GeneralWURFLEngine(/root/wurfl-1.6.1.0-release/wurfl.zip)
val userAgentSample = // my local dataset
val modelSample = userAgentSample.map(lookupModel(wurflEngine))

// the following will also work
val userAgentRdd = // my spark dataset
val modelRdd = 
userAgentRdd.mapPartitions(lookupModelPartitions(/root/wurfl-1.6.1.0-release/wurfl.zip”))
modelRdd.take(10)

// but the following will not work
modelRdd.count

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 
491, 10.128.224.227): net.sourceforge.wurfl.core.exc.WURFLRuntimeException: 
WURFL unexpected exception
at 
net.sourceforge.wurfl.core.GeneralWURFLEngine.initIfNeeded(GeneralWURFLEngine.java:286)
at 
net.sourceforge.wurfl.core.GeneralWURFLEngine.getDeviceForRequest(GeneralWURFLEngine.java:425)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.lookupModel(console:23)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$lookupModelPartitions$1.apply(console:27)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$lookupModelPartitions$1.apply(console:27)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1628)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: net.sourceforge.wurfl.core.resource.exc.WURFLResourceException: 
WURFL unexpected exception
at 
net.sourceforge.wurfl.core.resource.XMLResource.readData(XMLResource.java:350)
at 
net.sourceforge.wurfl.core.resource.XMLResource.getData(XMLResource.java:154)
at 
net.sourceforge.wurfl.core.resource.DefaultWURFLModel.init(DefaultWURFLModel.java:118)
at 
net.sourceforge.wurfl.core.resource.DefaultWURFLModel.init(DefaultWURFLModel.java:110)
at 
net.sourceforge.wurfl.core.GeneralWURFLEngine.init(GeneralWURFLEngine.java:304)
at 
net.sourceforge.wurfl.core.GeneralWURFLEngine.initIfNeeded(GeneralWURFLEngine.java:283)
... 16 more
Caused by: net.sourceforge.wurfl.core.resource.exc.WURFLParsingException: The 
devices with id generic define more is_wireless_device
at 
net.sourceforge.wurfl.core.resource.XMLResource$WURFLSAXHandler.startCapabilityElement(XMLResource.java:680)
at 
net.sourceforge.wurfl.core.resource.XMLResource$WURFLSAXHandler.startElement(XMLResource.java:534)
at 
com.sun.org.apache.xerces.internal.parsers.AbstractSAXParser.startElement(AbstractSAXParser.java:509)
at 
com.sun.org.apache.xerces.internal.parsers.AbstractXMLDocumentParser.emptyElement(AbstractXMLDocumentParser.java:182)
at 
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanStartElement(XMLDocumentFragmentScannerImpl.java:1343)
at 
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl$FragmentContentDriver.next(XMLDocumentFragmentScannerImpl.java:2786)
at 
com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl.next(XMLDocumentScannerImpl.java:606)
at 

Package Release Annoucement: Spark SQL on HBase Astro

2015-07-22 Thread Bing Xiao (Bing)
We are happy to announce the availability of the Spark SQL on HBase 1.0.0 
release.  http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase
The main features in this package, dubbed Astro, include:

* Systematic and powerful handling of data pruning and intelligent 
scan, based on partial evaluation technique

* HBase pushdown capabilities like custom filters and coprocessor to 
support ultra low latency processing

* SQL, Data Frame support

* More SQL capabilities made possible (Secondary index, bloom filter, 
Primary Key, Bulk load, Update)

* Joins with data from other sources

* Python/Java/Scala support

* Support latest Spark 1.4.0 release


The tests by Huawei team and community contributors covered the areas: bulk 
load; projection pruning; partition pruning; partial evaluation; code 
generation; coprocessor; customer filtering; DML; complex filtering on keys and 
non-keys; Join/union with non-Hbase data; Data Frame; multi-column family test. 
 We will post the test results including performance tests the middle of August.
You are very welcomed to try out or deploy the package, and help improve the 
integration tests with various combinations of the settings, extensive Data 
Frame tests, complex join/union test and extensive performance tests.  Please 
use the Issues Pull Requests links at this package homepage, if you want to 
report bugs, improvement or feature requests.
Special thanks to project owner and technical leader Yan Zhou, Huawei global 
team, community contributors and Databricks.   Databricks has been providing 
great assistance from the design to the release.
Astro, the Spark SQL on HBase package will be useful for ultra low latency 
query and analytics of large scale data sets in vertical enterprises. We will 
continue to work with the community to develop new features and improve code 
base.  Your comments and suggestions are greatly appreciated.

Yan Zhou / Bing Xiao
Huawei Big Data team



Re: No suitable driver found for jdbc:mysql://

2015-07-22 Thread Rishi Yadav
try setting --driver-class-path

On Wed, Jul 22, 2015 at 3:45 PM, roni roni.epi...@gmail.com wrote:

 Hi All,
  I have a cluster with spark 1.4.
 I am trying to save data to mysql but getting error

 Exception in thread main java.sql.SQLException: No suitable driver found
 for jdbc:mysql://.rds.amazonaws.com:3306/DAE_kmer?user=password=


 *I looked at - https://issues.apache.org/jira/browse/SPARK-8463
 https://issues.apache.org/jira/browse/SPARK-8463 and added the connector
 jar to the same location as on Master using copy-dir script.*

 *But I am still getting the same error. This sued to work with 1.3.*

 *This is my command  to run the program - **$SPARK_HOME/bin/spark-submit
 --jars
 /root/spark/lib/mysql-connector-java-5.1.35/mysql-connector-java-5.1.35-bin.jar
 --conf
 spark.executor.extraClassPath=/root/spark/lib/mysql-connector-java-5.1.35/mysql-connector-java-5.1.35-bin.jar
 --conf spark.executor.memory=55g --driver-memory=55g
 --master=spark://ec2-52-25-191-999.us-west-2.compute.amazonaws.com:7077
 http://ec2-52-25-191-999.us-west-2.compute.amazonaws.com:7077  --class
 saveBedToDB  target/scala-2.10/adam-project_2.10-1.0.jar*

 *What else can I Do ?*

 *Thanks*

 *-Roni*



Re: spark-submit and spark-shell behaviors mismatch.

2015-07-22 Thread Yana Kadiyska
Is it complaining about collect or toMap? In either case this error is
indicative of an old version usually -- any chance you have an old
installation of Spark somehow? Or scala? You can try running spark-submit
with --verbose. Also, when you say it runs with spark-shell do you run
spark shell in local mode or with --master? I'd try with --master whatever
master you use for spark-submit

Also, if you're using standalone mode I believe the worker log contains the
launch command for the executor -- you probably want to examine that
classpath carefully

On Wed, Jul 22, 2015 at 5:25 PM, Dan Dong dongda...@gmail.com wrote:

 Hi,

   I have a simple test spark program as below, the strange thing is that
 it runs well under a spark-shell, but will get a runtime error of

 java.lang.NoSuchMethodError:

 in spark-submit, which indicate the line of:

 val maps2=maps.collect.toMap

 has problem. But why the compilation has no problem and it works well
 under spark-shell(==maps2: scala.collection.immutable.Map[Int,String] =
 Map(269953 - once, 97 - a, 451002 - upon, 117481 - was, 226916 -
 there, 414413 - time, 146327 - king) )? Thanks!

 import org.apache.spark.SparkContext._
 import org.apache.spark.SparkConf
 import org.apache.spark.mllib.feature.HashingTF
 import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext
 import org.apache.spark._
 import SparkContext._


 val docs=sc.parallelize(Array(Array(once ,upon, a, time), 
 Array(there, was, a, king)))

 val hashingTF = new HashingTF()

 val maps=docs.flatMap{term=term.map(ele=(hashingTF.indexOf(ele),ele))}

 val maps2=maps.collect.toMap


 Cheers,

 Dan




Re: Spark SQL Table Caching

2015-07-22 Thread Pedro Rodriguez
I would be interested in the answer to this question, plus the relationship
between those and registerTempTable()

Pedro

On Tue, Jul 21, 2015 at 1:59 PM, Brandon White bwwintheho...@gmail.com
wrote:

 A few questions about caching a table in Spark SQL.

 1) Is there any difference between caching the dataframe and the table?

 df.cache() vs sqlContext.cacheTable(tableName)

 2) Do you need to warm up the cache before seeing the performance
 benefits? Is the cache LRU? Do you need to run some queries on the table
 before it is cached in memory?

 3) Is caching the table much faster than .saveAsTable? I am only seeing a
 10 %- 20% performance increase.




-- 
Pedro Rodriguez
UCBerkeley 2014 | Computer Science
SnowGeek http://SnowGeek.org
pedro-rodriguez.com
ski.rodrig...@gmail.com
208-340-1703


Re: Issue with column named count in a DataFrame

2015-07-22 Thread Michael Armbrust
Additionally have you tried enclosing count in `backticks`?

On Wed, Jul 22, 2015 at 4:25 PM, Michael Armbrust mich...@databricks.com
wrote:

 I believe this will be fixed in Spark 1.5

 https://github.com/apache/spark/pull/7237

 On Wed, Jul 22, 2015 at 3:04 PM, Young, Matthew T 
 matthew.t.yo...@intel.com wrote:

 I'm trying to do some simple counting and aggregation in an IPython
 notebook with Spark 1.4.0 and I have encountered behavior that looks like a
 bug.

 When I try to filter rows out of an RDD with a column name of count I get
 a large error message. I would just avoid naming things count, except for
 the fact that this is the default column name created with the count()
 operation in pyspark.sql.GroupedData

 The small example program below demonstrates the issue.

 from pyspark.sql import SQLContext
 sqlContext = SQLContext(sc)
 dataFrame = sc.parallelize([(foo,), (foo,), (bar,)]).toDF([title])
 counts = dataFrame.groupBy('title').count()
 counts.filter(title = 'foo').show() # Works
 counts.filter(count  1).show() # Errors out


 I can even reproduce the issue in a PySpark shell session by entering
 these commands.

 I suspect that the error has something to with Spark wanting to call the
 count() function in place of looking at the count column.

 The error message is as follows:


 Py4JJavaError Traceback (most recent call
 last)
 ipython-input-29-62a1b7c71f21 in module()
  1 counts.filter(count  1).show() # Errors Out

 C:\Users\User\Downloads\spark-1.4.0-bin-hadoop2.6\python\pyspark\sql\dataframe.pyc
 in filter(self, condition)
 774 
 775 if isinstance(condition, basestring):
 -- 776 jdf = self._jdf.filter(condition)
 777 elif isinstance(condition, Column):
 778 jdf = self._jdf.filter(condition._jc)

 C:\Python27\lib\site-packages\py4j\java_gateway.pyc in __call__(self,
 *args)
 536 answer = self.gateway_client.send_command(command)
 537 return_value = get_return_value(answer,
 self.gateway_client,
 -- 538 self.target_id, self.name)

 539
 540 for temp_arg in temp_args:

 C:\Python27\lib\site-packages\py4j\protocol.pyc in
 get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling
 {0}{1}{2}.\n'.
 -- 300 format(target_id, '.', name), value)
 301 else:
 302 raise Py4JError(

 Py4JJavaError: An error occurred while calling o229.filter.
 : java.lang.RuntimeException: [1.7] failure: ``('' expected but `' found

 count  1
   ^
 at scala.sys.package$.error(package.scala:27)
 at
 org.apache.spark.sql.catalyst.SqlParser.parseExpression(SqlParser.scala:45)
 at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:652)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
 at java.lang.reflect.Method.invoke(Unknown Source)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
 at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at py4j.Gateway.invoke(Gateway.java:259)
 at
 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Unknown Source)



 Is there a recommended workaround to the inability to filter on a column
 named count? Do I have to make a new DataFrame and rename the column just
 to work around this bug? What's the best way to do that?

 Thanks,

 -- Matthew Young

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Issue with column named count in a DataFrame

2015-07-22 Thread Michael Armbrust
I believe this will be fixed in Spark 1.5

https://github.com/apache/spark/pull/7237

On Wed, Jul 22, 2015 at 3:04 PM, Young, Matthew T matthew.t.yo...@intel.com
 wrote:

 I'm trying to do some simple counting and aggregation in an IPython
 notebook with Spark 1.4.0 and I have encountered behavior that looks like a
 bug.

 When I try to filter rows out of an RDD with a column name of count I get
 a large error message. I would just avoid naming things count, except for
 the fact that this is the default column name created with the count()
 operation in pyspark.sql.GroupedData

 The small example program below demonstrates the issue.

 from pyspark.sql import SQLContext
 sqlContext = SQLContext(sc)
 dataFrame = sc.parallelize([(foo,), (foo,), (bar,)]).toDF([title])
 counts = dataFrame.groupBy('title').count()
 counts.filter(title = 'foo').show() # Works
 counts.filter(count  1).show() # Errors out


 I can even reproduce the issue in a PySpark shell session by entering
 these commands.

 I suspect that the error has something to with Spark wanting to call the
 count() function in place of looking at the count column.

 The error message is as follows:


 Py4JJavaError Traceback (most recent call last)
 ipython-input-29-62a1b7c71f21 in module()
  1 counts.filter(count  1).show() # Errors Out

 C:\Users\User\Downloads\spark-1.4.0-bin-hadoop2.6\python\pyspark\sql\dataframe.pyc
 in filter(self, condition)
 774 
 775 if isinstance(condition, basestring):
 -- 776 jdf = self._jdf.filter(condition)
 777 elif isinstance(condition, Column):
 778 jdf = self._jdf.filter(condition._jc)

 C:\Python27\lib\site-packages\py4j\java_gateway.pyc in __call__(self,
 *args)
 536 answer = self.gateway_client.send_command(command)
 537 return_value = get_return_value(answer,
 self.gateway_client,
 -- 538 self.target_id, self.name)
 539
 540 for temp_arg in temp_args:

 C:\Python27\lib\site-packages\py4j\protocol.pyc in
 get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling {0}{1}{2}.\n'.
 -- 300 format(target_id, '.', name), value)
 301 else:
 302 raise Py4JError(

 Py4JJavaError: An error occurred while calling o229.filter.
 : java.lang.RuntimeException: [1.7] failure: ``('' expected but `' found

 count  1
   ^
 at scala.sys.package$.error(package.scala:27)
 at
 org.apache.spark.sql.catalyst.SqlParser.parseExpression(SqlParser.scala:45)
 at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:652)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
 at java.lang.reflect.Method.invoke(Unknown Source)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
 at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at py4j.Gateway.invoke(Gateway.java:259)
 at
 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Unknown Source)



 Is there a recommended workaround to the inability to filter on a column
 named count? Do I have to make a new DataFrame and rename the column just
 to work around this bug? What's the best way to do that?

 Thanks,

 -- Matthew Young

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: assertion failed error with GraphX

2015-07-22 Thread Roman Sokolov
I am also having problems with triangle count - seems like this algorithm
is very memory consuming (I could not process even small graphs ~ 5 million
Vertices and 70 million Edges with less the 32 GB RAM on EACH machine).
What if I have graphs with billion edges, what amount of RAM do I need then?

So now I am trying to understand how it works and rewrite it maybe. I would
like to process big graphs with not so much RAM on each machine.
Am 20.07.2015 04:27 schrieb Jack Yang j...@uow.edu.au:

  Hi there,



 I got an error when running one simple graphX program.

 My setting is: spark 1.4.0, Hadoop yarn 2.5. scala 2.10. with four virtual
 machines.



 if I constructed one small graph (6 nodes, 4 edges), I run:

 println(triangleCount: %s .format(
 hdfs_graph.triangleCount().vertices.count() ))

 that returns me the correct results.



 But I import a much larger graph (with 85 nodes, 500 edges), the
 error is

 15/07/20 12:03:36 WARN scheduler.TaskSetManager: Lost task 2.0 in stage
 11.0 (TID 32, 192.168.157.131): java.lang.AssertionError: assertion failed

 at scala.Predef$.assert(Predef.scala:165)

 at
 org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:90)

 at
 org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:87)

 at
 org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:140)

 at
 org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:159)

 at
 org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156)

 at
 org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)





 I run the above two graphs using the same submit command:

 spark-submit --class sparkUI.GraphApp --master spark://master:7077
 --executor-memory 2G  --total-executor-cores 4 myjar.jar



 any thought? anything wrong with my machine or configuration?









 Best regards,

 Jack





Re: Parquet problems

2015-07-22 Thread Michael Misiewicz
Hi Anders,

Did you ever get to the bottom of this issue? I'm encountering it too, but
only in yarn-cluster mode running on spark 1.4.0. I was thinking of
trying 1.4.1 today.

Michael

On Thu, Jun 25, 2015 at 5:52 AM, Anders Arpteg arp...@spotify.com wrote:

 Yes, both the driver and the executors. Works a little bit better with
 more space, but still a leak that will cause failure after a number of
 reads. There are about 700 different data sources that needs to be loaded,
 lots of data...

 tor 25 jun 2015 08:02 Sabarish Sasidharan sabarish.sasidha...@manthan.com
 skrev:

 Did you try increasing the perm gen for the driver?

 Regards
 Sab
 On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com wrote:

 When reading large (and many) datasets with the Spark 1.4.0 DataFrames
 parquet reader (the org.apache.spark.sql.parquet format), the following
 exceptions are thrown:

 Exception in thread task-result-getter-0
 Exception: java.lang.OutOfMemoryError thrown from the
 UncaughtExceptionHandler in thread task-result-getter-0
 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread task-result-getter-1 java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread task-result-getter-2 java.lang.OutOfMemoryError:
 PermGen space

 and many more like these from different threads. I've tried increasing
 the PermGen space using the -XX:MaxPermSize VM setting, but even after
 tripling the space, the same errors occur. I've also tried storing
 intermediate results, and am able to get the full job completed by running
 it multiple times and starting for the last successful intermediate result.
 There seems to be some memory leak in the parquet format. Any hints on how
 to fix this problem?

 Thanks,
 Anders




Re: Mesos + Spark

2015-07-22 Thread Dean Wampler
This page, http://spark.apache.org/docs/latest/running-on-mesos.html,
covers many of these questions. If you submit a job with the option
--supervise, it will be restarted if it fails.

You can use Chronos for scheduling. You can create a single streaming job
with a 10 minute batch interval, if that works for your every 10-min. need.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Jul 22, 2015 at 3:53 AM, boci boci.b...@gmail.com wrote:

 Hi guys!

 I'm a new in mesos. I have two spark application (one streaming and one
 batch). I want to run both app in mesos cluster. Now for testing I want to
 run in docker container so I started a simple redjack/mesos-master, but I
 think a lot of think unclear for me (both mesos and spark-mesos).

 If I have a mesos cluster (for testing it will be some docker container) i
 need a separate machine (container) to run my spark job? Or can I submit
 the cluster and schedule (chronos or I dunno)?
 How can I run the streaming job? What happened if the controller died?
 Or if I call spark-submit with master=mesos my application started and I
 can forget? How can I run in every 10 min without submit in every 10 min?
 How can I run my streaming app in HA mode?

 Thanks

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com



Re: Spark-hive parquet schema evolution

2015-07-22 Thread Dean Wampler
While it's not recommended to overwrite files Hive thinks it understands,
you can add the column to Hive's metastore using an ALTER TABLE command
using HiveQL in the Hive shell or using HiveContext.sql():

ALTER TABLE mytable ADD COLUMNS col_name data_type

See
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-AlterTable/Partition/Column
for full details.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Jul 22, 2015 at 4:36 AM, Cheng Lian lian.cs@gmail.com wrote:

  Since Hive doesn’t support schema evolution, you’ll have to update the
 schema stored in metastore somehow. For example, you can create a new
 external table with the merged schema. Say you have a Hive table t1:

 CREATE TABLE t1 (c0 INT, c1 DOUBLE);

 By default, this table is stored in HDFS path
 hdfs://some-host:9000/user/hive/warehouse/t1. Now you append some Parquet
 data with an extra column c2 to the same directory:

 import org.apache.spark.sql.types._
 val path = hdfs://some-host:9000/user/hive/warehouse/t1val df1 = 
 sqlContext.range(10).select('id as 'c0, 'id cast DoubleType as 'c1, 'id cast 
 StringType as 'c2)
 df1.write.mode(append).parquet(path)

 Now you can create a new external table t2 like this:

 val df2 = sqlContext.read.option(
  
 mergeSchema, true).parquet(path)
 df2.write.path(path).saveAsTable(t2)

 Since we specified a path above, the newly created t2 is an external
 table pointing to the original HDFS location. But the schema of t2 is the
 merged version.

 The drawback of this approach is that, t2 is actually a Spark SQL
 specific data source table rather than a genuine Hive table. This means, it
 can be accessed by Spark SQL only. We’re just using Hive metastore to help
 persisting metadata of the data source table. However, since you’re asking
 how to access the new table via Spark SQL CLI, this should work for you. We
 are working on making Parquet and ORC data source tables accessible via
 Hive in Spark 1.5.0.

 Cheng

 On 7/22/15 10:32 AM, Jerrick Hoang wrote:

   Hi Lian,

  Sorry I'm new to Spark so I did not express myself very clearly. I'm
 concerned about the situation when let's say I have a Parquet table some
 partitions and I add a new column A to parquet schema and write some data
 with the new schema to a new partition in the table. If i'm not mistaken,
 if I do a sqlContext.read.parquet(table_path).printSchema() it will print
 the correct schema with new column A. But if I do a 'describe table' from
 SparkSQLCLI I won't see the new column being added. I understand that this
 is because Hive doesn't support schema evolution. So what is the best way
 to support CLI queries in this situation? Do I need to manually alter the
 table everytime the underlying schema changes?

  Thanks

 On Tue, Jul 21, 2015 at 4:37 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hey Jerrick,

 What do you mean by schema evolution with Hive metastore tables? Hive
 doesn't take schema evolution into account. Could you please give a
 concrete use case? Are you trying to write Parquet data with extra columns
 into an existing metastore Parquet table?

 Cheng


 On 7/21/15 1:04 AM, Jerrick Hoang wrote:

 I'm new to Spark, any ideas would be much appreciated! Thanks

 On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang 
 jerrickho...@gmail.comjerrickho...@gmail.com wrote:

 Hi all,

  I'm aware of the support for schema evolution via DataFrame API. Just
 wondering what would be the best way to go about dealing with schema
 evolution with Hive metastore tables. So, say I create a table via SparkSQL
 CLI, how would I deal with Parquet schema evolution?

  Thanks,
 J




​



Re: Is spark suitable for real time query

2015-07-22 Thread Louis Hust
My code like below:
MapString, String t11opt = new HashMapString, String();
t11opt.put(url, DB_URL);
t11opt.put(dbtable, t11);
DataFrame t11 = sqlContext.load(jdbc, t11opt);
t11.registerTempTable(t11);

...the same for t12, t21, t22



DataFrame t1 = t11.unionAll(t12);
t1.registerTempTable(t1);
DataFrame t2 = t21.unionAll(t22);
t2.registerTempTable(t2);
for (int i = 0; i  10; i ++) {
System.out.println(new Date(System.currentTimeMillis()));
DataFrame crossjoin = sqlContext.sql(select txt from t1
join t2 on t1.id = t2.id);
crossjoin.show();
System.out.println(new Date(System.currentTimeMillis()));
}

Where t11,t12, t21,t22 are all table dataframe load from jdbc  of mysql
database which is at local with the spark job.

But each loop execute about 3 seconds. i do not know why cost so many time?




2015-07-22 19:52 GMT+08:00 Robin East robin.e...@xense.co.uk:

 Here’s an example using spark-shell on my laptop:

 sc.textFile(LICENSE).filter(_ contains Spark).count

 This takes less than a second the first time I run it and is instantaneous
 on every subsequent run.

 What code are you running?


 On 22 Jul 2015, at 12:34, Louis Hust louis.h...@gmail.com wrote:

 I do a simple test using spark in standalone mode(not cluster),
  and found a simple action take a few seconds, the data size is small,
 just few rows.
 So each spark job will cost some time for init or prepare work no matter
 what the job is?
 I mean if the basic framework of spark job will cost seconds?

 2015-07-22 19:17 GMT+08:00 Robin East robin.e...@xense.co.uk:

 Real-time is, of course, relative but you’ve mentioned microsecond level.
 Spark is designed to process large amounts of data in a distributed
 fashion. No distributed system I know of could give any kind of guarantees
 at the microsecond level.

 Robin

  On 22 Jul 2015, at 11:14, Louis Hust louis.h...@gmail.com wrote:
 
  Hi, all
 
  I am using spark jar in standalone mode, fetch data from different
 mysql instance and do some action, but i found the time is at second level.
 
  So i want to know if spark job is suitable for real time query which at
 microseconds?






Re: use S3-Compatible Storage with spark

2015-07-22 Thread Schmirr Wurst
I could get a little further :
- installed spark-1.4.1-without-hadoop
- unpacked hadoop 2.7.1
- added the folowing in spark-env.sh

HADOOP_HOME=/opt/hadoop-2.7.1/
SPARK_DIST_CLASSPATH=/opt/hadoop-2.7.1/opt/hadoop-2.7.1/share/hadoop/tools/lib/*/share/hadoop/tools/lib/*:/opt/hadoop-2.7.1/etc/hadoop:/opt/hadoop-2.7.1/share/hadoop/common/lib/*:/opt/had$

and start spark-shell with :
bin/spark-shell --jars
/opt/hadoop-2.7.1/share/hadoop/tools/lib/hadoop-aws-2.7.1.jar

Now spark-shell is starting with
spark.SparkContext: Added JAR
file:/opt/hadoop-2.7.1/share/hadoop/tools/lib/hadoop-aws-2.7.1.jar at
http://185.19.29.91:46368/jars/hadoop-aws-2.7.1.jar with timestamp
1437575186830

But when trying to access s3 I have
java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem:
Provider org.apache.hadoop.fs.s3a.S3AFileSystem could not be
instantiated

In Fact it doesn't even matters if I try to use s3n or s3a, error is
the same (strange!)

2015-07-22 12:19 GMT+02:00 Thomas Demoor thomas.dem...@hgst.com:
 You need to get the hadoop-aws.jar from hadoop-tools (use hadoop 2.7+) - you 
 can get the source and build with mvn or get it from prebuilt hadoop 
 distro's. Then when you run your spark job add --jars path/to/thejar

 
 From: Schmirr Wurst schmirrwu...@gmail.com
 Sent: Wednesday, July 22, 2015 12:06 PM
 To: Thomas Demoor
 Subject: Re: use S3-Compatible Storage with spark

 Hi Thomas, thanks, could you just tell me what exaclty I need to do ?
 I'm not familiar with java programming
 - where do I get the jar from, do  I need to compile it with mvn ?
 - where should I update the classpath and how ?



 2015-07-22 11:55 GMT+02:00 Thomas Demoor thomas.dem...@hgst.com:
 The classes are not found. Is the jar on your classpath?

 Take care: there are multiple s3 connectors in hadoop: the legacy s3n, based 
 on a 3d party S3 lib Jets3t, and the recent (functional since hadoop 2.7)  
 s3a based on the Amazon SDK. Make sure you stick to one: so use fs.s3a 
 endpoint and url s3a://bucket/object or fs.s3n.endpoint and 
 s3n://bucket/object. I recommend s3a but I'm biased :P

 Regards,
 Thomas

 
 From: Schmirr Wurst schmirrwu...@gmail.com
 Sent: Tuesday, July 21, 2015 11:59 AM
 To: Akhil Das
 Cc: user@spark.apache.org
 Subject: Re: use S3-Compatible Storage with spark

 Which version do you have ?

 - I tried with spark 1.4.1 for hdp 2.6, but here I had an issue that
 the aws-module is not there somehow:
 java.io.IOException: No FileSystem for scheme: s3n
 the same for s3a :
 java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
 org.apache.hadoop.fs.s3a.S3AFileSystem not found

 - On Spark 1.4.1 for hdp 2.4 , the module is there, and works out of
 the box for S3n (but for the endpoint)
 But I have java.io.IOException: No FileSystem for scheme: s3a

 :-|

 2015-07-21 11:09 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com:
 Did you try with s3a? It seems its more like an issue with hadoop.

 Thanks
 Best Regards

 On Tue, Jul 21, 2015 at 2:31 PM, Schmirr Wurst schmirrwu...@gmail.com
 wrote:

 It seems to work for the credentials , but the endpoint is ignored.. :
 I've changed it to
 sc.hadoopConfiguration.set(fs.s3n.endpoint,test.com)

 And I continue to get my data from amazon, how could it be ? (I also
 use s3n in my text url)

 2015-07-21 9:30 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com:
  You can add the jar in the classpath, and you can set the property like:
 
  sc.hadoopConfiguration.set(fs.s3a.endpoint,storage.sigmoid.com)
 
 
 
  Thanks
  Best Regards
 
  On Mon, Jul 20, 2015 at 9:41 PM, Schmirr Wurst schmirrwu...@gmail.com
  wrote:
 
  Thanks, that is what I was looking for...
 
  Any Idea where I have to store and reference the corresponding
  hadoop-aws-2.6.0.jar ?:
 
  java.io.IOException: No FileSystem for scheme: s3n
 
  2015-07-20 8:33 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com:
   Not in the uri, but in the hadoop configuration you can specify it.
  
   property
 namefs.s3a.endpoint/name
 descriptionAWS S3 endpoint to connect to. An up-to-date list is
   provided in the AWS Documentation: regions and endpoints. Without
   this
   property, the standard region (s3.amazonaws.com) is assumed.
 /description
   /property
  
  
   Thanks
   Best Regards
  
   On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst
   schmirrwu...@gmail.com
   wrote:
  
   I want to use pithos, were do I can specify that endpoint, is it
   possible in the url ?
  
   2015-07-19 17:22 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com:
Could you name the Storage service that you are using? Most of
them
provides
a S3 like RestAPI endpoint for you to hit.
   
Thanks
Best Regards
   
On Fri, Jul 17, 2015 at 2:06 PM, Schmirr Wurst
schmirrwu...@gmail.com
wrote:
   
Hi,
   
I wonder how to use S3 compatible Storage in Spark ?
If I'm using s3n:// url schema, the it will point to 

Re: Scaling spark cluster for a running application

2015-07-22 Thread Romi Kuntsman
Are you running the Spark cluster in standalone or YARN?
In standalone, the application gets the available resources when it starts.
With YARN, you can try to turn on the setting
*spark.dynamicAllocation.enabled*
See https://spark.apache.org/docs/latest/configuration.html

On Wed, Jul 22, 2015 at 2:20 PM phagunbaya phagun.b...@falkonry.com wrote:

 I have a spark cluster running in client mode with driver outside the spark
 cluster. I want to scale the cluster after an application is submitted. In
 order to do this, I'm creating new workers and they are getting registered
 with master but issue I'm seeing is; running application does not use the
 newly added worker. Hence cannot add more resources to existing running
 application.

 Is there any other way or config to deal with this use-case ? How to make
 running application to ask for executors from newly issued worker node ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Scaling-spark-cluster-for-a-running-application-tp23951.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




R: Is spark suitable for real time query

2015-07-22 Thread Paolo Platter
Are you using jdbc server?

Paolo

Inviata dal mio Windows Phone

Da: Louis Hustmailto:louis.h...@gmail.com
Inviato: ‎22/‎07/‎2015 13:47
A: Robin Eastmailto:robin.e...@xense.co.uk
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Oggetto: Re: Is spark suitable for real time query

I do a simple test using spark in standalone mode(not cluster),
 and found a simple action take a few seconds, the data size is small, just few 
rows.
So each spark job will cost some time for init or prepare work no matter what 
the job is?
I mean if the basic framework of spark job will cost seconds?

2015-07-22 19:17 GMT+08:00 Robin East 
robin.e...@xense.co.ukmailto:robin.e...@xense.co.uk:
Real-time is, of course, relative but you’ve mentioned microsecond level. Spark 
is designed to process large amounts of data in a distributed fashion. No 
distributed system I know of could give any kind of guarantees at the 
microsecond level.

Robin

 On 22 Jul 2015, at 11:14, Louis Hust 
 louis.h...@gmail.commailto:louis.h...@gmail.com wrote:

 Hi, all

 I am using spark jar in standalone mode, fetch data from different mysql 
 instance and do some action, but i found the time is at second level.

 So i want to know if spark job is suitable for real time query which at 
 microseconds?




Applications metrics unseparatable from Master metrics?

2015-07-22 Thread Romi Kuntsman
Hi,

I tried to enable Master metrics source (to get number of running/waiting
applications etc), and connected it to Graphite.
However, when these are enabled, application metrics are also sent.

Is it possible to separate them, and send only master metrics without
applications?

I see that Master class is registering both:
https://github.com/apache/spark/blob/b9a922e260bec1b211437f020be37fab46a85db0/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L91

Thanks,
RK.