global variable in spark streaming with no dependency on key

2015-08-18 Thread Joanne Contact
Hi Gurus,

Please help.

But please don't tell me to use updateStateByKey because I need a
global variable (something like the clock time) across the micro
batches but not depending on key. For my case, it is not acceptable to
maintain a state for each key since each key comes in different times.
Yes my global variable is related to time but cannot use machine
clock.

Any hint? Or is this lack of global variable by design?

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Difference btw MEMORY_ONLY and MEMORY_AND_DISK

2015-08-18 Thread Harsha HN
Hello Sparkers,

I would like to understand difference btw these Storage levels for a RDD
portion that doesn't fit in memory.
As it seems like in both storage levels, whatever portion doesnt fit in
memory will be spilled to disk. Any difference as such?

Thanks,
Harsha


Re: Difference btw MEMORY_ONLY and MEMORY_AND_DISK

2015-08-18 Thread Sabarish Sasidharan
MEMORY_ONLY will fail if there is not enough memory but MEMORY_AND_DISK
will spill to disk

Regards
Sab

On Tue, Aug 18, 2015 at 12:45 PM, Harsha HN 99harsha.h@gmail.com
wrote:

 Hello Sparkers,

 I would like to understand difference btw these Storage levels for a RDD
 portion that doesn't fit in memory.
 As it seems like in both storage levels, whatever portion doesnt fit in
 memory will be spilled to disk. Any difference as such?

 Thanks,
 Harsha




-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Regarding rdd.collect()

2015-08-18 Thread ayan guha
I think you are mixing the notion of job from hadoop map reduce world with
spark. In spark, RDDs are immutable and transformations are lazy. So the
first time rdd is actually fills up memory is when you run first
transformation. After that, it stays up in memory until either application
is stopped or new rdd s are generated causing old rdd to get pushed out to
disk.
Remember spark does not provide fault tolerance through replication but
through lineage. So it is important to keep old rdds around in case of any
failure downstream transformations

On Tue, Aug 18, 2015 at 5:46 PM, Dawid Wysakowicz 
wysakowicz.da...@gmail.com wrote:

 No, the data is not stored between two jobs. But it is stored for a
 lifetime of a job. Job can have multiple actions run.
 For a matter of sharing an rdd between jobs you can have a look at Spark
 Job Server(spark-jobserver https://github.com/ooyala/spark-jobserver)
 or some In-Memory storages: Tachyon(http://tachyon-project.org/) or
 Ignite(https://ignite.incubator.apache.org/)

 2015-08-18 9:37 GMT+02:00 Hemant Bhanawat hemant9...@gmail.com:

 It is still in memory for future rdd transformations and actions.

 This is interesting. You mean Spark holds the data in memory between two
 job executions.  How does the second job get the handle of the data in
 memory? I am interested in knowing more about it. Can you forward me a
 spark article or JIRA that talks about it?

 On Tue, Aug 18, 2015 at 12:49 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 It is still in memory for future rdd transformations and actions. What
 you get in driver is a copy of the data.

 Regards
 Sab

 On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote:

 When I do an rdd.collect().. The data moves back to driver  Or is still
 held in memory across the executors?




 --

 Architect - Big Data
 Ph: +91 99805 99458

 Manthan Systems | *Company of the year - Analytics (2014 Frost and
 Sullivan India ICT)*
 +++






-- 
Best Regards,
Ayan Guha


Spark SQL Partition discovery - schema evolution

2015-08-18 Thread Guy Hadash
Hi all,

I'm using Spark SQL using data from Openstack Swift.
I'm trying to load parquet files with partition discovery, but I can't do 
it when the partitions don't match between two objects.
For example, container which contains:
/zone=0/object2
/zone=0/area=0/object1

Won't load, and will result with the error:
java.lang.AssertionError: assertion failed: Conflicting partition column 
names detected:
ArrayBuffer(zone, area)
ArrayBuffer(zone)

I know that there is support for schema evolution for the schemas inside 
the parquet files, but is there a schema evolution for partition discovery 
as well?

Guy Hadash
IBM Research - Haifa
03-7689436

Why there are overlapping for tasks on the EventTimeline UI

2015-08-18 Thread Todd
Hi,
Following is copied from the spark EventTimeline UI. I don't understand why 
there are overlapping between tasks?
I think they should be sequentially one by one in one executor(there are one 
core each executor).

The blue part of each task is the scheduler delay time. Does it mean it is the 
delay that the task is put into the thread pool and the task is picked to run?




Re: Regarding rdd.collect()

2015-08-18 Thread Hemant Bhanawat
On Tue, Aug 18, 2015 at 1:16 PM, Dawid Wysakowicz 
wysakowicz.da...@gmail.com wrote:

 No, the data is not stored between two jobs. But it is stored for a
 lifetime of a job. Job can have multiple actions run.

I too thought so but wanted to confirm. Thanks.


 For a matter of sharing an rdd between jobs you can have a look at Spark
 Job Server(spark-jobserver https://github.com/ooyala/spark-jobserver)
 or some In-Memory storages: Tachyon(http://tachyon-project.org/) or
 Ignite(https://ignite.incubator.apache.org/)

 2015-08-18 9:37 GMT+02:00 Hemant Bhanawat hemant9...@gmail.com:

 It is still in memory for future rdd transformations and actions.

 This is interesting. You mean Spark holds the data in memory between two
 job executions.  How does the second job get the handle of the data in
 memory? I am interested in knowing more about it. Can you forward me a
 spark article or JIRA that talks about it?

 On Tue, Aug 18, 2015 at 12:49 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 It is still in memory for future rdd transformations and actions. What
 you get in driver is a copy of the data.

 Regards
 Sab

 On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote:

 When I do an rdd.collect().. The data moves back to driver  Or is still
 held in memory across the executors?




 --

 Architect - Big Data
 Ph: +91 99805 99458

 Manthan Systems | *Company of the year - Analytics (2014 Frost and
 Sullivan India ICT)*
 +++






Re: global variable in spark streaming with no dependency on key

2015-08-18 Thread Hemant Bhanawat
See if SparkContext.accumulator helps.

On Tue, Aug 18, 2015 at 2:27 PM, Joanne Contact joannenetw...@gmail.com
wrote:

 Hi Gurus,

 Please help.

 But please don't tell me to use updateStateByKey because I need a
 global variable (something like the clock time) across the micro
 batches but not depending on key. For my case, it is not acceptable to
 maintain a state for each key since each key comes in different times.
 Yes my global variable is related to time but cannot use machine
 clock.

 Any hint? Or is this lack of global variable by design?

 Thanks!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Why standalone mode don't allow to set num-executor ?

2015-08-18 Thread canan chen
num-executor only works for yarn mode. In standalone mode, I have to set
the --total-executor-cores and --executor-cores. Isn't this way so
intuitive ? Any reason for that ?


Re:Re: Regarding rdd.collect()

2015-08-18 Thread Todd


One spark application can have many jobs,eg,first call rdd.count then call 
rdd.collect






At 2015-08-18 15:37:14, Hemant Bhanawat hemant9...@gmail.com wrote:

It is still in memory for future rdd transformations and actions.


This is interesting. You mean Spark holds the data in memory between two job 
executions.  How does the second job get the handle of the data in memory? I am 
interested in knowing more about it. Can you forward me a spark article or JIRA 
that talks about it? 


On Tue, Aug 18, 2015 at 12:49 PM, Sabarish Sasidharan 
sabarish.sasidha...@manthan.com wrote:

It is still in memory for future rdd transformations and actions. What you get 
in driver is a copy of the data.


Regards
Sab


On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote:


When I do an rdd.collect().. The data moves back to driver  Or is still held in 
memory across the executors?






--



Architect - Big Data

Ph: +91 99805 99458


Manthan Systems | Company of the year - Analytics (2014 Frost and Sullivan 
India ICT)
+++



Regarding rdd.collect()

2015-08-18 Thread praveen S
When I do an rdd.collect().. The data moves back to driver  Or is still
held in memory across the executors?


Re:Changed Column order in DataFrame.Columns call and insertIntoJDBC

2015-08-18 Thread Todd
Take a look at the doc for the method:

 
  /**
   * Applies a schema to an RDD of Java Beans.
   *
   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
   *  SELECT * queries will return the columns in an undefined order.
   * @group dataframes
   * @since 1.3.0
   */




At 2015-08-18 15:40:54, MooseSpark pandey.mayur...@gmail.com wrote:
I have a RDD which I am using to create the data frame based on one POJO, but
when Dataframe is created, the sequence of column order get changed.

DataFrame df=sqlCtx.createDataFrame(rdd, Pojo.class);

String[] columns=df.columns();
//columns here are of different order what has been defined in pojo
//in pojo properties are p1,p2,p3
//but in columns it is p3 p1 p2 and same is being saved into jdbc 
 
df.insertIntoJDBC(jdbc:sqlserver://xx.yyy.00.11:PORT;databaseName=spark_gpeh;user=saw;password=password@123;,
Test, false);

any idea ?








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Changed-Column-order-in-DataFrame-Columns-call-and-insertIntoJDBC-tp24309.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Regarding rdd.collect()

2015-08-18 Thread Dawid Wysakowicz
No, the data is not stored between two jobs. But it is stored for a
lifetime of a job. Job can have multiple actions run.
For a matter of sharing an rdd between jobs you can have a look at Spark
Job Server(spark-jobserver https://github.com/ooyala/spark-jobserver) or
some In-Memory storages: Tachyon(http://tachyon-project.org/) or Ignite(
https://ignite.incubator.apache.org/)

2015-08-18 9:37 GMT+02:00 Hemant Bhanawat hemant9...@gmail.com:

 It is still in memory for future rdd transformations and actions.

 This is interesting. You mean Spark holds the data in memory between two
 job executions.  How does the second job get the handle of the data in
 memory? I am interested in knowing more about it. Can you forward me a
 spark article or JIRA that talks about it?

 On Tue, Aug 18, 2015 at 12:49 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 It is still in memory for future rdd transformations and actions. What
 you get in driver is a copy of the data.

 Regards
 Sab

 On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote:

 When I do an rdd.collect().. The data moves back to driver  Or is still
 held in memory across the executors?




 --

 Architect - Big Data
 Ph: +91 99805 99458

 Manthan Systems | *Company of the year - Analytics (2014 Frost and
 Sullivan India ICT)*
 +++





Re: Regarding rdd.collect()

2015-08-18 Thread Hemant Bhanawat
It is still in memory for future rdd transformations and actions.

This is interesting. You mean Spark holds the data in memory between two
job executions.  How does the second job get the handle of the data in
memory? I am interested in knowing more about it. Can you forward me a
spark article or JIRA that talks about it?

On Tue, Aug 18, 2015 at 12:49 PM, Sabarish Sasidharan 
sabarish.sasidha...@manthan.com wrote:

 It is still in memory for future rdd transformations and actions. What you
 get in driver is a copy of the data.

 Regards
 Sab

 On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote:

 When I do an rdd.collect().. The data moves back to driver  Or is still
 held in memory across the executors?




 --

 Architect - Big Data
 Ph: +91 99805 99458

 Manthan Systems | *Company of the year - Analytics (2014 Frost and
 Sullivan India ICT)*
 +++



Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation

2015-08-18 Thread Petr Novak
The solution how to share offsetRanges after DirectKafkaInputStream is
transformed is in:
https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java

One thing I would like to understand is why Scala version is using normal
variable while Java version uses AtomicReference.

Another thing which I don't get is about closure serialization. The
question why logger in the below code doesn't throw NPE even its instance
isn't copied like in the case of offsetRanges, when val offsets =
offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws
on offsets(idx). I have something like this code:

object StreamOps {

  val logger = LoggerFactory.getLogger(StreamOps)
  var offsetRanges = Array[OffsetRange]()

def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = {
  stream transform { rdd =
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

rdd flatmap { message =
  Try(... decode Array[Byte] to F...) match {
case Success(fact) = Some(fact)
case _ = None
}
  }
}

// Error handling removed for brevity
def save[F](stream: DStream[F]): Unit {
  stream foreachRDD { rdd =
// It has to be here otherwise NullPointerException
val offsets = offsetRanges

rdd mapartitionWithIndex { (idx, facts) =
  // Use offsets here
  val writer = new MyWriter[F](offsets(idx), ...)

  facts foreach { fact =
writer.write(fact)
  }

  writer.close()

  // Why logger works and doesn't throw NullPointerException?
  logger.info(...)

  Iterator.empty
} foreach {
  (_: Nothing) =
}
  }
}

Many thanks for any advice, I'm sure its a noob question.
Petr

On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak oss.mli...@gmail.com wrote:

 Or can I generally create new RDD from transformation and enrich its
 partitions with some metadata so that I would copy OffsetRanges in my new
 RDD in DStream?

 On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak oss.mli...@gmail.com wrote:

 Hi all,
 I need to transform KafkaRDD into a new stream of deserialized case
 classes. I want to use the new stream to save it to file and to perform
 additional transformations on it.

 To save it I want to use offsets in filenames, hence I need OffsetRanges
 in transformed RDD. But KafkaRDD is private, hence I don't know how to do
 it.

 Alternatively I could deserialize directly in messageHandler before
 KafkaRDD but it seems it is 1:1 transformation while I need to drop bad
 messages (KafkaRDD = RDD it would be flatMap).

 Is there a way how to do it using messageHandler, is there another
 approach?

 Many thanks for any help.
 Petr





Re: Regarding rdd.collect()

2015-08-18 Thread Sabarish Sasidharan
It is still in memory for future rdd transformations and actions. What you
get in driver is a copy of the data.

Regards
Sab

On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote:

 When I do an rdd.collect().. The data moves back to driver  Or is still
 held in memory across the executors?




-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: issue Running Spark Job on Yarn Cluster

2015-08-18 Thread MooseSpark
Please check logs in your hadoop yarn cluster, there you would get precise
error or exception. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21779p24308.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: registering an empty RDD as a temp table in a PySpark SQL context

2015-08-18 Thread Hemant Bhanawat
It is definitely not the case for Spark SQL. A temporary table (much like
dataFrame) is a just a logical plan with a name and it is not iterated
unless a query is fired on it.

I am not sure if using rdd.take in py code to verify the schema is a right
approach as it creates a spark job.

BTW, why would you want to update the Spark code? rdd.take in py code is
the problem. All you want is to avoid the schema verification in the
createDataFrame. I do not see any issue in the spark side in the way it
handles a RDD that has no data.


On Tue, Aug 18, 2015 at 1:23 AM, Eric Walker e...@node.io wrote:

 I have an RDD queried from a scan of a data source.  Sometimes the RDD has
 rows and at other times it has none.  I would like to register this RDD as
 a temporary table in a SQL context.  I suspect this will work in Scala, but
 in PySpark some code assumes that the RDD has rows in it, which are used to
 verify the schema:


 https://github.com/apache/spark/blob/branch-1.3/python/pyspark/sql/context.py#L299

 Before I attempt to extend the Scala code to handle an empty RDD or
 provide an empty DataFrame that can be registered, I was wondering what
 people recommend in this case.  Perhaps there's a simple way of registering
 an empty RDD as a temporary table in a PySpark SQL context that I'm
 overlooking.

 An alternative is to add special case logic in the client code to deal
 with an RDD backed by an empty table scan.  But since the SQL will already
 handle that, I was hoping to avoid special case logic.

 Eric




Changed Column order in DataFrame.Columns call and insertIntoJDBC

2015-08-18 Thread MooseSpark
I have a RDD which I am using to create the data frame based on one POJO, but
when Dataframe is created, the sequence of column order get changed.

DataFrame df=sqlCtx.createDataFrame(rdd, Pojo.class);

String[] columns=df.columns();
//columns here are of different order what has been defined in pojo
//in pojo properties are p1,p2,p3
//but in columns it is p3 p1 p2 and same is being saved into jdbc 
 
df.insertIntoJDBC(jdbc:sqlserver://xx.yyy.00.11:PORT;databaseName=spark_gpeh;user=saw;password=password@123;,
Test, false);

any idea ?








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Changed-Column-order-in-DataFrame-Columns-call-and-insertIntoJDBC-tp24309.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running Spark on user-provided Hadoop installation

2015-08-18 Thread gauravsehgal
Refer: http://spark.apache.org/docs/latest/hadoop-provided.html

Specifically if you want to refer s3a paths. Please edit spark-env.sh and
add following lines at end:
SPARK_DIST_CLASSPATH=$(/path/to/hadoop/hadoop-2.7.1/bin/hadoop classpath)
export
SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:/path/to/hadoop/hadoop-2.7.1/share/hadoop/tools/lib/*



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-on-user-provided-Hadoop-installation-tp24076p24310.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how do I execute a job on a single worker node in standalone mode

2015-08-18 Thread Igor Berman
by default standalone creates 1 executor on every worker machine per
application
number of overall cores is configured with --total-executor-cores
so in general if you'll specify --total-executor-cores=1 then there would
be only 1 core on some executor and you'll get what you want

on the other hand, if you application needs all cores of your cluster and
only some specific job should run on single executor there are few methods
to achieve this
e.g. coallesce(1) or dummyRddWithOnePartitionOnly.foreachPartition


On 18 August 2015 at 01:36, Axel Dahl a...@whisperstream.com wrote:

 I have a 4 node cluster and have been playing around with the
 num-executors parameters, executor-memory and executor-cores

 I set the following:
 --executor-memory=10G
 --num-executors=1
 --executor-cores=8

 But when I run the job, I see that each worker, is running one executor
 which has  2 cores and 2.5G memory.

 What I'd like to do instead is have Spark just allocate the job to a
 single worker node?

 Is that possible in standalone mode or do I need a job/resource scheduler
 like Yarn to do that?

 Thanks in advance,

 -Axel





Re: how to write any data (non RDD) to a file inside closure?

2015-08-18 Thread Robineast
Still not sure what you are trying to achieve. If you could post some code that 
doesn’t work the community can help you understand where the error (syntactic 
or conceptual) is.
 On 17 Aug 2015, at 17:42, dianweih001 [via Apache Spark User List] 
 ml-node+s1001560n24299...@n3.nabble.com wrote:
 
 Hi Robin, 
 
 I know how to write/read file outside of RDDs and executor closure. Just not 
 sure how to write data to file inside  closure because within closure we have 
 to define RDDs which will introduce SparkContext error sometimes. 
 
 Thank you for your reply. 
 
 Dianwei 
 
 If you reply to this email, your message will be added to the discussion 
 below:
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-write-any-data-non-RDD-to-a-file-inside-closure-tp24243p24299.html
  
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-write-any-data-non-RDD-to-a-file-inside-closure-tp24243p24299.html
 To start a new topic under Apache Spark User List, email 
 ml-node+s1001560n1...@n3.nabble.com 
 To unsubscribe from Apache Spark User List, click here 
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=Um9iaW4uZWFzdEB4ZW5zZS5jby51a3wxfDIzMzQzMDUyNg==.
 NAML 
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-write-any-data-non-RDD-to-a-file-inside-closure-tp24243p24315.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Evaluating spark + Cassandra for our use cases

2015-08-18 Thread Benjamin Ross
My company is interested in building a real-time time-series querying solution 
using Spark and Cassandra.  Specifically, we're interested in setting up a 
Spark system against Cassandra running a hive thrift server.  We need to be 
able to perform real-time queries on time-series data - things like, how many 
accounts have spent in total more than $300 on product X in the past 3 months, 
and purchased product Y in the past month.

These queries need to be fast - preferably sub-second but we can deal with a 
few seconds if absolutely necessary.  The data sizes are in the millions of 
records when rolled up to be per-monthly records.  Something on the order of 
100M per customer.

My question is, based on experience, how hard would it be to get Cassandra and 
Spark working together to give us sub-second response times in this use case?  
Note that we'll need to use DataStax enterprise (which is unappealing from a 
cost standpoint) because it's the only thing that provides the hive spark 
thrift server to Cassandra.

The two top contenders for our solution are Spark+Cassandra and Druid.

Neither of these solutions work perfectly out of the box:

-  Druid would need to be modified, possibly hacked, to support the 
queries we require.  I'm also not clear how operationally ready it is.

-  Cassandra and Spark would require paying money for DataStax 
enterprise.  It really feels like it's going to be tricky to configure 
Cassandra and Spark to be lightning fast for our use case.  Finally, window 
functions (which we need - see above) are not supported unless we use a 
pre-release milestone of the datastax spark Cassandra connector.

I was wondering if anyone had any thoughts.  How easy is it to get Spark and 
Cassandra down to sub-second speeds in our use case?

Thanks,
Ben


Re: Spark executor lost because of GC overhead limit exceeded even though using 20 executors using 25GB each

2015-08-18 Thread Ted Yu
Do you mind providing a bit more information ?

release of Spark

code snippet of your app

version of Java

Thanks

On Tue, Aug 18, 2015 at 8:57 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi this GC overhead limit error is making me crazy. I have 20 executors
 using
 25 GB each I dont understand at all how can it throw GC overhead I also
 dont
 that that big datasets. Once this GC error occurs in executor it will get
 lost and slowly other executors getting lost because of IOException, Rpc
 client disassociated, shuffle not found etc Please help me solve this I am
 getting mad as I am new to Spark. Thanks in advance.

 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 363.0 (TID 3373,
 myhost.com): java.lang.OutOfMemoryError: GC overhead limit exceeded
 at
 org.apache.spark.sql.types.UTF8String.toString(UTF8String.scala:150)
 at

 org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:120)
 at
 org.apache.spark.sql.columnar.STRING$.actualSize(ColumnType.scala:312)
 at

 org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.gatherCompressibilityStats(compressionSchemes.scala:224)
 at

 org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.gatherCompressibilityStats(CompressibleColumnBuilder.scala:72)
 at

 org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:80)
 at

 org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87)
 at

 org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:148)
 at

 org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:124)
 at
 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
 at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-executor-lost-because-of-GC-overhead-limit-exceeded-even-though-using-20-executors-using-25GB-h-tp24322.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark + Jupyter (IPython Notebook)

2015-08-18 Thread Jerry Lam
Hi Guru,

Thanks! Great to hear that someone tried it in production. How do you like
it so far?

Best Regards,

Jerry


On Tue, Aug 18, 2015 at 11:38 AM, Guru Medasani gdm...@gmail.com wrote:

 Hi Jerry,

 Yes. I’ve seen customers using this in production for data science work.
 I’m currently using this for one of my projects on a cluster as well.

 Also, here is a blog that describes how to configure this.


 http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/


 Guru Medasani
 gdm...@gmail.com



 On Aug 18, 2015, at 8:35 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi spark users and developers,

 Did anyone have IPython Notebook (Jupyter) deployed in production that
 uses Spark as the computational engine?

 I know Databricks Cloud provides similar features with deeper integration
 with Spark. However, Databricks Cloud has to be hosted by Databricks so we
 cannot do this.

 Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython
 has already offered years ago. It would be great if someone can educate me
 the reason behind this.

 Best Regards,

 Jerry





Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-18 Thread Shushant Arora
But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java generic
inheritance is not supported so derived class cannot return  different
genric typed subclass from overriden method.

On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c...@koeninger.org wrote:

 Option is covariant and KafkaRDD is a subclass of RDD

 On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Is it that in scala its allowed for derived class to have any return type
 ?

  And streaming jar is originally created in scala so its allowed for
 DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
 compute method ?

 On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 looking at source code of
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream

 override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]]
 = {
 val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
 val rdd = KafkaRDD[K, V, U, T, R](
   context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
 messageHandler)

 currentOffsets = untilOffsets.map(kv = kv._1 - kv._2.offset)
 Some(rdd)
   }


 But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,

 So what should  be the return type of custom DStream extends
 DirectKafkaInputDStream .
 Since I want the behaviour to be same as of DirectKafkaInputDStream  in
 normal scenarios and return none in specific scenario.

 And why the same error did not come while extending
 DirectKafkaInputDStream from InputDStream ? Since new return type 
 Option[KafkaRDD[K,
 V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
 failed?




 On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org
 wrote:

 The superclass method in DStream is defined as returning an
 Option[RDD[T]]

 On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Getting compilation error while overriding compute method of
 DirectKafkaInputDStream.


 [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
 compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
 cannot override compute(org.apache.spark.streaming.Time) in
 org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
 return type

 [ERROR] found   :
 scala.Optionorg.apache.spark.streaming.kafka.KafkaRDDbyte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]

 [ERROR] required: scala.Optionorg.apache.spark.rdd.RDDbyte[][]


 class :

 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 @Override
 public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder,
 byte[][] compute(
 Time validTime) {

 int processed=processedCounter.value();
 int failed = failedProcessingsCounter.value();
 if((processed==failed)){
 System.out.println(backing off since its 100 % failure);
 return Option.empty();
 }else{
 System.out.println(starting the stream );

 return super.compute(validTime);
 }
 }
 }


 What should be the return type of compute method ? super class is
 returning OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder,
 byte[][]  but its expecting
  scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived  class . Is
 there something wring with code?

 On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Look at the definitions of the java-specific
 KafkaUtils.createDirectStream methods (the ones that take a
 JavaStreamingContext)

 On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 How to create classtag in java ?Also Constructor
 of DirectKafkaInputDStream takes Function1 not Function but
 kafkautils.createDirectStream allows function.

 I have below as overriden DirectKafkaInputDStream.


 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 public CustomDirectKafkaInputDstream(
 StreamingContext ssc_,
 MapString, String kafkaParams,
 MapTopicAndPartition, Object fromOffsets,
 Function1MessageAndMetadatabyte[], byte[], byte[][]
 messageHandler,
 ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2,
 ClassTagDefaultDecoder evidence$3,
 ClassTagDefaultDecoder evidence$4, ClassTagbyte[][] evidence$5) {
 super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
 evidence$2,
 evidence$3, evidence$4, evidence$5);
 }
 @Override
 public OptionKafkaRDDbyte[], byte[], DefaultDecoder,
 DefaultDecoder, byte[][] compute(
 Time validTime) {
 int processe=processedCounter.value();
 int failed = failedProcessingsCounter.value();
 if((processed==failed)){
 System.out.println(backing off since its 100 % failure);
 return Option.empty();
 }else{
 System.out.println(starting the stream );

 return super.compute(validTime);
 }
 }



 To create this stream
 I am 

Re: Left outer joining big data set with small lookups

2015-08-18 Thread VIJAYAKUMAR JAWAHARLAL
Nope. Count action did not help to choose broadcast join.

All of my tables are hive external tables. So, I tried to trigger compute 
statistics from sqlContext.sql.  It gives me an error saying “nonsuch table”. I 
am not sure that is due to following bug in 1.4.1

https://issues.apache.org/jira/browse/SPARK-8105 
https://issues.apache.org/jira/browse/SPARK-8105

I don’t find a way to enable broadcastHashjoin in my case :(


 On Aug 17, 2015, at 12:52 PM, Silvio Fiorito silvio.fior...@granturing.com 
 wrote:
 
 Try doing a count on both lookups to force the caching to occur before the 
 join.
 
 
 
 
 On 8/17/15, 12:39 PM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote:
 
 Thanks for your help
 
 I tried to cache the lookup tables and left out join with the big table 
 (DF). Join does not seem to be using broadcast join-still it goes with hash 
 partition join and shuffling big table. Here is the scenario
 
 
 …
 table1 as big_df
 left outer join
 table2 as lkup
 on big_df.lkupid = lkup.lkupid
 
 table1 above is well distributed across all 40 partitions because 
 sqlContext.sql(SET spark.sql.shuffle.partitions=40). table2 is small, 
 using just 2 partition.  s. After the join stage, sparkUI showed me that all 
 activities ended up in  just 2 executors. When I tried to dump the data in 
 hdfs after join stage, all data ended up in 2 partition files and rest 38 
 files are 0 sized files.
 
 Since above one did not work, I tried to broadcast DF and registered as 
 table before join. 
 
 val table2_df = sqlContext.sql(select * from table2)
 val broadcast_table2 =sc.broadcast(table2_df)
 broadcast_table2.value.registerTempTable(“table2”)
 
 Broadcast is also having same issue as explained above. All data processed 
 by just executors due to lookup skew.
 
 Any more idea to tackle this issue in Spark Dataframe?
 
 Thanks
 Vijay
 
 
 On Aug 14, 2015, at 10:27 AM, Silvio Fiorito 
 silvio.fior...@granturing.com wrote:
 
 You could cache the lookup DataFrames, it’ll then do a broadcast join.
 
 
 
 
 On 8/14/15, 9:39 AM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote:
 
 Hi
 
 I am facing huge performance problem when I am trying to left outer join 
 very big data set (~140GB) with bunch of small lookups [Start schema 
 type]. I am using data frame  in spark sql. It looks like data is shuffled 
 and skewed when that join happens. Is there any way to improve performance 
 of such type of join in spark? 
 
 How can I hint optimizer to go with replicated join etc., to avoid 
 shuffle? Would it help to create broadcast variables on small lookups?  If 
 I create broadcast variables, how can I convert them into data frame and 
 use them in sparksql type of join?
 
 Thanks
 Vijay
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 



Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-18 Thread Cody Koeninger
Option is covariant and KafkaRDD is a subclass of RDD

On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 Is it that in scala its allowed for derived class to have any return type ?

  And streaming jar is originally created in scala so its allowed for
 DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
 compute method ?

 On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 looking at source code of
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream

 override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] =
 {
 val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
 val rdd = KafkaRDD[K, V, U, T, R](
   context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
 messageHandler)

 currentOffsets = untilOffsets.map(kv = kv._1 - kv._2.offset)
 Some(rdd)
   }


 But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,

 So what should  be the return type of custom DStream extends
 DirectKafkaInputDStream .
 Since I want the behaviour to be same as of DirectKafkaInputDStream  in
 normal scenarios and return none in specific scenario.

 And why the same error did not come while extending
 DirectKafkaInputDStream from InputDStream ? Since new return type 
 Option[KafkaRDD[K,
 V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
 failed?




 On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org
 wrote:

 The superclass method in DStream is defined as returning an
 Option[RDD[T]]

 On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Getting compilation error while overriding compute method of
 DirectKafkaInputDStream.


 [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
 compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
 cannot override compute(org.apache.spark.streaming.Time) in
 org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
 return type

 [ERROR] found   :
 scala.Optionorg.apache.spark.streaming.kafka.KafkaRDDbyte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]

 [ERROR] required: scala.Optionorg.apache.spark.rdd.RDDbyte[][]


 class :

 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 @Override
 public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder,
 byte[][] compute(
 Time validTime) {

 int processed=processedCounter.value();
 int failed = failedProcessingsCounter.value();
 if((processed==failed)){
 System.out.println(backing off since its 100 % failure);
 return Option.empty();
 }else{
 System.out.println(starting the stream );

 return super.compute(validTime);
 }
 }
 }


 What should be the return type of compute method ? super class is
 returning OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder,
 byte[][]  but its expecting
  scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived  class . Is
 there something wring with code?

 On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Look at the definitions of the java-specific
 KafkaUtils.createDirectStream methods (the ones that take a
 JavaStreamingContext)

 On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 How to create classtag in java ?Also Constructor
 of DirectKafkaInputDStream takes Function1 not Function but
 kafkautils.createDirectStream allows function.

 I have below as overriden DirectKafkaInputDStream.


 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 public CustomDirectKafkaInputDstream(
 StreamingContext ssc_,
 MapString, String kafkaParams,
 MapTopicAndPartition, Object fromOffsets,
 Function1MessageAndMetadatabyte[], byte[], byte[][]
 messageHandler,
 ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2,
 ClassTagDefaultDecoder evidence$3,
 ClassTagDefaultDecoder evidence$4, ClassTagbyte[][] evidence$5) {
 super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
 evidence$2,
 evidence$3, evidence$4, evidence$5);
 }
 @Override
 public OptionKafkaRDDbyte[], byte[], DefaultDecoder,
 DefaultDecoder, byte[][] compute(
 Time validTime) {
 int processe=processedCounter.value();
 int failed = failedProcessingsCounter.value();
 if((processed==failed)){
 System.out.println(backing off since its 100 % failure);
 return Option.empty();
 }else{
 System.out.println(starting the stream );

 return super.compute(validTime);
 }
 }



 To create this stream
 I am using
 scala.collection.immutable.MapString, String scalakafkaParams =
 JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.Tuple2String,
 Stringconforms());
 scala.collection.immutable.MapTopicAndPartition, Long
 scalaktopicOffsetMap=
 

Re: Spark + Jupyter (IPython Notebook)

2015-08-18 Thread Prabeesh K.
Refer this post
http://blog.prabeeshk.com/blog/2015/06/19/pyspark-notebook-with-docker/

Spark + Jupyter + Docker

On 18 August 2015 at 21:29, Jerry Lam chiling...@gmail.com wrote:

 Hi Guru,

 Thanks! Great to hear that someone tried it in production. How do you like
 it so far?

 Best Regards,

 Jerry


 On Tue, Aug 18, 2015 at 11:38 AM, Guru Medasani gdm...@gmail.com wrote:

 Hi Jerry,

 Yes. I’ve seen customers using this in production for data science work.
 I’m currently using this for one of my projects on a cluster as well.

 Also, here is a blog that describes how to configure this.


 http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/


 Guru Medasani
 gdm...@gmail.com



 On Aug 18, 2015, at 8:35 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi spark users and developers,

 Did anyone have IPython Notebook (Jupyter) deployed in production that
 uses Spark as the computational engine?

 I know Databricks Cloud provides similar features with deeper integration
 with Spark. However, Databricks Cloud has to be hosted by Databricks so we
 cannot do this.

 Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython
 has already offered years ago. It would be great if someone can educate me
 the reason behind this.

 Best Regards,

 Jerry






Re: What am I missing that's preventing javac from finding the libraries (CLASSPATH is setup...)?

2015-08-18 Thread Ted Yu
Normally people would establish maven project with Spark dependencies or,
use sbt.

Can you go with either approach ?

Cheers

On Tue, Aug 18, 2015 at 10:28 AM, Jerry jerry.c...@gmail.com wrote:

 Hello,

 So I setup Spark to run on my local machine to see if I can reproduce the
 issue I'm having with data frames, but I'm running into issues with the
 compiler.

 Here's what I got:

 $ echo $CLASSPATH

 /usr/lib/jvm/java-6-oracle/lib:/home/adminz/dev/spark/spark-1.4.1/lib/spark-assembly-1.4.1-hadoop2.6.0.jar


 javac Test.java
 Test.java:1: package org.apache.spark.sql.api.java does not exist
 import org.apache.spark.sql.api.java.*;
 ^
 Test.java:6: package org.apache.spark.sql does not exist
 import org.apache.spark.sql.*;
 ^
 Test.java:7: package org.apache.spark.sql.hive does not exist
 import org.apache.spark.sql.hive.*;
 


 Let me know what I'm doing wrong.

 Thanks,
 Jerry



Re: Too many files/dirs in hdfs

2015-08-18 Thread Mohit Anchlia
Is there a way to store all the results in one file and keep the file roll
over separate than the spark streaming batch interval?

On Mon, Aug 17, 2015 at 2:39 AM, UMESH CHAUDHARY umesh9...@gmail.com
wrote:

 In Spark Streaming you can simply check whether your RDD contains any
 records or not and if records are there you can save them using
 FIleOutputStream:

 DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE YOUR
 STUFF} };

 This will not create unnecessary files of 0 bytes.

 On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Currently, spark streaming would create a new directory for every batch
 and store the data to it (whether it has anything or not). There is no
 direct append call as of now, but you can achieve this either with
 FileUtil.copyMerge
 http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167
 or have a separate program which will do the clean up for you.

 Thanks
 Best Regards

 On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 Spark stream seems to be creating 0 bytes files even when there is no
 data. Also, I have 2 concerns here:

 1) Extra unnecessary files is being created from the output
 2) Hadoop doesn't work really well with too many files and I see that it
 is creating a directory with a timestamp every 1 second. Is there a better
 way of writing a file, may be use some kind of append mechanism where one
 doesn't have to change the batch interval.






COMPUTE STATS on hive table - NoSuchTableException

2015-08-18 Thread VIJAYAKUMAR JAWAHARLAL
Hi

I am trying to compute stats on a lookup table from spark which resides in 
hive. I am invoking spark API as follows. It gives me NoSuchTableException. 
Table is double verified and subsequent statement “sqlContext.sql(“select * 
from cpatext.lkup”)” picks up the table correctly. I am wondering whether it is 
related to https://issues.apache.org/jira/browse/SPARK-8105 
https://issues.apache.org/jira/browse/SPARK-8105. I am using Spark 1.4.1 
Please let me know.

scala sqlContext.sql(ANALYZE TABLE cpatext.lkup COMPUTE STATISTICS NOSCAN)
2015-08-18 18:12:19,299 INFO  [main] parse.ParseDriver 
(ParseDriver.java:parse(185)) - Parsing command: ANALYZE TABLE cpatext.lkup 
COMPUTE STATISTICS NOSCAN
2015-08-18 18:12:19,299 INFO  [main] parse.ParseDriver 
(ParseDriver.java:parse(206)) - Parse Completed
org.apache.spark.sql.catalyst.analysis.NoSuchTableException
at 
org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:112)
at 
org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:112)
at scala.Option.getOrElse(Option.scala:120)
at 
org.apache.spark.sql.hive.client.ClientInterface$class.getTable(ClientInterface.scala:112)
at 
org.apache.spark.sql.hive.client.ClientWrapper.getTable(ClientWrapper.scala:60)
at 
org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:227)
at 
org.apache.spark.sql.hive.HiveContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:371)
at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:165)
at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:165)
at scala.Option.getOrElse(Option.scala:120)
at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:165)
at 
org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:371)
at org.apache.spark.sql.hive.HiveContext.analyze(HiveContext.scala:293)
at 
org.apache.spark.sql.hive.execution.AnalyzeTable.run(commands.scala:43)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)

Thanks
Vijay

Re: Spark Job Hangs on our production cluster

2015-08-18 Thread Imran Rashid
just looking at the thread dump from your original email, the 3 executor
threads are all trying to load classes.  (One thread is actually loading
some class, and the others are blocked waiting to load a class, most likely
trying to load the same thing.)  That is really weird, definitely not
something which should keep things blocked for 30 min.  It suggest
something wrong w/ the jvm, or classpath configuration, or a combination.
Looks like you are trying to run in the repl, and for whatever reason the
http server for the repl to serve classes is not responsive.  I'd try
running outside of the repl and see if that works.

sorry not a full diagnosis but maybe this'll help a bit.

On Tue, Aug 11, 2015 at 3:19 PM, java8964 java8...@hotmail.com wrote:

 Currently we have a IBM BigInsight cluster with 1 namenode + 1 JobTracker
 + 42 data/task nodes, which runs with BigInsight V3.0.0.2, corresponding
 with Hadoop 2.2.0 with MR1.

 Since IBM BigInsight doesn't come with Spark, so we build Spark 1.2.2 with
 Hadoop 2.2.0 + Hive 0.12 by ourselves, and deploy it on the same cluster.

 The IBM Biginsight comes with IBM jdk 1.7, but during our experience on
 stage environment, we found out Spark works better with Oracle JVM. So we
 run spark under Oracle JDK 1.7.0_79.

 Now on production, we are facing a issue we never faced, nor can reproduce
 on our staging cluster.

 We are using Spark Standalone cluster, and here is the basic
 configurations:

 more spark-env.sh
 export JAVA_HOME=/opt/java
 export PATH=$JAVA_HOME/bin:$PATH
 export HADOOP_CONF_DIR=/opt/ibm/biginsights/hadoop-conf/
 export
 SPARK_CLASSPATH=/opt/ibm/biginsights/IHC/lib/ibm-compression.jar:/opt/ibm/biginsights/hive/lib
 /db2jcc4-10.6.jar
 export
 SPARK_LOCAL_DIRS=/data1/spark/local,/data2/spark/local,/data3/spark/local
 export SPARK_MASTER_WEBUI_PORT=8081
 export SPARK_MASTER_IP=host1
 export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=42
 export SPARK_WORKER_MEMORY=24g
 export SPARK_WORKER_CORES=6
 export SPARK_WORKER_DIR=/tmp/spark/work
 export SPARK_DRIVER_MEMORY=2g
 export SPARK_EXECUTOR_MEMORY=2g

 more spark-defaults.conf
 spark.master spark://host1:7077
 spark.eventLog.enabled true
 spark.eventLog.dir hdfs://host1:9000/spark/eventLog
 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails
 -XX:+PrintGCTimeStamps

 We are using AVRO file format a lot, and we have these 2 datasets, one is
 about 96G, and the other one is a little over 1T. Since we are using AVRO,
 so we also built spark-avro of commit 
 a788c9fce51b0ec1bb4ce88dc65c1d55aaa675b8
 https://github.com/databricks/spark-avro/tree/a788c9fce51b0ec1bb4ce88dc65c1d55aaa675b8,
 which is the latest version supporting Spark 1.2.x.

 Here is the problem we are facing on our production cluster, even the
 following simple spark-shell commands will hang in our production cluster:

 import org.apache.spark.sql.SQLContext
 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
 import com.databricks.spark.avro._
 val bigData = sqlContext.avroFile(hdfs://namenode:9000/bigData/)
 bigData.registerTempTable(bigData)
 bigData.count()

 From the console,  we saw following:
 [Stage 0:
 (44 + 42) / 7800]

 no update for more than 30 minutes and longer.

 The big dataset with 1T should generate 7800 HDFS block, but Spark's HDFS
 client looks like having problem to read them. Since we are running Spark
 on the data nodes, all the Spark tasks are running as NODE_LOCAL on
 locality level.

 If I go to the data/task node which Spark tasks hang, and use the JStack
 to dump the thread, I got the following on the top:

 015-08-11 15:38:38
 Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.79-b02 mixed mode):

 Attach Listener daemon prio=10 tid=0x7f0660589000 nid=0x1584d
 waiting on condition [0x]
java.lang.Thread.State: RUNNABLE

 org.apache.hadoop.hdfs.PeerCache@4a88ec00 daemon prio=10
 tid=0x7f06508b7800 nid=0x13302 waiting on condition [0x7f060be94000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
 at java.lang.Thread.sleep(Native Method)
 at org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:252)
 at org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:39)
 at org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:135)
 at java.lang.Thread.run(Thread.java:745)

 shuffle-client-1 daemon prio=10 tid=0x7f0650687000 nid=0x132fc
 runnable [0x7f060d198000]
java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
 - locked 0x00067bf47710 (a
 io.netty.channel.nio.SelectedSelectionKeySet)
 - locked 0x00067bf374e8 (a
 java.util.Collections$UnmodifiableSet)
  

What am I missing that's preventing javac from finding the libraries (CLASSPATH is setup...)?

2015-08-18 Thread Jerry
Hello,

So I setup Spark to run on my local machine to see if I can reproduce the
issue I'm having with data frames, but I'm running into issues with the
compiler.

Here's what I got:

$ echo $CLASSPATH
/usr/lib/jvm/java-6-oracle/lib:/home/adminz/dev/spark/spark-1.4.1/lib/spark-assembly-1.4.1-hadoop2.6.0.jar


javac Test.java
Test.java:1: package org.apache.spark.sql.api.java does not exist
import org.apache.spark.sql.api.java.*;
^
Test.java:6: package org.apache.spark.sql does not exist
import org.apache.spark.sql.*;
^
Test.java:7: package org.apache.spark.sql.hive does not exist
import org.apache.spark.sql.hive.*;



Let me know what I'm doing wrong.

Thanks,
Jerry


RE: Spark Job Hangs on our production cluster

2015-08-18 Thread java8964
Hi, Imran:
Thanks for your reply. I am not sure what do you mean repl. Can you be more 
detail about that?
This is only happened when the Spark 1.2.2 try to scan big data set, and cannot 
reproduce if it scans smaller dataset.
FYI, I have to build and deploy Spark 1.3.1 on our production cluster. Right 
now, I cannot reproduce this hang problem on the same cluster for the same big 
dataset. On this point, we will continue trying Spark 1.3.1, hope we will have 
more positive experience with it.
But just for wondering, what class Spark needs to be loaded at this time? From 
my understanding, the executor already scan the first block data from HDFS, and 
hanging while starting the 2nd block. All the class should be already loaded in 
JVM in this case.
Thanks
Yong
From: iras...@cloudera.com
Date: Tue, 18 Aug 2015 12:17:56 -0500
Subject: Re: Spark Job Hangs on our production cluster
To: java8...@hotmail.com
CC: user@spark.apache.org

just looking at the thread dump from your original email, the 3 executor 
threads are all trying to load classes.  (One thread is actually loading some 
class, and the others are blocked waiting to load a class, most likely trying 
to load the same thing.)  That is really weird, definitely not something which 
should keep things blocked for 30 min.  It suggest something wrong w/ the jvm, 
or classpath configuration, or a combination.  Looks like you are trying to run 
in the repl, and for whatever reason the http server for the repl to serve 
classes is not responsive.  I'd try running outside of the repl and see if that 
works.
sorry not a full diagnosis but maybe this'll help a bit.
On Tue, Aug 11, 2015 at 3:19 PM, java8964 java8...@hotmail.com wrote:



Currently we have a IBM BigInsight cluster with 1 namenode + 1 JobTracker + 42 
data/task nodes, which runs with BigInsight V3.0.0.2, corresponding with Hadoop 
2.2.0 with MR1.
Since IBM BigInsight doesn't come with Spark, so we build Spark 1.2.2 with 
Hadoop 2.2.0 + Hive 0.12 by ourselves, and deploy it on the same cluster.
The IBM Biginsight comes with IBM jdk 1.7, but during our experience on stage 
environment, we found out Spark works better with Oracle JVM. So we run spark 
under Oracle JDK 1.7.0_79.
Now on production, we are facing a issue we never faced, nor can reproduce on 
our staging cluster. 
We are using Spark Standalone cluster, and here is the basic configurations:
more spark-env.shexport JAVA_HOME=/opt/javaexport 
PATH=$JAVA_HOME/bin:$PATHexport 
HADOOP_CONF_DIR=/opt/ibm/biginsights/hadoop-conf/export 
SPARK_CLASSPATH=/opt/ibm/biginsights/IHC/lib/ibm-compression.jar:/opt/ibm/biginsights/hive/lib/db2jcc4-10.6.jarexport
 
SPARK_LOCAL_DIRS=/data1/spark/local,/data2/spark/local,/data3/spark/localexport 
SPARK_MASTER_WEBUI_PORT=8081export SPARK_MASTER_IP=host1export 
SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=42export 
SPARK_WORKER_MEMORY=24gexport SPARK_WORKER_CORES=6export 
SPARK_WORKER_DIR=/tmp/spark/workexport SPARK_DRIVER_MEMORY=2gexport 
SPARK_EXECUTOR_MEMORY=2g
more spark-defaults.confspark.master
spark://host1:7077spark.eventLog.enabledtruespark.eventLog.dir  
hdfs://host1:9000/spark/eventLogspark.serializer
org.apache.spark.serializer.KryoSerializerspark.executor.extraJavaOptions   
-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
We are using AVRO file format a lot, and we have these 2 datasets, one is about 
96G, and the other one is a little over 1T. Since we are using AVRO, so we also 
built spark-avro of commit a788c9fce51b0ec1bb4ce88dc65c1d55aaa675b8, which is 
the latest version supporting Spark 1.2.x.
Here is the problem we are facing on our production cluster, even the following 
simple spark-shell commands will hang in our production cluster:
import org.apache.spark.sql.SQLContextval sqlContext = new 
org.apache.spark.sql.hive.HiveContext(sc)import com.databricks.spark.avro._val 
bigData = 
sqlContext.avroFile(hdfs://namenode:9000/bigData/)bigData.registerTempTable(bigData)bigData.count()
From the console,  we saw following:[Stage 0: 
  (44 + 42) / 7800]
no update for more than 30 minutes and longer.
The big dataset with 1T should generate 7800 HDFS block, but Spark's HDFS 
client looks like having problem to read them. Since we are running Spark on 
the data nodes, all the Spark tasks are running as NODE_LOCAL on locality 
level.
If I go to the data/task node which Spark tasks hang, and use the JStack to 
dump the thread, I got the following on the top:
015-08-11 15:38:38Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.79-b02 
mixed mode):
Attach Listener daemon prio=10 tid=0x7f0660589000 nid=0x1584d waiting on 
condition [0x]   java.lang.Thread.State: RUNNABLE
org.apache.hadoop.hdfs.PeerCache@4a88ec00 daemon prio=10 
tid=0x7f06508b7800 nid=0x13302 waiting on condition [0x7f060be94000]   
java.lang.Thread.State: TIMED_WAITING 

RE: Scala: How to match a java object????

2015-08-18 Thread Saif.A.Ellafi
Hi, thank you for further assistance

you can reproduce this by simply running

5 match { case java.math.BigDecimal = 2 }

In my personal case, I am applying a map acton to a Seq[Any], so the elements 
inside are of type any, to which I need to apply a proper 
.asInstanceOf[WhoYouShouldBe].

Saif

From: William Briggs [mailto:wrbri...@gmail.com]
Sent: Tuesday, August 18, 2015 4:46 PM
To: Ellafi, Saif A.; user@spark.apache.org
Subject: Re: Scala: How to match a java object


Could you share your pattern matching expression that is failing?

On Tue, Aug 18, 2015, 3:38 PM  
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi all,

I am trying to run a spark job, in which I receive java.math.BigDecimal 
objects, instead of the scala equivalents, and I am trying to convert them into 
Doubles.
If I try to match-case this object class, I get: “error: object 
java.math.BigDecimal is not a value”

How could I get around matching java objects? I would like to avoid a multiple 
try-catch on ClassCastExceptions for all my checks.

Thank you,
Saif



Re: Why standalone mode don't allow to set num-executor ?

2015-08-18 Thread Andrew Or
Hi Canan,

This is mainly for legacy reasons. The default behavior in standalone in
mode is that the application grabs all available resources in the cluster.
This effectively means we want one executor per worker, where each executor
grabs all the available cores and memory on that worker. In this model, it
doesn't really make sense to express number of executors, because that's
equivalent to the number of workers.

In 1.4+, however, we do support multiple executors per worker, but that's
not the default so we decided not to add support for the --num-executors
setting to avoid potential confusion.

-Andrew


2015-08-18 2:35 GMT-07:00 canan chen ccn...@gmail.com:

 num-executor only works for yarn mode. In standalone mode, I have to set
 the --total-executor-cores and --executor-cores. Isn't this way so
 intuitive ? Any reason for that ?



Re: Evaluating spark + Cassandra for our use cases

2015-08-18 Thread Jörn Franke
Hi,

First you need to make your SLA clear. It does not sound for me they are
defined very well or that your solution is necessary for the scenario. I
also find it hard to believe that 1 customer has 100Million transactions
per month.

Time series data is easy to precalculate - you do not necessarily need
in-memory technology here.

I recommend your company to do a Proof of Concept and get more
details/clarificarion on the requirements before risking million of dollars
of investment.

Le mar. 18 août 2015 à 21:18, Benjamin Ross br...@lattice-engines.com a
écrit :

 My company is interested in building a real-time time-series querying
 solution using Spark and Cassandra.  Specifically, we’re interested in
 setting up a Spark system against Cassandra running a hive thrift server.
 We need to be able to perform real-time queries on time-series data –
 things like, how many accounts have spent in total more than $300 on
 product X in the past 3 months, and purchased product Y in the past month.



 These queries need to be fast – preferably sub-second but we can deal with
 a few seconds if absolutely necessary.  The data sizes are in the millions
 of records when rolled up to be per-monthly records.  Something on the
 order of 100M per customer.



 My question is, based on experience, how hard would it be to get Cassandra
 and Spark working together to give us sub-second response times in this use
 case?  Note that we’ll need to use DataStax enterprise (which is
 unappealing from a cost standpoint) because it’s the only thing that
 provides the hive spark thrift server to Cassandra.



 The two top contenders for our solution are Spark+Cassandra and Druid.



 Neither of these solutions work perfectly out of the box:

 -  Druid would need to be modified, possibly hacked, to support
 the queries we require.  I’m also not clear how operationally ready it is.

 -  Cassandra and Spark would require paying money for DataStax
 enterprise.  It really feels like it’s going to be tricky to configure
 Cassandra and Spark to be lightning fast for our use case.  Finally, window
 functions (which we need – see above) are not supported unless we use a
 pre-release milestone of the datastax spark Cassandra connector.



 I was wondering if anyone had any thoughts.  How easy is it to get Spark
 and Cassandra down to sub-second speeds in our use case?



 Thanks,

 Ben



RE: Evaluating spark + Cassandra for our use cases

2015-08-18 Thread Benjamin Ross
Hi Jorn,
Of course we're planning on doing a proof of concept here - the difficulty is 
that our timeline is short, so we cannot afford too many PoCs before we have to 
make a decision.  We also need to figure out *which* databases to proof of 
concept.

Note that one tricky aspect of our problem is that we need to support window 
functions partitioned on a per account basis.  I've found that support for 
window functions is very limited in most databases, and they're also generally 
slow when available.

Also, 1 customer certainly does not have 100M transactions per month.  There 
are 100M transactions total for a given customer when we roll everything up to 
be per-month.  We do not care about granularity smaller than a month.  There 
are also many columns that we care about - on the order of many thousands.

What makes you suggest that we do not need in-memory technology?

Ben



From: Jörn Franke [jornfra...@gmail.com]
Sent: Tuesday, August 18, 2015 4:14 PM
To: Benjamin Ross; user@spark.apache.org
Cc: Ron Gonzalez
Subject: Re: Evaluating spark + Cassandra for our use cases


Hi,

First you need to make your SLA clear. It does not sound for me they are 
defined very well or that your solution is necessary for the scenario. I also 
find it hard to believe that 1 customer has 100Million transactions per month.

Time series data is easy to precalculate - you do not necessarily need 
in-memory technology here.

I recommend your company to do a Proof of Concept and get more 
details/clarificarion on the requirements before risking million of dollars of 
investment.

Le mar. 18 août 2015 à 21:18, Benjamin Ross 
br...@lattice-engines.commailto:br...@lattice-engines.com a écrit :
My company is interested in building a real-time time-series querying solution 
using Spark and Cassandra.  Specifically, we’re interested in setting up a 
Spark system against Cassandra running a hive thrift server.  We need to be 
able to perform real-time queries on time-series data – things like, how many 
accounts have spent in total more than $300 on product X in the past 3 months, 
and purchased product Y in the past month.

These queries need to be fast – preferably sub-second but we can deal with a 
few seconds if absolutely necessary.  The data sizes are in the millions of 
records when rolled up to be per-monthly records.  Something on the order of 
100M per customer.

My question is, based on experience, how hard would it be to get Cassandra and 
Spark working together to give us sub-second response times in this use case?  
Note that we’ll need to use DataStax enterprise (which is unappealing from a 
cost standpoint) because it’s the only thing that provides the hive spark 
thrift server to Cassandra.

The two top contenders for our solution are Spark+Cassandra and Druid.

Neither of these solutions work perfectly out of the box:

-  Druid would need to be modified, possibly hacked, to support the 
queries we require.  I’m also not clear how operationally ready it is.

-  Cassandra and Spark would require paying money for DataStax 
enterprise.  It really feels like it’s going to be tricky to configure 
Cassandra and Spark to be lightning fast for our use case.  Finally, window 
functions (which we need – see above) are not supported unless we use a 
pre-release milestone of the datastax spark Cassandra connector.

I was wondering if anyone had any thoughts.  How easy is it to get Spark and 
Cassandra down to sub-second speeds in our use case?

Thanks,
Ben


Re: Json Serde used by Spark Sql

2015-08-18 Thread Michael Armbrust
Under the covers we use Jackson's Streaming API as of Spark 1.4.

On Tue, Aug 18, 2015 at 1:12 PM, Udit Mehta ume...@groupon.com wrote:

 Hi,

 I was wondering what json serde does spark sql use. I created a JsonRDD
 out of a json file and then registered it as a temp table to query. I can
 then query the table using dot notation for nested structs/arrays. I was
 wondering how does spark sql deserialize the json data based on the query.

 Thanks in advance,
 Udit



Re: Difference between Sort based and Hash based shuffle

2015-08-18 Thread Andrew Or
Hi Muhammad,

On a high level, in hash-based shuffle each mapper M writes R shuffle
files, one for each reducer where R is the number of reduce partitions.
This results in M * R shuffle files. Since it is not uncommon for M and R
to be O(1000), this quickly becomes expensive. An optimization with
hash-based shuffle is consolidation, where all mappers run in the same core
C write one file per reducer, resulting in C * R files. This is a strict
improvement, but it is still relatively expensive.

Instead, in sort-based shuffle each mapper writes a single partitioned
file. This allows a particular reducer to request a specific portion of
each mapper's single output file. In more detail, the mapper first fills up
an internal buffer in memory and continually spills the contents of the
buffer to disk, then finally merges all the spilled files together to form
one final output file. This places much less stress on the file system and
requires much fewer I/O operations especially on the read side.

-Andrew



2015-08-16 11:08 GMT-07:00 Muhammad Haseeb Javed 11besemja...@seecs.edu.pk
:

 I did check it out and although I did get a general understanding of the
 various classes used to implement Sort and Hash shuffles, however these
 slides lack details as to how they are implemented and why sort generally
 has better performance than hash

 On Sun, Aug 16, 2015 at 4:31 AM, Ravi Kiran ravikiranmag...@gmail.com
 wrote:

 Have a look at this presentation.
 http://www.slideshare.net/colorant/spark-shuffle-introduction . Can be
 of help to you.

 On Sat, Aug 15, 2015 at 1:42 PM, Muhammad Haseeb Javed 
 11besemja...@seecs.edu.pk wrote:

 What are the major differences between how Sort based and Hash based
 shuffle operate and what is it that cause Sort Shuffle to perform better
 than Hash?
 Any talks that discuss both shuffles in detail, how they are implemented
 and the performance gains ?






Json Serde used by Spark Sql

2015-08-18 Thread Udit Mehta
Hi,

I was wondering what json serde does spark sql use. I created a JsonRDD out
of a json file and then registered it as a temp table to query. I can then
query the table using dot notation for nested structs/arrays. I was
wondering how does spark sql deserialize the json data based on the query.

Thanks in advance,
Udit


Re: how do I execute a job on a single worker node in standalone mode

2015-08-18 Thread Andrew Or
Hi Axel,

You can try setting `spark.deploy.spreadOut` to false (through your
conf/spark-defaults.conf file). What this does is essentially try to
schedule as many cores on one worker as possible before spilling over to
other workers. Note that you *must* restart the cluster through the sbin
scripts.

For more information see:
http://spark.apache.org/docs/latest/spark-standalone.html.

Feel free to let me know whether it works,
-Andrew


2015-08-18 4:49 GMT-07:00 Igor Berman igor.ber...@gmail.com:

 by default standalone creates 1 executor on every worker machine per
 application
 number of overall cores is configured with --total-executor-cores
 so in general if you'll specify --total-executor-cores=1 then there would
 be only 1 core on some executor and you'll get what you want

 on the other hand, if you application needs all cores of your cluster and
 only some specific job should run on single executor there are few methods
 to achieve this
 e.g. coallesce(1) or dummyRddWithOnePartitionOnly.foreachPartition


 On 18 August 2015 at 01:36, Axel Dahl a...@whisperstream.com wrote:

 I have a 4 node cluster and have been playing around with the
 num-executors parameters, executor-memory and executor-cores

 I set the following:
 --executor-memory=10G
 --num-executors=1
 --executor-cores=8

 But when I run the job, I see that each worker, is running one executor
 which has  2 cores and 2.5G memory.

 What I'd like to do instead is have Spark just allocate the job to a
 single worker node?

 Is that possible in standalone mode or do I need a job/resource scheduler
 like Yarn to do that?

 Thanks in advance,

 -Axel






Re: Scala: How to match a java object????

2015-08-18 Thread Marcelo Vanzin
On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.com wrote:

 5 match { case java.math.BigDecimal = 2 }

5 match { case _: java.math.BigDecimal = 2 }

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Scala: How to match a java object????

2015-08-18 Thread Marcelo Vanzin
On Tue, Aug 18, 2015 at 1:19 PM,  saif.a.ell...@wellsfargo.com wrote:
 Hi, Can you please elaborate? I am confused :-)

You did note that the two pieces of code are different, right?

See http://docs.scala-lang.org/tutorials/tour/pattern-matching.html
for how to match things in Scala, especially the typed pattern
example.

 -Original Message-
 From: Marcelo Vanzin [mailto:van...@cloudera.com]
 Sent: Tuesday, August 18, 2015 5:15 PM
 To: Ellafi, Saif A.
 Cc: wrbri...@gmail.com; user@spark.apache.org
 Subject: Re: Scala: How to match a java object

 On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.com wrote:

 5 match { case java.math.BigDecimal = 2 }

 5 match { case _: java.math.BigDecimal = 2 }

 --
 Marcelo



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: dse spark-submit multiple jars issue

2015-08-18 Thread Andrew Or
Hi Satish,

The problem is that `--jars` accepts a comma-delimited list of jars! E.g.

spark-submit ... --jars lib1.jar,lib2.jar,lib3.jar main.jar

where main.jar is your main application jar (the one that starts a
SparkContext), and lib*.jar refer to additional libraries that your main
application jar uses.

-Andrew

2015-08-13 3:22 GMT-07:00 Javier Domingo Cansino javier.domi...@fon.com:

 Please notice that 'jars: null'

 I don't know why you put ///. but I would propose you just put normal
 absolute paths.

 dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld
 --jars /home/missingmerch/postgresql-9.4-1201.jdbc41.jar
 /home/missingmerch/dse.jar
 /home/missingmerch/spark-cassandra-connector-java_2.10-1.1.1.jar
 /home/missingmerch/etl-0.0.1-SNAPSHOT.jar

 Hope this is helpful!

 [image: Fon] http://www.fon.com/Javier Domingo CansinoResearch 
 Development Engineer+34 946545847Skype: javier.domingo.fonAll information
 in this email is confidential http://corp.fon.com/legal/email-disclaimer

 On Tue, Aug 11, 2015 at 3:42 PM, satish chandra j 
 jsatishchan...@gmail.com wrote:

 HI,

 Please find the log details below:


 dse spark-submit --verbose --master local --class HelloWorld
 etl-0.0.1-SNAPSHOT.jar --jars
 file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar
 file:/home/missingmerch/dse.jar
 file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar

 Using properties file: /etc/dse/spark/spark-defaults.conf

 Adding default property:
 spark.cassandra.connection.factory=com.datastax.bdp.spark.DseCassandraConnectionFactory

 Adding default property: spark.ssl.keyStore=.keystore

 Adding default property: spark.ssl.enabled=false

 Adding default property: spark.ssl.trustStore=.truststore

 Adding default property:
 spark.cassandra.auth.conf.factory=com.datastax.bdp.spark.DseAuthConfFactory

 Adding default property: spark.ssl.keyPassword=cassandra

 Adding default property: spark.ssl.keyStorePassword=cassandra

 Adding default property: spark.ssl.protocol=TLS

 Adding default property: spark.ssl.useNodeLocalConf=true

 Adding default property: spark.ssl.trustStorePassword=cassandra

 Adding default property:
 spark.ssl.enabledAlgorithms=TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA

 Parsed arguments:

   master  local

   deployMode  null

   executorMemory  null

   executorCores   null

   totalExecutorCores  null

   propertiesFile  /etc/dse/spark/spark-defaults.conf

   driverMemory512M

   driverCores null

   driverExtraClassPathnull

   driverExtraLibraryPath  null

   driverExtraJavaOptions  -Dcassandra.username=missingmerch
 -Dcassandra.password=STMbrjrlb -XX:MaxPermSize=256M

   supervise   false

   queue   null

   numExecutorsnull

   files   null

   pyFiles null

   archivesnull

   mainClass   HelloWorld

   primaryResource file:/home/missingmerch/etl-0.0.1-SNAPSHOT.jar

   nameHelloWorld

   childArgs   [--jars
 file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar
 file:/home/missingmerch/dse.jar
 file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar]

   jarsnull

   verbose true



 Spark properties used, including those specified through

 --conf and those from the properties file
 /etc/dse/spark/spark-defaults.conf:

   spark.cassandra.connection.factory -
 com.datastax.bdp.spark.DseCassandraConnectionFactory

   spark.ssl.useNodeLocalConf - true

   spark.ssl.enabled - false

   spark.executor.extraJavaOptions - -XX:MaxPermSize=256M

   spark.ssl.keyStore - .keystore

   spark.ssl.trustStore - .truststore

   spark.ssl.trustStorePassword - cassandra

   spark.ssl.enabledAlgorithms -
 TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA

   spark.cassandra.auth.conf.factory -
 com.datastax.bdp.spark.DseAuthConfFactory

   spark.ssl.protocol - TLS

   spark.ssl.keyPassword - cassandra

   spark.ssl.keyStorePassword - cassandra





 Main class:

 HelloWorld

 Arguments:

 --jars

 file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar

 file:/home/missingmerch/dse.jar

 file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar

 System properties:

 spark.cassandra.connection.factory -
 com.datastax.bdp.spark.DseCassandraConnectionFactory

 spark.driver.memory - 512M

 spark.ssl.useNodeLocalConf - true

 spark.ssl.enabled - false

 SPARK_SUBMIT - true

 spark.executor.extraJavaOptions - -XX:MaxPermSize=256M

 spark.app.name - HelloWorld

 spark.ssl.enabledAlgorithms -
 TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA

 spark.ssl.trustStorePassword - cassandra

 spark.driver.extraJavaOptions - -Dcassandra.username=missingmerch
 -Dcassandra.password=STMbrjrlb -XX:MaxPermSize=256M

 spark.ssl.keyStore - .keystore

 spark.ssl.trustStore - .truststore

 spark.jars - 

Re: Is it this a BUG?: Why Spark Flume Streaming job is not deploying the Receiver to the specified host?

2015-08-18 Thread Tathagata Das
Are you using the Flume polling stream or the older stream?

Such problems of binding used to occur in the older push-based approach,
hence we built the polling stream (pull-based).


On Tue, Aug 18, 2015 at 4:45 AM, diplomatic Guru diplomaticg...@gmail.com
wrote:

 I'm testing the Flume + Spark integration example (flume count).

 I'm deploying the job using yarn cluster mode.

 I first logged into the Yarn cluster, then submitted the job and passed in
 a specific worker node's IP to deploy the job. But when I checked the
 WebUI, it failed to bind to the specified IP because the receiver was
 deployed to a different host, not the one I asked it to. Do you know?

 For your information,  I've also tried passing the IP address used by the
 resource manager to find resources but no joy. But when I set the host to
 'localhost' and deploy to the cluster it is binding a worker node that is
 selected by the resource manager.





Re: Spark Job Hangs on our production cluster

2015-08-18 Thread Imran Rashid
sorry, by repl I mean spark-shell, I guess I'm used to them being used
interchangeably.  From that thread dump, the one thread that isn't stuck is
trying to get classes specifically related to the shell / repl:

   java.lang.Thread.State: RUNNABLE
 at java.net.SocketInputStream.socketRead0(Native Method)
 at java.net.SocketInputStream.read(SocketInputStream.java:152)
 at java.net.SocketInputStream.read(SocketInputStream.java:122)
 at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
 at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
 at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
 - locked 0x00072477d530 (a java.io.BufferedInputStream)
 at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:689)
 at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633)
 at
 sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1324)
 - locked 0x000724772bf8 (a
 sun.net.www.protocol.http.HttpURLConnection)
 at java.net.URL.openStream(URL.java:1037)
 at
 org.apache.spark.repl.ExecutorClassLoader.findClassLocally(ExecutorClassLoader.scala:86)
 at
 org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:63)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

...

thats because the repl needs to package up the code for every single line,
and it serves those compiled classes to each executor over http.  This
particular executor seems to be stuck pulling one of those lines compiled
in the repl.  (This is all assuming that the thread dump is the same over
the entire 30 minutes that spark seems to be stuck.)

Yes, the classes should be loaded for the first partition that is
processed. (there certainly could be cases where different classes are
needed for each partition, but it doesn't sound like you are doing anything
that would trigger this.)  But to be clear, in repl mode, there will be
additional classes to be sent with every single job.

Hope that helps a little more ... maybe there was some issue w/ 1.2.2,
though I didn't see anything with a quick search, hopefully you'll have
more luck w/ 1.3.1

On Tue, Aug 18, 2015 at 2:23 PM, java8964 java8...@hotmail.com wrote:

 Hi, Imran:

 Thanks for your reply. I am not sure what do you mean repl. Can you be
 more detail about that?

 This is only happened when the Spark 1.2.2 try to scan big data set, and
 cannot reproduce if it scans smaller dataset.

 FYI, I have to build and deploy Spark 1.3.1 on our production cluster.
 Right now, I cannot reproduce this hang problem on the same cluster for the
 same big dataset. On this point, we will continue trying Spark 1.3.1, hope
 we will have more positive experience with it.

 But just for wondering, what class Spark needs to be loaded at this time?
 From my understanding, the executor already scan the first block data from
 HDFS, and hanging while starting the 2nd block. All the class should be
 already loaded in JVM in this case.

 Thanks

 Yong

 --
 From: iras...@cloudera.com
 Date: Tue, 18 Aug 2015 12:17:56 -0500
 Subject: Re: Spark Job Hangs on our production cluster
 To: java8...@hotmail.com
 CC: user@spark.apache.org


 just looking at the thread dump from your original email, the 3 executor
 threads are all trying to load classes.  (One thread is actually loading
 some class, and the others are blocked waiting to load a class, most likely
 trying to load the same thing.)  That is really weird, definitely not
 something which should keep things blocked for 30 min.  It suggest
 something wrong w/ the jvm, or classpath configuration, or a combination.
 Looks like you are trying to run in the repl, and for whatever reason the
 http server for the repl to serve classes is not responsive.  I'd try
 running outside of the repl and see if that works.

 sorry not a full diagnosis but maybe this'll help a bit.

 On Tue, Aug 11, 2015 at 3:19 PM, java8964 java8...@hotmail.com wrote:

 Currently we have a IBM BigInsight cluster with 1 namenode + 1 JobTracker
 + 42 data/task nodes, which runs with BigInsight V3.0.0.2, corresponding
 with Hadoop 2.2.0 with MR1.

 Since IBM BigInsight doesn't come with Spark, so we build Spark 1.2.2 with
 Hadoop 2.2.0 + Hive 0.12 by ourselves, and deploy it on the same cluster.

 The IBM Biginsight comes with IBM jdk 1.7, but during our experience on
 stage environment, we found out Spark works better with Oracle JVM. So we
 run spark under Oracle JDK 1.7.0_79.

 Now on production, we are facing a issue we never faced, nor can reproduce
 on our staging cluster.

 We are using Spark Standalone cluster, and here is the basic
 configurations:

 more spark-env.sh
 export JAVA_HOME=/opt/java
 export PATH=$JAVA_HOME/bin:$PATH
 export HADOOP_CONF_DIR=/opt/ibm/biginsights/hadoop-conf/
 export
 

Re: Scala: How to match a java object????

2015-08-18 Thread William Briggs
Could you share your pattern matching expression that is failing?

On Tue, Aug 18, 2015, 3:38 PM  saif.a.ell...@wellsfargo.com wrote:

 Hi all,

 I am trying to run a spark job, in which I receive *java.math.BigDecimal* 
 objects,
 instead of the scala equivalents, and I am trying to convert them into
 Doubles.
 If I try to match-case this object class, I get: *“**error: object
 java.math.BigDecimal is not a value**”*

 How could I get around matching java objects? I would like to avoid a
 multiple try-catch on ClassCastExceptions for all my checks.

 Thank you,
 Saif




to retrive full stack trace

2015-08-18 Thread satish chandra j
HI All,
Please let me know if any arguments to be passed in CLI to retrieve FULL
STACK TRACE in Apache Spark

I am stuck in a issue for which it would be helpful to analyze full stack
trace

Regards,
Satish Chandra


Re: to retrive full stack trace

2015-08-18 Thread Koert Kuipers
if you error is on executors you need to check the executor logs for full
stacktrace

On Tue, Aug 18, 2015 at 10:01 PM, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 Please let me know if any arguments to be passed in CLI to retrieve FULL
 STACK TRACE in Apache Spark

 I am stuck in a issue for which it would be helpful to analyze full stack
 trace

 Regards,
 Satish Chandra



RE: Scala: How to match a java object????

2015-08-18 Thread Saif.A.Ellafi
Hi, Can you please elaborate? I am confused :-)

Saif

-Original Message-
From: Marcelo Vanzin [mailto:van...@cloudera.com] 
Sent: Tuesday, August 18, 2015 5:15 PM
To: Ellafi, Saif A.
Cc: wrbri...@gmail.com; user@spark.apache.org
Subject: Re: Scala: How to match a java object

On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.com wrote:

 5 match { case java.math.BigDecimal = 2 }

5 match { case _: java.math.BigDecimal = 2 }

-- 
Marcelo


NaN in GraphX PageRank answer

2015-08-18 Thread Khaled Ammar
Hi all,

I was trying to use GraphX to compute pagerank and found that pagerank
value for several vertices is NaN.

I am using Spark 1.3. Any idea how to fix that?

-- 
Thanks,
-Khaled


Re: Programmatically create SparkContext on YARN

2015-08-18 Thread Andrew Or
Hi Andreas,

I believe the distinction is not between standalone and YARN mode, but
between client and cluster mode.

In client mode, your Spark submit JVM runs your driver code. In cluster
mode, one of the workers (or NodeManagers if you're using YARN) in the
cluster runs your driver code. In the latter case, it doesn't really make
sense to call `setMaster` in your driver because Spark needs to know which
cluster you're submitting the application to.

Instead, the recommended way is to set the master through the `--master`
flag in the command line, e.g.

$ bin/spark-submit
--master spark://1.2.3.4:7077
--class some.user.Clazz
--name My app name
--jars lib1.jar,lib2.jar
--deploy-mode cluster
app.jar

Both YARN and standalone modes support client and cluster modes, and the
spark-submit script is the common interface through which you can launch
your application. In other words, you shouldn't have to do anything more
than providing a different value to `--master` to use YARN.

-Andrew

2015-08-17 0:34 GMT-07:00 Andreas Fritzler andreas.fritz...@gmail.com:

 Hi all,

 when runnig the Spark cluster in standalone mode I am able to create the
 Spark context from Java via the following code snippet:

 SparkConf conf = new SparkConf()
.setAppName(MySparkApp)
.setMaster(spark://SPARK_MASTER:7077)
.setJars(jars);
 JavaSparkContext sc = new JavaSparkContext(conf);


 As soon as I'm done with my processing, I can just close it via

 sc.stop();

 Now my question: Is the same also possible when running Spark on YARN? I
 currently don't see how this should be possible without submitting your
 application as a packaged jar file. Is there a way to get this kind of
 interactivity from within your Scala/Java code?

 Regards,
 Andrea



Spark scala addFile retrieving file with incorrect size

2015-08-18 Thread Bernardo Vecchia Stein
Hi all,

I'm trying to run a spark job (written in scala) that uses addFile to
download some small files to each node. However, one of the downloaded
files has an incorrect size (the other ones are ok), which causes an error
when using it in the code.

I have looked more into the issue and hexdump'ed both the original and the
spark-retrieved files. The beginning of the files are exactly equal, but
the spark-retrieved one just gets truncated at a random position. This
position appears random, however I noticed that it is exactly half the size
of the original file. Not sure if a coincidence or not.

The original file has a size of 296 bytes (the others are a little bit
bigger, around 13 kbytes).

I'm kinda new to spark, so I'm stuck at this point trying to figure out
what is the problem. Does anyone have any idea of what might be the problem
here?

Thank you,
Bernardo


Re: Scala: How to match a java object????

2015-08-18 Thread Sujit Pal
Hi Saif,

Would this work?

import scala.collection.JavaConversions._

new java.math.BigDecimal(5) match { case x: java.math.BigDecimal =
x.doubleValue }

It gives me on the scala console.

res9: Double = 5.0

Assuming you had a stream of BigDecimals, you could just call map on it.

myBigDecimals.map(_.doubleValue)

to get your Seq of Doubles. You will need the JavaConversions._ import to
allow Java Doubles to be treated by Scala as Scala Doubles.

-sujit

On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.com wrote:

 Hi, thank you for further assistance



 you can reproduce this by simply running



 *5 match { case java.math.BigDecimal = 2 }*



 In my personal case, I am applying a map acton to a Seq[Any], so the
 elements inside are of type any, to which I need to apply a proper
 .asInstanceOf[WhoYouShouldBe].



 Saif



 *From:* William Briggs [mailto:wrbri...@gmail.com]
 *Sent:* Tuesday, August 18, 2015 4:46 PM
 *To:* Ellafi, Saif A.; user@spark.apache.org
 *Subject:* Re: Scala: How to match a java object



 Could you share your pattern matching expression that is failing?



 On Tue, Aug 18, 2015, 3:38 PM  saif.a.ell...@wellsfargo.com wrote:

 Hi all,



 I am trying to run a spark job, in which I receive *java.math.BigDecimal 
 *objects,
 instead of the scala equivalents, and I am trying to convert them into
 Doubles.

 If I try to match-case this object class, I get: *“error: object
 java.math.BigDecimal is not a value”*



 How could I get around matching java objects? I would like to avoid a
 multiple try-catch on ClassCastExceptions for all my checks.



 Thank you,

 Saif






What is the reason for ExecutorLostFailure?

2015-08-18 Thread VIJAYAKUMAR JAWAHARLAL
Hi All

Why am I getting ExecutorLostFailure and executors are completely lost for rest 
of the processing? Eventually it makes job to fail. One thing for sure that lot 
of shuffling happens across executors in my program. 

Is there a way to understand and debug ExecutorLostFailure? Any pointers 
regarding “ExecutorLostFailure” would help me a lot.

Thanks
Vijay

Re:Why there are overlapping for tasks on the EventTimeline UI

2015-08-18 Thread Todd
I think I find the answer..
On the UI, the recording time of each task is when it is put into the thread 
pool. Then the UI makes sense





At 2015-08-18 17:40:07, Todd bit1...@163.com wrote:

Hi,
Following is copied from the spark EventTimeline UI. I don't understand why 
there are overlapping between tasks?
I think they should be sequentially one by one in one executor(there are one 
core each executor).

The blue part of each task is the scheduler delay time. Does it mean it is the 
delay that the task is put into the thread pool and the task is picked to run?




Re: global variable in spark streaming with no dependency on key

2015-08-18 Thread Joanne Contact
Thanks. I tried. The problem is I have to updateStatebyKey to maintain
other states related to keys.

Not sure where to pass this accumulator variable into updateStateBykey.


On Tue, Aug 18, 2015 at 2:17 AM, Hemant Bhanawat hemant9...@gmail.com wrote:
 See if SparkContext.accumulator helps.

 On Tue, Aug 18, 2015 at 2:27 PM, Joanne Contact joannenetw...@gmail.com
 wrote:

 Hi Gurus,

 Please help.

 But please don't tell me to use updateStateByKey because I need a
 global variable (something like the clock time) across the micro
 batches but not depending on key. For my case, it is not acceptable to
 maintain a state for each key since each key comes in different times.
 Yes my global variable is related to time but cannot use machine
 clock.

 Any hint? Or is this lack of global variable by design?

 Thanks!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[mllib] Random forest maxBins and confidence in training points

2015-08-18 Thread Mark Alen
Hi everyone, 
I have two questions regarding the random forest implementation in mllib
1- maxBins: Say the value of a feature is between [0,100]. In my dataset there 
are a lot of data points between [0,10] and one datapoint at 100 and nothing 
between (10, 100). I am wondering how does the binning work in this case? I 
obviously don't want all my points that are in between [0,10] to fall into the 
same bin and other bins to be empty.  would mllib do any smart reallocation of 
bins such that each bin gets some datapoints in them and one bin does not get 
all the datapoints?
2- Is there any way to do this in Spark? 
http://stats.stackexchange.com/questions/165062/incorporating-the-confidence-in-the-training-data-into-the-ml-model
Thanks a lotMark



Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Shenghua(Daniel) Wan
All of you are right.

I was trying to create too many producers. My idea was to create a pool(for
now the pool contains only one producer) shared by all the executors.
After I realized it was related to the serializable issues (though I did
not find clear clues in the source code to indicate the broacast template
type parameter must be implement serializable),  I followed spark cassandra
connector design and created a singleton of Kafka producer pools. There is
not exception noticed.

Thanks for all your comments.


On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com wrote:

 Why are you even trying to broadcast a producer? A broadcast variable is
 some immutable piece of serializable DATA that can be used for processing
 on the executors. A Kafka producer is neither DATA nor immutable, and
 definitely not serializable.
 The right way to do this is to create the producer in the executors.
 Please see the discussion in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

 On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I wouldn't expect a kafka producer to be serializable at all... among
 other things, it has a background thread

 On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan 
 wansheng...@gmail.com wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this in
 the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get the
 wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
 at
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
 at
 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85)
 at
 

Re: Spark + Jupyter (IPython Notebook)

2015-08-18 Thread Jerry Lam
Hi Prabeesh,

That's even better!

Thanks for sharing

Jerry


On Tue, Aug 18, 2015 at 1:31 PM, Prabeesh K. prabsma...@gmail.com wrote:

 Refer this post
 http://blog.prabeeshk.com/blog/2015/06/19/pyspark-notebook-with-docker/

 Spark + Jupyter + Docker

 On 18 August 2015 at 21:29, Jerry Lam chiling...@gmail.com wrote:

 Hi Guru,

 Thanks! Great to hear that someone tried it in production. How do you
 like it so far?

 Best Regards,

 Jerry


 On Tue, Aug 18, 2015 at 11:38 AM, Guru Medasani gdm...@gmail.com wrote:

 Hi Jerry,

 Yes. I’ve seen customers using this in production for data science work.
 I’m currently using this for one of my projects on a cluster as well.

 Also, here is a blog that describes how to configure this.


 http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/


 Guru Medasani
 gdm...@gmail.com



 On Aug 18, 2015, at 8:35 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi spark users and developers,

 Did anyone have IPython Notebook (Jupyter) deployed in production that
 uses Spark as the computational engine?

 I know Databricks Cloud provides similar features with deeper
 integration with Spark. However, Databricks Cloud has to be hosted by
 Databricks so we cannot do this.

 Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython
 has already offered years ago. It would be great if someone can educate me
 the reason behind this.

 Best Regards,

 Jerry







Re: Is it this a BUG?: Why Spark Flume Streaming job is not deploying the Receiver to the specified host?

2015-08-18 Thread diplomatic Guru
Thank you Tathagata for your response.  Yes, I'm using push model on Spark
1.2. For my scenario I do prefer the push model. Is this the case on the
later version 1.4 too?

I think I can find a workaround for this issue but only if I know how to
obtain the worker(executor) ID. I can get the detail of the driver like
this:

*ss.ssc().env().blockManager().blockManagerId().host()*

*But not sure how I could the executor Id from the driver.*

*When the job is submitted, I can see that blockmanager being registered
with the Driver and Executor IP address:*








*15/08/18 23:31:40 INFO YarnClientSchedulerBackend: Registered executor:
Actor[akka.tcp://sparkExecutor@05151113997207:41630/user/Executor#1210147506]
with ID 115/08/18 23:31:40 INFO RackResolver: Resolved 05151113997207 to
/0513_R-0050/RJ0515/08/18 23:31:41 INFO BlockManagerMasterActor:
Registering block manager 05151113997207:56921 with 530.3 MB RAM,
BlockManagerId(1, 05151113997207, 56921)The BlockManagerMasterActor appears
to be doing the registering. Is there anyway I can access this from the
SparkContext?Thanks.*



On 18 August 2015 at 22:40, Tathagata Das t...@databricks.com wrote:

 Are you using the Flume polling stream or the older stream?

 Such problems of binding used to occur in the older push-based approach,
 hence we built the polling stream (pull-based).


 On Tue, Aug 18, 2015 at 4:45 AM, diplomatic Guru diplomaticg...@gmail.com
  wrote:

 I'm testing the Flume + Spark integration example (flume count).

 I'm deploying the job using yarn cluster mode.

 I first logged into the Yarn cluster, then submitted the job and passed
 in a specific worker node's IP to deploy the job. But when I checked the
 WebUI, it failed to bind to the specified IP because the receiver was
 deployed to a different host, not the one I asked it to. Do you know?

 For your information,  I've also tried passing the IP address used by the
 resource manager to find resources but no joy. But when I set the host to
 'localhost' and deploy to the cluster it is binding a worker node that is
 selected by the resource manager.






Re: What is the reason for ExecutorLostFailure?

2015-08-18 Thread Corey Nolet
Usually more information as to the cause of this will be found down in your
logs. I generally see this happen when an out of memory exception has
occurred for one reason or another on an executor. It's possible your
memory settings are too small per executor or the concurrent number of
tasks you are running are too large for some of the executors. Other times,
it's possible using RDD functions like groupBy() that collect an unbounded
amount of items into memory could be causing it.

Either way, the logs for the executors should be able to give you some
insight, have you looked at those yet?

On Tue, Aug 18, 2015 at 6:26 PM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io
 wrote:

 Hi All

 Why am I getting ExecutorLostFailure and executors are completely lost
 for rest of the processing? Eventually it makes job to fail. One thing for
 sure that lot of shuffling happens across executors in my program.

 Is there a way to understand and debug ExecutorLostFailure? Any pointers
 regarding “ExecutorLostFailure” would help me a lot.

 Thanks
 Vijay



Re: Is it this a BUG?: Why Spark Flume Streaming job is not deploying the Receiver to the specified host?

2015-08-18 Thread Tathagata Das
I dont think there is a super clean way for doing this. Here is an idea.
Run a dummy job with large number of partitions/tasks, which will access
SparkEnv.get.blockManager().blockManagerId().host() and return it.
sc.makeRDD(1 to 100, 100).map { _ =
SparkEnv.get.blockManager().blockManagerId().host()
}.collect().distinct()

But that said, I do recommend using the pull based model. With the push
based model it become realy hard to deal with scenarios where the whole
node (where the receiver is supposed to run) goes down and the receiver
cannot run anywhere else.


On Tue, Aug 18, 2015 at 5:25 PM, diplomatic Guru diplomaticg...@gmail.com
wrote:

 Thank you Tathagata for your response.  Yes, I'm using push model on Spark
 1.2. For my scenario I do prefer the push model. Is this the case on the
 later version 1.4 too?

 I think I can find a workaround for this issue but only if I know how to
 obtain the worker(executor) ID. I can get the detail of the driver like
 this:

 *ss.ssc().env().blockManager().blockManagerId().host()*

 *But not sure how I could the executor Id from the driver.*

 *When the job is submitted, I can see that blockmanager being registered
 with the Driver and Executor IP address:*








 *15/08/18 23:31:40 INFO YarnClientSchedulerBackend: Registered executor:
 Actor[akka.tcp://sparkExecutor@05151113997207:41630/user/Executor#1210147506]
 with ID 115/08/18 23:31:40 INFO RackResolver: Resolved 05151113997207 to
 /0513_R-0050/RJ0515/08/18 23:31:41 INFO BlockManagerMasterActor:
 Registering block manager 05151113997207:56921 with 530.3 MB RAM,
 BlockManagerId(1, 05151113997207, 56921)The BlockManagerMasterActor appears
 to be doing the registering. Is there anyway I can access this from the
 SparkContext?Thanks.*



 On 18 August 2015 at 22:40, Tathagata Das t...@databricks.com wrote:

 Are you using the Flume polling stream or the older stream?

 Such problems of binding used to occur in the older push-based approach,
 hence we built the polling stream (pull-based).


 On Tue, Aug 18, 2015 at 4:45 AM, diplomatic Guru 
 diplomaticg...@gmail.com wrote:

 I'm testing the Flume + Spark integration example (flume count).

 I'm deploying the job using yarn cluster mode.

 I first logged into the Yarn cluster, then submitted the job and passed
 in a specific worker node's IP to deploy the job. But when I checked the
 WebUI, it failed to bind to the specified IP because the receiver was
 deployed to a different host, not the one I asked it to. Do you know?

 For your information,  I've also tried passing the IP address used by
 the resource manager to find resources but no joy. But when I set the host
 to 'localhost' and deploy to the cluster it is binding a worker node that
 is selected by the resource manager.







Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Tathagata Das
Why are you even trying to broadcast a producer? A broadcast variable is
some immutable piece of serializable DATA that can be used for processing
on the executors. A Kafka producer is neither DATA nor immutable, and
definitely not serializable.
The right way to do this is to create the producer in the executors. Please
see the discussion in the programming guide
http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org wrote:

 I wouldn't expect a kafka producer to be serializable at all... among
 other things, it has a background thread

 On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan 
 wansheng...@gmail.com wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this in
 the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get the
 wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
 at
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
 at
 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85)
 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
 at
 org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
 at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
 at
 org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
 at my driver
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 

Re: Spark + Jupyter (IPython Notebook)

2015-08-18 Thread Guru Medasani
For python it is really great. 

There is some work in progress in bringing Scala support to Jupyter as well.

https://github.com/hohonuuli/sparknotebook 
https://github.com/hohonuuli/sparknotebook

https://github.com/alexarchambault/jupyter-scala 
https://github.com/alexarchambault/jupyter-scala


Guru Medasani
gdm...@gmail.com



 On Aug 18, 2015, at 12:29 PM, Jerry Lam chiling...@gmail.com wrote:
 
 Hi Guru,
 
 Thanks! Great to hear that someone tried it in production. How do you like it 
 so far?
 
 Best Regards,
 
 Jerry
 
 
 On Tue, Aug 18, 2015 at 11:38 AM, Guru Medasani gdm...@gmail.com 
 mailto:gdm...@gmail.com wrote:
 Hi Jerry,
 
 Yes. I’ve seen customers using this in production for data science work. I’m 
 currently using this for one of my projects on a cluster as well. 
 
 Also, here is a blog that describes how to configure this. 
 
 http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/
  
 http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/
 
 
 Guru Medasani
 gdm...@gmail.com mailto:gdm...@gmail.com
 
 
 
 On Aug 18, 2015, at 8:35 AM, Jerry Lam chiling...@gmail.com 
 mailto:chiling...@gmail.com wrote:
 
 Hi spark users and developers,
 
 Did anyone have IPython Notebook (Jupyter) deployed in production that uses 
 Spark as the computational engine? 
 
 I know Databricks Cloud provides similar features with deeper integration 
 with Spark. However, Databricks Cloud has to be hosted by Databricks so we 
 cannot do this. 
 
 Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython has 
 already offered years ago. It would be great if someone can educate me the 
 reason behind this.
 
 Best Regards,
 
 Jerry
 
 



Re: Spark + Jupyter (IPython Notebook)

2015-08-18 Thread andy petrella
Hey,

Actually, for Scala, I'd better using
https://github.com/andypetrella/spark-notebook/

It's deployed at several places like *Alibaba*, *EBI*, *Cray* and is
supported by both the Scala community and the company Data Fellas.
For instance, it was part of the Big Scala Pipeline training given this
16th August at Galvanize in San Francisco with the collaboration of *Datastax,
Mesosphere, Databricks, Confluent and Typesafe*:
http://scala.bythebay.io/pipeline.html. It was a successful 100+ attendants
training day.

Also, it's the only one fully reactive including a reactive plotting
library in Scala, allowing you to creatively plot a moving average computed
in a DStream, or a D3 Graph layout dynamically updated or even a dynamic
map of the received tweets having geoloc set. Of course, you can plot
lines, pies, bars, hist, boxplot for any kind of data, being Dataframe, SQL
stuffs, Seq, List, Map or whatever of tuples or classes.

Checkout http://spark-notebook.io/, for your specific distro.
Note that you can also use it directly on DCOS.

For any question, I'll be glad helping you on the ~200 crowded gitter
chatroom: https://gitter.im/andypetrella/spark-notebook

cheers and have fun :-)


On Tue, Aug 18, 2015 at 10:24 PM Guru Medasani gdm...@gmail.com wrote:

 For python it is really great.

 There is some work in progress in bringing Scala support to Jupyter as
 well.

 https://github.com/hohonuuli/sparknotebook

 https://github.com/alexarchambault/jupyter-scala


 Guru Medasani
 gdm...@gmail.com



 On Aug 18, 2015, at 12:29 PM, Jerry Lam chiling...@gmail.com wrote:

 Hi Guru,

 Thanks! Great to hear that someone tried it in production. How do you like
 it so far?

 Best Regards,

 Jerry


 On Tue, Aug 18, 2015 at 11:38 AM, Guru Medasani gdm...@gmail.com wrote:

 Hi Jerry,

 Yes. I’ve seen customers using this in production for data science work.
 I’m currently using this for one of my projects on a cluster as well.

 Also, here is a blog that describes how to configure this.


 http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/


 Guru Medasani
 gdm...@gmail.com



 On Aug 18, 2015, at 8:35 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi spark users and developers,

 Did anyone have IPython Notebook (Jupyter) deployed in production that
 uses Spark as the computational engine?

 I know Databricks Cloud provides similar features with deeper integration
 with Spark. However, Databricks Cloud has to be hosted by Databricks so we
 cannot do this.

 Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython
 has already offered years ago. It would be great if someone can educate me
 the reason behind this.

 Best Regards,

 Jerry




 --
andy


Failed to fetch block error

2015-08-18 Thread swetha
Hi,

I see the following error in my Spark Job even after using like 100 cores
and 16G memory. Did any of you experience the same problem earlier?

15/08/18 21:51:23 ERROR shuffle.RetryingBlockFetcher: Failed to fetch block
input-0-1439959114400, and will not retry (0 retries)
java.lang.RuntimeException: java.io.FileNotFoundException:
/data1/spark/spark-aed30958-2ee1-4eb7-984e-6402fb0a0503/blockmgr-ded36b52-ccc7-48dc-ba05-65bb21fc4136/34/input-0-1439959114400
(Too many open files)
at java.io.RandomAccessFile.open(Native Method)
at java.io.RandomAccessFile.init(RandomAccessFile.java:241)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-fetch-block-error-tp24335.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Too many files/dirs in hdfs

2015-08-18 Thread UMESH CHAUDHARY
Of course, Java or Scala can do that:
1) Create a FileWriter with append or roll over option
2) For each RDD create a StringBuilder after applying your filters
3) Write this StringBuilder to File when you want to write (The duration
can be defined as a condition)

On Tue, Aug 18, 2015 at 11:05 PM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 Is there a way to store all the results in one file and keep the file roll
 over separate than the spark streaming batch interval?

 On Mon, Aug 17, 2015 at 2:39 AM, UMESH CHAUDHARY umesh9...@gmail.com
 wrote:

 In Spark Streaming you can simply check whether your RDD contains any
 records or not and if records are there you can save them using
 FIleOutputStream:

 DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE
 YOUR STUFF} };

 This will not create unnecessary files of 0 bytes.

 On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Currently, spark streaming would create a new directory for every batch
 and store the data to it (whether it has anything or not). There is no
 direct append call as of now, but you can achieve this either with
 FileUtil.copyMerge
 http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167
 or have a separate program which will do the clean up for you.

 Thanks
 Best Regards

 On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 Spark stream seems to be creating 0 bytes files even when there is no
 data. Also, I have 2 concerns here:

 1) Extra unnecessary files is being created from the output
 2) Hadoop doesn't work really well with too many files and I see that
 it is creating a directory with a timestamp every 1 second. Is there a
 better way of writing a file, may be use some kind of append mechanism
 where one doesn't have to change the batch interval.







Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Marcin Kuthan
As long as Kafka producent is thread-safe you don't need any pool at all.
Just share single producer on every executor. Please look at my blog post
for more details. http://allegro.tech/spark-kafka-integration.html
19 sie 2015 2:00 AM Shenghua(Daniel) Wan wansheng...@gmail.com
napisał(a):

 All of you are right.

 I was trying to create too many producers. My idea was to create a
 pool(for now the pool contains only one producer) shared by all the
 executors.
 After I realized it was related to the serializable issues (though I did
 not find clear clues in the source code to indicate the broacast template
 type parameter must be implement serializable),  I followed spark cassandra
 connector design and created a singleton of Kafka producer pools. There is
 not exception noticed.

 Thanks for all your comments.


 On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Why are you even trying to broadcast a producer? A broadcast variable is
 some immutable piece of serializable DATA that can be used for processing
 on the executors. A Kafka producer is neither DATA nor immutable, and
 definitely not serializable.
 The right way to do this is to create the producer in the executors.
 Please see the discussion in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

 On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I wouldn't expect a kafka producer to be serializable at all... among
 other things, it has a background thread

 On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan 
 wansheng...@gmail.com wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this
 in the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get the
 wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
 at
 

SparkR csv without headers

2015-08-18 Thread Franc Carter
Hi,

Does anyone have an example of how to create a DataFrame in SparkR  which
specifies the column names - the csv files I have do not have column names
in the first row. I can get read a csv nicely with
com.databricks:spark-csv_2.10:1.0.3, but I end up with column names C1, C2,
C3 etc


thanks

-- 

*Franc Carter* I  Systems ArchitectI RoZetta Technology



[image: Description: Description: Description:
cid:image003.jpg@01D02903.9B540580]



L4. 55 Harrington Street,  THE ROCKS,  NSW, 2000

PO Box H58, Australia Square, Sydney NSW, 1215, AUSTRALIA

*T*  +61 2 8355 2515 Iwww.rozettatechnology.com

[image: cid:image002.jpg@01D02903.0B41B280]

DISCLAIMER: The contents of this email, inclusive of attachments, may be
legally

privileged and confidential. Any unauthorised use of the contents is
expressly prohibited.


Repartitioning external table in Spark sql

2015-08-18 Thread James Pirz
I am using Spark 1.4.1 , in stand-alone mode, on a cluster of 3 nodes.

Using Spark sql and Hive Context, I am trying to run a simple scan query on
an existing Hive table (which is an external table consisting of rows in
text files stored in HDFS - it is NOT parquet, ORC or any other richer
format).

DataFrame res = hiveCtx.sql(SELECT * FROM lineitem WHERE L_LINENUMBER 
0);

What I observe is the performance of this full scan in Spark is not
comparable with Hive (it is almost 4 times slower). Checking the resource
usage, what I see is workers/executors do not do parallel scans but they
scan on a per-node basis (first executors from the worker(s) on node 1 do
reading from disk, while other two nodes are not doing I/O and just receive
data from the first node and through network, then 2nd node does the scan
and then the third one).
I also realized that if I load this data file directly from my spark
context (using textFile() ) and run count() on that (not using spark sql)
then I can get a better performance by increasing number of partitions. I
am just trying to do the same thing (increasing number of partitions in the
beginning) in Spark sql as:

var tab = sqlContext.read.table(lineitem);
tab.repartition(1000);
OR
tab.coalesce(1000);

but none of repartition() or coalesce() methods actually work - they do not
return an error, but if I check

var p = tab.rdd.partitions.size;

before and after calling any of them, it returns the same number of
partitions.

I am just wondering how I can change the number of partitions for a Hive
external table, in Spark Sql.

Any help/suggestion would be appreciated.


Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Tathagata Das
Its a cool blog post! Tweeted it!
Broadcasting the configuration necessary for lazily instantiating the
producer is a good idea.

Nitpick: The first code example has an extra `}` ;)

On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan marcin.kut...@gmail.com
wrote:

 As long as Kafka producent is thread-safe you don't need any pool at all.
 Just share single producer on every executor. Please look at my blog post
 for more details. http://allegro.tech/spark-kafka-integration.html
 19 sie 2015 2:00 AM Shenghua(Daniel) Wan wansheng...@gmail.com
 napisał(a):

 All of you are right.

 I was trying to create too many producers. My idea was to create a
 pool(for now the pool contains only one producer) shared by all the
 executors.
 After I realized it was related to the serializable issues (though I did
 not find clear clues in the source code to indicate the broacast template
 type parameter must be implement serializable),  I followed spark cassandra
 connector design and created a singleton of Kafka producer pools. There is
 not exception noticed.

 Thanks for all your comments.


 On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Why are you even trying to broadcast a producer? A broadcast variable is
 some immutable piece of serializable DATA that can be used for processing
 on the executors. A Kafka producer is neither DATA nor immutable, and
 definitely not serializable.
 The right way to do this is to create the producer in the executors.
 Please see the discussion in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

 On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I wouldn't expect a kafka producer to be serializable at all... among
 other things, it has a background thread

 On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan 
 wansheng...@gmail.com wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this
 in the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get the
 wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at

Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-18 Thread Cody Koeninger
The superclass method in DStream is defined as returning an Option[RDD[T]]

On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 Getting compilation error while overriding compute method of
 DirectKafkaInputDStream.


 [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
 compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
 cannot override compute(org.apache.spark.streaming.Time) in
 org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
 return type

 [ERROR] found   :
 scala.Optionorg.apache.spark.streaming.kafka.KafkaRDDbyte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]

 [ERROR] required: scala.Optionorg.apache.spark.rdd.RDDbyte[][]


 class :

 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 @Override
 public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder,
 byte[][] compute(
 Time validTime) {

 int processed=processedCounter.value();
 int failed = failedProcessingsCounter.value();
 if((processed==failed)){
 System.out.println(backing off since its 100 % failure);
 return Option.empty();
 }else{
 System.out.println(starting the stream );

 return super.compute(validTime);
 }
 }
 }


 What should be the return type of compute method ? super class is
 returning OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder,
 byte[][]  but its expecting
  scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived  class . Is
 there something wring with code?

 On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Look at the definitions of the java-specific
 KafkaUtils.createDirectStream methods (the ones that take a
 JavaStreamingContext)

 On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 How to create classtag in java ?Also Constructor
 of DirectKafkaInputDStream takes Function1 not Function but
 kafkautils.createDirectStream allows function.

 I have below as overriden DirectKafkaInputDStream.


 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 public CustomDirectKafkaInputDstream(
 StreamingContext ssc_,
 MapString, String kafkaParams,
 MapTopicAndPartition, Object fromOffsets,
 Function1MessageAndMetadatabyte[], byte[], byte[][] messageHandler,
 ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2,
 ClassTagDefaultDecoder evidence$3,
 ClassTagDefaultDecoder evidence$4, ClassTagbyte[][] evidence$5) {
 super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
 evidence$2,
 evidence$3, evidence$4, evidence$5);
 }
 @Override
 public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder,
 byte[][] compute(
 Time validTime) {
 int processe=processedCounter.value();
 int failed = failedProcessingsCounter.value();
 if((processed==failed)){
 System.out.println(backing off since its 100 % failure);
 return Option.empty();
 }else{
 System.out.println(starting the stream );

 return super.compute(validTime);
 }
 }



 To create this stream
 I am using
 scala.collection.immutable.MapString, String scalakafkaParams =
 JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.Tuple2String,
 Stringconforms());
 scala.collection.immutable.MapTopicAndPartition, Long
 scalaktopicOffsetMap=
 JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.Tuple2TopicAndPartition,
 Longconforms());

 scala.Function1MessageAndMetadatabyte[], byte[], byte[][] handler =
 new FunctionMessageAndMetadatabyte[], byte[], byte[][]() {
 ..});
 JavaDStreambyte[][] directKafkaStream = new
 CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
 handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
 kafka.serializer.DefaultDecoder.class,byte[][].class);



 How to pass classTag to constructor in CustomDirectKafkaInputDstream ?
 And how to use Function instead of Function1 ?



 On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger c...@koeninger.org
 wrote:

 I'm not aware of an existing api per se, but you could create your own
 subclass of the DStream that returns None for compute() under certain
 conditions.



 On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi Cody

 Can you help here if streaming 1.3 has any api for not consuming any
 message in next few runs?

 Thanks

 -- Forwarded message --
 From: Shushant Arora shushantaror...@gmail.com
 Date: Wed, Aug 12, 2015 at 11:23 PM
 Subject: spark streaming 1.3 doubts(force it to not consume anything)
 To: user user@spark.apache.org


 I Can't make my stream application batch interval to change at run
 time . Its always fixed and it always creates jobs at specified batch
 inetval and enqueue them if earleir batch is not finished.

 

Re: Spark 1.4.1 - Mac OSX Yosemite

2015-08-18 Thread Charlie Hack
Looks like Scala 2.11.6 and Java 1.7.0_79.

✔ ~
09:17 $ scala
Welcome to Scala version 2.11.6 (Java HotSpot(TM) 64-Bit Server VM, Java
1.7.0_79).
Type in expressions to have them evaluated.
Type :help for more information.

scala

✔ ~
09:26 $ echo $JAVA_HOME
/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home



On Mon, Aug 17, 2015 at 11:11 PM, Alun Champion a...@achampion.net wrote:

 Yes, they both are set. Just recompiled and still no success, silent
 failure.
 Which versions of java and scala are you using?


 On 17 August 2015 at 19:59, Charlie Hack charles.t.h...@gmail.com wrote:

 I had success earlier today on OSX Yosemite 10.10.4 building Spark 1.4.1
 using these instructions
 http://genomegeek.blogspot.com/2014/11/how-to-install-apache-spark-on-mac-os-x.html
  (using
 `$ sbt/sbt clean assembly`, with the additional step of downloading the
 proper sbt-launch.jar (0.13.7) from here
 http://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.7/
 and replacing the one that is in build/ as you noted. You've set SCALA_HOME
 and JAVA_HOME environment variables?

 On Mon, Aug 17, 2015 at 8:36 PM, Alun Champion a...@achampion.net
 wrote:

 Has anyone experienced issues running Spark 1.4.1 on a Mac OSX Yosemite?

 I'm been running a standalone 1.3.1 fine but it failed when trying to
 run 1.4.1. (I also trie 1.4.0).

 I've tried both the pre-built packages as well as compiling from source,
 both with the same results (I can successfully compile with both mvn and
 sbt (after fixing the sbt.jar - which was corrupt)
 After downloading/building spark and running ./bin/pyspark or
 ./bin/spark-shell it silently exits with a code 1.
 Creating a context in python I get: Exception: Java gateway process
 exited before sending the driver its port number

 I couldn't find any specific resolutions on the web.
 I did add 'pyspark-shell' to the PYSPARK_SUBMIT_ARGS but to no effect.

 Anyone have any further ideas I can explore?
 Cheers
-Alun.




 --
 # +17344761472





-- 
# +17344761472


Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation

2015-08-18 Thread Cody Koeninger
Python version has been available since 1.4.  It should be close to feature
parity with the jvm version in 1.5

On Tue, Aug 18, 2015 at 9:36 AM, ayan guha guha.a...@gmail.com wrote:

 Hi Cody

 A non-related question. Any idea when Python-version of direct receiver is
 expected? Me personally looking forward to it :)

 On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c...@koeninger.org
 wrote:

 The solution you found is also in the docs:

 http://spark.apache.org/docs/latest/streaming-kafka-integration.html

 Java uses an atomic reference because Java doesn't allow you to close
 over non-final references.

 I'm not clear on your other question.

 On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak oss.mli...@gmail.com wrote:

 The solution how to share offsetRanges after DirectKafkaInputStream is
 transformed is in:

 https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala

 https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java

 One thing I would like to understand is why Scala version is using
 normal variable while Java version uses AtomicReference.

 Another thing which I don't get is about closure serialization. The
 question why logger in the below code doesn't throw NPE even its instance
 isn't copied like in the case of offsetRanges, when val offsets =
 offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws
 on offsets(idx). I have something like this code:

 object StreamOps {

   val logger = LoggerFactory.getLogger(StreamOps)
   var offsetRanges = Array[OffsetRange]()

 def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = {
   stream transform { rdd =
 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

 rdd flatmap { message =
   Try(... decode Array[Byte] to F...) match {
 case Success(fact) = Some(fact)
 case _ = None
 }
   }
 }

 // Error handling removed for brevity
 def save[F](stream: DStream[F]): Unit {
   stream foreachRDD { rdd =
 // It has to be here otherwise NullPointerException
 val offsets = offsetRanges

 rdd mapartitionWithIndex { (idx, facts) =
   // Use offsets here
   val writer = new MyWriter[F](offsets(idx), ...)

   facts foreach { fact =
 writer.write(fact)
   }

   writer.close()

   // Why logger works and doesn't throw NullPointerException?
   logger.info(...)

   Iterator.empty
 } foreach {
   (_: Nothing) =
 }
   }
 }

 Many thanks for any advice, I'm sure its a noob question.
 Petr

 On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak oss.mli...@gmail.com
 wrote:

 Or can I generally create new RDD from transformation and enrich its
 partitions with some metadata so that I would copy OffsetRanges in my new
 RDD in DStream?

 On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak oss.mli...@gmail.com
 wrote:

 Hi all,
 I need to transform KafkaRDD into a new stream of deserialized case
 classes. I want to use the new stream to save it to file and to perform
 additional transformations on it.

 To save it I want to use offsets in filenames, hence I need
 OffsetRanges in transformed RDD. But KafkaRDD is private, hence I don't
 know how to do it.

 Alternatively I could deserialize directly in messageHandler before
 KafkaRDD but it seems it is 1:1 transformation while I need to drop bad
 messages (KafkaRDD = RDD it would be flatMap).

 Is there a way how to do it using messageHandler, is there another
 approach?

 Many thanks for any help.
 Petr







 --
 Best Regards,
 Ayan Guha



Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-18 Thread Shushant Arora
looking at source code of
org.apache.spark.streaming.kafka.DirectKafkaInputDStream

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
val rdd = KafkaRDD[K, V, U, T, R](
  context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
messageHandler)

currentOffsets = untilOffsets.map(kv = kv._1 - kv._2.offset)
Some(rdd)
  }


But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,

So what should  be the return type of custom DStream extends
DirectKafkaInputDStream .
Since I want the behaviour to be same as of DirectKafkaInputDStream  in
normal scenarios and return none in specific scenario.

And why the same error did not come while extending DirectKafkaInputDStream
from InputDStream ? Since new return type Option[KafkaRDD[K, V, U, T, R]] is
not subclass of Option[RDD[T] so it should have been failed?




On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org wrote:

 The superclass method in DStream is defined as returning an Option[RDD[T]]

 On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Getting compilation error while overriding compute method of
 DirectKafkaInputDStream.


 [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
 compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
 cannot override compute(org.apache.spark.streaming.Time) in
 org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
 return type

 [ERROR] found   :
 scala.Optionorg.apache.spark.streaming.kafka.KafkaRDDbyte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]

 [ERROR] required: scala.Optionorg.apache.spark.rdd.RDDbyte[][]


 class :

 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 @Override
 public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder,
 byte[][] compute(
 Time validTime) {

 int processed=processedCounter.value();
 int failed = failedProcessingsCounter.value();
 if((processed==failed)){
 System.out.println(backing off since its 100 % failure);
 return Option.empty();
 }else{
 System.out.println(starting the stream );

 return super.compute(validTime);
 }
 }
 }


 What should be the return type of compute method ? super class is
 returning OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder,
 byte[][]  but its expecting
  scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived  class . Is
 there something wring with code?

 On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Look at the definitions of the java-specific
 KafkaUtils.createDirectStream methods (the ones that take a
 JavaStreamingContext)

 On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 How to create classtag in java ?Also Constructor
 of DirectKafkaInputDStream takes Function1 not Function but
 kafkautils.createDirectStream allows function.

 I have below as overriden DirectKafkaInputDStream.


 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 public CustomDirectKafkaInputDstream(
 StreamingContext ssc_,
 MapString, String kafkaParams,
 MapTopicAndPartition, Object fromOffsets,
 Function1MessageAndMetadatabyte[], byte[], byte[][] messageHandler,
 ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2,
 ClassTagDefaultDecoder evidence$3,
 ClassTagDefaultDecoder evidence$4, ClassTagbyte[][] evidence$5) {
 super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
 evidence$2,
 evidence$3, evidence$4, evidence$5);
 }
 @Override
 public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder,
 byte[][] compute(
 Time validTime) {
 int processe=processedCounter.value();
 int failed = failedProcessingsCounter.value();
 if((processed==failed)){
 System.out.println(backing off since its 100 % failure);
 return Option.empty();
 }else{
 System.out.println(starting the stream );

 return super.compute(validTime);
 }
 }



 To create this stream
 I am using
 scala.collection.immutable.MapString, String scalakafkaParams =
 JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.Tuple2String,
 Stringconforms());
 scala.collection.immutable.MapTopicAndPartition, Long
 scalaktopicOffsetMap=
 JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.Tuple2TopicAndPartition,
 Longconforms());

 scala.Function1MessageAndMetadatabyte[], byte[], byte[][] handler =
 new FunctionMessageAndMetadatabyte[], byte[], byte[][]() {
 ..});
 JavaDStreambyte[][] directKafkaStream = new
 CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
 handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
 

Spark + Jupyter (IPython Notebook)

2015-08-18 Thread Jerry Lam
Hi spark users and developers,

Did anyone have IPython Notebook (Jupyter) deployed in production that uses
Spark as the computational engine?

I know Databricks Cloud provides similar features with deeper integration
with Spark. However, Databricks Cloud has to be hosted by Databricks so we
cannot do this.

Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython has
already offered years ago. It would be great if someone can educate me the
reason behind this.

Best Regards,

Jerry


Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation

2015-08-18 Thread Cody Koeninger
The solution you found is also in the docs:

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

Java uses an atomic reference because Java doesn't allow you to close over
non-final references.

I'm not clear on your other question.

On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak oss.mli...@gmail.com wrote:

 The solution how to share offsetRanges after DirectKafkaInputStream is
 transformed is in:

 https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala

 https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java

 One thing I would like to understand is why Scala version is using normal
 variable while Java version uses AtomicReference.

 Another thing which I don't get is about closure serialization. The
 question why logger in the below code doesn't throw NPE even its instance
 isn't copied like in the case of offsetRanges, when val offsets =
 offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws
 on offsets(idx). I have something like this code:

 object StreamOps {

   val logger = LoggerFactory.getLogger(StreamOps)
   var offsetRanges = Array[OffsetRange]()

 def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = {
   stream transform { rdd =
 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

 rdd flatmap { message =
   Try(... decode Array[Byte] to F...) match {
 case Success(fact) = Some(fact)
 case _ = None
 }
   }
 }

 // Error handling removed for brevity
 def save[F](stream: DStream[F]): Unit {
   stream foreachRDD { rdd =
 // It has to be here otherwise NullPointerException
 val offsets = offsetRanges

 rdd mapartitionWithIndex { (idx, facts) =
   // Use offsets here
   val writer = new MyWriter[F](offsets(idx), ...)

   facts foreach { fact =
 writer.write(fact)
   }

   writer.close()

   // Why logger works and doesn't throw NullPointerException?
   logger.info(...)

   Iterator.empty
 } foreach {
   (_: Nothing) =
 }
   }
 }

 Many thanks for any advice, I'm sure its a noob question.
 Petr

 On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak oss.mli...@gmail.com wrote:

 Or can I generally create new RDD from transformation and enrich its
 partitions with some metadata so that I would copy OffsetRanges in my new
 RDD in DStream?

 On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak oss.mli...@gmail.com wrote:

 Hi all,
 I need to transform KafkaRDD into a new stream of deserialized case
 classes. I want to use the new stream to save it to file and to perform
 additional transformations on it.

 To save it I want to use offsets in filenames, hence I need OffsetRanges
 in transformed RDD. But KafkaRDD is private, hence I don't know how to do
 it.

 Alternatively I could deserialize directly in messageHandler before
 KafkaRDD but it seems it is 1:1 transformation while I need to drop bad
 messages (KafkaRDD = RDD it would be flatMap).

 Is there a way how to do it using messageHandler, is there another
 approach?

 Many thanks for any help.
 Petr






Java 8 lambdas

2015-08-18 Thread Kristoffer Sjögren
Hi

Is there a way to execute spark jobs with Java 8 lambdas instead of
using anonymous inner classes as seen in the examples?

I think I remember seeing real lambdas in the examples before and in
articles [1]?

Cheers,
-Kristoffer

[1] 
http://blog.cloudera.com/blog/2014/04/making-apache-spark-easier-to-use-in-java-with-java-8

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark works with the data in another cluster(Elasticsearch)

2015-08-18 Thread gen tang
Hi,

Currently, I have my data in the cluster of Elasticsearch and I try to use
spark to analyse those data.
The cluster of Elasticsearch and the cluster of spark are two different
clusters. And I use hadoop input format(es-hadoop) to read data in ES.

I am wondering how this environment affect the speed of analysis.
If I understand well, spark will read data from ES cluster and do calculate
on its own cluster(include writing shuffle result on its own machine), Is
this right? If this is correct, I think that the performance will just a
little bit slower than the data stored on the same cluster.

I will be appreciated if someone can share his/her experience about using
spark with elasticsearch.

Thanks a lot in advance for your help.

Cheers
Gen


Re: Java 8 lambdas

2015-08-18 Thread Sean Owen
Yes, it should Just Work. lambdas can be used for any method that
takes an instance of an interface with one method, and that describes
Function, PairFunction, etc.

On Tue, Aug 18, 2015 at 3:23 PM, Kristoffer Sjögren sto...@gmail.com wrote:
 Hi

 Is there a way to execute spark jobs with Java 8 lambdas instead of
 using anonymous inner classes as seen in the examples?

 I think I remember seeing real lambdas in the examples before and in
 articles [1]?

 Cheers,
 -Kristoffer

 [1] 
 http://blog.cloudera.com/blog/2014/04/making-apache-spark-easier-to-use-in-java-with-java-8

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Scala: How to match a java object????

2015-08-18 Thread Saif.A.Ellafi
Hi all,

I am trying to run a spark job, in which I receive java.math.BigDecimal 
objects, instead of the scala equivalents, and I am trying to convert them into 
Doubles.
If I try to match-case this object class, I get: error: object 
java.math.BigDecimal is not a value

How could I get around matching java objects? I would like to avoid a multiple 
try-catch on ClassCastExceptions for all my checks.

Thank you,
Saif



broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Shenghua(Daniel) Wan
Hi,
Did anyone see java.util.ConcurrentModificationException when using
broadcast variables?
I encountered this exception when wrapping a Kafka producer like this in
the spark streaming driver.

Here is what I did.
KafkaProducerString, String producer = new KafkaProducerString,
String(properties);
final BroadcastKafkaDataProducer bCastProducer
= streamingContext.sparkContext().broadcast(producer);

Then within an closure called by a foreachRDD, I was trying to get the
wrapped producer, i.e.
 KafkaProducerString, String p = bCastProducer.value();

after rebuilding and rerunning, I got the stack trace like this

Exception in thread main com.esotericsoftware.kryo.KryoException:
java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.spark.util.MutableURLClassLoader)
contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
ioThread (org.apache.kafka.clients.producer.KafkaProducer)
producer (my driver)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
at
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
at
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
at
org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
at my driver
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.ConcurrentModificationException
at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
at java.util.Vector$Itr.next(Vector.java:1133)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Cody Koeninger
I wouldn't expect a kafka producer to be serializable at all... among other
things, it has a background thread

On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan wansheng...@gmail.com
 wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this in
 the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get the
 wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
 at
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
 at
 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85)
 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
 at
 org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
 at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
 at
 org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
 at my driver
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.util.ConcurrentModificationException
 at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
 at 

Spark and ActorSystem

2015-08-18 Thread maxdml
Hi,

I'd like to know where I could find more information related to the
depreciation of the actor system in spark (from 1.4.x).

I'm interested in the reasons for this decision,

Cheers



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-ActorSystem-tp24321.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark + Jupyter (IPython Notebook)

2015-08-18 Thread Guru Medasani
Hi Jerry,

Yes. I’ve seen customers using this in production for data science work. I’m 
currently using this for one of my projects on a cluster as well. 

Also, here is a blog that describes how to configure this. 

http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/
 
http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/


Guru Medasani
gdm...@gmail.com



 On Aug 18, 2015, at 8:35 AM, Jerry Lam chiling...@gmail.com wrote:
 
 Hi spark users and developers,
 
 Did anyone have IPython Notebook (Jupyter) deployed in production that uses 
 Spark as the computational engine? 
 
 I know Databricks Cloud provides similar features with deeper integration 
 with Spark. However, Databricks Cloud has to be hosted by Databricks so we 
 cannot do this. 
 
 Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython has 
 already offered years ago. It would be great if someone can educate me the 
 reason behind this.
 
 Best Regards,
 
 Jerry



Spark executor lost because of GC overhead limit exceeded even though using 20 executors using 25GB each

2015-08-18 Thread unk1102
Hi this GC overhead limit error is making me crazy. I have 20 executors using
25 GB each I dont understand at all how can it throw GC overhead I also dont
that that big datasets. Once this GC error occurs in executor it will get
lost and slowly other executors getting lost because of IOException, Rpc
client disassociated, shuffle not found etc Please help me solve this I am
getting mad as I am new to Spark. Thanks in advance.

WARN scheduler.TaskSetManager: Lost task 7.0 in stage 363.0 (TID 3373,
myhost.com): java.lang.OutOfMemoryError: GC overhead limit exceeded
at
org.apache.spark.sql.types.UTF8String.toString(UTF8String.scala:150)
at
org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:120)
at
org.apache.spark.sql.columnar.STRING$.actualSize(ColumnType.scala:312)
at
org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.gatherCompressibilityStats(compressionSchemes.scala:224)
at
org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.gatherCompressibilityStats(CompressibleColumnBuilder.scala:72)
at
org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:80)
at
org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87)
at
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:148)
at
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:124)
at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-executor-lost-because-of-GC-overhead-limit-exceeded-even-though-using-20-executors-using-25GB-h-tp24322.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org