Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized

2015-04-20 Thread Jean-Pascal Billaud
Hey, so I start the context at the very end when all the piping is done.
BTW a foreachRDD will be called on the resulting dstream.map() right after
that.

The puzzling thing is why removing the context bounds solve the problem...
What does this exception mean in general?

On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com wrote:

 When are you getting this exception? After starting the context?

 TD

 On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Hi,

 I am getting this serialization exception and I am not too sure what
 Graph is unexpectedly null when DStream is being serialized means?

 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED,
 exitCode: 15, (reason: User class threw exception: Task not serializable)
 Exception in thread Driver org.apache.spark.SparkException: Task not
 serializable
 at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
 ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(
 ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
 at org.apache.spark.streaming.dstream.DStream.map(DStream.
 scala:438)
 [...]
 Caused by: java.io.NotSerializableException: Graph is unexpectedly null
 when DStream is being serialized.
 at org.apache.spark.streaming.dstream.DStream$anonfun$
 writeObject$1.apply$mcV$sp(DStream.scala:420)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
 at org.apache.spark.streaming.dstream.DStream.writeObject(
 DStream.scala:403)

 The operation comes down to something like this:

 dstream.map(tuple = {
 val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
 (tuple._1, (tuple._2, w)) })

 And StreamState being a very simple standalone object:

 object StreamState {
   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String,
 key: K) : Option[V] = None
 }

 However if I remove the context bounds from K in fetch e.g. removing
 ClassTag and Ordering then everything is fine.

 If anyone has some pointers, I'd really appreciate it.

 Thanks,





Why is Columnar Parquet used as default for saving Row-based DataFrames/RDD?

2015-04-20 Thread Duy Lan Nguyen
Hello,

I have the above naive question if anyone could help. Why not using a
Row-based File format to save Row-based DataFrames/RDD?

Thanks,
Lan


Re: Streaming Linear Regression problem

2015-04-20 Thread Xiangrui Meng
Did you keep adding new files under the `train/` folder? What was the
exact warn message? -Xiangrui

On Fri, Apr 17, 2015 at 4:56 AM, barisak baris.akg...@gmail.com wrote:
 Hi,

 I write this code for just train the Stream Linear Regression, but I took no
 data found warn, so no weights were not updated.

 Is there any solution for this ?

 Thanks

 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression.{LabeledPoint,
 StreamingLinearRegressionWithSGD}
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}


 object StreamingLinearRegression {

   def main(args: Array[String]) {

 val numFeatures=3

 val conf = new
 SparkConf().setMaster(local[2]).setAppName(StreamingLinearRegression)
 val ssc = new StreamingContext(conf, Seconds(30))

 val trainingData =
 ssc.textFileStream(/home/barisakgu/Desktop/Spark/train).map(LabeledPoint.parse).cache()
 val testData =
 ssc.textFileStream(/home/barisakgu/Desktop/Spark/test).map(LabeledPoint.parse)

 val model = new
 StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))

 model.trainOn(trainingData)
 model.predictOnValues(testData.map(lp = (lp.label,
 lp.features))).print()

 ssc.start()
 ssc.awaitTermination()

   }

 }



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Linear-Regression-problem-tp22539.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ChiSquared Test from user response flat files to RDD[Vector]?

2015-04-20 Thread Dan DeCapria, CivicScience
Hi Spark community,

I'm very new to the Apache Spark community; but if this (very active) group
is anything like the other Apache project user groups I've worked with, I'm
going to enjoy discussions here very much. Thanks in advance!

Use Case:
I am trying to go from flat files of user response data, to contingency
tables of frequency counts, to Pearson Chi-Square correlation statistics
and perform a Chi-Squared hypothesis test.  The user response data
represents a multiple choice question-answer (MCQ) format. The goal is to
compute all choose-two combinations of question answers (precondition,
question X question) contingency tables. Each cell of the contingency table
is the intersection of the users whom responded per each option per each
question of the table.

An overview of the problem:
// data ingestion and typing schema: Observation (u: String, d:
java.util.Date, t: String, q: String, v: String, a: Int)
// a question (q) has a finite set of response options (v) per which a user
(u) responds
// additional response fields are not required per this test
for (precondition a) {
  for (q_i in lex ordered questions) {
for (q_j in lex ordered question, q_j  q_i) {
forall v_k \in q_i get set of distinct users {u}_ik
forall v_l \in q_j get set of distinct users {u}_jl
forall cells per table (a,q_i,q_j) defn C_ijkl = |intersect({u}_ik,
{u}_jl)| // contingency table construct
compute chisq test per this contingency table and persist
}
  }
}

The scala main I'm testing is provided below, and I was planning to use the
provided example
https://spark.apache.org/docs/1.3.1/mllib-statistics.html however
I am not sure how to go from my RDD[Observation] to the necessary
precondition of RDD[Vector] for ingestion

  def main(args: Array[String]): Unit = {
// setup main space for test
val conf = new SparkConf().setAppName(TestMain)
val sc = new SparkContext(conf)

// data ETL and typing schema
case class Observation (u: String, d: java.util.Date, t: String, q:
String, v: String, a: Int)
val date_format = new java.text.SimpleDateFormat(MMdd)
val data_project_abs_dir = /my/path/to/data/files
val data_files = data_project_abs_dir + /part-*.gz
val data = sc.textFile(data_files)
val observations = data.map(line = line.split(,).map(_.trim)).map(r
= Observation(r(0).toString, date_format.parse(r(1).toString),
r(2).toString, r(3).toString, r(4).toString, r(5).toInt))
observations.cache

// ToDo: the basic keying of the space, possibly...
val qvu = observations.map(o = ((o.a, o.q, o.v), o.u)).distinct

// ToDo: ok, so now how to get this into the precondition RDD[Vector]
from the Spark example to make a contingency table?...

// ToDo: perform then persist the resulting chisq and p-value on these
contingency tables...
  }


Any help is appreciated.

Thanks!  -Dan


RE: GSSException when submitting Spark job in yarn-cluster mode with HiveContext APIs on Kerberos cluster

2015-04-20 Thread Andrew Lee
Hi Marcelo,
Exactly what I need to track, thanks for the JIRA pointer.

 Date: Mon, 20 Apr 2015 14:03:55 -0700
 Subject: Re: GSSException when submitting Spark job in yarn-cluster mode with 
 HiveContext APIs on Kerberos cluster
 From: van...@cloudera.com
 To: alee...@hotmail.com
 CC: user@spark.apache.org
 
 I think you want to take a look at:
 https://issues.apache.org/jira/browse/SPARK-6207
 
 On Mon, Apr 20, 2015 at 1:58 PM, Andrew Lee alee...@hotmail.com wrote:
  Hi All,
 
  Affected version: spark 1.2.1 / 1.2.2 / 1.3-rc1
 
  Posting this problem to user group first to see if someone is encountering
  the same problem.
 
  When submitting spark jobs that invokes HiveContext APIs on a Kerberos
  Hadoop + YARN (2.4.1) cluster,
  I'm getting this error.
 
  javax.security.sasl.SaslException: GSS initiate failed [Caused by
  GSSException: No valid credentials provided (Mechanism level: Failed to find
  any Kerberos tgt)]
 
  Apparently, the Kerberos ticket is not on the remote data node nor computing
  node since we don't
  deploy Kerberos tickets, and that is not a good practice either. On the
  other hand, we can't just SSH to every machine and run kinit for that users.
  This is not practical and it is insecure.
 
  The point here is that shouldn't there be a delegation token during the doAs
  to use the token instead of the ticket ?
  I'm trying to understand what is missing in Spark's HiveContext API while a
  normal MapReduce job that invokes Hive APIs will work, but not in Spark SQL.
  Any insights or feedback are appreciated.
 
  Anyone got this running without pre-deploying (pre-initializing) all tickets
  node by node? Is this worth filing a JIRA?
 
 
 
  15/03/25 18:59:08 INFO hive.metastore: Trying to connect to metastore with
  URI thrift://alee-cluster.test.testserver.com:9083
  15/03/25 18:59:08 ERROR transport.TSaslTransport: SASL negotiation failure
  javax.security.sasl.SaslException: GSS initiate failed [Caused by
  GSSException: No valid credentials provided (Mechanism level: Failed to find
  any Kerberos tgt)]
  at
  com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
  at
  org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
  at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
  at
  org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
  at
  org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
  at
  org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:415)
  at
  org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
  at
  org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49)
  at
  org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:336)
  at
  org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:214)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at
  sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
  at
  sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
  at
  org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410)
  at
  org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62)
  at
  org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72)
  at
  org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453)
  at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
  at
  org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340)
  at
  org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235)
  at
  org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:231)
  at scala.Option.orElse(Option.scala:257)
  at
  org.apache.spark.sql.hive.HiveContext.x$3$lzycompute(HiveContext.scala:231)
  at org.apache.spark.sql.hive.HiveContext.x$3(HiveContext.scala:229)
  at
  org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:229)
  at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:229)
  at
  org.apache.spark.sql.hive.HiveMetastoreCatalog.init(HiveMetastoreCatalog.scala:55)
  at
  org.apache.spark.sql.hive.HiveContext$$anon$2.init(HiveContext.scala:253)
  at
  org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:253)
  at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:253)
  at
  org.apache.spark.sql.hive.HiveContext$$anon$4.init(HiveContext.scala:263)
  at
  

Re: SparkSQL performance

2015-04-20 Thread Renato Marroquín Mogrovejo
Does anybody have an idea? a clue? a hint?
Thanks!


Renato M.

2015-04-20 9:31 GMT+02:00 Renato Marroquín Mogrovejo 
renatoj.marroq...@gmail.com:

 Hi all,

 I have a simple query Select * from tableX where attribute1 between 0 and
 5 that I run over a Kryo file with four partitions that ends up being
 around 3.5 million rows in our case.
 If I run this query by doing a simple map().filter() it takes around ~9.6
 seconds but when I apply schema, register the table into a SqlContext, and
 then run the query, it takes around ~16 seconds. This is using Spark 1.2.1
 with Scala 2.10.0
 I am wondering why there is such a big gap on performance if it is just a
 filter. Internally, the relation files are mapped to a JavaBean. This
 different data presentation (JavaBeans vs SparkSQL internal representation)
 could lead to such difference? Is there anything I could do to make the
 performance get closer to the hard-coded option?
 Thanks in advance for any suggestions or ideas.


 Renato M.



Re: SparkSQL performance

2015-04-20 Thread ayan guha
SparkSQL optimizes better by column pruning and predicate pushdown,
primarily. Here you are not taking advantage of either.

I am curious to know what goes in your filter function, as you are not
using a filter in SQL side.

Best
Ayan
On 21 Apr 2015 08:05, Renato Marroquín Mogrovejo 
renatoj.marroq...@gmail.com wrote:

 Does anybody have an idea? a clue? a hint?
 Thanks!


 Renato M.

 2015-04-20 9:31 GMT+02:00 Renato Marroquín Mogrovejo 
 renatoj.marroq...@gmail.com:

 Hi all,

 I have a simple query Select * from tableX where attribute1 between 0
 and 5 that I run over a Kryo file with four partitions that ends up being
 around 3.5 million rows in our case.
 If I run this query by doing a simple map().filter() it takes around ~9.6
 seconds but when I apply schema, register the table into a SqlContext, and
 then run the query, it takes around ~16 seconds. This is using Spark 1.2.1
 with Scala 2.10.0
 I am wondering why there is such a big gap on performance if it is just a
 filter. Internally, the relation files are mapped to a JavaBean. This
 different data presentation (JavaBeans vs SparkSQL internal representation)
 could lead to such difference? Is there anything I could do to make the
 performance get closer to the hard-coded option?
 Thanks in advance for any suggestions or ideas.


 Renato M.





Re: ChiSquared Test from user response flat files to RDD[Vector]?

2015-04-20 Thread Xiangrui Meng
You can find the user guide for vector creation here:
http://spark.apache.org/docs/latest/mllib-data-types.html#local-vector.
-Xiangrui

On Mon, Apr 20, 2015 at 2:32 PM, Dan DeCapria, CivicScience
dan.decap...@civicscience.com wrote:
 Hi Spark community,

 I'm very new to the Apache Spark community; but if this (very active) group
 is anything like the other Apache project user groups I've worked with, I'm
 going to enjoy discussions here very much. Thanks in advance!

 Use Case:
 I am trying to go from flat files of user response data, to contingency
 tables of frequency counts, to Pearson Chi-Square correlation statistics and
 perform a Chi-Squared hypothesis test.  The user response data represents a
 multiple choice question-answer (MCQ) format. The goal is to compute all
 choose-two combinations of question answers (precondition, question X
 question) contingency tables. Each cell of the contingency table is the
 intersection of the users whom responded per each option per each question
 of the table.

 An overview of the problem:
 // data ingestion and typing schema: Observation (u: String, d:
 java.util.Date, t: String, q: String, v: String, a: Int)
 // a question (q) has a finite set of response options (v) per which a user
 (u) responds
 // additional response fields are not required per this test
 for (precondition a) {
   for (q_i in lex ordered questions) {
 for (q_j in lex ordered question, q_j  q_i) {
 forall v_k \in q_i get set of distinct users {u}_ik
 forall v_l \in q_j get set of distinct users {u}_jl
 forall cells per table (a,q_i,q_j) defn C_ijkl = |intersect({u}_ik,
 {u}_jl)| // contingency table construct
 compute chisq test per this contingency table and persist
 }
   }
 }

 The scala main I'm testing is provided below, and I was planning to use the
 provided example https://spark.apache.org/docs/1.3.1/mllib-statistics.html
 however I am not sure how to go from my RDD[Observation] to the necessary
 precondition of RDD[Vector] for ingestion

   def main(args: Array[String]): Unit = {
 // setup main space for test
 val conf = new SparkConf().setAppName(TestMain)
 val sc = new SparkContext(conf)

 // data ETL and typing schema
 case class Observation (u: String, d: java.util.Date, t: String, q:
 String, v: String, a: Int)
 val date_format = new java.text.SimpleDateFormat(MMdd)
 val data_project_abs_dir = /my/path/to/data/files
 val data_files = data_project_abs_dir + /part-*.gz
 val data = sc.textFile(data_files)
 val observations = data.map(line = line.split(,).map(_.trim)).map(r
 = Observation(r(0).toString, date_format.parse(r(1).toString),
 r(2).toString, r(3).toString, r(4).toString, r(5).toInt))
 observations.cache

 // ToDo: the basic keying of the space, possibly...
 val qvu = observations.map(o = ((o.a, o.q, o.v), o.u)).distinct

 // ToDo: ok, so now how to get this into the precondition RDD[Vector]
 from the Spark example to make a contingency table?...

 // ToDo: perform then persist the resulting chisq and p-value on these
 contingency tables...
   }


 Any help is appreciated.

 Thanks!  -Dan


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkStreaming onStart not being invoked on CustomReceiver attached to master with multiple workers

2015-04-20 Thread Tathagata Das
Responses inline.

On Mon, Apr 20, 2015 at 3:27 PM, Ankit Patel patel7...@hotmail.com wrote:

 What you said is correct and I am expecting the printlns to be in my
 console or my SparkUI. I do not see it in either places.

Can you actually login into the machine running the executor which runs the
receiver? And then see the executors logs in that machine, to see whether
the expected onStart logs are there?


 However, if you run the program then the printlns do print for the
 constructor of the receiver and the for the foreach statements with total
 count 0.

That's expected. In all cases, with or without master, the receiver object
is constructed in the driver. With master, the receiver gets serialized and
sent to worker machine and rest of the prints go to the executor logs.


 When you run it in regular more with no master attached then you will see
 the counts being printed out in the console as well. Please compile my
 program and try it out, I have spent significant time on debugging where it
 can go wrong and could not find an answer.


As I said, checkout the executor logs in the worker machine running the
receiver. You can identify that machine from the streaming tab in the Spark
UI.


 I also see the starting receiver logs from spark when no master is
 defined, but do not see it when there is. Also, I am running some other
 simple code with spark-submit with printlns and I do see them in my
 SparkUI, but not for spark streaming.

That's expected. Same reason. Without master, received logs is in the same
driver process. With master, they go to executor logs.

As such, its not clear what you are trying to debug? If you just want to
see printlns, then this seems to be the expected behavior, nothing is
wrong. Use without master for convenient debugging (you can see all logs
and prints), and then run into distributed mode for actual deployment
(where its harder to see those logs and prints).


 Thanks,
 Ankit

 --
 From: t...@databricks.com
 Date: Mon, 20 Apr 2015 13:29:31 -0700
 Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver
 attached to master with multiple workers
 To: patel7...@hotmail.com
 CC: ak...@sigmoidanalytics.com; user@spark.apache.org


 Well, the receiver always runs as part of an executor. When running
 locally (that is, spark-submit without --master), the executor is in the
 same process as the driver, so you see the printlns. If you are running
 with --master spark://cluster, then the executors ar running in different
 process and possibly different nodes. Hence you dont see the printlns in
 the output of driver process. If you see the the output of executorsin the
 Spark UI, then you may find those prints.

 TD

 On Mon, Apr 20, 2015 at 5:16 AM, Ankit Patel patel7...@hotmail.com
 wrote:

 The code I've written is simple as it just invokes a thread and calls a
 store method on the Receiver class.

  I see this code with printlns working fine when I try spark-submit --jars
 jar --class test.TestCustomReceiver jar

 However it does not work with I try the same command above with --master
 spark://masterURL
 spark-submit --master spark://masterURL --jars jar --class
 test.TestCustomReceiver jar

 I also tried setting the master in the conf that I am created, but that
 does not work either. I do not see the onStart println being printed when I
 use --master option. Please advice.

 Also, the master I am attaching to has multiple workers across hosts with
 many threads available to it.

 The code is pasted below (Classes: TestReviever, TestCustomReceiver):



 package test;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.receiver.Receiver;

 public class TestReceiver extends ReceiverString {

  public TestReceiver() {
super(StorageLevel.MEMORY_ONLY());
System.out.println(Ankit: Created TestReceiver);
  }

  @Override
  public void onStart() {
 System.out.println(Start TestReceiver);
 new TestThread().start();
  }
  public void onStop() {}  @SuppressWarnings(unused)

private class TestThread extends Thread{
   @Override
   public void run() {
while(true){
   try{
  sleep( (long) (Math.random() *
 3000));
   }catch(Exception e){
  e.printStackTrace();
   }
   store(Time:  +
 System.currentTimeMillis());
}
  }
  }

}



 package test;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import 

HiveContext vs SQLContext

2015-04-20 Thread Daniel Mahler
Is HiveContext still preferred over SQLContext?
What are the current (1.3.1) diferences between them?

thanks
Daniel


Re: MLlib - Naive Bayes Problem

2015-04-20 Thread Xiangrui Meng
Could you attach the full stack trace? Please also include the stack
trace from executors, which you can find on the Spark WebUI. -Xiangrui

On Thu, Apr 16, 2015 at 1:00 PM, riginos samarasrigi...@gmail.com wrote:
 I have a big dataset of categories of cars and descriptions of cars. So i
 want to give a description of a car and the program to classify the category
 of that car.
 So i decided to use multinomial naive Bayes. I created a unique id for each
 word and replaced my whole category,description data.

 //My input
 2,25187 15095 22608 28756 17862 29523 499 32681 9830 24957 18993 19501 16596
 17953 16596
 20,1846 29058 16252 20446 9835
 52,16861 808 26785 17874 18993 18993 18993 18269 34157 33811 18437 6004 2791
 27923 19141
 ...
 ...

 Why do I have errors like:

 //Errors

 3 ERROR Executor: Exception in task 0.0 in stage 211.0 (TID 392)
 java.lang.IndexOutOfBoundsException: 13 not in [-13,13)

 ERROR Executor: Exception in task 1.0 in stage 211.0 (TID 393)
 java.lang.IndexOutOfBoundsException: 17 not in [-17,17)

 ERROR TaskSetManager: Task 0 in stage 211.0 failed 1 times; aborting job
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
 stage 211.0 failed 1 times, most recent failure: Lost task 0.0 in stage
 211.0 (TID 392, localhost): java.lang.IndexOutOfBoundsException: 13 not in
 [-13,13)

 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)






 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Naive-Bayes-Problem-tp22531.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Instantiating/starting Spark jobs programmatically

2015-04-20 Thread Ajay Singal
Greetings,

We have an analytics workflow system in production.  This system is built in
Java and utilizes other services (including Apache Solr).  It works fine
with moderate level of data/processing load.  However, when the load goes
beyond certain limit (e.g., more than 10 million messages/documents) delays
start to show up.  No doubt this is a scalability issue, and Hadoop
ecosystem, especially Spark, can be handy in this situation.  The simplest
approach would be to rebuild the entire workflow using Spark, Kafka and
other components.  However, we decided to handle the problem in a couple of
phases.  In first phase we identified a few pain points (areas where
performance suffers most) and have started building corresponding mini Spark
applications (so as to take advantage of parallelism).

For now my question is: how can we instantiate/start our mini Spark jobs
programmatically (e.g., from Java applications)?  Only option I see in the
documentation is to run the jobs through command line (using spark-submit). 
Any insight in this area would be highly appreciated.

In longer term, I want to construct a collection of mini Spark applications
(each performing one specific task, similar to web services), and
architect/design bigger Spark based applications which in term will call
these mini Spark applications programmatically.  There is a possibility that
the Spark community has already started building such collection of
services.  Can you please provide some information/tips/best-practices in
this regard?

Cheers!
Ajay




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Instantiating-starting-Spark-jobs-programmatically-tp22577.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL vs map reduce tableInputOutput

2015-04-20 Thread Ted Yu
Please take a look at https://issues.apache.org/jira/browse/PHOENIX-1815

On Mon, Apr 20, 2015 at 10:11 AM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Thanks for reply.

 Does phoenix using inside Spark will be useful?

 what is the best way to bring data from Hbase into Spark in terms
 performance of application?

 Regards
 Jeetendra

 On 20 April 2015 at 20:49, Ted Yu yuzhih...@gmail.com wrote:

 To my knowledge, Spark SQL currently doesn't provide range scan
 capability against hbase.

 Cheers



  On Apr 20, 2015, at 7:54 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:
 
  HI All,
 
  I am Querying Hbase and combining result and using in my spake job.
  I am querying hbase using Hbase client api inside my spark job.
  can anybody suggest me will Spark SQl will be fast enough and provide
 range of queries?
 
  Regards
  Jeetendra
 








Re: Spark SQL vs map reduce tableInputOutput

2015-04-20 Thread ayan guha
I think recommended use will be creating a dataframe using hbase as source.
Then you can run any SQL on that DF.
In 1.2 you can create a base rdd and then apply schema in the same manner
On 21 Apr 2015 03:12, Jeetendra Gangele gangele...@gmail.com wrote:

 Thanks for reply.

 Does phoenix using inside Spark will be useful?

 what is the best way to bring data from Hbase into Spark in terms
 performance of application?

 Regards
 Jeetendra

 On 20 April 2015 at 20:49, Ted Yu yuzhih...@gmail.com wrote:

 To my knowledge, Spark SQL currently doesn't provide range scan
 capability against hbase.

 Cheers



  On Apr 20, 2015, at 7:54 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:
 
  HI All,
 
  I am Querying Hbase and combining result and using in my spake job.
  I am querying hbase using Hbase client api inside my spark job.
  can anybody suggest me will Spark SQl will be fast enough and provide
 range of queries?
 
  Regards
  Jeetendra
 








Re: SparkSQL performance

2015-04-20 Thread Michael Armbrust
There is a cost to converting from JavaBeans to Rows and this code path has
not been optimized.  That is likely what you are seeing.

On Mon, Apr 20, 2015 at 3:55 PM, ayan guha guha.a...@gmail.com wrote:

 SparkSQL optimizes better by column pruning and predicate pushdown,
 primarily. Here you are not taking advantage of either.

 I am curious to know what goes in your filter function, as you are not
 using a filter in SQL side.

 Best
 Ayan
 On 21 Apr 2015 08:05, Renato Marroquín Mogrovejo 
 renatoj.marroq...@gmail.com wrote:

 Does anybody have an idea? a clue? a hint?
 Thanks!


 Renato M.

 2015-04-20 9:31 GMT+02:00 Renato Marroquín Mogrovejo 
 renatoj.marroq...@gmail.com:

 Hi all,

 I have a simple query Select * from tableX where attribute1 between 0
 and 5 that I run over a Kryo file with four partitions that ends up being
 around 3.5 million rows in our case.
 If I run this query by doing a simple map().filter() it takes around
 ~9.6 seconds but when I apply schema, register the table into a SqlContext,
 and then run the query, it takes around ~16 seconds. This is using Spark
 1.2.1 with Scala 2.10.0
 I am wondering why there is such a big gap on performance if it is just
 a filter. Internally, the relation files are mapped to a JavaBean. This
 different data presentation (JavaBeans vs SparkSQL internal representation)
 could lead to such difference? Is there anything I could do to make the
 performance get closer to the hard-coded option?
 Thanks in advance for any suggestions or ideas.


 Renato M.





Re: spark sql error with proto/parquet

2015-04-20 Thread Michael Armbrust
You are probably using an encoding that we don't support.  I think this PR
may be adding that support: https://github.com/apache/spark/pull/5422

On Sat, Apr 18, 2015 at 5:40 PM, Abhishek R. Singh 
abhis...@tetrationanalytics.com wrote:

 I have created a bunch of protobuf based parquet files that I want to
 read/inspect using Spark SQL. However, I am running into exceptions and not
 able to proceed much further:

 This succeeds successfully (probably because there is no action yet). I
 can also printSchema() and count() without any issues:

 scala val df = sqlContext.load(“my_root_dir/201504101000,
 parquet)

 scala df.select(df(summary)).first

 15/04/18 17:03:03 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
 5.0 (TID 27, xxx.yyy.com): parquet.io.ParquetDecodingException: Can not
 read value at 0 in block -1 in file
 hdfs://xxx.yyy.com:8020/my_root_dir/201504101000/0.parquet
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: parquet.io.ParquetDecodingException: The requested schema is
 not compatible with the file schema. incompatible types: optional group …


 I could convert my protos into json and then back to parquet, but that
 seems wasteful !

 Also, I will be happy to contribute and make protobuf work with Spark SQL
 if I can get some guidance/help/pointers. Help appreciated.

 -Abhishek-



Re: Updating a Column in a DataFrame

2015-04-20 Thread ayan guha
You can always create another DF using a map. In reality operations are
lazy so only final value will get computed.

Can you provide the usecase in little more detail?
On 21 Apr 2015 08:39, ARose ashley.r...@telarix.com wrote:

 In my Java application, I want to update the values of a Column in a given
 DataFrame. However, I realize DataFrames are immutable, and therefore
 cannot
 be updated by conventional means. Is there a workaround for this sort of
 transformation? If so, can someone provide an example?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Updating-a-Column-in-a-DataFrame-tp22578.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Why is Columnar Parquet used as default for saving Row-based DataFrames/RDD?

2015-04-20 Thread Lan
Hello,

I have the above naive question if anyone could help. Why not using a
Row-based File format to save Row-based DataFrames/RDD?

Thanks,
Lan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-Columnar-Parquet-used-as-default-for-saving-Row-based-DataFrames-RDD-tp22579.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to make a spark cluster ?

2015-04-20 Thread haihar nahak
Thank you :)

On Mon, Apr 20, 2015 at 4:46 PM, Jörn Franke jornfra...@gmail.com wrote:

 Hi, If you have just one physical machine then I would try out Docker
 instead of a full VM (would be waste of memory and CPU).

 Best regards
 Le 20 avr. 2015 00:11, hnahak harihar1...@gmail.com a écrit :

 Hi All,

 I've big physical machine with 16 CPUs , 256 GB RAM, 20 TB Hard disk. I
 just
 need to know what should be the best solution to make a spark cluster?

 If I need to process TBs of data then
 1. Only one machine, which contain driver, executor, job tracker and task
 tracker everything.
 2. create 4 VMs and each VM should consist 4 CPUs , 64 GB RAM
 3. create 8 VMs and each VM should consist 2 CPUs , 32 GB RAM each

 please give me your views/suggestions



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-a-spark-cluster-tp22563.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
{{{H2N}}}-(@:


RE: SparkStreaming onStart not being invoked on CustomReceiver attached to master with multiple workers

2015-04-20 Thread Ankit Patel
What you said is correct and I am expecting the printlns to be in my console or 
my SparkUI. I do not see it in either places. However, if you run the program 
then the printlns do print for the constructor of the receiver and the for the 
foreach statements with total count 0. When you run it in regular more with no 
master attached then you will see the counts being printed out in the console 
as well. Please compile my program and try it out, I have spent significant 
time on debugging where it can go wrong and could not find an answer. I also 
see the starting receiver logs from spark when no master is defined, but do not 
see it when there is. Also, I am running some other simple code with 
spark-submit with printlns and I do see them in my SparkUI, but not for spark 
streaming.
Thanks,Ankit

From: t...@databricks.com
Date: Mon, 20 Apr 2015 13:29:31 -0700
Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver 
attached to master with multiple workers
To: patel7...@hotmail.com
CC: ak...@sigmoidanalytics.com; user@spark.apache.org

Well, the receiver always runs as part of an executor. When running locally 
(that is, spark-submit without --master), the executor is in the same process 
as the driver, so you see the printlns. If you are running with --master 
spark://cluster, then the executors ar running in different process and 
possibly different nodes. Hence you dont see the printlns in the output of 
driver process. If you see the the output of executorsin the Spark UI, then you 
may find those prints.
TD
On Mon, Apr 20, 2015 at 5:16 AM, Ankit Patel patel7...@hotmail.com wrote:



The code I've written is simple as it just invokes a thread and calls a store 
method on the Receiver class.
 I see this code with printlns working fine when I try spark-submit --jars 
jar --class test.TestCustomReceiver jar
However it does not work with I try the same command above with --master 
spark://masterURLspark-submit --master spark://masterURL --jars jar --class 
test.TestCustomReceiver jar
I also tried setting the master in the conf that I am created, but that does 
not work either. I do not see the onStart println being printed when I use 
--master option. Please advice.
Also, the master I am attaching to has multiple workers across hosts with many 
threads available to it.
The code is pasted below (Classes: TestReviever, TestCustomReceiver):


package test;import org.apache.spark.storage.StorageLevel;import 
org.apache.spark.streaming.receiver.Receiver;
public class TestReceiver extends ReceiverString {
 public TestReceiver() {   super(StorageLevel.MEMORY_ONLY());   
System.out.println(Ankit: Created TestReceiver); }
 @Override public void onStart() {
System.out.println(Start TestReceiver);new 
TestThread().start(); } public void onStop() {}  
@SuppressWarnings(unused)
   private class TestThread extends Thread{  @Override  
public void run() {   while(true){  
try{ sleep( 
(long) (Math.random() * 3000));  
}catch(Exception e){ 
e.printStackTrace();  } 
 store(Time:  + System.currentTimeMillis());  
 } } }
   }
 
package test;import org.apache.spark.SparkConf;import 
org.apache.spark.api.java.function.Function;import 
org.apache.spark.streaming.Duration;import 
org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import 
org.apache.spark.streaming.api.java.JavaStreamingContext;
public class TestCustomReceiver {   public static void main(String[] args){ 
 SparkConf conf = new SparkConf();  
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));  
TestReceiver receiver = new TestReceiver();  
JavaReceiverInputDStreamString stream = ssc.receiverStream(receiver); 
 stream.map(new Function(){ @Override   
  public Object call(Object arg0) throws Exception {   
System.out.println(Received:  + arg0);   return arg0; 
}  }).foreachRDD(new Function(){
 @Override public Object call(Object arg0) throws 
Exception {   System.out.println(Total Count:  + 
((org.apache.spark.api.java.JavaRDD)arg0).count());   
return arg0; }  });  ssc.start();   
   ssc.awaitTermination();   }
}

Thanks,Ankit

Date: Mon, 20 Apr 2015 12:22:03 +0530
Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver 
attached to master with multiple workers
From: 

Updating a Column in a DataFrame

2015-04-20 Thread ARose
In my Java application, I want to update the values of a Column in a given
DataFrame. However, I realize DataFrames are immutable, and therefore cannot
be updated by conventional means. Is there a workaround for this sort of
transformation? If so, can someone provide an example?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Updating-a-Column-in-a-DataFrame-tp22578.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQL UserDefinedType can't be saved in parquet file when using assembly jar

2015-04-20 Thread Xiangrui Meng
You should check where MyDenseVectorUDT is defined and whether it was
on the classpath (or in the assembly jar) at runtime. Make sure the
full class name (with package name) is used. Btw, UDTs are not public
yet, so please use it with caution. -Xiangrui

On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:
 Dear all,

 Here is an example of code to reproduce the issue I mentioned in a previous
 mail about saving an UserDefinedType into a parquet file. The problem here
 is that the code works when I run it inside intellij idea but fails when I
 create the assembly jar and run it with spark-submit. I use the master
 version of  Spark.

 @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT])
 class MyDenseVector(val data: Array[Double]) extends Serializable {
   override def equals(other: Any): Boolean = other match {
 case v: MyDenseVector =
   java.util.Arrays.equals(this.data, v.data)
 case _ = false
   }
 }

 class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] {
   override def sqlType: DataType = ArrayType(DoubleType, containsNull =
 false)
   override def serialize(obj: Any): Seq[Double] = {
 obj match {
   case features: MyDenseVector =
 features.data.toSeq
 }
   }

   override def deserialize(datum: Any): MyDenseVector = {
 datum match {
   case data: Seq[_] =
 new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray)
 }
   }

   override def userClass: Class[MyDenseVector] = classOf[MyDenseVector]

 }

 case class Toto(imageAnnotation: MyDenseVector)

 object TestUserDefinedType {

   case class Params(input: String = null,
partitions: Int = 12,
 outputDir: String = images.parquet)

   def main(args: Array[String]): Unit = {

 val conf = new
 SparkConf().setAppName(ImportImageFolder).setMaster(local[4])

 val sc = new SparkContext(conf)
 val sqlContext = new SQLContext(sc)

 import sqlContext.implicits._

 val rawImages = sc.parallelize((1 to 5).map(x = Toto(new
 MyDenseVector(Array[Double](x.toDouble).toDF

 rawImages.printSchema()

 rawImages.show()

 rawImages.save(toto.parquet) // This fails with assembly jar
 sc.stop()

   }
 }


 My build.sbt is as follow :

 libraryDependencies ++= Seq(
   org.apache.spark %% spark-core % sparkVersion % provided,
   org.apache.spark %% spark-sql % sparkVersion,
   org.apache.spark %% spark-mllib % sparkVersion
 )

 assemblyMergeStrategy in assembly := {
   case PathList(javax, servlet, xs @ _*) = MergeStrategy.first
   case PathList(org, apache, xs @ _*) = MergeStrategy.first
   case PathList(org, jboss, xs @ _*) = MergeStrategy.first
 //  case PathList(ps @ _*) if ps.last endsWith .html =
 MergeStrategy.first
 //  case application.conf=
 MergeStrategy.concat
   case m if m.startsWith(META-INF) = MergeStrategy.discard
   //case x =
   //  val oldStrategy = (assemblyMergeStrategy in assembly).value
   //  oldStrategy(x)
   case _ = MergeStrategy.first
 }


 As I said, this code works without problem when I execute it inside intellij
 idea. But when generate the assembly jar with sbt-assembly and

 use spark-submit I got the following error :

 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0
 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0 (TID 7)
 java.lang.IllegalArgumentException: Unsupported dataType:
 {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]},
 [1.1] failure: `TimestampType' expected but `{' found

 {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]}
 ^
   at
 org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)
   at
 org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)
   at
 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)
   at
 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)
   at scala.util.Try.getOrElse(Try.scala:77)
   at
 org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)
   at
 org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)
   at
 parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)
   at
 parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
   at
 org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:694)
   at
 org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716)
   at
 

Re: Spark Performance on Yarn

2015-04-20 Thread Peng Cheng
I got exactly the same problem, except that I'm running on a standalone
master. Can you tell me the counterpart parameter on standalone master for
increasing the same memroy overhead?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p22580.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



MLlib - Collaborative Filtering - trainImplicit task size

2015-04-20 Thread Christian S. Perone
I keep seeing these warnings when using trainImplicit:

WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB).
The maximum recommended task size is 100 KB.

And then the task size starts to increase. Is this a known issue ?

Thanks !

-- 
Blog http://blog.christianperone.com | Github https://github.com/perone
| Twitter https://twitter.com/tarantulae
Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
joke on me.


meet weird exception when studying rdd caching

2015-04-20 Thread donhoff_h
Hi,

I am studying the RDD Caching function and write a small program to verify it. 
I run the program in a Spark1.3.0 environment and on Yarn cluster. But I meet a 
weird exception. It isn't always generated in the log. Only sometimes I can see 
this exception. And it does not affect the output of my program.  Could anyone 
explain why this happens and how to eliminate it?

My program and the exception is listed in the following. Thanks very much for 
the help!

*Program*
object TestSparkCaching01 {
 def main(args: Array[String]) {
   val conf = new SparkConf()
   conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
   conf.set(spark.kryo.registrationRequired,true)
   conf.registerKryoClasses(Array(classOf[MyClass1],classOf[Array[MyClass1]]))
   val inFile = hdfs://bgdt-dev-hrb/user/spark/tst/charset/A_utf8.txt
   val sc = new SparkContext(conf)
   val rdd = sc.textFile(inFile)
   rdd.cache()
   rdd.map(Cache String: +_).foreach(println )
   sc.stop()
 }
}

*Exception*
15/04/21 09:58:25 WARN channel.DefaultChannelPipeline: An exception was thrown 
by an exception handler.
java.util.concurrent.RejectedExecutionException: Worker has already been 
shutdown
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72)
at 
org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56)
at 
org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
at 
org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34)
at 
org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496)
at 
org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46)
at 
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
at 
org.jboss.netty.channel.Channels.disconnect(Channels.java:781)
at 
org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:211)
at 
akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:223)
at 
akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:222)
at scala.util.Success.foreach(Try.scala:205)
at 
scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204)
at 
scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at 
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/04/21 09:58:25 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Remoting shut down.

Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-20 Thread ๏̯͡๏
It also is a little more evidence to Jonathan's suggestion that there is a
null / 0 record that is getting grouped together.

To fix this, do i need to run a filter ?

val viEventsRaw = details.map { vi = (vi.get(14).asInstanceOf[Long],
vi) }

val viEvents = viEventsRaw.filter { case (itemId, viEvent) = itemId !=
0 }



On Wed, Apr 15, 2015 at 2:04 AM, Imran Rashid iras...@cloudera.com wrote:

 Shuffle write could be a good indication of skew, but it looks like the
 task in question hasn't generated any shuffle write yet, because its still
 working on the shuffle-read side.   So I wouldn't read too much into the
 fact that the shuffle write is 0 for a task that is still running.

 The shuffle read is larger than for the other tasks (3.0GB vs. 2.2 GB, or
 more importantly, 55M records vs 1M records).  So it might not be that the
 raw data volume is much higher on that task, but its getting a ton more
 small records, which will also generate a lot of work.  It also is a little
 more evidence to Jonathan's suggestion that there is a null / 0 record that
 is getting grouped together.


 On Mon, Apr 13, 2015 at 12:40 PM, Jonathan Coveney jcove...@gmail.com
 wrote:

 I'm not 100% sure of spark's implementation but in the MR frameworks, it
 would have a much larger shuffle write size becasue that node is dealing
 with a lot more data and as a result has a lot  more to shuffle

 2015-04-13 13:20 GMT-04:00 java8964 java8...@hotmail.com:

 If it is really due to data skew, will the task hanging has much bigger 
 Shuffle
 Write Size in this case?

 In this case, the shuffle write size for that task is 0, and the rest IO
 of this task is not much larger than the fast finished tasks, is that
 normal?

 I am also interested in this case, as from statistics on the UI, how it
 indicates the task could have skew data?

 Yong

 --
 Date: Mon, 13 Apr 2015 12:58:12 -0400
 Subject: Re: Equi Join is taking for ever. 1 Task is Running while other
 199 are complete
 From: jcove...@gmail.com
 To: deepuj...@gmail.com
 CC: user@spark.apache.org


 I can promise you that this is also a problem in the pig world :) not
 sure why it's not a problem for this data set, though... are you sure that
 the two are doing the exact same code?

 you should inspect your source data. Make a histogram for each and see
 what the data distribution looks like. If there is a value or bucket with a
 disproportionate set of values you know you have an issue

 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 You mean there is a tuple in either RDD, that has itemID = 0 or null ?
 And what is catch all ?

 That implies is it a good idea to run a filter on each RDD first ? We do
 not do this using Pig on M/R. Is it required in Spark world ?

 On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com
 wrote:

 My guess would be data skew. Do you know if there is some item id that
 is a catch all? can it be null? item id 0? lots of data sets have this sort
 of value and it always kills joins

 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Code:

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.join(viEvents).map {
   case (itemId, (listing, viDetail)) =
 val viSummary = new VISummary
 viSummary.leafCategoryId = listing.getLeafCategId().toInt
 viSummary.itemSiteId = listing.getItemSiteId().toInt
 viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
 viSummary.sellerCountryId = listing.getSlrCntryId().toInt
 viSummary.buyerSegment = 0
 viSummary.isBin = (if
 (listing.getBinPriceLstgCurncy.doubleValue()  0) 1 else 0)
 val sellerId = listing.getSlrId.toLong
 (sellerId, (viDetail, viSummary, itemId))
 }

 Running Tasks:
 Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size
 / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0
 RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13
 06:43:53 1.7 h  13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB
 2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13
 06:43:53 15 min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5
 MB   1 217 0 SUCCESS PROCESS_LOCAL 202 /
 phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min  1.3
 min  2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4 MB   4 220 0
 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13
 06:43:53 15 min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4
 MB



 Command:
 ./bin/spark-submit -v --master yarn-cluster --driver-class-path
 

Re: Map-Side Join in Spark

2015-04-20 Thread ayan guha
In my understanding you need to create a key out of the data and
repartition both datasets to achieve map side join.
On 21 Apr 2015 14:10, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Can someone share their working code of Map Side join in Spark + Scala.
 (No Spark-SQL)

 The only resource i could find was this (Open in chrome with Chinese to
 english translator)

 http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/



 --
 Deepak




Spark and accumulo

2015-04-20 Thread madhvi

Hi all,

Is there anything to integrate spark with accumulo or make spark to 
process over accumulo data?


Thanks
Madhvi Gupta

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Instantiating/starting Spark jobs programmatically

2015-04-20 Thread firemonk9
I have built a data analytics SaaS platform by creating Rest end points and
based on the type of job request I would invoke the necessary spark job/jobs
and return the results as json(async). I used yarn-client mode to submit the
jobs to yarn cluster. 

hope this helps. 

   



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Instantiating-starting-Spark-jobs-programmatically-tp22577p22584.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-20 Thread ๏̯͡๏
After the above changes

1) filter shows this
Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
DurationGC TimeInput Size / RecordsWrite TimeShuffle Write Size / Records
Errors  0 1 0 SUCCESS ANY 1 / phxaishdc9dn1571.stratus.phx.ebay.com 2015/04/20
20:55:31 7.4 min  21 s  129.7 MB (hadoop) / 100  18 s 1106.2 MB /
718687
2) lstgItem.join(viEvents).map [Equi Join] shows this

Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 17 0 RUNNING
PROCESS_LOCAL 8 / phxaishdc9dn0556.phx.ebay.com 2015/04/20 21:02:56 4.3 min
20 s  1097.3 MB / 55906817  0.0 B / 0  0.0 B 0.0 B   1 18 0 SUCCESS
PROCESS_LOCAL 3 / phxaishdc9dn0374.phx.ebay.com 2015/04/20 21:02:56 1.4 min
1 s  251.0 MB / 831341  2 ms 377.8 KB / 226  9.6 GB 173.3 MB   2 19 0
SUCCESS PROCESS_LOCAL 9 / phxaishdc9dn0121.phx.ebay.com 2015/04/20 21:02:56 2.1
min  4 s  250.6 MB / 830896  89 ms 280.4 KB / 168  4.4 GB 267.9 MB   3 20 0
SUCCESS PROCESS_LOCAL 4 / phxaishdc9dn1703.stratus.phx.ebay.com 2015/04/20
21:02:56 1.9 min  1.0 s  250.6 MB / 831180  2 ms 330.3 KB / 198  7.4 GB 285.2
MB   4 21 0 SUCCESS PROCESS_LOCAL 5 /
phxaishdc9dn1350.stratus.phx.ebay.com 2015/04/20
21:02:56 2.1 min  3 s  249.7 MB / 830966  3 ms 303.9 KB / 182  3.7 GB 282.8
MB

Task 0/17 will run for 30 minutes.
I was wondering if increasing the input data size  executors will solve
this problem ?
Will Map Side join help ?


On Tue, Apr 21, 2015 at 9:23 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 It also is a little more evidence to Jonathan's suggestion that there is a
 null / 0 record that is getting grouped together.

 To fix this, do i need to run a filter ?

 val viEventsRaw = details.map { vi = (vi.get(14).asInstanceOf[Long],
 vi) }

 val viEvents = viEventsRaw.filter { case (itemId, viEvent) = itemId
 != 0 }



 On Wed, Apr 15, 2015 at 2:04 AM, Imran Rashid iras...@cloudera.com
 wrote:

 Shuffle write could be a good indication of skew, but it looks like the
 task in question hasn't generated any shuffle write yet, because its still
 working on the shuffle-read side.   So I wouldn't read too much into the
 fact that the shuffle write is 0 for a task that is still running.

 The shuffle read is larger than for the other tasks (3.0GB vs. 2.2 GB, or
 more importantly, 55M records vs 1M records).  So it might not be that the
 raw data volume is much higher on that task, but its getting a ton more
 small records, which will also generate a lot of work.  It also is a little
 more evidence to Jonathan's suggestion that there is a null / 0 record that
 is getting grouped together.


 On Mon, Apr 13, 2015 at 12:40 PM, Jonathan Coveney jcove...@gmail.com
 wrote:

 I'm not 100% sure of spark's implementation but in the MR frameworks, it
 would have a much larger shuffle write size becasue that node is dealing
 with a lot more data and as a result has a lot  more to shuffle

 2015-04-13 13:20 GMT-04:00 java8964 java8...@hotmail.com:

 If it is really due to data skew, will the task hanging has much bigger 
 Shuffle
 Write Size in this case?

 In this case, the shuffle write size for that task is 0, and the rest
 IO of this task is not much larger than the fast finished tasks, is that
 normal?

 I am also interested in this case, as from statistics on the UI, how it
 indicates the task could have skew data?

 Yong

 --
 Date: Mon, 13 Apr 2015 12:58:12 -0400
 Subject: Re: Equi Join is taking for ever. 1 Task is Running while
 other 199 are complete
 From: jcove...@gmail.com
 To: deepuj...@gmail.com
 CC: user@spark.apache.org


 I can promise you that this is also a problem in the pig world :) not
 sure why it's not a problem for this data set, though... are you sure that
 the two are doing the exact same code?

 you should inspect your source data. Make a histogram for each and see
 what the data distribution looks like. If there is a value or bucket with a
 disproportionate set of values you know you have an issue

 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 You mean there is a tuple in either RDD, that has itemID = 0 or null ?
 And what is catch all ?

 That implies is it a good idea to run a filter on each RDD first ? We
 do not do this using Pig on M/R. Is it required in Spark world ?

 On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com
 wrote:

 My guess would be data skew. Do you know if there is some item id that
 is a catch all? can it be null? item id 0? lots of data sets have this sort
 of value and it always kills joins

 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Code:

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.join(viEvents).map {
   case (itemId, (listing, viDetail)) =
 val viSummary = new VISummary
 viSummary.leafCategoryId = 

Map-Side Join in Spark

2015-04-20 Thread ๏̯͡๏
Can someone share their working code of Map Side join in Spark + Scala. (No
Spark-SQL)

The only resource i could find was this (Open in chrome with Chinese to
english translator)

http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/



-- 
Deepak


Re: Map-Side Join in Spark

2015-04-20 Thread Punyashloka Biswal
Could you do it using flatMap?

Punya

On Tue, Apr 21, 2015 at 12:19 AM ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 The reason am asking this is, i am not able to understand how do i do a
 skip.

 1) Broadcast small table-1 as map.
 2) I jun do .map() on large table-2.
When you do .map() you must map each element to a new element.
  However with map-side join, when i get the broadcasted map, i will search
 in it with a key, and if that element in not found in map then i want to
 skip that input all together. (This is what happens when you do .join, it
 skips automatically). With map side join you need to do it. I am assuming
 you do it with mapPartitions  yield.

 A working code will help me understand it better.

 On Tue, Apr 21, 2015 at 9:40 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Can someone share their working code of Map Side join in Spark + Scala.
 (No Spark-SQL)

 The only resource i could find was this (Open in chrome with Chinese to
 english translator)

 http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/



 --
 Deepak




 --
 Deepak




Re: Map-Side Join in Spark

2015-04-20 Thread ๏̯͡๏
I did this


val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg)
}.collectAsMap
val broadCastMap = sc.broadcast(lstgItemMap)

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
Long))] = viEvents.mapPartitions({
  iter =
val lstgItemMap = broadCastMap.value
for {
  (itemId, viDetail) - iter
  if (lstgItemMap.contains(itemId))
} yield ({
  val listing = lstgItemMap.get(itemId).get
  val viSummary = new VISummary
  viSummary.leafCategoryId = listing.getLeafCategId().toInt
  viSummary.itemSiteId = listing.getItemSiteId().toInt
  viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
  viSummary.sellerCountryId = listing.getSlrCntryId().toInt
  viSummary.buyerSegment = 0
  viSummary.isBin = (if
(listing.getBinPriceLstgCurncy.doubleValue()  0) 1 else 0)

  val sellerId = listing.getSlrId.toLong
  (sellerId, (viDetail, viSummary, itemId))
})
})


Earlier :

val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) }
val lstgItem = listings.map { lstg = (lstg.getItemId().toLong, lstg) }

val viEventsWithListings: RDD[(Long, (DetailInputRecord,
VISummary, Long))] = lstgItem.join(viEvents).map {
  case (itemId, (listing, viDetail)) =
val viSummary = new VISummary
viSummary.leafCategoryId = listing.getLeafCategId().toInt
viSummary.itemSiteId = listing.getItemSiteId().toInt
viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
viSummary.sellerCountryId = listing.getSlrCntryId().toInt
viSummary.buyerSegment = 0
viSummary.isBin = (if
(listing.getBinPriceLstgCurncy.doubleValue()  0) 1 else 0)
val sellerId = listing.getSlrId.toLong
(sellerId, (viDetail, viSummary, itemId))
}


Waiting for run to complete.


On Tue, Apr 21, 2015 at 9:54 AM, Punyashloka Biswal punya.bis...@gmail.com
wrote:

 Could you do it using flatMap?

 Punya

 On Tue, Apr 21, 2015 at 12:19 AM ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 The reason am asking this is, i am not able to understand how do i do a
 skip.

 1) Broadcast small table-1 as map.
 2) I jun do .map() on large table-2.
When you do .map() you must map each element to a new element.
  However with map-side join, when i get the broadcasted map, i will
 search in it with a key, and if that element in not found in map then i
 want to skip that input all together. (This is what happens when you do
 .join, it skips automatically). With map side join you need to do it. I am
 assuming you do it with mapPartitions  yield.

 A working code will help me understand it better.

 On Tue, Apr 21, 2015 at 9:40 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Can someone share their working code of Map Side join in Spark + Scala.
 (No Spark-SQL)

 The only resource i could find was this (Open in chrome with Chinese to
 english translator)

 http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/



 --
 Deepak




 --
 Deepak




-- 
Deepak


Re: invalidate caching for hadoopFile input?

2015-04-20 Thread ayan guha
You can use rdd.unpersist. its documented in spark programming guide page
under Removing Data section.

Ayan
On 21 Apr 2015 13:16, Wei Wei vivie...@gmail.com wrote:

 Hey folks,

 I am trying to load a directory of avro files like this in spark-shell:

 val data = sqlContext.avroFile(hdfs://path/to/dir/*).cache
 data.count

 This works fine, but when more files are uploaded to that directory
 running these two lines again yields the same result. I suspect there
 is some metadata caching in HadoopRDD, thus new files are ignored.

 Does anyone know why this is happening? Is there a way to force reload
 the whole directory without restarting spark-shell?

 Thanks.
 W

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




invalidate caching for hadoopFile input?

2015-04-20 Thread Wei Wei
Hey folks,

I am trying to load a directory of avro files like this in spark-shell:

val data = sqlContext.avroFile(hdfs://path/to/dir/*).cache
data.count

This works fine, but when more files are uploaded to that directory
running these two lines again yields the same result. I suspect there
is some metadata caching in HadoopRDD, thus new files are ignored.

Does anyone know why this is happening? Is there a way to force reload
the whole directory without restarting spark-shell?

Thanks.
W

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Map-Side Join in Spark

2015-04-20 Thread ๏̯͡๏
The reason am asking this is, i am not able to understand how do i do a
skip.

1) Broadcast small table-1 as map.
2) I jun do .map() on large table-2.
   When you do .map() you must map each element to a new element.
 However with map-side join, when i get the broadcasted map, i will search
in it with a key, and if that element in not found in map then i want to
skip that input all together. (This is what happens when you do .join, it
skips automatically). With map side join you need to do it. I am assuming
you do it with mapPartitions  yield.

A working code will help me understand it better.

On Tue, Apr 21, 2015 at 9:40 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Can someone share their working code of Map Side join in Spark + Scala.
 (No Spark-SQL)

 The only resource i could find was this (Open in chrome with Chinese to
 english translator)

 http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/



 --
 Deepak




-- 
Deepak


Re: Map-Side Join in Spark

2015-04-20 Thread ๏̯͡๏
What is re-partition ?

On Tue, Apr 21, 2015 at 10:23 AM, ayan guha guha.a...@gmail.com wrote:

 In my understanding you need to create a key out of the data and
 repartition both datasets to achieve map side join.
 On 21 Apr 2015 14:10, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Can someone share their working code of Map Side join in Spark + Scala.
 (No Spark-SQL)

 The only resource i could find was this (Open in chrome with Chinese to
 english translator)

 http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/



 --
 Deepak




-- 
Deepak


Re: Streaming problems running 24x7

2015-04-20 Thread Luis Ángel Vicente Sánchez
You have a window operation; I have seen that behaviour before with window
operations in spark streaming. My solution was to move away from window
operations using probabilistic data structures; it might not be an option
for you.

2015-04-20 10:29 GMT+01:00 Marius Soutier mps@gmail.com:

 The processing speed displayed in the UI doesn’t seem to take everything
 into account. I also had a low processing time but had to increase batch
 duration from 30 seconds to 1 minute because waiting batches kept
 increasing. Now it runs fine.

 On 17.04.2015, at 13:30, González Salgado, Miquel 
 miquel.gonza...@tecsidel.es wrote:

 Hi,

 Thank you for your response,
 I think it is not because of the processing speed, in fact the delay is
 under 1 second, while the batch interval is 10 seconds… The data volume is
 low (10 lines / second)

 Changing to local[8] was worsening the problem (cpu increase more quickly)

 By the way, I have seen some results changing to this call of Kafkautils:

 KafkaUtils.createDirectStream

 CPU usage is low and stable, but memory is slowly increasing… But at least
 the process last longer..

 Best regards,
 Miquel


 *De:* bit1...@163.com [mailto:bit1...@163.com bit1...@163.com]
 *Enviado el:* jueves, 16 de abril de 2015 10:58
 *Para:* González Salgado, Miquel; user
 *Asunto:* Re: Streaming problems running 24x7

 From your description, looks like the data processing speed is far behind
 the data receiving speed

 Could you try to increase the core number when you submit the application?
 such as local[8]?

 --
 bit1...@163.com


 *From:* Miquel miquel.gonza...@tecsidel.es
 *Date:* 2015-04-16 16:39
 *To:* user user@spark.apache.org
 *Subject:* Streaming problems running 24x7
 Hello,
 I'm finding problems to run a spark streaming job for more than a few hours
 (3 or 4). It begins working OK, but it degrades until failure. Some of the
 symptoms:

 - Consumed memory and CPU keeps getting higher ang higher, and finally some
 error is being thrown (java.lang.Exception: Could not compute split, block
 input-0-1429168311800 not found) and data stops being calculated.

 - The delay showed in web UI keeps also increasing.

 - After some hours disk space is being consumed. There are a lot of
 directories with name like
 /tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c

 The job is basically reading information from kafka topic, and calculate
 several topN tables for some key and value camps related with netflow data,
 some of the parameters are this:
 - batch interval: 10 seconds
 - window calculation: 1 minute
 - spark.cleaner.ttl: 5 minutes

 The execution is standalone on one machine (16GB RAM , 12 cores), and the
 options to run it is as follows:
 /opt/spark/bin/spark-submit --driver-java-options -XX:+UseCompressedOops
 --jars $JARS --class $APPCLASS --master local[2] $APPJAR

 someone has some clues about the problem? I don't know if it is a
 configuration problem or some error in the code that is causing memory
 leaks..

 Thank you in advance!
 Miquel

 PD: the code is basically this:--

 object NetflowTopn {

   var appPath = .
   var zkQuorum = 
   var group = 
   var topics = 
   var numThreads = 1

   var batch_interval = 10
   var n_window = 1
   var n_slide = 1
   var topnsize = 10

   var hm = Map[String,Int]()
   hm += ( unix_secs -  0 )
   hm += ( unix_nsecs - 1 )
   hm += ( sysuptime -  2 )
   hm += ( exaddr - 3 )
   hm += ( dpkts -  4 )
   hm += ( doctets -5 )
   hm += ( first -  6 )
   hm += ( last -   7 )
   hm += ( engine_type - 8 )
   hm += ( engine_id -   9 )
   hm += ( srcaddr -10 )
   hm += ( dstaddr -11 )
   hm += ( nexthop -12 )
   hm += ( input -  13 )
   hm += ( output - 14 )
   hm += ( srcport -15 )
   hm += ( dstport -16 )
   hm += ( prot -   17 )
   hm += ( tos -18 )
   hm += ( tcp_flags -  19 )
   hm += ( src_mask -   20 )
   hm += ( dst_mask -   21 )
   hm += ( src_as - 22 )
   hm += ( dst_as - 23 )

   def getKey (lcamps: Array[String], camp: String): String  = {
 if (camp == total) return total
 else return lcamps(hm(camp))
   }

   def getVal (lcamps: Array[String], camp: String): Long  = {
 if (camp == flows) return 1L
 else return lcamps(hm(camp)).toLong
   }

   def getKeyVal (line: String, keycamps: List[String], valcamp: String ) =
 {
 val arr = line.split(,)
 (keycamps.map(getKey(arr, _)).mkString(,)   ,   getVal(arr,valcamp) )
   }

   def writeOutput (data: Array[(Long, String)], keycamps_str: String,
 csvheader: String, valcamp: String, prefix: String) = {

val ts = System.currentTimeMillis
val f1 = appPath + /data/ + prefix + _ + keycamps_str + _
 +
 valcamp + .csv
val f1f = new File(f1);
val ftmpf = new File(f1 + ts);
val pw = new PrintWriter(ftmpf)
pw.println(csvheader)
data.foreach{
 t =  

Re: writing to hdfs on master node much faster

2015-04-20 Thread Sean Owen
What machines are HDFS data nodes -- just your master? that would
explain it. Otherwise, is it actually the write that's slow or is
something else you're doing much faster on the master for other
reasons maybe? like you're actually shipping data via the master first
in some local computation? so the master's executor has the result
much faster?

On Mon, Apr 20, 2015 at 12:21 PM, jamborta jambo...@gmail.com wrote:
 Hi all,

 I have a three node cluster with identical hardware. I am trying a workflow
 where it reads data from hdfs, repartitions it and runs a few map operations
 then writes the results back to hdfs.

 It looks like that all the computation, including the repartitioning and the
 maps complete within similar time intervals on all the nodes, except when it
 writes it back to HDFS when the master node does the job way much faster
 then the slaves (15s for each block as opposed to 1.2 min for the slaves).

 Any suggestion what the reason might be?

 thanks,



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/writing-to-hdfs-on-master-node-much-faster-tp22570.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



When the old data dropped from the cache?

2015-04-20 Thread Tash Chainar
Hi all,

On https://spark.apache.org/docs/latest/programming-guide.html
under the RDD Persistence  Removing Data, it states

Spark automatically monitors cache usage on each node and drops out old
 data partitions in a least-recently-used (LRU) fashion.


 Can it be understood that the cache will be automatically refreshed with
new data. If yes when and how? How Spark determines the old data?

Regards.


Re: writing to hdfs on master node much faster

2015-04-20 Thread Tamas Jambor
Not sure what would slow it down as the repartition completes equally fast
on all nodes, implying that the data is available on all, then there are a
few computation steps none of them local on the master.

On Mon, Apr 20, 2015 at 12:57 PM, Sean Owen so...@cloudera.com wrote:

 What machines are HDFS data nodes -- just your master? that would
 explain it. Otherwise, is it actually the write that's slow or is
 something else you're doing much faster on the master for other
 reasons maybe? like you're actually shipping data via the master first
 in some local computation? so the master's executor has the result
 much faster?

 On Mon, Apr 20, 2015 at 12:21 PM, jamborta jambo...@gmail.com wrote:
  Hi all,
 
  I have a three node cluster with identical hardware. I am trying a
 workflow
  where it reads data from hdfs, repartitions it and runs a few map
 operations
  then writes the results back to hdfs.
 
  It looks like that all the computation, including the repartitioning and
 the
  maps complete within similar time intervals on all the nodes, except
 when it
  writes it back to HDFS when the master node does the job way much faster
  then the slaves (15s for each block as opposed to 1.2 min for the
 slaves).
 
  Any suggestion what the reason might be?
 
  thanks,
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/writing-to-hdfs-on-master-node-much-faster-tp22570.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



RE: writing to hdfs on master node much faster

2015-04-20 Thread Evo Eftimov
Check whether your partitioning results in balanced partitions ie partitions 
with similar sizes - one of the reasons for the performance differences 
observed by you may be that after your explicit repartitioning, the partition 
on your master node is much smaller than the RDD partitions on the other 2 
nodes  

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Monday, April 20, 2015 12:57 PM
To: jamborta
Cc: user@spark.apache.org
Subject: Re: writing to hdfs on master node much faster

What machines are HDFS data nodes -- just your master? that would explain it. 
Otherwise, is it actually the write that's slow or is something else you're 
doing much faster on the master for other reasons maybe? like you're actually 
shipping data via the master first in some local computation? so the master's 
executor has the result much faster?

On Mon, Apr 20, 2015 at 12:21 PM, jamborta jambo...@gmail.com wrote:
 Hi all,

 I have a three node cluster with identical hardware. I am trying a 
 workflow where it reads data from hdfs, repartitions it and runs a few 
 map operations then writes the results back to hdfs.

 It looks like that all the computation, including the repartitioning 
 and the maps complete within similar time intervals on all the nodes, 
 except when it writes it back to HDFS when the master node does the 
 job way much faster then the slaves (15s for each block as opposed to 1.2 min 
 for the slaves).

 Any suggestion what the reason might be?

 thanks,



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/writing-to-hdfs-on
 -master-node-much-faster-tp22570.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
 additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.3.1 - SQL Issues

2015-04-20 Thread ayan guha
Hi
Just upgraded to Spark 1.3.1.

I am getting an warning

Warning (from warnings module):
  File
D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py,
line 191
warnings.warn(inferSchema is deprecated, please use createDataFrame
instead)
UserWarning: inferSchema is deprecated, please use createDataFrame instead

However, documentation still says to use inferSchema.
Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in
section

Also, I am getting an error in mlib.ALS.train function when passing
dataframe (do I need to convert the DF to RDD?)

Code:
training = ssc.sql(select userId,movieId,rating from ratings where
partitionKey  6).cache()
print type(training)
model = ALS.train(training,rank,numIter,lmbda)

Error:
class 'pyspark.sql.dataframe.DataFrame'
Rank:8 Lmbda:1.0 iteration:10

Traceback (most recent call last):
  File D:\Project\Spark\code\movie_sql.py, line 109, in module
bestConf = getBestModel(sc,ssc,training,validation,validationNoRating)
  File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
model = ALS.train(trainingRDD,rank,numIter,lmbda)
  File
D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
line 139, in train
model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank,
iterations,
  File
D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
line 127, in _prepare
assert isinstance(ratings, RDD), ratings should be RDD
AssertionError: ratings should be RDD

-- 
Best Regards,
Ayan Guha


Re: Running spark over HDFS

2015-04-20 Thread SURAJ SHETH
I think the memory requested by your job 2.0 GB is higher than what is
requested.
Please request for 256 MB explicitly which creating Spark Context and try
again.

Thanks and Regards,
Suraj Sheth

On Mon, Apr 20, 2015 at 2:44 PM, madhvi madhvi.gu...@orkash.com wrote:

  PFA screenshot of my cluster UI

 Thanks
 On Monday 20 April 2015 02:27 PM, Akhil Das wrote:

  Are you seeing your task being submitted to the UI? Under completed or
 running tasks? How much resources are you allocating for your job? Can you
 share a screenshot of your cluster UI and the code snippet that you are
 trying to run?

  Thanks
 Best Regards

 On Mon, Apr 20, 2015 at 12:37 PM, madhvi madhvi.gu...@orkash.com wrote:

  Hi,

 I Did the same you told but now it is giving the following error:
 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All
 masters are unresponsive! Giving up.

 On UI it is showing that master is working

 Thanks
 Madhvi

 On Monday 20 April 2015 12:28 PM, Akhil Das wrote:

  In your eclipse, while you create your SparkContext, set the master uri
 as shown in the web UI's top left corner like: spark://someIPorHost:7077
 and it should be fine.

  Thanks
 Best Regards

 On Mon, Apr 20, 2015 at 12:22 PM, madhvi madhvi.gu...@orkash.com wrote:

 Hi All,

 I am new to spark and have installed spark cluster over my system having
 hadoop cluster.I want to process data stored in HDFS through spark.

 When I am running code in eclipse it is giving the following warning
 repeatedly:
 scheduler.TaskSchedulerImpl: Initial job has not accepted any resources;
 check your cluster UI to ensure that workers are registered and have
 sufficient resources.

 I have made changes to spark-env.sh file as below
 export SPARK_WORKER_INSTANCES=1
 export HADOOP_CONF_DIR=/root/Documents/hadoop/etc/hadoop
 export SPARK_WORKER_MEMORY=1g
 export SPARK_WORKER_CORES=2
 export SPARK_EXECUTOR_MEMORY=1g

 I am running the spark standalone cluster.In cluster UI it is showing
 all workers with allocated resources but still its not working.
 what other configurations are needed to be changed?

 Thanks
 Madhvi Gupta

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: Running spark over HDFS

2015-04-20 Thread Archit Thakur
There are lot of similar problems shared and resolved by users on this same
portal. I have been part of those discussions before, Search those, Please
Try them and let us know, if you still face problems.

Thanks and Regards,
Archit Thakur.

On Mon, Apr 20, 2015 at 3:05 PM, madhvi madhvi.gu...@orkash.com wrote:

  On Monday 20 April 2015 02:52 PM, SURAJ SHETH wrote:

   Hi Madhvi,
 I think the memory requested by your job, i.e. 2.0 GB is higher than what
 is available.
 Please request for 256 MB explicitly while creating Spark Context and try
 again.

  Thanks and Regards,
 Suraj Sheth


   Tried the same but still no luck:|

 Madhvi



Re: mapPartitions vs foreachPartition

2015-04-20 Thread Archit Thakur
The same, which is between map and foreach. map takes iterator returns
iterator foreach takes iterator returns Unit.

On Mon, Apr 20, 2015 at 4:05 PM, Arun Patel arunp.bigd...@gmail.com wrote:

 What is difference between mapPartitions vs foreachPartition?

 When to use these?

 Thanks,
 Arun



Re: Running spark over HDFS

2015-04-20 Thread madhvi

On Monday 20 April 2015 03:18 PM, Archit Thakur wrote:
There are lot of similar problems shared and resolved by users on this 
same portal. I have been part of those discussions before, Search 
those, Please Try them and let us know, if you still face problems.


Thanks and Regards,
Archit Thakur.

On Mon, Apr 20, 2015 at 3:05 PM, madhvi madhvi.gu...@orkash.com 
mailto:madhvi.gu...@orkash.com wrote:


On Monday 20 April 2015 02:52 PM, SURAJ SHETH wrote:

Hi Madhvi,
I think the memory requested by your job, i.e. 2.0 GB is higher
than what is available.
Please request for 256 MB explicitly while creating Spark Context
and try again.

Thanks and Regards,
Suraj Sheth



Tried the same but still no luck:|

Madhvi



Hi,

Its still not working.Dont getting where I am mistaken or doing 
wrong.Following are the configurations in my spark-env.sh file:

export HADOOP_CONF_DIR=/root/Documents/hadoop/etc/hadoop
export SPARK_EXECUTOR_MEMORY=256m
export SPARK_DRIVER_MEMORY=256m
I am running the command on the shell:
./bin/spark-submit --class Spark.testSpark.JavaWordCount --master 
yarn-client --num-executors 2 --driver-memory 256m 
--executor-memory 256m --executor-cores 1 lib/Untitled.jar


Madhvi



RE: SparkStreaming onStart not being invoked on CustomReceiver attached to master with multiple workers

2015-04-20 Thread Ankit Patel
The code I've written is simple as it just invokes a thread and calls a store 
method on the Receiver class.
 I see this code with printlns working fine when I try spark-submit --jars 
jar --class test.TestCustomReceiver jar
However it does not work with I try the same command above with --master 
spark://masterURLspark-submit --master spark://masterURL --jars jar --class 
test.TestCustomReceiver jar
I also tried setting the master in the conf that I am created, but that does 
not work either. I do not see the onStart println being printed when I use 
--master option. Please advice.
Also, the master I am attaching to has multiple workers across hosts with many 
threads available to it.
The code is pasted below (Classes: TestReviever, TestCustomReceiver):


package test;import org.apache.spark.storage.StorageLevel;import 
org.apache.spark.streaming.receiver.Receiver;
public class TestReceiver extends ReceiverString {
 public TestReceiver() {   super(StorageLevel.MEMORY_ONLY());   
System.out.println(Ankit: Created TestReceiver); }
 @Override public void onStart() {
System.out.println(Start TestReceiver);new 
TestThread().start(); } public void onStop() {}  
@SuppressWarnings(unused)
   private class TestThread extends Thread{  @Override  
public void run() {   while(true){  
try{ sleep( 
(long) (Math.random() * 3000));  
}catch(Exception e){ 
e.printStackTrace();  } 
 store(Time:  + System.currentTimeMillis());  
 } } }
   }
 
package test;import org.apache.spark.SparkConf;import 
org.apache.spark.api.java.function.Function;import 
org.apache.spark.streaming.Duration;import 
org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import 
org.apache.spark.streaming.api.java.JavaStreamingContext;
public class TestCustomReceiver {   public static void main(String[] args){ 
 SparkConf conf = new SparkConf();  
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));  
TestReceiver receiver = new TestReceiver();  
JavaReceiverInputDStreamString stream = ssc.receiverStream(receiver); 
 stream.map(new Function(){ @Override   
  public Object call(Object arg0) throws Exception {   
System.out.println(Received:  + arg0);   return arg0; 
}  }).foreachRDD(new Function(){
 @Override public Object call(Object arg0) throws 
Exception {   System.out.println(Total Count:  + 
((org.apache.spark.api.java.JavaRDD)arg0).count());   
return arg0; }  });  ssc.start();   
   ssc.awaitTermination();   }
}

Thanks,Ankit

Date: Mon, 20 Apr 2015 12:22:03 +0530
Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver 
attached to master with multiple workers
From: ak...@sigmoidanalytics.com
To: patel7...@hotmail.com
CC: user@spark.apache.org

Would be good, if you can paste your custom receiver code and the code that you 
used to invoke it.ThanksBest Regards

On Mon, Apr 20, 2015 at 9:43 AM, Ankit Patel patel7...@hotmail.com wrote:







I am experiencing problem with SparkStreaming (Spark 1.2.0), the onStart method 
is never called on CustomReceiver when calling spark-submit against a master 
node with multiple workers. However, SparkStreaming works fine with no master 
node set. Anyone notice this issue? 
   

  

Re: history-server does't read logs which are on FS

2015-04-20 Thread Serega Sheypak
Thanks, it helped.
We can't use Spark 1.3 because Cassandra DSE doesn't support it.

2015-04-17 21:48 GMT+02:00 Imran Rashid iras...@cloudera.com:

 are you calling sc.stop() at the end of your applications?

 The history server only displays completed applications, but if you don't
 call sc.stop(), it doesn't know that those applications have been stopped.

 Note that in spark 1.3, the history server can also display running
 applications (including completed applications, but that it thinks are
 still running), which improves things a little bit.

 On Fri, Apr 17, 2015 at 10:13 AM, Serega Sheypak serega.shey...@gmail.com
  wrote:

 Hi, started history-server
 Here is UI output:


- *Event log directory:* file:/var/log/spark/applicationHistory/

 No completed applications found!

 Did you specify the correct logging directory? Please verify your setting
 of spark.history.fs.logDirectory and whether you have the permissions to
 access it.
 It is also possible that your application did not run to completion or
 did not stop the SparkContext.

 Spark 1.2.0

 I goto node where server runs and:

 ls -la /var/log/spark/applicationHistory/

 total 44

 drwxrwxrwx 11 root  root4096 Apr 17 14:50 .

 drwxrwxrwx  3 cassandra root4096 Apr 16 15:31 ..

 drwxrwxrwx  2 vagrant   vagrant 4096 Apr 17 10:06 app-20150417100630-

 drwxrwxrwx  2 vagrant   vagrant 4096 Apr 17 11:01 app-20150417110140-0001

 drwxrwxrwx  2 vagrant   vagrant 4096 Apr 17 11:12 app-20150417111216-0002

 drwxrwxrwx  2 vagrant   vagrant 4096 Apr 17 11:14 app-20150417111441-0003

 drwxrwx---  2 vagrant   vagrant 4096 Apr 17 11:20
 *app-20150417112028-0004*

 drwxrwx---  2 vagrant   vagrant 4096 Apr 17 14:17
 *app-20150417141733-0005*

 drwxrwx---  2 vagrant   vagrant 4096 Apr 17 14:32
 *app-20150417143237-0006*

 drwxrwx---  2 vagrant   vagrant 4096 Apr 17 14:49
 *app-20150417144902-0007*

 drwxrwx---  2 vagrant   vagrant 4096 Apr 17 14:50
 *app-20150417145025-0008*


 So there are logs, but history-server doesn't want to display them.

 I've checked workers, they are pointed to that dir also, I run app, I see
 new log.


 Here is history-server log output:

 vagrant@dsenode01:/usr/lib/spark/logs$ cat
 spark-root-org.apache.spark.deploy.history.HistoryServer-1-dsenode01.out

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath

 Spark Command: java -cp
 ::/usr/lib/spark/sbin/../conf:/usr/lib/spark/lib/spark-assembly-1.2.0-hadoop2.4.0.jar:/usr/lib/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/lib/spark/lib/datanucleus-rdbms-3.2.9.jar:/usr/lib/spark/lib/datanucleus-core-3.2.10.jar
 -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true
 -Dspark.history.fs.logDirectory=/var/log/spark/applicationHistory
 -Dspark.eventLog.enabled=true -Xms512m -Xmx512m
 org.apache.spark.deploy.history.HistoryServer

 


 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties

 15/04/17 09:55:21 INFO HistoryServer: Registered signal handlers for
 [TERM, HUP, INT]

 15/04/17 09:55:21 INFO SecurityManager: Changing view acls to: root

 15/04/17 09:55:21 INFO SecurityManager: Changing modify acls to: root

 15/04/17 09:55:21 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(root); users
 with modify permissions: Set(root)

 15/04/17 09:55:22 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable

 15/04/17 09:55:24 INFO Utils: Successfully started service on port 18080.

 15/04/17 09:55:24 INFO HistoryServer: Started HistoryServer at
 http://dsenode01:18080


 What could be wrong with it?





Re: mapPartitions vs foreachPartition

2015-04-20 Thread Archit Thakur
True.

On Mon, Apr 20, 2015 at 4:14 PM, Arun Patel arunp.bigd...@gmail.com wrote:

 mapPartitions is a transformation and foreachPartition is a an action?

 Thanks
 Arun

 On Mon, Apr 20, 2015 at 4:38 AM, Archit Thakur archit279tha...@gmail.com
 wrote:

 The same, which is between map and foreach. map takes iterator returns
 iterator foreach takes iterator returns Unit.

 On Mon, Apr 20, 2015 at 4:05 PM, Arun Patel arunp.bigd...@gmail.com
 wrote:

 What is difference between mapPartitions vs foreachPartition?

 When to use these?

 Thanks,
 Arun






SparkSQL performance

2015-04-20 Thread Renato Marroquín Mogrovejo
Hi all,

I have a simple query Select * from tableX where attribute1 between 0 and
5 that I run over a Kryo file with four partitions that ends up being
around 3.5 million rows in our case.
If I run this query by doing a simple map().filter() it takes around ~9.6
seconds but when I apply schema, register the table into a SqlContext, and
then run the query, it takes around ~16 seconds. This is using Spark 1.2.1
with Scala 2.10.0
I am wondering why there is such a big gap on performance if it is just a
filter. Internally, the relation files are mapped to a JavaBean. This
different data presentation (JavaBeans vs SparkSQL internal representation)
could lead to such difference? Is there anything I could do to make the
performance get closer to the hard-coded option?
Thanks in advance for any suggestions or ideas.


Renato M.


[pyspark] Starting workers in a virtualenv

2015-04-20 Thread Karlson

Hi all,

I am running the Python process that communicates with Spark in a 
virtualenv. Is there any way I can make sure that the Python processes 
of the workers are also started in a virtualenv? Currently I am getting 
ImportErrors when the worker tries to unpickle stuff that is not 
installed system-wide. For now both the worker and the driver run on the 
same machine in local mode.


Thanks in advance!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running spark over HDFS

2015-04-20 Thread Akhil Das
Are you seeing your task being submitted to the UI? Under completed or
running tasks? How much resources are you allocating for your job? Can you
share a screenshot of your cluster UI and the code snippet that you are
trying to run?

Thanks
Best Regards

On Mon, Apr 20, 2015 at 12:37 PM, madhvi madhvi.gu...@orkash.com wrote:

  Hi,

 I Did the same you told but now it is giving the following error:
 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All
 masters are unresponsive! Giving up.

 On UI it is showing that master is working

 Thanks
 Madhvi

 On Monday 20 April 2015 12:28 PM, Akhil Das wrote:

  In your eclipse, while you create your SparkContext, set the master uri
 as shown in the web UI's top left corner like: spark://someIPorHost:7077
 and it should be fine.

  Thanks
 Best Regards

 On Mon, Apr 20, 2015 at 12:22 PM, madhvi madhvi.gu...@orkash.com wrote:

 Hi All,

 I am new to spark and have installed spark cluster over my system having
 hadoop cluster.I want to process data stored in HDFS through spark.

 When I am running code in eclipse it is giving the following warning
 repeatedly:
 scheduler.TaskSchedulerImpl: Initial job has not accepted any resources;
 check your cluster UI to ensure that workers are registered and have
 sufficient resources.

 I have made changes to spark-env.sh file as below
 export SPARK_WORKER_INSTANCES=1
 export HADOOP_CONF_DIR=/root/Documents/hadoop/etc/hadoop
 export SPARK_WORKER_MEMORY=1g
 export SPARK_WORKER_CORES=2
 export SPARK_EXECUTOR_MEMORY=1g

 I am running the spark standalone cluster.In cluster UI it is showing all
 workers with allocated resources but still its not working.
 what other configurations are needed to be changed?

 Thanks
 Madhvi Gupta

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: NEWBIE/not able to connect to postgresql using jdbc

2015-04-20 Thread Akhil Das
try doing a sc.addJar(path\to\your\postgres\jar)

Thanks
Best Regards

On Mon, Apr 20, 2015 at 12:26 PM, shashanksoni shashankso...@gmail.com
wrote:

 I am using spark 1.3 standalone cluster on my local windows and trying to
 load data from one of our server. Below is my code -

 import os
 os.environ['SPARK_CLASSPATH'] =

 C:\Users\ACERNEW3\Desktop\Spark\spark-1.3.0-bin-hadoop2.4\postgresql-9.2-1002.jdbc3.jar

 from pyspark import SparkContext, SparkConf, SQLContext

 sc = SparkContext(appName = SampleApp)
 sqlContext = SQLContext(sc)
 df = sqlContext.load(source=jdbc,
 url=jdbc:postgresql://54.189.136.218/reporting,
 dbtable=public.course_mast)


 When I run it, it throws error - No suitable driver found for
 jdbc:postgresql. Please help me out.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/NEWBIE-not-able-to-connect-to-postgresql-using-jdbc-tp22569.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




writing to hdfs on master node much faster

2015-04-20 Thread jamborta
Hi all,

I have a three node cluster with identical hardware. I am trying a workflow
where it reads data from hdfs, repartitions it and runs a few map operations
then writes the results back to hdfs.

It looks like that all the computation, including the repartitioning and the
maps complete within similar time intervals on all the nodes, except when it
writes it back to HDFS when the master node does the job way much faster
then the slaves (15s for each block as opposed to 1.2 min for the slaves). 

Any suggestion what the reason might be?

thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/writing-to-hdfs-on-master-node-much-faster-tp22570.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running spark over HDFS

2015-04-20 Thread madhvi

On Monday 20 April 2015 02:52 PM, SURAJ SHETH wrote:

Hi Madhvi,
I think the memory requested by your job, i.e. 2.0 GB is higher than 
what is available.
Please request for 256 MB explicitly while creating Spark Context and 
try again.


Thanks and Regards,
Suraj Sheth



Tried the same but still no luck:|

Madhvi


Re: Spark-1.2.2-bin-hadoop2.4.tgz missing

2015-04-20 Thread Conor Fennell
I looking for that build too.

-Conor

On Mon, Apr 20, 2015 at 9:18 AM, Marius Soutier mps@gmail.com wrote:

 Same problem here...

  On 20.04.2015, at 09:59, Zsolt Tóth toth.zsolt@gmail.com wrote:
 
  Hi all,
 
  it looks like the 1.2.2 pre-built version for hadoop2.4 is not available
 on the mirror sites. Am I missing something?
 
  Regards,
  Zsolt


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Spark-1.2.2-bin-hadoop2.4.tgz missing

2015-04-20 Thread Zsolt Tóth
Hi all,

it looks like the 1.2.2 pre-built version for hadoop2.4 is not available on
the mirror sites. Am I missing something?

Regards,
Zsolt


RE: compliation error

2015-04-20 Thread Brahma Reddy Battula
Any pointers to this issue..?



Thanks  Regards
Brahma Reddy Battula





From: Brahma Reddy Battula [brahmareddy.batt...@huawei.com]
Sent: Monday, April 20, 2015 9:30 AM
To: Sean Owen; Ted Yu
Cc: user@spark.apache.org
Subject: RE: compliation error

Thanks a lot for your replies..

@Ted,V100R001C00 this is our internal hadoop version which is based on hadoop 
2.4.1..

@Sean Owen,Yes, you are correct...Just I wanted to know, what leads this 
problem...


Thanks  Regards
Brahma Reddy Battula

From: Sean Owen [so...@cloudera.com]
Sent: Monday, April 20, 2015 9:14 AM
To: Ted Yu
Cc: Brahma Reddy Battula; user@spark.apache.org
Subject: Re: compliation error

Brahma since you can see the continuous integration builds are
passing, it's got to be something specific to your environment, right?
this is not even an error from Spark, but from Maven plugins.

On Mon, Apr 20, 2015 at 4:42 AM, Ted Yu yuzhih...@gmail.com wrote:
 bq. -Dhadoop.version=V100R001C00

 First time I saw above hadoop version. Doesn't look like Apache release.

 I checked my local maven repo but didn't find impl under
 ~/.m2/repository/com/ibm/icu

 FYI

 On Sun, Apr 19, 2015 at 8:04 PM, Brahma Reddy Battula
 brahmareddy.batt...@huawei.com wrote:

 Hey Todd

 Thanks a lot for your reply...Kindly check following details..

 spark version :1.1.0
 jdk:jdk1.7.0_60 ,
 command:mvn -Pbigtop-dist  -Phive -Pyarn -Phadoop-2.4
 -Dhadoop.version=V100R001C00 -DskipTests package


 Thanks  Regards



 Brahma Reddy Battula


 
 From: Ted Yu [yuzhih...@gmail.com]
 Sent: Monday, April 20, 2015 8:07 AM
 To: Brahma Reddy Battula
 Cc: user@spark.apache.org
 Subject: Re: compliation error

 What JDK release are you using ?

 Can you give the complete command you used ?

 Which Spark branch are you working with ?

 Cheers

 On Sun, Apr 19, 2015 at 7:25 PM, Brahma Reddy Battula
 brahmareddy.batt...@huawei.com wrote:

 Hi All

 Getting following error, when I am compiling spark..What did I miss..?
 Even googled and did not find the exact solution for this...


 [ERROR] Failed to execute goal
 org.apache.maven.plugins:maven-shade-plugin:2.2:shade (default) on project
 spark-assembly_2.10: Error creating shaded jar: Error in ASM processing
 class com/ibm/icu/impl/data/LocaleElements_zh__PINYIN.class: 48188 - [Help
 1]
 org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
 goal org.apache.maven.plugins:maven-shade-plugin:2.2:shade (default) on
 project spark-assembly_2.10: Error creating shaded jar: Error in ASM
 processing class com/ibm/icu/impl/data/LocaleElements_zh__PINYIN.class
 at
 org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:217)
 at
 org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
 at
 org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
 at
 org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:84)
 at
 org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:59)
 at
 org.apache.maven.lifecycle.internal.LifecycleStarter.singleThreadedBuild(LifecycleStarter.java:183)
 at
 org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:161)
 at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:320)



 Thanks  Regards

 Brahma Reddy Battula







-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark-1.2.2-bin-hadoop2.4.tgz missing

2015-04-20 Thread Marius Soutier
Same problem here...

 On 20.04.2015, at 09:59, Zsolt Tóth toth.zsolt@gmail.com wrote:
 
 Hi all,
 
 it looks like the 1.2.2 pre-built version for hadoop2.4 is not available on 
 the mirror sites. Am I missing something?
 
 Regards,
 Zsolt


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: sparksql - HiveConf not found during task deserialization

2015-04-20 Thread Akhil Das
Can you try sc.addJar(/path/to/your/hive/jar), i think it will resolve it.

Thanks
Best Regards

On Mon, Apr 20, 2015 at 12:26 PM, Manku Timma manku.tim...@gmail.com
wrote:

 Akhil,
 But the first case of creating HiveConf on the executor works fine (map
 case). Only the second case fails. I was suspecting some foul play with
 classloaders.

 On 20 April 2015 at 12:20, Akhil Das ak...@sigmoidanalytics.com wrote:

 Looks like a missing jar, try to print the classpath and make sure the
 hive jar is present.

 Thanks
 Best Regards

 On Mon, Apr 20, 2015 at 11:52 AM, Manku Timma manku.tim...@gmail.com
 wrote:

 I am using spark-1.3 with hadoop-provided and hive-provided and
 hive-0.13.1 profiles. I am running a simple spark job on a yarn cluster by
 adding all hadoop2 and hive13 jars to the spark classpaths.

 If I remove the hive-provided while building spark, I dont face any
 issue. But with hive-provided I am getting a
 java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf in
 the yarn executor.

 Code is below:
 import org.apache.spark._
 import org.apache.spark.sql._
 import org.apache.hadoop.hive.conf.HiveConf

 object Simple {
   def main(args: Array[String]) = {
 val sc = new SparkContext(new SparkConf())
 val sqlC = new  org.apache.spark.sql.hive.HiveContext(sc)

 val x = sc.parallelize(1 to 2).map(x =
   { val h = new HiveConf; h.getBoolean(hive.test, false) })
 x.collect.foreach(x = println(s-  $x
 ))

 val result = sqlC.sql(
   select * from products_avro order by month, name, price
   )
 result.collect.foreach(println)
   }
 }

 The first job (involving map) runs fine. HiveConf is instantiated and
 the conf variable is looked up etc. But the second job (involving the
 select * query) throws the class not found exception.

 The task deserializer is the one throwing the exception. It is unable to
 find the class in its classpath. Not sure what is different from the first
 job which also involved HiveConf.

 157573 [task-result-getter-3] 2015/04/20 11:01:48:287 WARN
 TaskSetManager: Lost task 0.2 in stage 2.0 (TID 4, localhost):
 java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf
 at java.lang.Class.getDeclaredFields0(Native Method)
 at java.lang.Class.privateGetDeclaredFields(Class.java:2436)
 at java.lang.Class.getDeclaredField(Class.java:1946)
 at
 java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)
 at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
 at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
 at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.io.ObjectStreamClass.init(ObjectStreamClass.java:468)
 at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
 at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 

Re: Running spark over HDFS

2015-04-20 Thread madhvi

Hi,

I Did the same you told but now it is giving the following error:
ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: 
All masters are unresponsive! Giving up.


On UI it is showing that master is working

Thanks
Madhvi
On Monday 20 April 2015 12:28 PM, Akhil Das wrote:
In your eclipse, while you create your SparkContext, set the master 
uri as shown in the web UI's top left corner like: 
spark://someIPorHost:7077 and it should be fine.


Thanks
Best Regards

On Mon, Apr 20, 2015 at 12:22 PM, madhvi madhvi.gu...@orkash.com 
mailto:madhvi.gu...@orkash.com wrote:


Hi All,

I am new to spark and have installed spark cluster over my system
having hadoop cluster.I want to process data stored in HDFS
through spark.

When I am running code in eclipse it is giving the following
warning repeatedly:
scheduler.TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are
registered and have sufficient resources.

I have made changes to spark-env.sh file as below
export SPARK_WORKER_INSTANCES=1
export HADOOP_CONF_DIR=/root/Documents/hadoop/etc/hadoop
export SPARK_WORKER_MEMORY=1g
export SPARK_WORKER_CORES=2
export SPARK_EXECUTOR_MEMORY=1g

I am running the spark standalone cluster.In cluster UI it is
showing all workers with allocated resources but still its not
working.
what other configurations are needed to be changed?

Thanks
Madhvi Gupta

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org






RE: shuffle.FetchFailedException in spark on YARN job

2015-04-20 Thread Shao, Saisai
I don’t think this problem is related to Netty or NIO, switching to nio will 
not change this part of code path to get the index file for sort-based shuffle 
reader.

I think you could check your system from some aspects:

1. Is there any hardware problem like disk full or others which makes this file 
lost or non-exist, this can introduce such exception.
2. Do you have any other exception besides this one, mainly the shuffle fetch 
failed problem means your job is in abnormal status, some other problems may 
also introduce this error.

Thanks
Jerry

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Monday, April 20, 2015 2:56 PM
To: roy
Cc: user@spark.apache.org
Subject: Re: shuffle.FetchFailedException in spark on YARN job

Which version of Spark are you using? Did you try using 
spark.shuffle.blockTransferService=nio

Thanks
Best Regards

On Sat, Apr 18, 2015 at 11:14 PM, roy rp...@njit.edumailto:rp...@njit.edu 
wrote:
Hi,

 My spark job is failing with following error message

org.apache.spark.shuffle.FetchFailedException:
/mnt/ephemeral12/yarn/nm/usercache/abc/appcache/application_1429353954024_1691/spark-local-20150418132335-0723/28/shuffle_3_1_0.index
(No such file or directory)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:89)
at
org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
at
org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException:
/mnt/ephemeral12/yarn/nm/usercache/abc/appcache/application_1429353954024_1691/spark-local-20150418132335-0723/28/shuffle_3_1_0.index
(No such file or directory)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.init(FileInputStream.java:146)
at
org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchLocalBlocks(ShuffleBlockFetcherIterator.scala:235)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:268)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.init(ShuffleBlockFetcherIterator.scala:115)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:76)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
... 7 more

)


my job 

Re: Addition of new Metrics for killed executors.

2015-04-20 Thread Archit Thakur
Hi Twinkle,

We have a use case in where we want to debug the reason of how n why an
executor got killed.
Could be because of stackoverflow, GC or any other unexpected scenario.
If I see the driver UI there is no information present around killed
executors, So was just curious how do people usually debug those things
apart from scanning logs and understanding it. The metrics we are planning
to add are similar to what we have for non killed executors - [data per
stage specifically] - numFailedTasks, executorRunTime, inputBytes,
memoryBytesSpilled .. etc.

Apart from that we also intend to add all information present in an
executor tabs for running executors.

Thanks,
Archit Thakur.

On Mon, Apr 20, 2015 at 1:31 PM, twinkle sachdeva 
twinkle.sachd...@gmail.com wrote:

 Hi Archit,

 What is your use case and what kind of metrics are you planning to add?

 Thanks,
 Twinkle

 On Fri, Apr 17, 2015 at 4:07 PM, Archit Thakur archit279tha...@gmail.com
 wrote:

 Hi,

 We are planning to add new Metrics in Spark for the executors that got
 killed during the execution. Was just curious, why this info is not already
 present. Is there some reason for not adding it.?
 Any ideas around are welcome.

 Thanks and Regards,
 Archit Thakur.





Re: Streaming problems running 24x7

2015-04-20 Thread Marius Soutier
The processing speed displayed in the UI doesn’t seem to take everything into 
account. I also had a low processing time but had to increase batch duration 
from 30 seconds to 1 minute because waiting batches kept increasing. Now it 
runs fine.

 On 17.04.2015, at 13:30, González Salgado, Miquel 
 miquel.gonza...@tecsidel.es wrote:
 
 Hi,
  
 Thank you for your response, 
 I think it is not because of the processing speed, in fact the delay is under 
 1 second, while the batch interval is 10 seconds… The data volume is low (10 
 lines / second)
  
 Changing to local[8] was worsening the problem (cpu increase more quickly)
  
 By the way, I have seen some results changing to this call of Kafkautils: 
 
 KafkaUtils.createDirectStream
  
 CPU usage is low and stable, but memory is slowly increasing… But at least 
 the process last longer..
  
 Best regards, 
 Miquel
  
  
 De: bit1...@163.com mailto:bit1...@163.com [mailto:bit1...@163.com 
 mailto:bit1...@163.com] 
 Enviado el: jueves, 16 de abril de 2015 10:58
 Para: González Salgado, Miquel; user
 Asunto: Re: Streaming problems running 24x7
  
 From your description, looks like the data processing speed is far behind the 
 data receiving speed
  
 Could you try to increase the core number when you submit the application? 
 such as local[8]?
  
 bit1...@163.com mailto:bit1...@163.com
  
 From: Miquel mailto:miquel.gonza...@tecsidel.es
 Date: 2015-04-16 16:39
 To: user mailto:user@spark.apache.org
 Subject: Streaming problems running 24x7
 Hello,
 I'm finding problems to run a spark streaming job for more than a few hours
 (3 or 4). It begins working OK, but it degrades until failure. Some of the
 symptoms:
  
 - Consumed memory and CPU keeps getting higher ang higher, and finally some
 error is being thrown (java.lang.Exception: Could not compute split, block
 input-0-1429168311800 not found) and data stops being calculated. 
  
 - The delay showed in web UI keeps also increasing.
  
 - After some hours disk space is being consumed. There are a lot of
 directories with name like /tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c
  
 The job is basically reading information from kafka topic, and calculate
 several topN tables for some key and value camps related with netflow data,
 some of the parameters are this:
 - batch interval: 10 seconds
 - window calculation: 1 minute
 - spark.cleaner.ttl: 5 minutes
  
 The execution is standalone on one machine (16GB RAM , 12 cores), and the
 options to run it is as follows:
 /opt/spark/bin/spark-submit --driver-java-options -XX:+UseCompressedOops
 --jars $JARS --class $APPCLASS --master local[2] $APPJAR 
  
 someone has some clues about the problem? I don't know if it is a
 configuration problem or some error in the code that is causing memory
 leaks..
  
 Thank you in advance!
 Miquel
  
 PD: the code is basically this:--
  
 object NetflowTopn {

   var appPath = .
   var zkQuorum = 
   var group = 
   var topics = 
   var numThreads = 1
  
   var batch_interval = 10
   var n_window = 1
   var n_slide = 1
   var topnsize = 10
  
   var hm = Map[String,Int]()
   hm += ( unix_secs -  0 )
   hm += ( unix_nsecs - 1 )
   hm += ( sysuptime -  2 )
   hm += ( exaddr - 3 )
   hm += ( dpkts -  4 )
   hm += ( doctets -5 )
   hm += ( first -  6 )
   hm += ( last -   7 )
   hm += ( engine_type - 8 )
   hm += ( engine_id -   9 )
   hm += ( srcaddr -10 )
   hm += ( dstaddr -11 )
   hm += ( nexthop -12 )
   hm += ( input -  13 )
   hm += ( output - 14 )
   hm += ( srcport -15 )
   hm += ( dstport -16 )
   hm += ( prot -   17 )
   hm += ( tos -18 )
   hm += ( tcp_flags -  19 )
   hm += ( src_mask -   20 )
   hm += ( dst_mask -   21 )
   hm += ( src_as - 22 )
   hm += ( dst_as - 23 )
 
   def getKey (lcamps: Array[String], camp: String): String  = {
 if (camp == total) return total
 else return lcamps(hm(camp))
   }
  
   def getVal (lcamps: Array[String], camp: String): Long  = {
 if (camp == flows) return 1L
 else return lcamps(hm(camp)).toLong
   }
   
   def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = {
 val arr = line.split(,)
 (keycamps.map(getKey(arr, _)).mkString(,)   ,   getVal(arr,valcamp) )
   }
  
   def writeOutput (data: Array[(Long, String)], keycamps_str: String,
 csvheader: String, valcamp: String, prefix: String) = {
   
val ts = System.currentTimeMillis
val f1 = appPath + /data/ + prefix + _ + keycamps_str + _ +
 valcamp + .csv
val f1f = new File(f1);
val ftmpf = new File(f1 + ts);
val pw = new PrintWriter(ftmpf)
pw.println(csvheader)
data.foreach{
 t =  pw.println (t._2 + , + t._1) 
}
pw.close
ftmpf.renameTo(f1f);

   }
  
 
   def main(args: Array[String]) {
 
 if (args.length  1) {
   

Re: Addition of new Metrics for killed executors.

2015-04-20 Thread twinkle sachdeva
Hi Archit,

What is your use case and what kind of metrics are you planning to add?

Thanks,
Twinkle

On Fri, Apr 17, 2015 at 4:07 PM, Archit Thakur archit279tha...@gmail.com
wrote:

 Hi,

 We are planning to add new Metrics in Spark for the executors that got
 killed during the execution. Was just curious, why this info is not already
 present. Is there some reason for not adding it.?
 Any ideas around are welcome.

 Thanks and Regards,
 Archit Thakur.



Re: Running spark over HDFS

2015-04-20 Thread madhvi
No I am not getting any task on the UI which I am running.Also I have 
set instances=1 but on UI it is showing 2 workers.i am running the java 
word count code exactly but i have the text file in HDFS.Following is 
the part of my code I writing to make connection


SparkConf sparkConf = new SparkConf().setAppName(JavaWordCount);
sparkConf.setMaster(spark://192.168.0.119:7077);
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
Configuration conf = new Configuration();
conf.set(fs.default.name, hdfs://192.168.0.119:9000);
FileSystem dfs = FileSystem.get(conf);
JavaRDDString lines = 
ctx.textFile(dfs.getWorkingDirectory()+/spark.txt, 1);


Thanks
On Monday 20 April 2015 02:27 PM, Akhil Das wrote:
Are you seeing your task being submitted to the UI? Under completed or 
running tasks? How much resources are you allocating for your job? Can 
you share a screenshot of your cluster UI and the code snippet that 
you are trying to run?


Thanks
Best Regards

On Mon, Apr 20, 2015 at 12:37 PM, madhvi madhvi.gu...@orkash.com 
mailto:madhvi.gu...@orkash.com wrote:


Hi,

I Did the same you told but now it is giving the following error:
ERROR TaskSchedulerImpl: Exiting due to error from cluster
scheduler: All masters are unresponsive! Giving up.

On UI it is showing that master is working

Thanks
Madhvi

On Monday 20 April 2015 12:28 PM, Akhil Das wrote:

In your eclipse, while you create your SparkContext, set the
master uri as shown in the web UI's top left corner like:
spark://someIPorHost:7077 and it should be fine.

Thanks
Best Regards

On Mon, Apr 20, 2015 at 12:22 PM, madhvi madhvi.gu...@orkash.com
mailto:madhvi.gu...@orkash.com wrote:

Hi All,

I am new to spark and have installed spark cluster over my
system having hadoop cluster.I want to process data stored in
HDFS through spark.

When I am running code in eclipse it is giving the following
warning repeatedly:
scheduler.TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are
registered and have sufficient resources.

I have made changes to spark-env.sh file as below
export SPARK_WORKER_INSTANCES=1
export HADOOP_CONF_DIR=/root/Documents/hadoop/etc/hadoop
export SPARK_WORKER_MEMORY=1g
export SPARK_WORKER_CORES=2
export SPARK_EXECUTOR_MEMORY=1g

I am running the spark standalone cluster.In cluster UI it is
showing all workers with allocated resources but still its
not working.
what other configurations are needed to be changed?

Thanks
Madhvi Gupta

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org









mapPartitions vs foreachPartition

2015-04-20 Thread Arun Patel
What is difference between mapPartitions vs foreachPartition?

When to use these?

Thanks,
Arun


Re: Running spark over HDFS

2015-04-20 Thread SURAJ SHETH
Hi Madhvi,
I think the memory requested by your job, i.e. 2.0 GB is higher than what
is available.
Please request for 256 MB explicitly while creating Spark Context and try
again.

Thanks and Regards,
Suraj Sheth


Order of execution of tasks inside of a stage and computing the number of stages

2015-04-20 Thread Spico Florin
Hello!
I'm newbie in spark I would like to understand some basic mechanism on how
it works behind the scenes.
I have attached the lineage of my RDD and I have the following questions:
1. Why do I have 8 stages instead of 5? From the book Learning from Spark
(Chapter 8 -http://bit.ly/1E0Hah7), I could understand that  RDDs that
exist at the same level of indentation as their
parents will be pipelined [into same physical stage] during physical
execution. Since I have 5 parents, I'm expected to have 5 stages. Still
the Spark UI stages view, shows 8 stages.
Also what represents the (8) represented in the debug string? Is any bug in
this function?
2. At the stage level, what is the execution order among the tasks? They
can be executed all of them in parallel (for example: test4spark.csv
HadoopRDD[0] ||  test4spark.csv MappedRDD[1] || MapPartitionsRDD[4] ||
 ZippedWithIndexRDD[6]) or they are waiting each task upon the other to
complete ( test4spark.csv HadoopRDD[0]=completed= test4spark.csv
MappedRDD[1]=completed=etc)
3. Between stages, the order is given by the execution plan, so each stage
is waiting till the ones before  it will be completed. Is this a correct
assumption?

I look forward for your answers.
Regards,
 Florin


(8) MappedRDD[21] at map at WAChunkSepvgFilterNewModel.scala:298 []
 |  MappedRDD[20] at map at WAChunkSepvgFilterNewModel.scala:182 []
 |  ShuffledRDD[19] at sortByKey at WAChunkSepvgFilterNewModel.scala:182 []
 +-(8) ShuffledRDD[16] at aggregateByKey at
WAChunkSepvgFilterNewModel.scala:182 []
+-(8) FlatMappedRDD[15] at flatMap at
WAChunkSepvgFilterNewModel.scala:174 []
   |  ZippedWithIndexRDD[14] at zipWithIndex at
WAChunkSepvgFilterNewModel.scala:174 []
   |  MappedRDD[13] at map at WAChunkSepvgFilterNewModel.scala:272 []
   |  MappedRDD[12] at map at WAChunkSepvgFilterNewModel.scala:161 []
   |  ShuffledRDD[11] at sortByKey at
WAChunkSepvgFilterNewModel.scala:161 []
   +-(8) ShuffledRDD[8] at aggregateByKey at
WAChunkSepvgFilterNewModel.scala:161 []
  +-(8) FlatMappedRDD[7] at flatMap at
WAChunkSepvgFilterNewModel.scala:153 []
 |  ZippedWithIndexRDD[6] at zipWithIndex at
WAChunkSepvgFilterNewModel.scala:153 []
 |  MappedRDD[5] at map at WAChunkSepvgFilterNewModel.scala:248
[]
 |  MapPartitionsRDD[4] at mapPartitionsWithIndex at
WAChunkSepvgFilterNewModel.scala:114 []
 |  test4spark.csv MappedRDD[1] at textFile at
WAChunkSepvgFilterNewModel.scala:215 []
 |  test4spark.csv HadoopRDD[0] at textFile at
WAChunkSepvgFilterNewModel.scala:215 []

[image: Inline image 1]

Excerpt from the book: The lineage output shown in
Example 8-8 uses indentation levels to show where RDDs are going to be
pipelined
together into physical stages. RDDs that exist at the same level of
indentation as their
parents will be pipelined during physical execution



Re: mapPartitions vs foreachPartition

2015-04-20 Thread Arun Patel
mapPartitions is a transformation and foreachPartition is a an action?

Thanks
Arun

On Mon, Apr 20, 2015 at 4:38 AM, Archit Thakur archit279tha...@gmail.com
wrote:

 The same, which is between map and foreach. map takes iterator returns
 iterator foreach takes iterator returns Unit.

 On Mon, Apr 20, 2015 at 4:05 PM, Arun Patel arunp.bigd...@gmail.com
 wrote:

 What is difference between mapPartitions vs foreachPartition?

 When to use these?

 Thanks,
 Arun





Custom Partitioning Spark

2015-04-20 Thread mas
Hi,

I aim to do custom partitioning on a text file. I first convert it into
pairRDD and then try to use my custom partitioner. However, somehow it is
not working. My code snippet is given below.

val file=sc.textFile(filePath)
val locLines=file.map(line = line.split(\t)).map(line=
((line(2).toDouble,line(3).toDouble),line(5).toLong))
val ck=locLines.partitionBy(new HashPartitioner(50)) // new
CustomPartitioner(50) -- none of the way is working here.

while reading the file using textFile method it automatically partitions
the file. However when i explicitly want to partition the new rdd
locLines, It doesn't appear to do anything and even the number of
partitions are same which is created by sc.textFile().

Any help in this regard will be highly appreciated.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Partitioning-Spark-tp22571.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Understanding the build params for spark with sbt.

2015-04-20 Thread Shiyao Ma
Hi.

My usage is only about the spark core and hdfs, so no spark sql or
mlib or other components invovled.


I saw the hint on the
http://spark.apache.org/docs/latest/building-spark.html, with a sample
like:
build/sbt -Pyarn -Phadoop-2.3 assembly. (what's the -P for?)


Fundamentally, I'd like to let sbt only compile and package the core
and the hadoop.

Meanwhile, it would be appreciated if you could inform me what's the
scala file that controls the logic of -Pyarn, so that I can dig into
the build source and have a finer control.



Thanks.

-- 

吾輩は猫である。ホームーページはhttp://introo.me。

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Issue of running partitioned loading (RDD) in Spark External Datasource on Mesos

2015-04-20 Thread Yang Lei
I implemented two kinds of DataSource, one load data during buildScan, the
other returning my RDD class with partition information for future loading.

My RDD's compute gets actorSystem from  SparkEnv.get.actorSystem, then use
Spray to interact with a HTTP endpoint, which is the same flow as loading
data in buildScan.  All the Spray dependencies are included in a jar and
passes to spark-submit using --jar.

The Job is define in python.

Both scenarios work testing locally using --master local[4]. For mesos, the
not partitioned loading works too, but the partitioned loading hits the
following exception.

Traceback (most recent call last):

  File /root/spark-1.3.1-bin-hadoop2.4/../CloudantApp.py, line 78, in
module

for code in airportData.collect():

  File /root/spark-1.3.1-bin-hadoop2.4/python/pyspark/sql/dataframe.py,
line 293, in collect

port =
self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())

  File
/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 538, in __call__

  File
/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
line 300, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling
o60.javaToPython.

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 36.0 failed 4 times, most recent failure: Lost task 0.3 in stage
36.0 (TID 147, 198.11.207.72): com.typesafe.config.ConfigException$Missing:
No configuration setting found for key 'spray'

at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)

at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)

at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)

at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)

at com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:218)

at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:224)

at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:33)

at spray.can.HttpExt.init(Http.scala:143)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)

at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:526)

at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)

at scala.util.Try$.apply(Try.scala:161)

at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)

at akka.actor.ExtensionKey.createExtension(Extension.scala:153)

at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:711)

at akka.actor.ExtensionId$class.apply(Extension.scala:79)

at akka.actor.ExtensionKey.apply(Extension.scala:149)

at akka.io.IO$.apply(IO.scala:30)

at spray.client.pipelining$.sendReceive(pipelining.scala:35)

at
com.cloudant.spark.common.JsonStoreDataAccess.getQueryResult(JsonStoreDataAccess.scala:118)

at
com.cloudant.spark.common.JsonStoreDataAccess.getIterator(JsonStoreDataAccess.scala:71)

at com.cloudant.spark.common.JsonStoreRDD.compute(JsonStoreRDD.scala:86)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Is this due to some kind of classpath setup issue on the executor for the
external jar for handing RDD?

Thanks in advance for any suggestions on how to resolve this.

Yang


Re: Equal number of RDD Blocks

2015-04-20 Thread Laeeq Ahmed
I also see that its creating both receivers on the same executor and that might 
be the cause of having more RDDs on executor than the other. Can I suggest 
spark to create each receiver on a each executor 
Regards,Laeeq 


 On Monday, April 20, 2015 4:51 PM, Evo Eftimov evo.efti...@isecc.com 
wrote:
   

 #yiv8130515999 #yiv8130515999 -- _filtered #yiv8130515999 
{font-family:Helvetica;panose-1:2 11 6 4 2 2 2 2 2 4;} _filtered #yiv8130515999 
{panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv8130515999 
{font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv8130515999 
{font-family:Tahoma;panose-1:2 11 6 4 3 5 4 4 2 4;}#yiv8130515999 
#yiv8130515999 p.yiv8130515999MsoNormal, #yiv8130515999 
li.yiv8130515999MsoNormal, #yiv8130515999 div.yiv8130515999MsoNormal 
{margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8130515999 a:link, 
#yiv8130515999 span.yiv8130515999MsoHyperlink 
{color:blue;text-decoration:underline;}#yiv8130515999 a:visited, #yiv8130515999 
span.yiv8130515999MsoHyperlinkFollowed 
{color:purple;text-decoration:underline;}#yiv8130515999 
p.yiv8130515999MsoAcetate, #yiv8130515999 li.yiv8130515999MsoAcetate, 
#yiv8130515999 div.yiv8130515999MsoAcetate 
{margin:0in;margin-bottom:.0001pt;font-size:8.0pt;}#yiv8130515999 
p.yiv8130515999msolistparagraph, #yiv8130515999 
li.yiv8130515999msolistparagraph, #yiv8130515999 
div.yiv8130515999msolistparagraph 
{margin-right:0in;margin-left:0in;font-size:12.0pt;}#yiv8130515999 
p.yiv8130515999msonormal, #yiv8130515999 li.yiv8130515999msonormal, 
#yiv8130515999 div.yiv8130515999msonormal 
{margin-right:0in;margin-left:0in;font-size:12.0pt;}#yiv8130515999 
p.yiv8130515999msochpdefault, #yiv8130515999 li.yiv8130515999msochpdefault, 
#yiv8130515999 div.yiv8130515999msochpdefault 
{margin-right:0in;margin-left:0in;font-size:12.0pt;}#yiv8130515999 
span.yiv8130515999msohyperlink {}#yiv8130515999 
span.yiv8130515999msohyperlinkfollowed {}#yiv8130515999 
span.yiv8130515999emailstyle17 {}#yiv8130515999 p.yiv8130515999msonormal1, 
#yiv8130515999 li.yiv8130515999msonormal1, #yiv8130515999 
div.yiv8130515999msonormal1 
{margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8130515999 
span.yiv8130515999msohyperlink1 
{color:blue;text-decoration:underline;}#yiv8130515999 
span.yiv8130515999msohyperlinkfollowed1 
{color:purple;text-decoration:underline;}#yiv8130515999 
p.yiv8130515999msolistparagraph1, #yiv8130515999 
li.yiv8130515999msolistparagraph1, #yiv8130515999 
div.yiv8130515999msolistparagraph1 
{margin-top:0in;margin-right:0in;margin-bottom:0in;margin-left:.5in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8130515999
 span.yiv8130515999emailstyle171 {color:#1F497D;}#yiv8130515999 
p.yiv8130515999msochpdefault1, #yiv8130515999 li.yiv8130515999msochpdefault1, 
#yiv8130515999 div.yiv8130515999msochpdefault1 
{margin-right:0in;margin-left:0in;font-size:10.0pt;}#yiv8130515999 
span.yiv8130515999BalloonTextChar {}#yiv8130515999 
span.yiv8130515999EmailStyle31 {color:#1F497D;}#yiv8130515999 
.yiv8130515999MsoChpDefault {font-size:10.0pt;} _filtered #yiv8130515999 
{margin:1.0in 1.0in 1.0in 1.0in;}#yiv8130515999 div.yiv8130515999WordSection1 
{}#yiv8130515999 And what is the message rate of each topic mate – that was the 
other part of the required clarifications   From: Laeeq Ahmed 
[mailto:laeeqsp...@yahoo.com] 
Sent: Monday, April 20, 2015 3:38 PM
To: Evo Eftimov; user@spark.apache.org
Subject: Re: Equal number of RDD Blocks  Hi,  I have two different topics and 
two Kafka receivers, one for each topic.  Regards,Laeeq      On Monday, April 
20, 2015 4:28 PM, Evo Eftimov evo.efti...@isecc.com wrote:  What is meant by 
“streams” here: 1.   Two different DSTream Receivers producing two 
different DSTreams consuming from two different kafka topics, each with 
different message rate 2.   One kafka topic (hence only one message rate to 
consider) but with two different DStream receivers (ie running in parallel) 
giving a start of two different DSTreams  From: Laeeq Ahmed 
[mailto:laeeqsp...@yahoo.com.INVALID] 
Sent: Monday, April 20, 2015 3:15 PM
To: user@spark.apache.org
Subject: Equal number of RDD Blocks Hi, I have two streams of data from kafka. 
How can I make approx. equal number of RDD blocks of on two executors.Please 
see the attachement, one worker has 1785 RDD blocks and the other has 26.  
Regards,Laeeq    

  

Spark SQL vs map reduce tableInputOutput

2015-04-20 Thread Jeetendra Gangele
HI All,

I am Querying Hbase and combining result and using in my spake job.
I am querying hbase using Hbase client api inside my spark job.
can anybody suggest me will Spark SQl will be fast enough and provide range
of queries?

Regards
Jeetendra


RE: Equal number of RDD Blocks

2015-04-20 Thread Evo Eftimov
What is meant by “streams” here:

 

1.   Two different DSTream Receivers producing two different DSTreams 
consuming from two different kafka topics, each with different message rate 

2.   One kafka topic (hence only one message rate to consider) but with two 
different DStream receivers (ie running in parallel) giving a start of two 
different DSTreams 

 

From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] 
Sent: Monday, April 20, 2015 3:15 PM
To: user@spark.apache.org
Subject: Equal number of RDD Blocks

 

Hi,

 

I have two streams of data from kafka. How can I make approx. equal number of 
RDD blocks of on two executors.

Please see the attachement, one worker has 1785 RDD blocks and the other has 
26. 

 

Regards,

Laeeq

 

 



Fail to read files from s3 after upgrading to 1.3

2015-04-20 Thread Ophir Cohen
Hi,
Today I upgraded our code and cluster to 1.3.
We are using Spark 1.3 in Amazon EMR, ami 3.6, include history server and
Ganglia.

I also migrated all deprecated SchemaRDD into DataFrame.
Now when I'm trying to read a parquet files from s3 I get the below
exception.
Actually it not a problem if my code because I get the same failures using
Spark shell.
Any ideas?

Thanks,
Ophir


15/04/20 13:49:20 WARN internal.S3MetadataResponseHandler: Unable to parse
last modified date: Wed, 04 Mar 2015 16:20:05 GMT
java.lang.IllegalStateException: Joda-time 2.2 or later version is
required, but found version: null
at com.amazonaws.util.DateUtils.handleException(DateUtils.java:147)
at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:195)
at
com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:73)
at
com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
at
com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
at
com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
at
com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:975)
at
com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:702)
at
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:461)
at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:296)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3735)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1026)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1004)
at
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:199)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source)
at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:743)
at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1098)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:768)
at
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:171)
at
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:402)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:278)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at
scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at
scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Invalid format: Wed, 04 Mar
2015 16:20:05 GMT is malformed at GMT
at
org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:747)
at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
... 39 more


Re: Equal number of RDD Blocks

2015-04-20 Thread Laeeq Ahmed
They both have same message rates, 300 record/sec 


 On Monday, April 20, 2015 4:51 PM, Evo Eftimov evo.efti...@isecc.com 
wrote:
   

 #yiv8130515999 #yiv8130515999 -- _filtered #yiv8130515999 
{font-family:Helvetica;panose-1:2 11 6 4 2 2 2 2 2 4;} _filtered #yiv8130515999 
{panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv8130515999 
{font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv8130515999 
{font-family:Tahoma;panose-1:2 11 6 4 3 5 4 4 2 4;}#yiv8130515999 
#yiv8130515999 p.yiv8130515999MsoNormal, #yiv8130515999 
li.yiv8130515999MsoNormal, #yiv8130515999 div.yiv8130515999MsoNormal 
{margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8130515999 a:link, 
#yiv8130515999 span.yiv8130515999MsoHyperlink 
{color:blue;text-decoration:underline;}#yiv8130515999 a:visited, #yiv8130515999 
span.yiv8130515999MsoHyperlinkFollowed 
{color:purple;text-decoration:underline;}#yiv8130515999 
p.yiv8130515999MsoAcetate, #yiv8130515999 li.yiv8130515999MsoAcetate, 
#yiv8130515999 div.yiv8130515999MsoAcetate 
{margin:0in;margin-bottom:.0001pt;font-size:8.0pt;}#yiv8130515999 
p.yiv8130515999msolistparagraph, #yiv8130515999 
li.yiv8130515999msolistparagraph, #yiv8130515999 
div.yiv8130515999msolistparagraph 
{margin-right:0in;margin-left:0in;font-size:12.0pt;}#yiv8130515999 
p.yiv8130515999msonormal, #yiv8130515999 li.yiv8130515999msonormal, 
#yiv8130515999 div.yiv8130515999msonormal 
{margin-right:0in;margin-left:0in;font-size:12.0pt;}#yiv8130515999 
p.yiv8130515999msochpdefault, #yiv8130515999 li.yiv8130515999msochpdefault, 
#yiv8130515999 div.yiv8130515999msochpdefault 
{margin-right:0in;margin-left:0in;font-size:12.0pt;}#yiv8130515999 
span.yiv8130515999msohyperlink {}#yiv8130515999 
span.yiv8130515999msohyperlinkfollowed {}#yiv8130515999 
span.yiv8130515999emailstyle17 {}#yiv8130515999 p.yiv8130515999msonormal1, 
#yiv8130515999 li.yiv8130515999msonormal1, #yiv8130515999 
div.yiv8130515999msonormal1 
{margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8130515999 
span.yiv8130515999msohyperlink1 
{color:blue;text-decoration:underline;}#yiv8130515999 
span.yiv8130515999msohyperlinkfollowed1 
{color:purple;text-decoration:underline;}#yiv8130515999 
p.yiv8130515999msolistparagraph1, #yiv8130515999 
li.yiv8130515999msolistparagraph1, #yiv8130515999 
div.yiv8130515999msolistparagraph1 
{margin-top:0in;margin-right:0in;margin-bottom:0in;margin-left:.5in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8130515999
 span.yiv8130515999emailstyle171 {color:#1F497D;}#yiv8130515999 
p.yiv8130515999msochpdefault1, #yiv8130515999 li.yiv8130515999msochpdefault1, 
#yiv8130515999 div.yiv8130515999msochpdefault1 
{margin-right:0in;margin-left:0in;font-size:10.0pt;}#yiv8130515999 
span.yiv8130515999BalloonTextChar {}#yiv8130515999 
span.yiv8130515999EmailStyle31 {color:#1F497D;}#yiv8130515999 
.yiv8130515999MsoChpDefault {font-size:10.0pt;} _filtered #yiv8130515999 
{margin:1.0in 1.0in 1.0in 1.0in;}#yiv8130515999 div.yiv8130515999WordSection1 
{}#yiv8130515999 And what is the message rate of each topic mate – that was the 
other part of the required clarifications   From: Laeeq Ahmed 
[mailto:laeeqsp...@yahoo.com] 
Sent: Monday, April 20, 2015 3:38 PM
To: Evo Eftimov; user@spark.apache.org
Subject: Re: Equal number of RDD Blocks  Hi,  I have two different topics and 
two Kafka receivers, one for each topic.  Regards,Laeeq      On Monday, April 
20, 2015 4:28 PM, Evo Eftimov evo.efti...@isecc.com wrote:  What is meant by 
“streams” here: 1.   Two different DSTream Receivers producing two 
different DSTreams consuming from two different kafka topics, each with 
different message rate 2.   One kafka topic (hence only one message rate to 
consider) but with two different DStream receivers (ie running in parallel) 
giving a start of two different DSTreams  From: Laeeq Ahmed 
[mailto:laeeqsp...@yahoo.com.INVALID] 
Sent: Monday, April 20, 2015 3:15 PM
To: user@spark.apache.org
Subject: Equal number of RDD Blocks Hi, I have two streams of data from kafka. 
How can I make approx. equal number of RDD blocks of on two executors.Please 
see the attachement, one worker has 1785 RDD blocks and the other has 26.  
Regards,Laeeq    

  

Re: Fail to read files from s3 after upgrading to 1.3

2015-04-20 Thread Ophir Cohen
Interesting:
Remove the history server, '-a' option and using ami 3.5 fixed the problem.
Now the question is: what made the change?...
I vote for the '-a' but let me update...

On Mon, Apr 20, 2015 at 5:43 PM, Ophir Cohen oph...@gmail.com wrote:

 Hi,
 Today I upgraded our code and cluster to 1.3.
 We are using Spark 1.3 in Amazon EMR, ami 3.6, include history server and
 Ganglia.

 I also migrated all deprecated SchemaRDD into DataFrame.
 Now when I'm trying to read a parquet files from s3 I get the below
 exception.
 Actually it not a problem if my code because I get the same failures using
 Spark shell.
 Any ideas?

 Thanks,
 Ophir


 15/04/20 13:49:20 WARN internal.S3MetadataResponseHandler: Unable to parse
 last modified date: Wed, 04 Mar 2015 16:20:05 GMT
 java.lang.IllegalStateException: Joda-time 2.2 or later version is
 required, but found version: null
 at com.amazonaws.util.DateUtils.handleException(DateUtils.java:147)
 at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:195)
 at
 com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:73)
 at
 com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
 at
 com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
 at
 com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
 at
 com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:975)
 at
 com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:702)
 at
 com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:461)
 at
 com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:296)
 at
 com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3735)
 at
 com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1026)
 at
 com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1004)
 at
 com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:199)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
 at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source)
 at
 com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:743)
 at
 com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1098)
 at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:768)
 at
 com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:171)
 at
 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:402)
 at
 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:278)
 at
 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
 at
 scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
 at
 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
 at
 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
 at
 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
 at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
 at
 scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
 at
 scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
 at
 scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
 at
 scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.IllegalArgumentException: Invalid format: Wed, 04
 Mar 2015 16:20:05 GMT is malformed at GMT
 at
 org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:747)
 at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
 ... 39 more




Re: Configuring logging properties for executor

2015-04-20 Thread Lan Jiang
Rename your log4j_special.properties file as log4j.properties and place it 
under the root of your jar file, you should be fine.

If you are using Maven to build your jar, please the log4j.properties in the 
src/main/resources folder.

However, please note that if you have other dependency jar file in the 
classpath that contains another log4j.properties file this way, it might not 
work since the first log4j.properties file that is loaded will be used.

You can also do spark-submit —file log4j_special.properties … ,which should 
transfer your log4j property file to the worker nodes automatically without you 
copying them manually.

Lan


 On Apr 20, 2015, at 9:26 AM, Michael Ryabtsev michael...@gmail.com wrote:
 
 Hi all,
 
 I need to configure spark executor log4j.properties on a standalone cluster. 
 It looks like placing the relevant properties file in the spark
 configuration folder and  setting the spark.executor.extraJavaOptions from
 my application code:
 sparkConf.set(spark.executor.extraJavaOptions,
 -Dlog4j.configuration=log4j_special.properties);
 does the work, and the executor logs are written in the required place and
 level. As far as I understand, it works, because the spark configuration
 folder is on the class path, and passing parameter without path works here.
 However, I would like to avoid deploying these properties to each worker
 spark configuration folder.
 I wonder, if I put the properties in my application jar, is there any way of
 telling executor to load them?
 
 Thanks,
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Configuring logging properties for executor

2015-04-20 Thread Michael Ryabtsev
Hi all,

I need to configure spark executor log4j.properties on a standalone cluster. 
It looks like placing the relevant properties file in the spark
configuration folder and  setting the spark.executor.extraJavaOptions from
my application code:
sparkConf.set(spark.executor.extraJavaOptions,
-Dlog4j.configuration=log4j_special.properties);
does the work, and the executor logs are written in the required place and
level. As far as I understand, it works, because the spark configuration
folder is on the class path, and passing parameter without path works here.
However, I would like to avoid deploying these properties to each worker
spark configuration folder.
I wonder, if I put the properties in my application jar, is there any way of
telling executor to load them?

Thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Equal number of RDD Blocks

2015-04-20 Thread Laeeq Ahmed
Hi,
I have two different topics and two Kafka receivers, one for each topic.
Regards,Laeeq
 


 On Monday, April 20, 2015 4:28 PM, Evo Eftimov evo.efti...@isecc.com 
wrote:
   

 #yiv4992037734 #yiv4992037734 -- _filtered #yiv4992037734 {panose-1:2 4 5 3 5 
4 6 3 2 4;} _filtered #yiv4992037734 {font-family:Calibri;panose-1:2 15 5 2 2 2 
4 3 2 4;} _filtered #yiv4992037734 {font-family:Tahoma;panose-1:2 11 6 4 3 5 4 
4 2 4;}#yiv4992037734 #yiv4992037734 p.yiv4992037734MsoNormal, #yiv4992037734 
li.yiv4992037734MsoNormal, #yiv4992037734 div.yiv4992037734MsoNormal 
{margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv4992037734 a:link, 
#yiv4992037734 span.yiv4992037734MsoHyperlink 
{color:blue;text-decoration:underline;}#yiv4992037734 a:visited, #yiv4992037734 
span.yiv4992037734MsoHyperlinkFollowed 
{color:purple;text-decoration:underline;}#yiv4992037734 
p.yiv4992037734MsoListParagraph, #yiv4992037734 
li.yiv4992037734MsoListParagraph, #yiv4992037734 
div.yiv4992037734MsoListParagraph 
{margin-top:0in;margin-right:0in;margin-bottom:0in;margin-left:.5in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv4992037734
 span.yiv4992037734EmailStyle17 {color:#1F497D;}#yiv4992037734 
.yiv4992037734MsoChpDefault {font-size:10.0pt;} _filtered #yiv4992037734 
{margin:1.0in 1.0in 1.0in 1.0in;}#yiv4992037734 div.yiv4992037734WordSection1 
{}#yiv4992037734 _filtered #yiv4992037734 {} _filtered #yiv4992037734 
{}#yiv4992037734 ol {margin-bottom:0in;}#yiv4992037734 ul 
{margin-bottom:0in;}#yiv4992037734 What is meant by “streams” here:  1.   
Two different DSTream Receivers producing two different DSTreams consuming from 
two different kafka topics, each with different message rate 2.   One kafka 
topic (hence only one message rate to consider) but with two different DStream 
receivers (ie running in parallel) giving a start of two different DSTreams   
From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] 
Sent: Monday, April 20, 2015 3:15 PM
To: user@spark.apache.org
Subject: Equal number of RDD Blocks  Hi,  I have two streams of data from 
kafka. How can I make approx. equal number of RDD blocks of on two 
executors.Please see the attachement, one worker has 1785 RDD blocks and the 
other has 26.   Regards,Laeeq    

  

Equal number of RDD Blocks

2015-04-20 Thread Laeeq Ahmed
Hi,
I have two streams of data from kafka. How can I make approx. equal number of 
RDD blocks of on two executors.Please see the attachement, one worker has 1785 
RDD blocks and the other has 26. 
Regards,Laeeq


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

RE: Equal number of RDD Blocks

2015-04-20 Thread Evo Eftimov
And what is the message rate of each topic mate – that was the other part of 
the required clarifications 

 

From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com] 
Sent: Monday, April 20, 2015 3:38 PM
To: Evo Eftimov; user@spark.apache.org
Subject: Re: Equal number of RDD Blocks

 

Hi,

 

I have two different topics and two Kafka receivers, one for each topic.

 

Regards,

Laeeq

 

 

 

On Monday, April 20, 2015 4:28 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 

What is meant by “streams” here:

 

1.   Two different DSTream Receivers producing two different DSTreams 
consuming from two different kafka topics, each with different message rate 

2.   One kafka topic (hence only one message rate to consider) but with two 
different DStream receivers (ie running in parallel) giving a start of two 
different DSTreams 

 

From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] 
Sent: Monday, April 20, 2015 3:15 PM
To: user@spark.apache.org
Subject: Equal number of RDD Blocks

 

Hi,

 

I have two streams of data from kafka. How can I make approx. equal number of 
RDD blocks of on two executors.

Please see the attachement, one worker has 1785 RDD blocks and the other has 
26. 

 

Regards,

Laeeq

 

 

 



RE: Equal number of RDD Blocks

2015-04-20 Thread Evo Eftimov
Well spark steraming is supposed to create / distribute the Receivers on 
different cluster nodes. If you are saying that actually your receivers are 
running on the same node probably that node is getting most of the data to 
minimize the network transfer costs 

 

If you want to distribute your data more evenly you can partition it explicitly 

 

Also contact Data Bricks why the Receivers are not being distributed on 
different cluster nodes 

 

From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com] 
Sent: Monday, April 20, 2015 3:57 PM
To: Evo Eftimov; user@spark.apache.org
Subject: Re: Equal number of RDD Blocks

 

I also see that its creating both receivers on the same executor and that might 
be the cause of having more RDDs on executor than the other. Can I suggest 
spark to create each receiver on a each executor 

 

Regards,

Laeeq

 

 

On Monday, April 20, 2015 4:51 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 

And what is the message rate of each topic mate – that was the other part of 
the required clarifications 

 

From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com] 
Sent: Monday, April 20, 2015 3:38 PM
To: Evo Eftimov; user@spark.apache.org
Subject: Re: Equal number of RDD Blocks

 

Hi,

 

I have two different topics and two Kafka receivers, one for each topic.

 

Regards,

Laeeq

 

 

 

On Monday, April 20, 2015 4:28 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 

What is meant by “streams” here:

 

1.   Two different DSTream Receivers producing two different DSTreams 
consuming from two different kafka topics, each with different message rate 

2.   One kafka topic (hence only one message rate to consider) but with two 
different DStream receivers (ie running in parallel) giving a start of two 
different DSTreams 

 

From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] 
Sent: Monday, April 20, 2015 3:15 PM
To: user@spark.apache.org
Subject: Equal number of RDD Blocks

 

Hi,

 

I have two streams of data from kafka. How can I make approx. equal number of 
RDD blocks of on two executors.

Please see the attachement, one worker has 1785 RDD blocks and the other has 
26. 

 

Regards,

Laeeq

 

 

 

 



Shuffle files not cleaned up (Spark 1.2.1)

2015-04-20 Thread N B
Hi all,

I had posed this query as part of a different thread but did not get a
response there. So creating a new thread hoping to catch someone's
attention.

We are experiencing this issue of shuffle files being left behind and not
being cleaned up by Spark. Since this is a Spark streaming application, it
is expected to stay up indefinitely, so shuffle files not being cleaned up
is a big problem right now. Our max window size is 6 hours, so we have set
up a cron job to clean up shuffle files older than 12 hours otherwise it
will eat up all our disk space.

Please see the following. It seems the non-cleaning of shuffle files is
being documented in 1.3.1.

https://github.com/apache/spark/pull/5074/files
https://issues.apache.org/jira/browse/SPARK-5836


Also, for some reason, the following JIRAs that were reported as functional
issues were closed as Duplicates of the above Documentation bug. Does this
mean that this issue won't be tackled at all?

https://issues.apache.org/jira/browse/SPARK-3563
https://issues.apache.org/jira/browse/SPARK-4796
https://issues.apache.org/jira/browse/SPARK-6011

Any further insight into whether this is being looked into and meanwhile
how to handle shuffle files will be greatly appreciated.

Thanks
NB


Task not Serializable: Graph is unexpectedly null when DStream is being serialized

2015-04-20 Thread Jean-Pascal Billaud
Hi,

I am getting this serialization exception and I am not too sure what Graph
is unexpectedly null when DStream is being serialized means?

15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception: Task not serializable)
Exception in thread Driver org.apache.spark.SparkException: Task not
serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(
ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
[...]
Caused by: java.io.NotSerializableException: Graph is unexpectedly null
when DStream is being serialized.
at org.apache.spark.streaming.dstream.DStream$anonfun$
writeObject$1.apply$mcV$sp(DStream.scala:420)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
at org.apache.spark.streaming.dstream.DStream.writeObject(
DStream.scala:403)

The operation comes down to something like this:

dstream.map(tuple = {
val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
(tuple._1, (tuple._2, w)) })

And StreamState being a very simple standalone object:

object StreamState {
  def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key:
K) : Option[V] = None
}

However if I remove the context bounds from K in fetch e.g. removing
ClassTag and Ordering then everything is fine.

If anyone has some pointers, I'd really appreciate it.

Thanks,


Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-20 Thread Jeetendra Gangele
Write a crone job for this like below

12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
32 * * * *  find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec
rm -rf {} \+
52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
+1440 -name spark-*-*-* -prune -exec rm -rf {} \+

On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote:

 Hi all,

 I had posed this query as part of a different thread but did not get a
 response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind and not
 being cleaned up by Spark. Since this is a Spark streaming application, it
 is expected to stay up indefinitely, so shuffle files not being cleaned up
 is a big problem right now. Our max window size is 6 hours, so we have set
 up a cron job to clean up shuffle files older than 12 hours otherwise it
 will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files is
 being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK-5836


 Also, for some reason, the following JIRAs that were reported as
 functional issues were closed as Duplicates of the above Documentation bug.
 Does this mean that this issue won't be tackled at all?

 https://issues.apache.org/jira/browse/SPARK-3563
 https://issues.apache.org/jira/browse/SPARK-4796
 https://issues.apache.org/jira/browse/SPARK-6011

 Any further insight into whether this is being looked into and meanwhile
 how to handle shuffle files will be greatly appreciated.

 Thanks
 NB




Re: Did anybody run Spark-perf on powerpc?

2015-04-20 Thread zapstar
This appears to be a problem with SSL.

I'm facing the same issue.. Did you get around this somehow?

I'm running IBM Java 8, on linux ppc64le.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Did-anybody-run-Spark-perf-on-powerpc-tp22329p22575.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Configuring logging properties for executor

2015-04-20 Thread Michael Ryabtsev
Hi Lan,

Thanks for fast response. It could be a solution if it works. I have more
than one log4 properties file, for different run modes like
debug/production, for executor and for application core. I think I would
like to keep them separate. Then, I suppose I should give all other
properties files a special names and keep the executor configuration with
the default name? Can I conclude that going this way I will not be able to
run several applications on the same cluster in parallel?

Regarding submit, I am not using it now, I submit from the code, but I
think I should consider this option.

Thanks.

On Mon, Apr 20, 2015 at 5:59 PM, Lan Jiang ljia...@gmail.com wrote:

 Rename your log4j_special.properties file as log4j.properties and place it
 under the root of your jar file, you should be fine.

 If you are using Maven to build your jar, please the log4j.properties in
 the src/main/resources folder.

 However, please note that if you have other dependency jar file in the
 classpath that contains another log4j.properties file this way, it might
 not work since the first log4j.properties file that is loaded will be used.

 You can also do spark-submit —file log4j_special.properties … ,which
 should transfer your log4j property file to the worker nodes automatically
 without you copying them manually.

 Lan


  On Apr 20, 2015, at 9:26 AM, Michael Ryabtsev michael...@gmail.com
 wrote:
 
  Hi all,
 
  I need to configure spark executor log4j.properties on a standalone
 cluster.
  It looks like placing the relevant properties file in the spark
  configuration folder and  setting the spark.executor.extraJavaOptions
 from
  my application code:
  sparkConf.set(spark.executor.extraJavaOptions,
  -Dlog4j.configuration=log4j_special.properties);
  does the work, and the executor logs are written in the required place
 and
  level. As far as I understand, it works, because the spark configuration
  folder is on the class path, and passing parameter without path works
 here.
  However, I would like to avoid deploying these properties to each worker
  spark configuration folder.
  I wonder, if I put the properties in my application jar, is there any
 way of
  telling executor to load them?
 
  Thanks,
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 




Re: Configuring logging properties for executor

2015-04-20 Thread Lan Jiang
Each application gets its own executor processes,  so there should be no 
problem running them in parallel. 

Lan


 On Apr 20, 2015, at 10:25 AM, Michael Ryabtsev michael...@gmail.com wrote:
 
 Hi Lan, 
 
 Thanks for fast response. It could be a solution if it works. I have more 
 than one log4 properties file, for different run modes like debug/production, 
 for executor and for application core. I think I would like to keep them 
 separate. Then, I suppose I should give all other properties files a special 
 names and keep the executor configuration with the default name? Can I 
 conclude that going this way I will not be able to run several applications 
 on the same cluster in parallel?
 
 Regarding submit, I am not using it now, I submit from the code, but I think 
 I should consider this option.
 
 Thanks.
 
 On Mon, Apr 20, 2015 at 5:59 PM, Lan Jiang ljia...@gmail.com 
 mailto:ljia...@gmail.com wrote:
 Rename your log4j_special.properties file as log4j.properties and place it 
 under the root of your jar file, you should be fine.
 
 If you are using Maven to build your jar, please the log4j.properties in the 
 src/main/resources folder.
 
 However, please note that if you have other dependency jar file in the 
 classpath that contains another log4j.properties file this way, it might not 
 work since the first log4j.properties file that is loaded will be used.
 
 You can also do spark-submit —file log4j_special.properties … ,which should 
 transfer your log4j property file to the worker nodes automatically without 
 you copying them manually.
 
 Lan
 
 
  On Apr 20, 2015, at 9:26 AM, Michael Ryabtsev michael...@gmail.com 
  mailto:michael...@gmail.com wrote:
 
  Hi all,
 
  I need to configure spark executor log4j.properties on a standalone cluster.
  It looks like placing the relevant properties file in the spark
  configuration folder and  setting the spark.executor.extraJavaOptions from
  my application code:
  sparkConf.set(spark.executor.extraJavaOptions,
  -Dlog4j.configuration=log4j_special.properties);
  does the work, and the executor logs are written in the required place and
  level. As far as I understand, it works, because the spark configuration
  folder is on the class path, and passing parameter without path works here.
  However, I would like to avoid deploying these properties to each worker
  spark configuration folder.
  I wonder, if I put the properties in my application jar, is there any way of
  telling executor to load them?
 
  Thanks,
 
 
 
  --
  View this message in context: 
  http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html
   
  http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
  mailto:user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org 
  mailto:user-h...@spark.apache.org
 
 
 



How can i custom the external initialize when start the spark cluster

2015-04-20 Thread ??????????
Hi All,
I had a question about Spark thriftserver .
I want to load some table when the Spark server started .
How can i config the external initialization in spark ,  i guess the spark 
should had a interface can config in the spark-default.conf , and we can 
implements the initialization function to add the business logic  like cache 
table , crate temporary table etc .
Please help to verify correct or not , or if anyother correct way .Thank you 
very much.


Best Regards .
Jacky .

Re: Did anybody run Spark-perf on powerpc?

2015-04-20 Thread zapstar
This appears to be a problem with SSL.

I'm facing the same issue.. Did you get around this somehow?

I'm running IBM Java 8, on linux ppc64le.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Did-anybody-run-Spark-perf-on-powerpc-tp22329p22576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Fail to read files from s3 after upgrading to 1.3

2015-04-20 Thread Ophir Cohen
And the winner is: ami 3.6.
Apparently it does not work with it...
ami 3.5 works great.
Interesting:
Remove the history server, '-a' option and using ami 3.5 fixed the problem.
Now the question is: what made the change?...
I vote for the '-a' but let me update...

On Mon, Apr 20, 2015 at 5:43 PM, Ophir Cohen oph...@gmail.com wrote:

 Hi,
 Today I upgraded our code and cluster to 1.3.
 We are using Spark 1.3 in Amazon EMR, ami 3.6, include history server and
 Ganglia.

 I also migrated all deprecated SchemaRDD into DataFrame.
 Now when I'm trying to read a parquet files from s3 I get the below
 exception.
 Actually it not a problem if my code because I get the same failures using
 Spark shell.
 Any ideas?

 Thanks,
 Ophir


 15/04/20 13:49:20 WARN internal.S3MetadataResponseHandler: Unable to parse
 last modified date: Wed, 04 Mar 2015 16:20:05 GMT
 java.lang.IllegalStateException: Joda-time 2.2 or later version is
 required, but found version: null
 at com.amazonaws.util.DateUtils.handleException(DateUtils.java:147)
 at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:195)
 at
 com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:73)
 at
 com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
 at
 com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
 at
 com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
 at
 com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:975)
 at
 com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:702)
 at
 com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:461)
 at
 com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:296)
 at
 com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3735)
 at
 com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1026)
 at
 com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1004)
 at
 com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:199)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
 at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source)
 at
 com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:743)
 at
 com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1098)
 at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:768)
 at
 com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:171)
 at
 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:402)
 at
 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:278)
 at
 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
 at
 scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
 at
 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
 at
 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
 at
 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
 at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
 at
 scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
 at
 scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
 at
 scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
 at
 scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.IllegalArgumentException: Invalid format: Wed, 04
 Mar 2015 16:20:05 GMT is malformed at GMT
 at
 org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:747)
 at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
 ... 39 more




Re: Spark SQL vs map reduce tableInputOutput

2015-04-20 Thread Jeetendra Gangele
Thanks for reply.

Does phoenix using inside Spark will be useful?

what is the best way to bring data from Hbase into Spark in terms
performance of application?

Regards
Jeetendra

On 20 April 2015 at 20:49, Ted Yu yuzhih...@gmail.com wrote:

 To my knowledge, Spark SQL currently doesn't provide range scan capability
 against hbase.

 Cheers



  On Apr 20, 2015, at 7:54 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:
 
  HI All,
 
  I am Querying Hbase and combining result and using in my spake job.
  I am querying hbase using Hbase client api inside my spark job.
  can anybody suggest me will Spark SQl will be fast enough and provide
 range of queries?
 
  Regards
  Jeetendra
 



Re: Spark SQL vs map reduce tableInputOutput

2015-04-20 Thread Ted Yu
To my knowledge, Spark SQL currently doesn't provide range scan capability 
against hbase. 

Cheers



 On Apr 20, 2015, at 7:54 AM, Jeetendra Gangele gangele...@gmail.com wrote:
 
 HI All,
 
 I am Querying Hbase and combining result and using in my spake job.
 I am querying hbase using Hbase client api inside my spark job.
 can anybody suggest me will Spark SQl will be fast enough and provide range 
 of queries?
 
 Regards
 Jeetendra
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Unsupported types in org.apache.spark.sql.jdbc.JDBCRDD$.getCatalystType

2015-04-20 Thread ARose
So I am trying to pull data from an external database using JDBC

MapString, String options = new HashMap();
options.put(driver, driver);
options.put(url, dburl);
options.put(dbtable, tmpTrunk);

DataFrame tbTrunkInfo = sqlContext.load(jdbc, options);


And the following exception gets thrown: java.sql.SQLException: Unsupported
type -9
at
org.apache.spark.sql.jdbc.JDBCRDD$.getCatalystType(JDBCRDD.scala:76)
at org.apache.spark.sql.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:110)
at org.apache.spark.sql.jdbc.JDBCRelation.init(JDBCRelation.scala:125)
at
org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:114)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:667)

I looked the schema for the table tmpTrunk and the function
org.apache.spark.sql.jdbc.JDBCRDD$.getCatalystType(JDBCRDD.scala:76)

And it appears that the datatype nvarchar is not supported, among several
other commonly used datatypes. Is it possible to request an expansion of
this method to include more java.sql.Types?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unsupported-types-in-org-apache-spark-sql-jdbc-JDBCRDD-getCatalystType-tp22573.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Super slow caching in 1.3?

2015-04-20 Thread Evo Eftimov
Now this is very important:

 

“Normal RDDs” refers to “batch RDDs”. However the default in-memory 
Serialization of RDDs which are part of DSTream is “Srialized” rather than 
actual (hydrated) Objects. The Spark documentation states that “Serialization” 
is required for space and garbage collection efficiency (but creates higher CPU 
load) – which makes sense consider the large number of RDDs which get discarded 
in a streaming app

 

So what does Data Bricks actually recommend as Object Oriented model for RDD 
elements used in Spark Streaming apps – flat or not and can you provide a 
detailed description / spec of both 

 

From: Michael Armbrust [mailto:mich...@databricks.com] 
Sent: Thursday, April 16, 2015 7:23 PM
To: Evo Eftimov
Cc: Christian Perez; user
Subject: Re: Super slow caching in 1.3?

 

Here are the types that we specialize, other types will be much slower.  This 
is only for Spark SQL, normal RDDs do not serialize data that is cached.  I'll 
also not that until yesterday we were missing FloatType

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala#L154

 

Christian, can you provide the schema of the fast and slow datasets?

 

On Thu, Apr 16, 2015 at 10:14 AM, Evo Eftimov evo.efti...@isecc.com wrote:

Michael what exactly do you mean by flattened version/structure here e.g.:

1. An Object with only primitive data types as attributes
2. An Object with  no more than one level of other Objects as attributes
3. An Array/List of primitive types
4. An Array/List of Objects

This question is in general about RDDs not necessarily RDDs in the context of 
SparkSQL

When answering can you also score how bad the performance of each of the above 
options is


-Original Message-
From: Christian Perez [mailto:christ...@svds.com]
Sent: Thursday, April 16, 2015 6:09 PM
To: Michael Armbrust
Cc: user
Subject: Re: Super slow caching in 1.3?

Hi Michael,

Good question! We checked 1.2 and found that it is also slow cacheing the same 
flat parquet file. Caching other file formats of the same data were faster by 
up to a factor of ~2. Note that the parquet file was created in Impala but the 
other formats were written by Spark SQL.

Cheers,

Christian

On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com wrote:
 Do you think you are seeing a regression from 1.2?  Also, are you
 caching nested data or flat rows?  The in-memory caching is not really
 designed for nested data and so performs pretty slowly here (its just
 falling back to kryo and even then there are some locking issues).

 If so, would it be possible to try caching a flattened version?

 CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable

 On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com wrote:

 Hi all,

 Has anyone else noticed very slow time to cache a Parquet file? It
 takes 14 s per 235 MB (1 block) uncompressed node local Parquet file
 on M2 EC2 instances. Or are my expectations way off...

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
 additional commands, e-mail: user-h...@spark.apache.org





--
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



 



Custom paritioning of DSTream

2015-04-20 Thread Evo Eftimov
Is the only way to implement a custom partitioning of DStream via the foreach
approach so to gain access to the actual RDDs comprising the DSTReam and
hence their paritionBy method 

DSTReam has only a repartition method accepting only the number of
partitions, BUT not the method of partitioning 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   >