Re: Improving performance of a kafka spark streaming app

2016-06-22 Thread Colin Kincaid Williams
Streaming UI tab showing empty events and very different metrics than on 1.5.2

On Thu, Jun 23, 2016 at 5:06 AM, Colin Kincaid Williams  wrote:
> After a bit of effort I moved from a Spark cluster running 1.5.2, to a
> Yarn cluster running 1.6.1 jars. I'm still setting the maxRPP. The
> completed batches are no longer showing the number of events processed
> in the Streaming UI tab . I'm getting around 4k inserts per second in
> hbase, but I haven't yet tried to remove or reset the mRPP.  I will
> attach a screenshot of the UI tab. It shows significantly lower
> figures for processing and delay times, than the previous posted shot.
> It also shows the batches as empty, however I see the requests hitting
> hbase.
>
> Then it's possible my issues were related to running on the Spark
> 1.5.2 cluster. Also is the missing event count in the completed
> batches a bug? Should I file an issue?
>
> On Tue, Jun 21, 2016 at 9:04 PM, Colin Kincaid Williams  
> wrote:
>> Thanks @Cody, I will try that out. In the interm, I tried to validate
>> my Hbase cluster by running a random write test and see 30-40K writes
>> per second. This suggests there is noticeable room for improvement.
>>
>> On Tue, Jun 21, 2016 at 8:32 PM, Cody Koeninger  wrote:
>>> Take HBase out of the equation and just measure what your read
>>> performance is by doing something like
>>>
>>> createDirectStream(...).foreach(_.println)
>>>
>>> not take() or print()
>>>
>>> On Tue, Jun 21, 2016 at 3:19 PM, Colin Kincaid Williams  
>>> wrote:
 @Cody I was able to bring my processing time down to a second by
 setting maxRatePerPartition as discussed. My bad that I didn't
 recognize it as the cause of my scheduling delay.

 Since then I've tried experimenting with a larger Spark Context
 duration. I've been trying to get some noticeable improvement
 inserting messages from Kafka -> Hbase using the above application.
 I'm currently getting around 3500 inserts / second on a 9 node hbase
 cluster. So far, I haven't been able to get much more throughput. Then
 I'm looking for advice here how I should tune Kafka and Spark for this
 job.

 I can create a kafka topic with as many partitions that I want. I can
 set the Duration and maxRatePerPartition. I have 1.7 billion messages
 that I can insert rather quickly into the Kafka queue, and I'd like to
 get them into Hbase as quickly as possible.

 I'm looking for advice regarding # Kafka Topic Partitions / Streaming
 Duration / maxRatePerPartition / any other spark settings or code
 changes that I should make to try to get a better consumption rate.

 Thanks for all the help so far, this is the first Spark application I
 have written.

 On Mon, Jun 20, 2016 at 12:32 PM, Colin Kincaid Williams  
 wrote:
> I'll try dropping the maxRatePerPartition=400, or maybe even lower.
> However even at application starts up I have this large scheduling
> delay. I will report my progress later on.
>
> On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger  
> wrote:
>> If your batch time is 1 second and your average processing time is
>> 1.16 seconds, you're always going to be falling behind.  That would
>> explain why you've built up an hour of scheduling delay after eight
>> hours of running.
>>
>> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams  
>> wrote:
>>> Hi Mich again,
>>>
>>> Regarding batch window, etc. I have provided the sources, but I'm not
>>> currently calling the window function. Did you see the program source?
>>> It's only 100 lines.
>>>
>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>
>>> Then I would expect I'm using defaults, other than what has been shown
>>> in the configuration.
>>>
>>> For example:
>>>
>>> In the launcher configuration I set --conf
>>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
>>> are 500 messages for the duration set in the application:
>>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
>>> Duration(1000));
>>>
>>>
>>> Then with the --num-executors 6 \ submit flag, and the
>>> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
>>> arrive at the 3000 events per batch in the UI, pasted above.
>>>
>>> Feel free to correct me if I'm wrong.
>>>
>>> Then are you suggesting that I set the window?
>>>
>>> Maybe following this as reference:
>>>
>>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>>>
>>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
>>>  wrote:
 Ok

 What is the set up for 

Re: Improving performance of a kafka spark streaming app

2016-06-22 Thread Colin Kincaid Williams
After a bit of effort I moved from a Spark cluster running 1.5.2, to a
Yarn cluster running 1.6.1 jars. I'm still setting the maxRPP. The
completed batches are no longer showing the number of events processed
in the Streaming UI tab . I'm getting around 4k inserts per second in
hbase, but I haven't yet tried to remove or reset the mRPP.  I will
attach a screenshot of the UI tab. It shows significantly lower
figures for processing and delay times, than the previous posted shot.
It also shows the batches as empty, however I see the requests hitting
hbase.

Then it's possible my issues were related to running on the Spark
1.5.2 cluster. Also is the missing event count in the completed
batches a bug? Should I file an issue?

On Tue, Jun 21, 2016 at 9:04 PM, Colin Kincaid Williams  wrote:
> Thanks @Cody, I will try that out. In the interm, I tried to validate
> my Hbase cluster by running a random write test and see 30-40K writes
> per second. This suggests there is noticeable room for improvement.
>
> On Tue, Jun 21, 2016 at 8:32 PM, Cody Koeninger  wrote:
>> Take HBase out of the equation and just measure what your read
>> performance is by doing something like
>>
>> createDirectStream(...).foreach(_.println)
>>
>> not take() or print()
>>
>> On Tue, Jun 21, 2016 at 3:19 PM, Colin Kincaid Williams  
>> wrote:
>>> @Cody I was able to bring my processing time down to a second by
>>> setting maxRatePerPartition as discussed. My bad that I didn't
>>> recognize it as the cause of my scheduling delay.
>>>
>>> Since then I've tried experimenting with a larger Spark Context
>>> duration. I've been trying to get some noticeable improvement
>>> inserting messages from Kafka -> Hbase using the above application.
>>> I'm currently getting around 3500 inserts / second on a 9 node hbase
>>> cluster. So far, I haven't been able to get much more throughput. Then
>>> I'm looking for advice here how I should tune Kafka and Spark for this
>>> job.
>>>
>>> I can create a kafka topic with as many partitions that I want. I can
>>> set the Duration and maxRatePerPartition. I have 1.7 billion messages
>>> that I can insert rather quickly into the Kafka queue, and I'd like to
>>> get them into Hbase as quickly as possible.
>>>
>>> I'm looking for advice regarding # Kafka Topic Partitions / Streaming
>>> Duration / maxRatePerPartition / any other spark settings or code
>>> changes that I should make to try to get a better consumption rate.
>>>
>>> Thanks for all the help so far, this is the first Spark application I
>>> have written.
>>>
>>> On Mon, Jun 20, 2016 at 12:32 PM, Colin Kincaid Williams  
>>> wrote:
 I'll try dropping the maxRatePerPartition=400, or maybe even lower.
 However even at application starts up I have this large scheduling
 delay. I will report my progress later on.

 On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger  wrote:
> If your batch time is 1 second and your average processing time is
> 1.16 seconds, you're always going to be falling behind.  That would
> explain why you've built up an hour of scheduling delay after eight
> hours of running.
>
> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams  
> wrote:
>> Hi Mich again,
>>
>> Regarding batch window, etc. I have provided the sources, but I'm not
>> currently calling the window function. Did you see the program source?
>> It's only 100 lines.
>>
>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>
>> Then I would expect I'm using defaults, other than what has been shown
>> in the configuration.
>>
>> For example:
>>
>> In the launcher configuration I set --conf
>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
>> are 500 messages for the duration set in the application:
>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
>> Duration(1000));
>>
>>
>> Then with the --num-executors 6 \ submit flag, and the
>> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
>> arrive at the 3000 events per batch in the UI, pasted above.
>>
>> Feel free to correct me if I'm wrong.
>>
>> Then are you suggesting that I set the window?
>>
>> Maybe following this as reference:
>>
>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>>
>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
>>  wrote:
>>> Ok
>>>
>>> What is the set up for these please?
>>>
>>> batch window
>>> window length
>>> sliding interval
>>>
>>> And also in each batch window how much data do you get in (no of 
>>> messages in
>>> the topic whatever)?
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>

Re: Spark Kafka stream processing time increasing gradually

2016-06-22 Thread Roshan Singh
Thanks for the detailed explanation. Just tested it, worked like a charm.

On Mon, Jun 20, 2016 at 1:02 PM, N B  wrote:

> Its actually necessary to retire keys that become "Zero" or "Empty" so to
> speak. In your case, the key is "imageURL" and values are a dictionary, one
> of whose fields is "count" that you are maintaining. For simplicity and
> illustration's sake I will assume imageURL to be a strings like "abc". Your
> slide duration is 60 and window duration is 1800 seconds.
>
> Now consider the following chain of events in your stream.
>
> batch 1 : "abc"
> batch 2 : "xyz"
> batch 3 : "abc"
>
> and now for the rest of the stream, the keys "abc" or "xyz" never occur.
>
> At the end of the third batch, the generated window rdd has
> { "abc" -> count = 2, "xyz" -> count = 1 }.
> When the first batch falls off after 1800 seconds, it will become
> { "abc -> count = 1, "xyz" -> count = 1 }.
> 60 seconds later, it will become
> { "abc" -> count = 1, "xyz" -> count = 0 }
> and a further 60 seconds later, the 3rd batch is removed from the window
> and the new window rdd becomes
> { "abc" -> count = 0, "xyz" -> count = 0 }.
>
> I hope you can see what is wrong with this. These keys will be perpetually
> held in memory even though there is no need for them to be there. This
> growth in the size of the generated window rdd is what's giving rise to the
> deteriorating processing time in your case.
>
> A filter function that's equivalent of "count != 0" will suffice to
> remember only those keys that have not become "Zero".
>
> HTH,
> NB
>
>
>
> On Thu, Jun 16, 2016 at 8:12 PM, Roshan Singh 
> wrote:
>
>> Hi,
>> According to the docs (
>> https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.DStream.reduceByKeyAndWindow),
>> filerFunc can be used to retain expiring keys. I do not want to retain any
>> expiring key, so I do not understand how can this help me stabilize it.
>> Please correct me if this is not the case.
>>
>> I am also specifying both reduceFunc and invReduceFunc. Can you can a
>> sample code of what you are using.
>>
>> Thanks.
>>
>> On Fri, Jun 17, 2016 at 3:43 AM, N B  wrote:
>>
>>> We had this same issue with the reduceByKeyAndWindow API that you are
>>> using. For fixing this issue, you have to use  different flavor of that
>>> API, specifically the 2 versions that allow you to give a 'Filter function'
>>> to them. Putting in the filter functions helped stabilize our application
>>> too.
>>>
>>> HTH
>>> NB
>>>
>>>
>>> On Sun, Jun 12, 2016 at 11:19 PM, Roshan Singh >> > wrote:
>>>
 Hi all,
 I have a python streaming job which is supposed to run 24x7. I am
 unable to stabilize it. The job just counts no of links shared in a 30
 minute sliding window. I am using reduceByKeyAndWindow operation with a
 batch of 30 seconds, slide interval of 60 seconds.

 The kafka queue has a rate of nearly 2200 messages/second which can
 increase to 3000 but the mean is 2200.

 I have played around with batch size, slide interval, and by increasing
 parallelism with no fruitful result. These just delay the destabilization.

 GC time is usually between 60-100 ms.

 I also noticed that the jobs were not distributed to other nodes in the
 spark UI, for which I have used configured spark.locality.wait as 100ms.
 After which I have noticed that the job is getting distributed properly.

 I have a cluster of 6 slaves and one master each with 16 cores and 15gb
 of ram.

 Code and configuration: http://pastebin.com/93DMSiji

 Streaming screenshot: http://imgur.com/psNfjwJ

 I need help in debugging the issue. Any help will be appreciated.

 --
 Roshan Singh


>>>
>>
>>
>> --
>> Roshan Singh
>> http://roshansingh.in
>>
>
>


-- 
Roshan Singh
http://roshansingh.in


Re: Building Spark 2.X in Intellij

2016-06-22 Thread Praveen R
I had some errors like SqlBaseParser class missing, and figured out I
needed to get these classes from SqlBase.g4 using antlr4. It works fine now.

On Thu, Jun 23, 2016 at 9:20 AM, Jeff Zhang  wrote:

> It works well with me. You can try reimport it into intellij.
>
> On Thu, Jun 23, 2016 at 10:25 AM, Stephen Boesch 
> wrote:
>
>>
>> Building inside intellij is an ever moving target. Anyone have the
>> magical procedures to get it going for 2.X?
>>
>> There are numerous library references that - although included in the
>> pom.xml build - are for some reason not found when processed within
>> Intellij.
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Building Spark 2.X in Intellij

2016-06-22 Thread Jeff Zhang
It works well with me. You can try reimport it into intellij.

On Thu, Jun 23, 2016 at 10:25 AM, Stephen Boesch  wrote:

>
> Building inside intellij is an ever moving target. Anyone have the magical
> procedures to get it going for 2.X?
>
> There are numerous library references that - although included in the
> pom.xml build - are for some reason not found when processed within
> Intellij.
>



-- 
Best Regards

Jeff Zhang


Re: NullPointerException when starting StreamingContext

2016-06-22 Thread Ted Yu
Which Scala version / Spark release are you using ?

Cheers

On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvind 
wrote:

> Hello Experts,
>
> I am getting this error repeatedly:
>
> 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the 
> context, marking it as stopped
> java.lang.NullPointerException
>   at 
> com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
>   at 
> com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
>   at 
> com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>   at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>   at java.lang.Throwable.writeObject(Throwable.java:985)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at 
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
>   at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141)
>   at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>   at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
>   at 
> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142)
>   at 
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554)
>   at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
>   at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
>   at 
> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73)
>   at 
> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67)
>   at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
>
> It seems to be a typical issue. All I am doing here is as below:
>
> Object ProcessingEngine{
>
> def initializeSpark(customer:String):StreamingContext={
>   LogHandler.log.info("InitialeSpark")
>   val custConf = ConfigFactory.load(customer + 
> ".conf").getConfig(customer).withFallback(AppConf)
>   implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer)
>   val ssc: StreamingContext = new StreamingContext(sparkConf, 
> Seconds(custConf.getLong("batchDurSec")))
>   ssc.checkpoint(custConf.getString("checkpointDir"))
>   ssc
> }
>
> def createDataStreamFromKafka(customer:String, ssc: 
> StreamingContext):DStream[Array[Byte]]={
>   val custConf = ConfigFactory.load(customer + 
> ".conf").getConfig(customer).withFallback(ConfigFactory.load())
>   LogHandler.log.info("createDataStreamFromKafka")
>   KafkaUtils.createDirectStream[String,
> Array[Byte],
> StringDecoder,
> DefaultDecoder](
> ssc,
> Map[String, String]("metadata.broker.list" -> 
> 

NullPointerException when starting StreamingContext

2016-06-22 Thread Sunita Arvind
Hello Experts,

I am getting this error repeatedly:

16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the
context, marking it as stopped
java.lang.NullPointerException
at 
com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
at 
com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
at 
com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
at java.lang.Throwable.writeObject(Throwable.java:985)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141)
at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
at 
org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142)
at 
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554)
at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at 
com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73)
at 
com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67)
at scala.collection.immutable.List.foreach(List.scala:318)
at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67)
at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)


It seems to be a typical issue. All I am doing here is as below:

Object ProcessingEngine{

def initializeSpark(customer:String):StreamingContext={
  LogHandler.log.info("InitialeSpark")
  val custConf = ConfigFactory.load(customer +
".conf").getConfig(customer).withFallback(AppConf)
  implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer)
  val ssc: StreamingContext = new StreamingContext(sparkConf,
Seconds(custConf.getLong("batchDurSec")))
  ssc.checkpoint(custConf.getString("checkpointDir"))
  ssc
}

def createDataStreamFromKafka(customer:String, ssc:
StreamingContext):DStream[Array[Byte]]={
  val custConf = ConfigFactory.load(customer +
".conf").getConfig(customer).withFallback(ConfigFactory.load())
  LogHandler.log.info("createDataStreamFromKafka")
  KafkaUtils.createDirectStream[String,
Array[Byte],
StringDecoder,
DefaultDecoder](
ssc,
Map[String, String]("metadata.broker.list" ->
custConf.getString("brokers"), "group.id" ->
custConf.getString("groupId")),
Set(custConf.getString("topics")))

}

def main(args: Array[String]): Unit = {
  val AppConf = ConfigFactory.load()
  LogHandler.log.info("Starting the processing Engine")
  getListOfCustomers().foreach{cust 

why did spark2.0 Disallow ROW FORMAT and STORED AS (parquet | orc | avro etc.)

2016-06-22 Thread linxi zeng
Hi All,
I have tried the spark sql of Spark branch-2.0 and countered an
unexpected problem:

Operation not allowed: ROW FORMAT DELIMITED is only compatible with
'textfile', not 'orc'(line 1, pos 0)

the sql is like:

CREATE TABLE IF NOT EXISTS test.test_orc
(
 ...
)
PARTITIONED BY (xxx)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
stored as orc

I found this JIRA: https://issues.apache.org/jira/browse/SPARK-15279,
but still can't understand why?

and by the way, this sql is work fine on spark1.4.


Building Spark 2.X in Intellij

2016-06-22 Thread Stephen Boesch
Building inside intellij is an ever moving target. Anyone have the magical
procedures to get it going for 2.X?

There are numerous library references that - although included in the
pom.xml build - are for some reason not found when processed within
Intellij.


Re: spark-1.6.1-bin-without-hadoop can not use spark-sql

2016-06-22 Thread Ted Yu
The tar ball was built against hadoop 2.6 which is compatible with hadoop
2.7.2
So the answer should be yes.

On Wed, Jun 22, 2016 at 7:10 PM, 喜之郎 <251922...@qq.com> wrote:

> Thanks.
>
> In addition,I want to know, if I can use  spark-1.6.1-bin-hadoop2.6.tgz
>  which
> is a pre-built package on hadoop 2.7.2?
>
>
>
> -- 原始邮件 --
> *发件人:* "Ted Yu";;
> *发送时间:* 2016年6月22日(星期三) 晚上11:51
> *收件人:* "喜之郎"<251922...@qq.com>;
> *抄送:* "user";
> *主题:* Re: spark-1.6.1-bin-without-hadoop can not use spark-sql
>
> build/mvn clean -Phive -Phive-thriftserver -Pyarn -Phadoop-2.6 -Psparkr
> -Dhadoop.version=2.7.2 package
>
> On Wed, Jun 22, 2016 at 8:00 AM, 251922566 <251922...@qq.com> wrote:
>
>> ok,i will rebuild myself. if i want to use spark with hadoop 2.7.2, when
>> i build spark, i should put what on param --hadoop, 2.7.2 or others?
>>
>> 来自我的华为手机
>>
>>
>>  原始邮件 
>> 主题:Re: spark-1.6.1-bin-without-hadoop can not use spark-sql
>> 发件人:Ted Yu
>> 收件人:喜之郎 <251922...@qq.com>
>> 抄送:user
>>
>>
>> I wonder if the tar ball was built with:
>>
>> -Phive -Phive-thriftserver
>>
>> Maybe rebuild by yourself with the above ?
>>
>> FYI
>>
>> On Wed, Jun 22, 2016 at 4:38 AM, 喜之郎 <251922...@qq.com> wrote:
>>
>>> Hi all.
>>> I download spark-1.6.1-bin-without-hadoop.tgz
>>>  
>>> from
>>> website.
>>> And I configured "SPARK_DIST_CLASSPATH" in spark-env.sh.
>>> Now spark-shell run well. But spark-sql can not run.
>>> My hadoop version is 2.7.2.
>>> This is error infos:
>>>
>>> bin/spark-sql
>>> java.lang.ClassNotFoundException:
>>> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:278)
>>> at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> Failed to load main class
>>> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.
>>> You need to build Spark with -Phive and -Phive-thriftserver.
>>>
>>> Do I need configure something else in spark-env.sh or spark-default.conf?
>>> Suggestions are appreciated ,thanks.
>>>
>>>
>>>
>>>
>>
>


Re: Creating a python port for a Scala Spark Projeect

2016-06-22 Thread Daniel Imberman
Thank you Holden, I look forward to watching your talk!

On Wed, Jun 22, 2016 at 7:12 PM Holden Karau  wrote:

> PySpark RDDs are (on the Java side) are essentially RDD of pickled objects
> and mostly (but not entirely) opaque to the JVM. It is possible (by using
> some internals) to pass a PySpark DataFrame to a Scala library (you may or
> may not find the talk I gave at Spark Summit useful
> https://www.youtube.com/watch?v=V6DkTVvy9vk as well as some of the Python
> examples in
> https://github.com/high-performance-spark/high-performance-spark-examples
> ). Good luck! :)
>
> On Wed, Jun 22, 2016 at 7:07 PM, Daniel Imberman <
> daniel.imber...@gmail.com> wrote:
>
>> Hi All,
>>
>> I've developed a spark module in scala that I would like to add a python
>> port for. I want to be able to allow users to create a pyspark RDD and send
>> it to my system. I've been looking into the pyspark source code as well as
>> py4J and was wondering if there has been anything like this implemented
>> before.
>>
>> Thank you
>>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


Re: Creating a python port for a Scala Spark Projeect

2016-06-22 Thread Holden Karau
PySpark RDDs are (on the Java side) are essentially RDD of pickled objects
and mostly (but not entirely) opaque to the JVM. It is possible (by using
some internals) to pass a PySpark DataFrame to a Scala library (you may or
may not find the talk I gave at Spark Summit useful
https://www.youtube.com/watch?v=V6DkTVvy9vk as well as some of the Python
examples in
https://github.com/high-performance-spark/high-performance-spark-examples
). Good luck! :)

On Wed, Jun 22, 2016 at 7:07 PM, Daniel Imberman 
wrote:

> Hi All,
>
> I've developed a spark module in scala that I would like to add a python
> port for. I want to be able to allow users to create a pyspark RDD and send
> it to my system. I've been looking into the pyspark source code as well as
> py4J and was wondering if there has been anything like this implemented
> before.
>
> Thank you
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


?????? spark-1.6.1-bin-without-hadoop can not use spark-sql

2016-06-22 Thread ??????
Thanks.


In addition??I want to know, if I can use  spark-1.6.1-bin-hadoop2.6.tgz which 
is a pre-built package on hadoop 2.7.2??






--  --
??: "Ted Yu";;
: 2016??6??22??(??) 11:51
??: "??"<251922...@qq.com>; 
: "user"; 
: Re: spark-1.6.1-bin-without-hadoop can not use spark-sql



build/mvn clean -Phive -Phive-thriftserver -Pyarn -Phadoop-2.6 -Psparkr 
-Dhadoop.version=2.7.2 package


On Wed, Jun 22, 2016 at 8:00 AM, 251922566 <251922...@qq.com> wrote:
ok,i will rebuild myself.  if i want to use spark with hadoop 2.7.2, when i 
build spark, i should put what on param --hadoop, 2.7.2 or others?



  
??Re: spark-1.6.1-bin-without-hadoop can not use spark-sql
Ted Yu 
?? <251922...@qq.com>
??user 


I wonder if the tar ball was built with:
-Phive -Phive-thriftserver




Maybe rebuild by yourself with the above ?


FYI


On Wed, Jun 22, 2016 at 4:38 AM, ?? <251922...@qq.com> wrote:
Hi all.
I download spark-1.6.1-bin-without-hadoop.tgz from website.
And I configured "SPARK_DIST_CLASSPATH" in spark-env.sh.
Now spark-shell run well. But spark-sql can not run.
My hadoop version is 2.7.2.
This is error infos:


bin/spark-sql 
java.lang.ClassNotFoundException: 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:278)
at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Failed to load main class 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.
You need to build Spark with -Phive and -Phive-thriftserver.



Do I need configure something else in spark-env.sh or spark-default.conf?
Suggestions are appreciated ,thanks.

Creating a python port for a Scala Spark Projeect

2016-06-22 Thread Daniel Imberman
Hi All,

I've developed a spark module in scala that I would like to add a python
port for. I want to be able to allow users to create a pyspark RDD and send
it to my system. I've been looking into the pyspark source code as well as
py4J and was wondering if there has been anything like this implemented
before.

Thank you


Re: OOM on the driver after increasing partitions

2016-06-22 Thread Raghava Mutharaju
Thank you. Sure, if I find something I will post it.

Regards,
Raghava.


On Wed, Jun 22, 2016 at 7:43 PM, Nirav Patel  wrote:

> I believe it would be task, partitions, task status etc information. I do
> not know exact of those things but I had OOM on driver with 512MB and
> increasing it did help. Someone else might be able to answer about exact
> memory usage of driver better.
>
> You also seem to use broadcast means sending something from dirver jvm.
> You can try taking memory dump when your driver memory is about full or set
> jvm args to take it automatically on OutOfMemory error. Analyze it and
> share your finding :)
>
>
>
> On Wed, Jun 22, 2016 at 4:33 PM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Ok. Would be able to shed more light on what exact meta data it manages
>> and what is the relationship with more number of partitions/nodes?
>>
>> There is one executor running on each node -- so there are 64 executors
>> in total. Each executor, including the driver are give 12GB and this is the
>> maximum available limit. So the other options are
>>
>> 1) Separate the driver from master, i.e., run them on two separate nodes
>> 2) Increase the RAM capacity on the driver/master node.
>>
>> Regards,
>> Raghava.
>>
>>
>> On Wed, Jun 22, 2016 at 7:05 PM, Nirav Patel 
>> wrote:
>>
>>> Yes driver keeps fair amount of meta data to manage scheduling across
>>> all your executors. I assume with 64 nodes you have more executors as well.
>>> Simple way to test is to increase driver memory.
>>>
>>> On Wed, Jun 22, 2016 at 10:10 AM, Raghava Mutharaju <
>>> m.vijayaragh...@gmail.com> wrote:
>>>
 It is an iterative algorithm which uses map, mapPartitions, join,
 union, filter, broadcast and count. The goal is to compute a set of tuples
 and in each iteration few tuples are added to it. Outline is given below

 1) Start with initial set of tuples, T
 2) In each iteration compute deltaT, and add them to T, i.e., T = T +
 deltaT
 3) Stop when current T size (count) is same as previous T size, i.e.,
 deltaT is 0.

 Do you think something happens on the driver due to the application
 logic and when the partitions are increased?

 Regards,
 Raghava.


 On Wed, Jun 22, 2016 at 12:33 PM, Sonal Goyal 
 wrote:

> What does your application do?
>
> Best Regards,
> Sonal
> Founder, Nube Technologies 
> Reifier at Strata Hadoop World
> 
> Reifier at Spark Summit 2015
> 
>
> 
>
>
>
> On Wed, Jun 22, 2016 at 9:57 PM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Hello All,
>>
>> We have a Spark cluster where driver and master are running on the
>> same node. We are using Spark Standalone cluster manager. If the number 
>> of
>> nodes (and the partitions) are increased, the same dataset that used to 
>> run
>> to completion on lesser number of nodes is now giving an out of memory on
>> the driver.
>>
>> For example, a dataset that runs on 32 nodes with number of
>> partitions set to 256 completes whereas the same dataset when run on 64
>> nodes with number of partitions as 512 gives an OOM on the driver side.
>>
>> From what I read in the Spark documentation and other articles,
>> following are the responsibilities of the driver/master.
>>
>> 1) create spark context
>> 2) build DAG of operations
>> 3) schedule tasks
>>
>> I am guessing that 1) and 2) should not change w.r.t number of
>> nodes/partitions. So is it that since the driver has to keep track of lot
>> more tasks, that it gives an OOM?
>>
>> What could be the possible reasons behind the driver-side OOM when
>> the number of partitions are increased?
>>
>> Regards,
>> Raghava.
>>
>
>


 --
 Regards,
 Raghava
 http://raghavam.github.io

>>>
>>>
>>>
>>>
>>> [image: What's New with Xactly] 
>>>
>>>   [image: LinkedIn]
>>>   [image: Twitter]
>>>   [image: Facebook]
>>>   [image: YouTube]
>>> 
>>
>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
> 

Re: Spark ml and PMML export

2016-06-22 Thread jayantshekhar
I have the same question on Spark ML and PMML export as Philippe.

Is there a way to export Spark ML generated models to PMML?

Jayant



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ml-and-PMML-export-tp26773p27213.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GraphX performance and settings

2016-06-22 Thread Maja Kabiljo
Thank you for the reply Deepak.

I know with more executors / memory per executor it will work, we actually have 
a bunch of experiments we ran with various setups. I'm just trying to confirm 
that limits we are hitting are right, or there are some other configuration 
parameters we didn't try yet which would move the limits further. Since without 
any tuning limits for what we can run were much worse off.

Errors would be various executors lost: after heartbeat timeout of 10 minutes, 
out of memory errors or job just not making any progress (not completing any 
tasks) for many hours after which we'd kill them.

Maja

From: Deepak Goel >
Date: Wednesday, June 15, 2016 at 7:13 PM
To: Maja Kabiljo >
Cc: "user @spark" >
Subject: Re: GraphX performance and settings


I am not an expert but some thoughts inline

On Jun 16, 2016 6:31 AM, "Maja Kabiljo" 
> wrote:
>
> Hi,
>
> We are running some experiments with GraphX in order to compare it with other 
> systems. There are multiple settings which significantly affect performance, 
> and we experimented a lot in order to tune them well. I'll share here what 
> are the best we found so far and which results we got with them, and would 
> really appreciate if anyone who used GraphX before has any advice on what 
> else can make it even better, or confirm that these results are as good as it 
> gets.
>
> Algorithms we used are pagerank and connected components. We used Twitter and 
> UK graphs from the GraphX paper 
> (https://amplab.cs.berkeley.edu/wp-content/uploads/2014/09/graphx.pdf),
>  and also generated graphs with properties similar to Facebook social graph 
> with various number of edges. Apart from performance we tried to see what is 
> the minimum amount of resources it requires in order to handle graph of some 
> size.
>
> We ran experiments using Spark 1.6.1, on machines which have 20 cores with 
> 2-way SMT, always fixing number of executors (min=max=initial), giving 40GB 
> or 80GB per executor, and making sure we run only a single executor per 
> machine.

***Deepak***
I guess you have 16 machines in your test. Is that right?
**Deepak***

Additionally we used:
> spark.shuffle.manager=hash, spark.shuffle.service.enabled=false
> Parallel GC
> PartitionStrategy.EdgePartition2D
> 8*numberOfExecutors partitions
> Here are some data points which we got:
> Running on Facebook-like graph with 2 billion edges, using 4 executors with 
> 80GB each it took 451 seconds to do 20 iterations of pagerank and 236 seconds 
> to find connected components. It failed when we tried to use 2 executors, or 
> 4 executors with 40GB each.
> For graph with 10 billion edges we needed 16 executors with 80GB each (it 
> failed with 8), 1041 seconds for 20 iterations of pagerank and 716 seconds 
> for connected component

**Deepak*
The executors are not scaling linearly. You should need max of 10 executors. 
Also what is the error it is showing for 8 executors?
*Deepak**

> Twitter-2010 graph (1.5 billion edges), 8 executors, 40GB each, pagerank 
> 473s, connected components 264s. With 4 executors 80GB each it worked but was 
> struggling (pr 2475s, cc 4499s), with 8 executors 80GB pr 362s, cc 255s.

*Deepak*
For 4 executors can you try with 160GB. Also if you could spell out the system 
statistics during the test it would be great. My guess is with 4 connectors a 
lot of spilling is happening
*Deepak***

> One more thing, we were not able to reproduce what's mentioned in the paper 
> about fault tolerance (section 5.2). If we kill an executor during first few 
> iterations it recovers successfully, but if killed in later iterations 
> reconstruction of each iteration starts taking exponentially longer and 
> doesn't finish after letting it run for a few hours. Are there some 
> additional parameters which we need to set in order for this to work?
>
> Any feedback would be highly appreciated!
>
> Thank you,
> Maja


Re: OOM on the driver after increasing partitions

2016-06-22 Thread Nirav Patel
I believe it would be task, partitions, task status etc information. I do
not know exact of those things but I had OOM on driver with 512MB and
increasing it did help. Someone else might be able to answer about exact
memory usage of driver better.

You also seem to use broadcast means sending something from dirver jvm. You
can try taking memory dump when your driver memory is about full or set jvm
args to take it automatically on OutOfMemory error. Analyze it and share
your finding :)



On Wed, Jun 22, 2016 at 4:33 PM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> Ok. Would be able to shed more light on what exact meta data it manages
> and what is the relationship with more number of partitions/nodes?
>
> There is one executor running on each node -- so there are 64 executors in
> total. Each executor, including the driver are give 12GB and this is the
> maximum available limit. So the other options are
>
> 1) Separate the driver from master, i.e., run them on two separate nodes
> 2) Increase the RAM capacity on the driver/master node.
>
> Regards,
> Raghava.
>
>
> On Wed, Jun 22, 2016 at 7:05 PM, Nirav Patel 
> wrote:
>
>> Yes driver keeps fair amount of meta data to manage scheduling across all
>> your executors. I assume with 64 nodes you have more executors as well.
>> Simple way to test is to increase driver memory.
>>
>> On Wed, Jun 22, 2016 at 10:10 AM, Raghava Mutharaju <
>> m.vijayaragh...@gmail.com> wrote:
>>
>>> It is an iterative algorithm which uses map, mapPartitions, join, union,
>>> filter, broadcast and count. The goal is to compute a set of tuples and in
>>> each iteration few tuples are added to it. Outline is given below
>>>
>>> 1) Start with initial set of tuples, T
>>> 2) In each iteration compute deltaT, and add them to T, i.e., T = T +
>>> deltaT
>>> 3) Stop when current T size (count) is same as previous T size, i.e.,
>>> deltaT is 0.
>>>
>>> Do you think something happens on the driver due to the application
>>> logic and when the partitions are increased?
>>>
>>> Regards,
>>> Raghava.
>>>
>>>
>>> On Wed, Jun 22, 2016 at 12:33 PM, Sonal Goyal 
>>> wrote:
>>>
 What does your application do?

 Best Regards,
 Sonal
 Founder, Nube Technologies 
 Reifier at Strata Hadoop World
 
 Reifier at Spark Summit 2015
 

 



 On Wed, Jun 22, 2016 at 9:57 PM, Raghava Mutharaju <
 m.vijayaragh...@gmail.com> wrote:

> Hello All,
>
> We have a Spark cluster where driver and master are running on the
> same node. We are using Spark Standalone cluster manager. If the number of
> nodes (and the partitions) are increased, the same dataset that used to 
> run
> to completion on lesser number of nodes is now giving an out of memory on
> the driver.
>
> For example, a dataset that runs on 32 nodes with number of partitions
> set to 256 completes whereas the same dataset when run on 64 nodes with
> number of partitions as 512 gives an OOM on the driver side.
>
> From what I read in the Spark documentation and other articles,
> following are the responsibilities of the driver/master.
>
> 1) create spark context
> 2) build DAG of operations
> 3) schedule tasks
>
> I am guessing that 1) and 2) should not change w.r.t number of
> nodes/partitions. So is it that since the driver has to keep track of lot
> more tasks, that it gives an OOM?
>
> What could be the possible reasons behind the driver-side OOM when the
> number of partitions are increased?
>
> Regards,
> Raghava.
>


>>>
>>>
>>> --
>>> Regards,
>>> Raghava
>>> http://raghavam.github.io
>>>
>>
>>
>>
>>
>> [image: What's New with Xactly] 
>>
>>   [image: LinkedIn]
>>   [image: Twitter]
>>   [image: Facebook]
>>   [image: YouTube]
>> 
>
>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: OOM on the driver after increasing partitions

2016-06-22 Thread Raghava Mutharaju
Ok. Would be able to shed more light on what exact meta data it manages and
what is the relationship with more number of partitions/nodes?

There is one executor running on each node -- so there are 64 executors in
total. Each executor, including the driver are give 12GB and this is the
maximum available limit. So the other options are

1) Separate the driver from master, i.e., run them on two separate nodes
2) Increase the RAM capacity on the driver/master node.

Regards,
Raghava.


On Wed, Jun 22, 2016 at 7:05 PM, Nirav Patel  wrote:

> Yes driver keeps fair amount of meta data to manage scheduling across all
> your executors. I assume with 64 nodes you have more executors as well.
> Simple way to test is to increase driver memory.
>
> On Wed, Jun 22, 2016 at 10:10 AM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> It is an iterative algorithm which uses map, mapPartitions, join, union,
>> filter, broadcast and count. The goal is to compute a set of tuples and in
>> each iteration few tuples are added to it. Outline is given below
>>
>> 1) Start with initial set of tuples, T
>> 2) In each iteration compute deltaT, and add them to T, i.e., T = T +
>> deltaT
>> 3) Stop when current T size (count) is same as previous T size, i.e.,
>> deltaT is 0.
>>
>> Do you think something happens on the driver due to the application logic
>> and when the partitions are increased?
>>
>> Regards,
>> Raghava.
>>
>>
>> On Wed, Jun 22, 2016 at 12:33 PM, Sonal Goyal 
>> wrote:
>>
>>> What does your application do?
>>>
>>> Best Regards,
>>> Sonal
>>> Founder, Nube Technologies 
>>> Reifier at Strata Hadoop World
>>> 
>>> Reifier at Spark Summit 2015
>>> 
>>>
>>> 
>>>
>>>
>>>
>>> On Wed, Jun 22, 2016 at 9:57 PM, Raghava Mutharaju <
>>> m.vijayaragh...@gmail.com> wrote:
>>>
 Hello All,

 We have a Spark cluster where driver and master are running on the same
 node. We are using Spark Standalone cluster manager. If the number of nodes
 (and the partitions) are increased, the same dataset that used to run to
 completion on lesser number of nodes is now giving an out of memory on the
 driver.

 For example, a dataset that runs on 32 nodes with number of partitions
 set to 256 completes whereas the same dataset when run on 64 nodes with
 number of partitions as 512 gives an OOM on the driver side.

 From what I read in the Spark documentation and other articles,
 following are the responsibilities of the driver/master.

 1) create spark context
 2) build DAG of operations
 3) schedule tasks

 I am guessing that 1) and 2) should not change w.r.t number of
 nodes/partitions. So is it that since the driver has to keep track of lot
 more tasks, that it gives an OOM?

 What could be the possible reasons behind the driver-side OOM when the
 number of partitions are increased?

 Regards,
 Raghava.

>>>
>>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 




-- 
Regards,
Raghava
http://raghavam.github.io


Re: Shuffle service fails to register driver - Spark - Mesos

2016-06-22 Thread Feller, Eugen
Make sure you are running the MesosShuffleService and not the standard shuffle 
service:

 *   org.apache.spark.deploy.mesos.MesosExternalShuffleService v.s. 
org.apache.spark.deploy.ExternalShuffleService
 *   start-mesos-shuffle-service.sh v.s. start-shuffle-service.sh

Thanks to Timothy Chen for the hint.

- Eugen Feller


Re: Explode row with start and end dates into row for each date

2016-06-22 Thread Saurabh Sardeshpande
I don't think there would be any issues since MLlib is part of Spark as
against being an external package. Most of the problems I've had to deal
were because of the existence of both versions of Python on a system, and
not Python 3 itself.

On Wed, Jun 22, 2016 at 3:51 PM, John Aherne 
wrote:

> Thanks Saurabh!
>
> That explode function looks like it is exactly what I need.
>
> We will be using MLlib quite a lot - Do I have to worry about python
> versions for that?
>
> John
>
> On Wed, Jun 22, 2016 at 4:34 PM, Saurabh Sardeshpande <
> saurabh...@gmail.com> wrote:
>
>> Hi John,
>>
>> If you can do it in Hive, you should be able to do it in Spark. Just make
>> sure you import HiveContext instead of SQLContext.
>>
>> If your intent is to explore rather than get stuff done, I've not aware
>> of any RDD operations that do this for you, but there is a DataFrame
>> operation called 'explode' which does this -
>> https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.functions.explode.
>> You'll just have to generate the array of dates using something like this -
>> http://stackoverflow.com/questions/7274267/print-all-day-dates-between-two-dates
>> .
>>
>> It's generally recommended to use Python 3 if you're starting a new
>> project and don't have old dependencies. But remember that there is still
>> quite a lot of stuff that is not yet ported to Python 3.
>>
>> Regards,
>> Saurabh
>>
>> On Wed, Jun 22, 2016 at 3:20 PM, John Aherne 
>> wrote:
>>
>>> Hi Everyone,
>>>
>>> I am pretty new to Spark (and the mailing list), so forgive me if the
>>> answer is obvious.
>>>
>>> I have a dataset, and each row contains a start date and end date.
>>>
>>> I would like to explode each row so that each day between the start and
>>> end dates becomes its own row.
>>> e.g.
>>> row1  2015-01-01  2015-01-03
>>> becomes
>>> row1   2015-01-01
>>> row1   2015-01-02
>>> row1   2015-01-03
>>>
>>> So, my questions are:
>>> Is Spark a good place to do that?
>>> I can do it in Hive, but it's a bit messy, and this seems like a good
>>> problem to use for learning Spark (and Python).
>>>
>>> If so, any pointers on what methods I should use? Particularly how to
>>> split one row into multiples.
>>>
>>> Lastly, I am a bit hesitant to ask but is there a recommendation on
>>> which version of python to use? Not interested in which is better, just
>>> want to know if they are both supported equally.
>>>
>>> I am using Spark 1.6.1 (Hortonworks distro).
>>>
>>> Thanks!
>>> John
>>>
>>> --
>>>
>>> John Aherne
>>> Big Data and SQL Developer
>>>
>>> [image: JustEnough Logo]
>>>
>>> Cell:
>>> Email:
>>> Skype:
>>> Web:
>>>
>>> +1 (303) 809-9718
>>> john.ahe...@justenough.com
>>> john.aherne.je
>>> www.justenough.com
>>>
>>>
>>> Confidentiality Note: The information contained in this email and 
>>> document(s) attached are for the exclusive use of the addressee and may 
>>> contain confidential, privileged and non-disclosable information. If the 
>>> recipient of this email is not the addressee, such recipient is strictly 
>>> prohibited from reading, photocopying, distribution or otherwise using this 
>>> email or its contents in any way.
>>>
>>>
>>
>
>
> --
>
> John Aherne
> Big Data and SQL Developer
>
> [image: JustEnough Logo]
>
> Cell:
> Email:
> Skype:
> Web:
>
> +1 (303) 809-9718
> john.ahe...@justenough.com
> john.aherne.je
> www.justenough.com
>
>
> Confidentiality Note: The information contained in this email and document(s) 
> attached are for the exclusive use of the addressee and may contain 
> confidential, privileged and non-disclosable information. If the recipient of 
> this email is not the addressee, such recipient is strictly prohibited from 
> reading, photocopying, distribution or otherwise using this email or its 
> contents in any way.
>
>


Re: OOM on the driver after increasing partitions

2016-06-22 Thread Nirav Patel
Yes driver keeps fair amount of meta data to manage scheduling across all
your executors. I assume with 64 nodes you have more executors as well.
Simple way to test is to increase driver memory.

On Wed, Jun 22, 2016 at 10:10 AM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> It is an iterative algorithm which uses map, mapPartitions, join, union,
> filter, broadcast and count. The goal is to compute a set of tuples and in
> each iteration few tuples are added to it. Outline is given below
>
> 1) Start with initial set of tuples, T
> 2) In each iteration compute deltaT, and add them to T, i.e., T = T +
> deltaT
> 3) Stop when current T size (count) is same as previous T size, i.e.,
> deltaT is 0.
>
> Do you think something happens on the driver due to the application logic
> and when the partitions are increased?
>
> Regards,
> Raghava.
>
>
> On Wed, Jun 22, 2016 at 12:33 PM, Sonal Goyal 
> wrote:
>
>> What does your application do?
>>
>> Best Regards,
>> Sonal
>> Founder, Nube Technologies 
>> Reifier at Strata Hadoop World
>> 
>> Reifier at Spark Summit 2015
>> 
>>
>> 
>>
>>
>>
>> On Wed, Jun 22, 2016 at 9:57 PM, Raghava Mutharaju <
>> m.vijayaragh...@gmail.com> wrote:
>>
>>> Hello All,
>>>
>>> We have a Spark cluster where driver and master are running on the same
>>> node. We are using Spark Standalone cluster manager. If the number of nodes
>>> (and the partitions) are increased, the same dataset that used to run to
>>> completion on lesser number of nodes is now giving an out of memory on the
>>> driver.
>>>
>>> For example, a dataset that runs on 32 nodes with number of partitions
>>> set to 256 completes whereas the same dataset when run on 64 nodes with
>>> number of partitions as 512 gives an OOM on the driver side.
>>>
>>> From what I read in the Spark documentation and other articles,
>>> following are the responsibilities of the driver/master.
>>>
>>> 1) create spark context
>>> 2) build DAG of operations
>>> 3) schedule tasks
>>>
>>> I am guessing that 1) and 2) should not change w.r.t number of
>>> nodes/partitions. So is it that since the driver has to keep track of lot
>>> more tasks, that it gives an OOM?
>>>
>>> What could be the possible reasons behind the driver-side OOM when the
>>> number of partitions are increased?
>>>
>>> Regards,
>>> Raghava.
>>>
>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Explode row with start and end dates into row for each date

2016-06-22 Thread John Aherne
Thanks Saurabh!

That explode function looks like it is exactly what I need.

We will be using MLlib quite a lot - Do I have to worry about python
versions for that?

John

On Wed, Jun 22, 2016 at 4:34 PM, Saurabh Sardeshpande 
wrote:

> Hi John,
>
> If you can do it in Hive, you should be able to do it in Spark. Just make
> sure you import HiveContext instead of SQLContext.
>
> If your intent is to explore rather than get stuff done, I've not aware of
> any RDD operations that do this for you, but there is a DataFrame operation
> called 'explode' which does this -
> https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.functions.explode.
> You'll just have to generate the array of dates using something like this -
> http://stackoverflow.com/questions/7274267/print-all-day-dates-between-two-dates
> .
>
> It's generally recommended to use Python 3 if you're starting a new
> project and don't have old dependencies. But remember that there is still
> quite a lot of stuff that is not yet ported to Python 3.
>
> Regards,
> Saurabh
>
> On Wed, Jun 22, 2016 at 3:20 PM, John Aherne 
> wrote:
>
>> Hi Everyone,
>>
>> I am pretty new to Spark (and the mailing list), so forgive me if the
>> answer is obvious.
>>
>> I have a dataset, and each row contains a start date and end date.
>>
>> I would like to explode each row so that each day between the start and
>> end dates becomes its own row.
>> e.g.
>> row1  2015-01-01  2015-01-03
>> becomes
>> row1   2015-01-01
>> row1   2015-01-02
>> row1   2015-01-03
>>
>> So, my questions are:
>> Is Spark a good place to do that?
>> I can do it in Hive, but it's a bit messy, and this seems like a good
>> problem to use for learning Spark (and Python).
>>
>> If so, any pointers on what methods I should use? Particularly how to
>> split one row into multiples.
>>
>> Lastly, I am a bit hesitant to ask but is there a recommendation on which
>> version of python to use? Not interested in which is better, just want to
>> know if they are both supported equally.
>>
>> I am using Spark 1.6.1 (Hortonworks distro).
>>
>> Thanks!
>> John
>>
>> --
>>
>> John Aherne
>> Big Data and SQL Developer
>>
>> [image: JustEnough Logo]
>>
>> Cell:
>> Email:
>> Skype:
>> Web:
>>
>> +1 (303) 809-9718
>> john.ahe...@justenough.com
>> john.aherne.je
>> www.justenough.com
>>
>>
>> Confidentiality Note: The information contained in this email and 
>> document(s) attached are for the exclusive use of the addressee and may 
>> contain confidential, privileged and non-disclosable information. If the 
>> recipient of this email is not the addressee, such recipient is strictly 
>> prohibited from reading, photocopying, distribution or otherwise using this 
>> email or its contents in any way.
>>
>>
>


-- 

John Aherne
Big Data and SQL Developer

[image: JustEnough Logo]

Cell:
Email:
Skype:
Web:

+1 (303) 809-9718
john.ahe...@justenough.com
john.aherne.je
www.justenough.com


Confidentiality Note: The information contained in this email and
document(s) attached are for the exclusive use of the addressee and
may contain confidential, privileged and non-disclosable information.
If the recipient of this email is not the addressee, such recipient is
strictly prohibited from reading, photocopying, distribution or
otherwise using this email or its contents in any way.


Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-22 Thread Nirav Patel
PS. In my reduceByKey operation I have two mutable object. What I do is
merge mutable2 into mutable1 and return mutable1. I read that it works for
aggregateByKey so thought it will work for reduceByKey as well. I might be
wrong here. Can someone verify if this will work or be un predictable?

On Wed, Jun 22, 2016 at 11:52 AM, Nirav Patel  wrote:

> Hi,
>
> I do not see any indication of errors or executor getting killed in spark
> UI - jobs, stages, event timelines. No task failures. I also don't see any
> errors in executor logs.
>
> Thanks
>
> On Wed, Jun 22, 2016 at 2:32 AM, Ted Yu  wrote:
>
>> For the run which returned incorrect result, did you observe any error
>> (on workers) ?
>>
>> Cheers
>>
>> On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel 
>> wrote:
>>
>>> I have an RDD[String, MyObj] which is a result of Join + Map operation.
>>> It has no partitioner info. I run reduceByKey without passing any
>>> Partitioner or partition counts.  I observed that output aggregation result
>>> for given key is incorrect sometime. like 1 out of 5 times. It looks like
>>> reduce operation is joining values from two different keys. There is no
>>> configuration change between multiple runs. I am scratching my head over
>>> this. I verified results by printing out RDD before and after reduce
>>> operation; collecting subset at driver.
>>>
>>> Besides shuffle and storage memory fraction I use following options:
>>>
>>> sparkConf.set("spark.driver.userClassPathFirst","true")
>>> sparkConf.set("spark.unsafe.offHeap","true")
>>> sparkConf.set("spark.reducer.maxSizeInFlight","128m")
>>> sparkConf.set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>
>>>
>>>
>>> [image: What's New with Xactly] 
>>>
>>>   [image: LinkedIn]
>>>   [image: Twitter]
>>>   [image: Facebook]
>>>   [image: YouTube]
>>> 
>>
>>
>>
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Silly question about Yarn client vs Yarn cluster modes...

2016-06-22 Thread Michael Segel
The only documentation on this… in terms of direction … (that I could find)

If your client is not close to the cluster (e.g. your PC) then you definitely 
want to go cluster to improve performance.
If your client is close to the cluster (e.g. an edge node) then you could go 
either client or cluster.  Note that by going client, more resources are going 
to be used on the edge node.

HTH

-Mike

> On Jun 22, 2016, at 1:51 PM, Marcelo Vanzin  wrote:
> 
> On Wed, Jun 22, 2016 at 1:32 PM, Mich Talebzadeh
>  wrote:
>> Does it also depend on the number of Spark nodes involved in choosing which
>> way to go?
> 
> Not really.
> 
> -- 
> Marcelo
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Explode row with start and end dates into row for each date

2016-06-22 Thread Saurabh Sardeshpande
Hi John,

If you can do it in Hive, you should be able to do it in Spark. Just make
sure you import HiveContext instead of SQLContext.

If your intent is to explore rather than get stuff done, I've not aware of
any RDD operations that do this for you, but there is a DataFrame operation
called 'explode' which does this -
https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.functions.explode.
You'll just have to generate the array of dates using something like this -
http://stackoverflow.com/questions/7274267/print-all-day-dates-between-two-dates
.

It's generally recommended to use Python 3 if you're starting a new project
and don't have old dependencies. But remember that there is still quite a
lot of stuff that is not yet ported to Python 3.

Regards,
Saurabh

On Wed, Jun 22, 2016 at 3:20 PM, John Aherne 
wrote:

> Hi Everyone,
>
> I am pretty new to Spark (and the mailing list), so forgive me if the
> answer is obvious.
>
> I have a dataset, and each row contains a start date and end date.
>
> I would like to explode each row so that each day between the start and
> end dates becomes its own row.
> e.g.
> row1  2015-01-01  2015-01-03
> becomes
> row1   2015-01-01
> row1   2015-01-02
> row1   2015-01-03
>
> So, my questions are:
> Is Spark a good place to do that?
> I can do it in Hive, but it's a bit messy, and this seems like a good
> problem to use for learning Spark (and Python).
>
> If so, any pointers on what methods I should use? Particularly how to
> split one row into multiples.
>
> Lastly, I am a bit hesitant to ask but is there a recommendation on which
> version of python to use? Not interested in which is better, just want to
> know if they are both supported equally.
>
> I am using Spark 1.6.1 (Hortonworks distro).
>
> Thanks!
> John
>
> --
>
> John Aherne
> Big Data and SQL Developer
>
> [image: JustEnough Logo]
>
> Cell:
> Email:
> Skype:
> Web:
>
> +1 (303) 809-9718
> john.ahe...@justenough.com
> john.aherne.je
> www.justenough.com
>
>
> Confidentiality Note: The information contained in this email and document(s) 
> attached are for the exclusive use of the addressee and may contain 
> confidential, privileged and non-disclosable information. If the recipient of 
> this email is not the addressee, such recipient is strictly prohibited from 
> reading, photocopying, distribution or otherwise using this email or its 
> contents in any way.
>
>


Explode row with start and end dates into row for each date

2016-06-22 Thread John Aherne
Hi Everyone,

I am pretty new to Spark (and the mailing list), so forgive me if the
answer is obvious.

I have a dataset, and each row contains a start date and end date.

I would like to explode each row so that each day between the start and end
dates becomes its own row.
e.g.
row1  2015-01-01  2015-01-03
becomes
row1   2015-01-01
row1   2015-01-02
row1   2015-01-03

So, my questions are:
Is Spark a good place to do that?
I can do it in Hive, but it's a bit messy, and this seems like a good
problem to use for learning Spark (and Python).

If so, any pointers on what methods I should use? Particularly how to split
one row into multiples.

Lastly, I am a bit hesitant to ask but is there a recommendation on which
version of python to use? Not interested in which is better, just want to
know if they are both supported equally.

I am using Spark 1.6.1 (Hortonworks distro).

Thanks!
John

-- 

John Aherne
Big Data and SQL Developer

[image: JustEnough Logo]

Cell:
Email:
Skype:
Web:

+1 (303) 809-9718
john.ahe...@justenough.com
john.aherne.je
www.justenough.com


Confidentiality Note: The information contained in this email and
document(s) attached are for the exclusive use of the addressee and
may contain confidential, privileged and non-disclosable information.
If the recipient of this email is not the addressee, such recipient is
strictly prohibited from reading, photocopying, distribution or
otherwise using this email or its contents in any way.


Recovery techniques for Spark Streaming scheduling delay

2016-06-22 Thread C. Josephson
We have a Spark Streaming application that has basically zero scheduling
delay for hours, but then suddenly it jumps up to multiple minutes and
spirals out of control (see screenshot of job manager here:
http://i.stack.imgur.com/kSftN.png)

This is happens after a while even if we double the batch interval.

We are not sure what causes the delay to happen (theories include garbage
collection). The cluster has generally low CPU utilization regardless of
whether we use 3, 5 or 10 slaves.

We are really reluctant to further increase the batch interval, since the
delay is zero for such long periods. Are there any techniques to improve
recovery time from a sudden spike in scheduling delay? We've tried seeing
if it will recover on its own, but it takes hours if it even recovers at all

Thanks,
-cjoseph


Re: Silly question about Yarn client vs Yarn cluster modes...

2016-06-22 Thread Marcelo Vanzin
On Wed, Jun 22, 2016 at 1:32 PM, Mich Talebzadeh
 wrote:
> Does it also depend on the number of Spark nodes involved in choosing which
> way to go?

Not really.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Silly question about Yarn client vs Yarn cluster modes...

2016-06-22 Thread Mich Talebzadeh
Thanks Marcelo,

Sounds like cluster mode is more resilient than the client-mode.

Does it also depend on the number of Spark nodes involved in choosing which
way to go?

Cheers


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 June 2016 at 21:27, Marcelo Vanzin  wrote:

> Trying to keep the answer short and simple...
>
> On Wed, Jun 22, 2016 at 1:19 PM, Michael Segel
>  wrote:
> > But this gets to the question… what are the real differences between
> client
> > and cluster modes?
> > What are the pros/cons and use cases where one has advantages over the
> > other?
>
> - client mode requires the process that launched the app remain alive.
> Meaning the host where it lives has to stay alive, and it may not be
> super-friendly to ssh sessions dying, for example, unless you use
> nohup.
>
> - client mode driver logs are printed to stderr by default. yes you
> can change that, but in cluster mode, they're all collected by yarn
> without any user intervention.
>
> - if your edge node (from where the app is launched) isn't really part
> of the cluster (e.g., lives in an outside network with firewalls or
> higher latency), you may run into issues.
>
> - in cluster mode, your driver's cpu / memory usage is accounted for
> in YARN; this matters if your edge node is part of the cluster (and
> could be running yarn containers), since in client mode your driver
> will potentially use a lot of memory / cpu.
>
> - finally, in cluster mode YARN can restart your application without
> user interference. this is useful for things that need to stay up
> (think a long running streaming job, for example).
>
>
> --
> Marcelo
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Silly question about Yarn client vs Yarn cluster modes...

2016-06-22 Thread Marcelo Vanzin
Trying to keep the answer short and simple...

On Wed, Jun 22, 2016 at 1:19 PM, Michael Segel
 wrote:
> But this gets to the question… what are the real differences between client
> and cluster modes?
> What are the pros/cons and use cases where one has advantages over the
> other?

- client mode requires the process that launched the app remain alive.
Meaning the host where it lives has to stay alive, and it may not be
super-friendly to ssh sessions dying, for example, unless you use
nohup.

- client mode driver logs are printed to stderr by default. yes you
can change that, but in cluster mode, they're all collected by yarn
without any user intervention.

- if your edge node (from where the app is launched) isn't really part
of the cluster (e.g., lives in an outside network with firewalls or
higher latency), you may run into issues.

- in cluster mode, your driver's cpu / memory usage is accounted for
in YARN; this matters if your edge node is part of the cluster (and
could be running yarn containers), since in client mode your driver
will potentially use a lot of memory / cpu.

- finally, in cluster mode YARN can restart your application without
user interference. this is useful for things that need to stay up
(think a long running streaming job, for example).


-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Silly question about Yarn client vs Yarn cluster modes...

2016-06-22 Thread Michael Segel
LOL… I hate YARN, but unfortunately I don’t get to make the call on which tools 
we’re going to use, I just get paid to make stuff work on the tools provided. 
;-) 

Testing is somewhat problematic.  You have to really test at some large enough 
fraction of scale. 
Fortunately for this issue (YARN client/cluster) is just in how you launch a 
job so its really just benchmarking the time difference. 

But this gets to the question… what are the real differences between client and 
cluster modes? 

What are the pros/cons and use cases where one has advantages over the other?  
I couldn’t find anything on the web, so I’m here asking these ‘silly’ 
questions… ;-)

The issue of Mesos over YARN is that you now have to look at Myriad to help 
glue stuff together, or so I’m told. 
And you now may have to add an additional vendor support contract unless your 
Hadoop vendor is willing to support Mesos. 

But of course, YMMV and maybe someone else can add something of value than my 
$0.02 worth. ;-) 

-Mike

> On Jun 22, 2016, at 12:04 PM, Mich Talebzadeh  
> wrote:
> 
> This is exactly the sort of topics that distinguish lab work from enterprise 
> practice :)
> 
> The question on YARN client versus YARN cluster mode. I am not sure how much 
> in real life it is going to make an impact if I choose one over the other?
> 
> These days I yell developers that it is perfectly valid to use Spark local 
> mode to their dev/unit testing. at least they know how to look after it with 
> the help of Spark WEB GUI.
> 
> Also anyone has tried using Mesos instead of Spark?. What does it offer above 
> YARN.
> 
> Cheers
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 22 June 2016 at 19:04, Michael Segel  > wrote:
> JDBC reliability problem? 
> 
> Ok… a bit more explanation… 
> 
> Usually when you have to go back to a legacy system, its because the data set 
> is usually metadata and is relatively small.  Its not the sort of data that 
> gets ingested in to a data lake unless you’re also ingesting the metadata and 
> are using HBase/MapRDB , Cassandra or something like that. 
> 
> On top of all of this… when dealing with PII, you have to consider data at 
> rest. This is also why you will see some things that make parts of the design 
> a bit counter intuitive and why you start to think of storage/compute models 
> over traditional cluster design. 
> 
> A single threaded JDBC connection to build a local RDD should be fine and I 
> would really be concerned if that was unreliable.  That would mean tools like 
> Spark or Drill would have serious reliability issues when considering legacy 
> system access. 
> 
> And of course I’m still trying to dig in to YARN’s client vs cluster option. 
> 
> 
> thx
> 
>> On Jun 22, 2016, at 9:36 AM, Mich Talebzadeh > > wrote:
>> 
>> Thanks Mike for clarification.
>> 
>> I think there is another option to get data out of RDBMS through some form 
>> of SELECT ALL COLUMNS TAB SEPARATED OR OTHER and put them in a flat file or 
>> files. scp that file from the RDBMS directory to a private directory on HDFS 
>> system  and push it into HDFS. That will by-pass the JDBC reliability 
>> problem and I guess in this case one is in more control.
>> 
>> I do concur that there are security issues with this. For example that local 
>> file system may have to have encryption etc  that will make it tedious. I 
>> believe Jorn mentioned this somewhere.
>> 
>> HTH
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> 
>>  
>> http://talebzadehmich.wordpress.com 
>>  
>> 
>> On 22 June 2016 at 15:59, Michael Segel > > wrote:
>> Hi, 
>> 
>> Just to clear a few things up… 
>> 
>> First I know its hard to describe some problems because they deal with 
>> client confidential information. 
>> (Also some basic ‘dead hooker’ thought problems to work through before 
>> facing them at a client.) 
>> The questions I pose here are very general and deal with some basic design 
>> issues/consideration. 
>> 
>> 
>> W.R.T JDBC / Beeline:
>> There are many use cases where you don’t want to migrate some or all data to 
>> HDFS.  This is why tools like Apache Drill exist. At the same time… there 
>> are different cluster design patterns.  One such pattern is a 
>> storage/compute model where you have multiple clusters acting either as 
>> compute clusters which pull data 

Executors killed in Workers with Error: invalid log directory

2016-06-22 Thread Yiannis Gkoufas
Hi there,

I have been getting a strange error in spark-1.6.1
The job submitted uses only the executor launched on the Master node while
the other workers are idle.
When I check the errors from the web ui to investigate on the killed
executors I see the error:
Error: invalid log directory
/disk/spark-1.6.1-bin-hadoop2.6/work/app-20160622195714-0001/2/

I made sure I gave the correct permissions on the directory, plus the
Worker logs are clean.
Any hints on that?

Thanks a lot!


Re: Silly question about Yarn client vs Yarn cluster modes...

2016-06-22 Thread Mich Talebzadeh
This is exactly the sort of topics that distinguish lab work from
enterprise practice :)

The question on YARN client versus YARN cluster mode. I am not sure how
much in real life it is going to make an impact if I choose one over the
other?

These days I yell developers that it is perfectly valid to use Spark local
mode to their dev/unit testing. at least they know how to look after it
with the help of Spark WEB GUI.

Also anyone has tried using Mesos instead of Spark?. What does it offer
above YARN.

Cheers

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 June 2016 at 19:04, Michael Segel  wrote:

> JDBC reliability problem?
>
> Ok… a bit more explanation…
>
> Usually when you have to go back to a legacy system, its because the data
> set is usually metadata and is relatively small.  Its not the sort of data
> that gets ingested in to a data lake unless you’re also ingesting the
> metadata and are using HBase/MapRDB , Cassandra or something like that.
>
> On top of all of this… when dealing with PII, you have to consider data at
> rest. This is also why you will see some things that make parts of the
> design a bit counter intuitive and why you start to think of
> storage/compute models over traditional cluster design.
>
> A single threaded JDBC connection to build a local RDD should be fine and
> I would really be concerned if that was unreliable.  That would mean tools
> like Spark or Drill would have serious reliability issues when considering
> legacy system access.
>
> And of course I’m still trying to dig in to YARN’s client vs cluster
> option.
>
>
> thx
>
> On Jun 22, 2016, at 9:36 AM, Mich Talebzadeh 
> wrote:
>
> Thanks Mike for clarification.
>
> I think there is another option to get data out of RDBMS through some form
> of SELECT ALL COLUMNS TAB SEPARATED OR OTHER and put them in a flat file or
> files. scp that file from the RDBMS directory to a private directory on
> HDFS system  and push it into HDFS. That will by-pass the JDBC reliability
> problem and I guess in this case one is in more control.
>
> I do concur that there are security issues with this. For example that
> local file system may have to have encryption etc  that will make it
> tedious. I believe Jorn mentioned this somewhere.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 June 2016 at 15:59, Michael Segel  wrote:
>
>> Hi,
>>
>> Just to clear a few things up…
>>
>> First I know its hard to describe some problems because they deal with
>> client confidential information.
>> (Also some basic ‘dead hooker’ thought problems to work through before
>> facing them at a client.)
>> The questions I pose here are very general and deal with some basic
>> design issues/consideration.
>>
>>
>> W.R.T JDBC / Beeline:
>>
>> There are many use cases where you don’t want to migrate some or all data
>> to HDFS.  This is why tools like Apache Drill exist. At the same time…
>> there are different cluster design patterns.  One such pattern is a
>> storage/compute model where you have multiple clusters acting either as
>> compute clusters which pull data from storage clusters. An example would be
>> spinning up an EMR cluster and running a M/R job where you read from S3 and
>> output to S3.  Or within your enterprise you have your Data Lake (Data
>> Sewer) and then a compute cluster for analytics.
>>
>> In addition, you have some very nasty design issues to deal with like
>> security. Yes, that’s the very dirty word nobody wants to deal with and in
>> most of these tools, security is an afterthought.
>> So you may not have direct access to the cluster or an edge node. You
>> only have access to a single port on a single machine through the firewall,
>> which is running beeline so you can pull data from your storage cluster.
>>
>> Its very possible that you have to pull data from the cluster thru
>> beeline to store the data within a spark job running on the cluster. (Oh
>> the irony! ;-)
>>
>> Its important to understand that due to design constraints, options like
>> sqoop or running a query directly against Hive may not be possible and
>> these use cases do exist when dealing with PII information.
>>
>> Its also important to realize that you may have to pull data from
>> multiple data sources for some not so obvious but important reasons…
>> So your spark app has to be able to generate an RDD from data in an RDBS
>> (Oracle, DB2, etc …) persist it for local lookups and then pull data from
>> the cluster and then output it either back to the 

Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-22 Thread Nirav Patel
Hi,

I do not see any indication of errors or executor getting killed in spark
UI - jobs, stages, event timelines. No task failures. I also don't see any
errors in executor logs.

Thanks

On Wed, Jun 22, 2016 at 2:32 AM, Ted Yu  wrote:

> For the run which returned incorrect result, did you observe any error (on
> workers) ?
>
> Cheers
>
> On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel 
> wrote:
>
>> I have an RDD[String, MyObj] which is a result of Join + Map operation.
>> It has no partitioner info. I run reduceByKey without passing any
>> Partitioner or partition counts.  I observed that output aggregation result
>> for given key is incorrect sometime. like 1 out of 5 times. It looks like
>> reduce operation is joining values from two different keys. There is no
>> configuration change between multiple runs. I am scratching my head over
>> this. I verified results by printing out RDD before and after reduce
>> operation; collecting subset at driver.
>>
>> Besides shuffle and storage memory fraction I use following options:
>>
>> sparkConf.set("spark.driver.userClassPathFirst","true")
>> sparkConf.set("spark.unsafe.offHeap","true")
>> sparkConf.set("spark.reducer.maxSizeInFlight","128m")
>> sparkConf.set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>
>>
>>
>> [image: What's New with Xactly] 
>>
>>   [image: LinkedIn]
>>   [image: Twitter]
>>   [image: Facebook]
>>   [image: YouTube]
>> 
>
>
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: spark streaming questions

2016-06-22 Thread pandees waran
For my question (2), From my understanding checkpointing ensures the recovery 
from failures.
Sent from my iPhone

> On Jun 22, 2016, at 10:27 AM, pandees waran  wrote:
> 
> In general, if you have multiple steps in a workflow :
> For every batch 
> 1.stream data from s3 
> 2.write it to hbase
> 3.execute a hive step using the data in s3 
> 
> In this case all these 3 steps are part of the workflow. That's the reason I 
> mentioned about workflow orchestration.
> 
> The other question (2) is about how to manage the clusters without any 
> downtime / data loss .(especially when you want k being down the cluster and 
> create a new one for running spark streaming )
> 
> 
> Sent from my iPhone
> 
>> On Jun 22, 2016, at 10:17 AM, Mich Talebzadeh  
>> wrote:
>> 
>> Hi Pandees,
>> 
>> can you kindly explain what you are trying to achieve by incorporating Spark 
>> streaming with workflow orchestration. Is this some form of back-to-back 
>> seamless integration.
>> 
>> I have not used it myself but would be interested in knowing more about your 
>> use case.
>> 
>> Cheers,
>> 
>> 
>> 
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> http://talebzadehmich.wordpress.com
>>  
>> 
>>> On 22 June 2016 at 15:54, pandees waran  wrote:
>>> Hi Mich, please let me know if you have any thoughts on the below. 
>>> 
>>> -- Forwarded message --
>>> From: pandees waran 
>>> Date: Wed, Jun 22, 2016 at 7:53 AM
>>> Subject: spark streaming questions
>>> To: user@spark.apache.org
>>> 
>>> 
>>> Hello all,
>>> 
>>> I have few questions regarding spark streaming :
>>> 
>>> * I am wondering anyone uses spark streaming with workflow orchestrators 
>>> such as data pipeline/SWF/any other framework. Is there any advantages 
>>> /drawbacks on using a workflow orchestrator for spark streaming?
>>> 
>>> *How do you guys manage the cluster(bringing down /creating a new cluster ) 
>>> without any data loss in streaming? 
>>> 
>>> I would like to hear your thoughts on this.
>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> Thanks,
>>> Pandeeswaran
>> 


Networking Exceptions in Spark 1.6.1 with Dynamic Allocation and YARN Pre-Emption

2016-06-22 Thread Nick Peterson
Hey all,

We're working on setting up a Spark 1.6.1 cluster on Amazon EC2, and
encountering some problems related to pre-emption.  We have followed all
the instructions for setting up dynamic allocation, including enabling the
external spark shuffle service in the YARN NodeManagers.

When a container is pre-empted, we see lots of RPC calls continuing to be
made to its host/port; obviously, these fail.  Eventually, a combination of
exceptions manages to actually kill our job.  Interestingly, downgrading to
1.4.1 and switching the spark.shuffle.blocktransferservice to nio fixes the
problem completely; but, we need functionality developed in later versions,
so we can't actually go back to Spark 1.4.1.

Is this a network configuration issue, or a resource allocation issue, or
something to that effect?  Or, is there actually a bug here?  Have any of
you set up a cluster for production use that runs Spark 1.6.1, Hadoop
2.7.2, Yarn FairScheduler with preemption, and dynamic allocation?

Some examples of exceptions are below; thank you very much for any pointers
you can provide.

Thanks!
Nick

---

16/06/22 08:13:30 ERROR spark.ContextCleaner: Error cleaning RDD 49
java.io.IOException: Failed to send RPC 5721681506291542850 to
nodexx.xx..ddns.xx.com/xx.xx.xx.xx:42857:
java.nio.channels.ClosedChannelException
at
org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
at
org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
at
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
at
io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
at
io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
at
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
at
io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
at
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException



16/06/19 22:33:14 INFO storage.BlockManager: Removing RDD 122
16/06/19 22:33:14 WARN server.TransportChannelHandler: Exception in
connection from nodexx-xx-xx.xx.ddns.xx.com/xx.xx.xx.xx:56618
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
16/06/19 22:33:14 ERROR client.TransportResponseHandler: Still have 2
requests outstanding when connection from
nodexx-xx-xx..ddns.xx.com/xx.xx.xx.xx:56618 is closed.


Re: Silly question about Yarn client vs Yarn cluster modes...

2016-06-22 Thread Michael Segel
JDBC reliability problem? 

Ok… a bit more explanation… 

Usually when you have to go back to a legacy system, its because the data set 
is usually metadata and is relatively small.  Its not the sort of data that 
gets ingested in to a data lake unless you’re also ingesting the metadata and 
are using HBase/MapRDB , Cassandra or something like that. 

On top of all of this… when dealing with PII, you have to consider data at 
rest. This is also why you will see some things that make parts of the design a 
bit counter intuitive and why you start to think of storage/compute models over 
traditional cluster design. 

A single threaded JDBC connection to build a local RDD should be fine and I 
would really be concerned if that was unreliable.  That would mean tools like 
Spark or Drill would have serious reliability issues when considering legacy 
system access. 

And of course I’m still trying to dig in to YARN’s client vs cluster option. 


thx

> On Jun 22, 2016, at 9:36 AM, Mich Talebzadeh  
> wrote:
> 
> Thanks Mike for clarification.
> 
> I think there is another option to get data out of RDBMS through some form of 
> SELECT ALL COLUMNS TAB SEPARATED OR OTHER and put them in a flat file or 
> files. scp that file from the RDBMS directory to a private directory on HDFS 
> system  and push it into HDFS. That will by-pass the JDBC reliability problem 
> and I guess in this case one is in more control.
> 
> I do concur that there are security issues with this. For example that local 
> file system may have to have encryption etc  that will make it tedious. I 
> believe Jorn mentioned this somewhere.
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 22 June 2016 at 15:59, Michael Segel  > wrote:
> Hi, 
> 
> Just to clear a few things up… 
> 
> First I know its hard to describe some problems because they deal with client 
> confidential information. 
> (Also some basic ‘dead hooker’ thought problems to work through before facing 
> them at a client.) 
> The questions I pose here are very general and deal with some basic design 
> issues/consideration. 
> 
> 
> W.R.T JDBC / Beeline:
> There are many use cases where you don’t want to migrate some or all data to 
> HDFS.  This is why tools like Apache Drill exist. At the same time… there are 
> different cluster design patterns.  One such pattern is a storage/compute 
> model where you have multiple clusters acting either as compute clusters 
> which pull data from storage clusters. An example would be spinning up an EMR 
> cluster and running a M/R job where you read from S3 and output to S3.  Or 
> within your enterprise you have your Data Lake (Data Sewer) and then a 
> compute cluster for analytics. 
> 
> In addition, you have some very nasty design issues to deal with like 
> security. Yes, that’s the very dirty word nobody wants to deal with and in 
> most of these tools, security is an afterthought.  
> So you may not have direct access to the cluster or an edge node. You only 
> have access to a single port on a single machine through the firewall, which 
> is running beeline so you can pull data from your storage cluster. 
> 
> Its very possible that you have to pull data from the cluster thru beeline to 
> store the data within a spark job running on the cluster. (Oh the irony! ;-) 
> 
> Its important to understand that due to design constraints, options like 
> sqoop or running a query directly against Hive may not be possible and these 
> use cases do exist when dealing with PII information.
> 
> Its also important to realize that you may have to pull data from multiple 
> data sources for some not so obvious but important reasons… 
> So your spark app has to be able to generate an RDD from data in an RDBS 
> (Oracle, DB2, etc …) persist it for local lookups and then pull data from the 
> cluster and then output it either back to the cluster, another cluster, or 
> someplace else.  All of these design issues occur when you’re dealing with 
> large enterprises. 
> 
> 
> But I digress… 
> 
> The reason I started this thread was to get a better handle on where things 
> run when we make decisions to run either in client or cluster mode in YARN. 
> Issues like starting/stopping long running apps are an issue in production.  
> Tying up cluster resources while your application remains dormant waiting for 
> the next batch of events to come through. 
> Trying to understand the gotchas of each design consideration so that we can 
> weigh the pros and cons of a decision when designing a solution…. 
> 
> 
> Spark is relatively new and its a bit disruptive because it forces you to 
> think in terms of 

Re: spark streaming questions

2016-06-22 Thread pandees waran
In general, if you have multiple steps in a workflow :
For every batch 
1.stream data from s3 
2.write it to hbase
3.execute a hive step using the data in s3 

In this case all these 3 steps are part of the workflow. That's the reason I 
mentioned about workflow orchestration.

The other question (2) is about how to manage the clusters without any downtime 
/ data loss .(especially when you want k being down the cluster and create a 
new one for running spark streaming )


Sent from my iPhone

> On Jun 22, 2016, at 10:17 AM, Mich Talebzadeh  
> wrote:
> 
> Hi Pandees,
> 
> can you kindly explain what you are trying to achieve by incorporating Spark 
> streaming with workflow orchestration. Is this some form of back-to-back 
> seamless integration.
> 
> I have not used it myself but would be interested in knowing more about your 
> use case.
> 
> Cheers,
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
>  
> 
>> On 22 June 2016 at 15:54, pandees waran  wrote:
>> Hi Mich, please let me know if you have any thoughts on the below. 
>> 
>> -- Forwarded message --
>> From: pandees waran 
>> Date: Wed, Jun 22, 2016 at 7:53 AM
>> Subject: spark streaming questions
>> To: user@spark.apache.org
>> 
>> 
>> Hello all,
>> 
>> I have few questions regarding spark streaming :
>> 
>> * I am wondering anyone uses spark streaming with workflow orchestrators 
>> such as data pipeline/SWF/any other framework. Is there any advantages 
>> /drawbacks on using a workflow orchestrator for spark streaming?
>> 
>> *How do you guys manage the cluster(bringing down /creating a new cluster ) 
>> without any data loss in streaming? 
>> 
>> I would like to hear your thoughts on this.
>> 
>> 
>> 
>> 
>> -- 
>> Thanks,
>> Pandeeswaran
> 


Re: spark streaming questions

2016-06-22 Thread Mich Talebzadeh
Hi Pandees,

can you kindly explain what you are trying to achieve by incorporating
Spark streaming with workflow orchestration. Is this some form of
back-to-back seamless integration.

I have not used it myself but would be interested in knowing more about
your use case.

Cheers,




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 June 2016 at 15:54, pandees waran  wrote:

> Hi Mich, please let me know if you have any thoughts on the below.
>
> -- Forwarded message --
> From: pandees waran 
> Date: Wed, Jun 22, 2016 at 7:53 AM
> Subject: spark streaming questions
> To: user@spark.apache.org
>
>
> Hello all,
>
> I have few questions regarding spark streaming :
>
> * I am wondering anyone uses spark streaming with workflow orchestrators
> such as data pipeline/SWF/any other framework. Is there any advantages
> /drawbacks on using a workflow orchestrator for spark streaming?
>
> *How do you guys manage the cluster(bringing down /creating a new cluster
> ) without any data loss in streaming?
>
> I would like to hear your thoughts on this.
>
>
>
>
> --
> Thanks,
> Pandeeswaran
>


Re: OOM on the driver after increasing partitions

2016-06-22 Thread Raghava Mutharaju
It is an iterative algorithm which uses map, mapPartitions, join, union,
filter, broadcast and count. The goal is to compute a set of tuples and in
each iteration few tuples are added to it. Outline is given below

1) Start with initial set of tuples, T
2) In each iteration compute deltaT, and add them to T, i.e., T = T + deltaT
3) Stop when current T size (count) is same as previous T size, i.e.,
deltaT is 0.

Do you think something happens on the driver due to the application logic
and when the partitions are increased?

Regards,
Raghava.


On Wed, Jun 22, 2016 at 12:33 PM, Sonal Goyal  wrote:

> What does your application do?
>
> Best Regards,
> Sonal
> Founder, Nube Technologies 
> Reifier at Strata Hadoop World
> 
> Reifier at Spark Summit 2015
> 
>
> 
>
>
>
> On Wed, Jun 22, 2016 at 9:57 PM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Hello All,
>>
>> We have a Spark cluster where driver and master are running on the same
>> node. We are using Spark Standalone cluster manager. If the number of nodes
>> (and the partitions) are increased, the same dataset that used to run to
>> completion on lesser number of nodes is now giving an out of memory on the
>> driver.
>>
>> For example, a dataset that runs on 32 nodes with number of partitions
>> set to 256 completes whereas the same dataset when run on 64 nodes with
>> number of partitions as 512 gives an OOM on the driver side.
>>
>> From what I read in the Spark documentation and other articles, following
>> are the responsibilities of the driver/master.
>>
>> 1) create spark context
>> 2) build DAG of operations
>> 3) schedule tasks
>>
>> I am guessing that 1) and 2) should not change w.r.t number of
>> nodes/partitions. So is it that since the driver has to keep track of lot
>> more tasks, that it gives an OOM?
>>
>> What could be the possible reasons behind the driver-side OOM when the
>> number of partitions are increased?
>>
>> Regards,
>> Raghava.
>>
>
>


-- 
Regards,
Raghava
http://raghavam.github.io


Re: Spark support for update/delete operations on Hive ORC transactional tables

2016-06-22 Thread Ajay Chander
Thanks for the confirmation Mich!

On Wednesday, June 22, 2016, Mich Talebzadeh 
wrote:

> Hi Ajay,
>
> I am afraid for now transaction heart beat do not work through Spark, so I
> have no other solution.
>
> This is interesting point as with Hive running on Spark engine there is no
> issue with this as Hive handles the transactions,
>
> I gather in simplest form Hive has to deal with its metadata for
> transaction logic but Spark somehow cannot do that.
>
> In short that is it. You need to do that through Hive.
>
> Cheers,
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 June 2016 at 16:08, Ajay Chander  > wrote:
>
>> Hi Mich,
>>
>> Right now I have a similar usecase where I have to delete some rows
>> from a hive table. My hive table is of type ORC, Bucketed and included
>> transactional property. I can delete from hive shell but not from my
>> spark-shell or spark app. Were you able to find any work around? Thank
>> you.
>>
>> Regards,
>> Ajay
>>
>>
>> On Thursday, June 2, 2016, Mich Talebzadeh > > wrote:
>>
>>> thanks for that.
>>>
>>> I will have a look
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 2 June 2016 at 10:46, Elliot West  wrote:
>>>
 Related to this, there exists an API in Hive to simplify the
 integrations of other frameworks with Hive's ACID feature:

 See:
 https://cwiki.apache.org/confluence/display/Hive/HCatalog+Streaming+Mutation+API

 It contains code for maintaining heartbeats, handling locks and
 transactions, and submitting mutations in a distributed environment.

 We have used it to write to transactional tables from Cascading based
 processes.

 Elliot.


 On 2 June 2016 at 09:54, Mich Talebzadeh 
 wrote:

>
> Hi,
>
> Spark does not support transactions because as I understand there is
> a piece in the execution side that needs to send heartbeats to Hive
> metastore saying a transaction is still alive". That has not been
> implemented in Spark yet to my knowledge."
>
> Any idea on the timelines when we are going to have support for
> transactions in Spark for Hive ORC tables. This will really be useful.
>
>
> Thanks,
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


>>>
>


Re: Confusing argument of sql.functions.count

2016-06-22 Thread Xinh Huynh
I can see how the linked documentation could be confusing:
"Aggregate function: returns the number of items in a group."

What it doesn't mention is that it returns the number of rows for which the
given column is non-null.

Xinh

On Wed, Jun 22, 2016 at 9:31 AM, Takeshi Yamamuro 
wrote:

> Hi,
>
> An argument for `functions.count` is needed for per-column counting;
> df.groupBy($"a").agg(count($"b"))
>
> // maropu
>
> On Thu, Jun 23, 2016 at 1:27 AM, Ted Yu  wrote:
>
>> See the first example in:
>>
>> http://www.w3schools.com/sql/sql_func_count.asp
>>
>> On Wed, Jun 22, 2016 at 9:21 AM, Jakub Dubovsky <
>> spark.dubovsky.ja...@gmail.com> wrote:
>>
>>> Hey Ted,
>>>
>>> thanks for reacting.
>>>
>>> I am refering to both of them. They both take column as parameter
>>> regardless of its type. Intuition here is that count should take no
>>> parameter. Or am I missing something?
>>>
>>> Jakub
>>>
>>> On Wed, Jun 22, 2016 at 6:19 PM, Ted Yu  wrote:
>>>
 Are you referring to the following method in
 sql/core/src/main/scala/org/apache/spark/sql/functions.scala :

   def count(e: Column): Column = withAggregateFunction {

 Did you notice this method ?

   def count(columnName: String): TypedColumn[Any, Long] =

 On Wed, Jun 22, 2016 at 9:06 AM, Jakub Dubovsky <
 spark.dubovsky.ja...@gmail.com> wrote:

> Hey sparkers,
>
> an aggregate function *count* in *org.apache.spark.sql.functions*
> package takes a *column* as an argument. Is this needed for
> something? I find it confusing that I need to supply a column there. It
> feels like it might be distinct count or something. This can be seen in 
> latest
> documentation
> 
> .
>
> I am considering filling this in spark bug tracker. Any opinions on
> this?
>
> Thanks
>
> Jakub
>
>

>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Spark support for update/delete operations on Hive ORC transactional tables

2016-06-22 Thread Mich Talebzadeh
Hi Ajay,

I am afraid for now transaction heart beat do not work through Spark, so I
have no other solution.

This is interesting point as with Hive running on Spark engine there is no
issue with this as Hive handles the transactions,

I gather in simplest form Hive has to deal with its metadata for
transaction logic but Spark somehow cannot do that.

In short that is it. You need to do that through Hive.

Cheers,



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 June 2016 at 16:08, Ajay Chander  wrote:

> Hi Mich,
>
> Right now I have a similar usecase where I have to delete some rows from a
> hive table. My hive table is of type ORC, Bucketed and included
> transactional property. I can delete from hive shell but not from my
> spark-shell or spark app. Were you able to find any work around? Thank
> you.
>
> Regards,
> Ajay
>
>
> On Thursday, June 2, 2016, Mich Talebzadeh 
> wrote:
>
>> thanks for that.
>>
>> I will have a look
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 2 June 2016 at 10:46, Elliot West  wrote:
>>
>>> Related to this, there exists an API in Hive to simplify the
>>> integrations of other frameworks with Hive's ACID feature:
>>>
>>> See:
>>> https://cwiki.apache.org/confluence/display/Hive/HCatalog+Streaming+Mutation+API
>>>
>>> It contains code for maintaining heartbeats, handling locks and
>>> transactions, and submitting mutations in a distributed environment.
>>>
>>> We have used it to write to transactional tables from Cascading based
>>> processes.
>>>
>>> Elliot.
>>>
>>>
>>> On 2 June 2016 at 09:54, Mich Talebzadeh 
>>> wrote:
>>>

 Hi,

 Spark does not support transactions because as I understand there is a
 piece in the execution side that needs to send heartbeats to Hive metastore
 saying a transaction is still alive". That has not been implemented in
 Spark yet to my knowledge."

 Any idea on the timelines when we are going to have support for
 transactions in Spark for Hive ORC tables. This will really be useful.


 Thanks,


 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



>>>
>>>
>>


Re: Confusing argument of sql.functions.count

2016-06-22 Thread Jakub Dubovsky
Nice reactions. My comments:

@Ted.Yu: I see now that count(*) works for what I want
@Takeshi: I understand this is the syntax but it was not clear to me what
this $"b" column will be used for...

My line of thinking was this:

I started with
1) someDF.groupBy("colA").count()

and then I realized I need an average of colB per group so I tried
2) someDF.groupBy("colA").agg( avg("colB"), count() )

but it failed because count needs an argument. I understand the situation
now. Thank you guys for clarification! However having future generations in
mind :) I still want to poke around:

- Usages of count in 1) and 2) are still a bit inconsistent to me. If this
is the way have 2) works why there is no column arg in 1)?
- I would expect a glimpse of all of this would be in scaladoc for the
methods. The difference between their scala doc strings are hard to catch:
- - usage 1) in org.apache.spark.sql.GroupedData: Count the number of *rows*
for each group...
- - usage 2) in org.apache.spark.sql.functions: ...returns the number of
*items* in a group...

Thanks

On Wed, Jun 22, 2016 at 6:31 PM, Takeshi Yamamuro 
wrote:

> Hi,
>
> An argument for `functions.count` is needed for per-column counting;
> df.groupBy($"a").agg(count($"b"))
>
> // maropu
>
> On Thu, Jun 23, 2016 at 1:27 AM, Ted Yu  wrote:
>
>> See the first example in:
>>
>> http://www.w3schools.com/sql/sql_func_count.asp
>>
>> On Wed, Jun 22, 2016 at 9:21 AM, Jakub Dubovsky <
>> spark.dubovsky.ja...@gmail.com> wrote:
>>
>>> Hey Ted,
>>>
>>> thanks for reacting.
>>>
>>> I am refering to both of them. They both take column as parameter
>>> regardless of its type. Intuition here is that count should take no
>>> parameter. Or am I missing something?
>>>
>>> Jakub
>>>
>>> On Wed, Jun 22, 2016 at 6:19 PM, Ted Yu  wrote:
>>>
 Are you referring to the following method in
 sql/core/src/main/scala/org/apache/spark/sql/functions.scala :

   def count(e: Column): Column = withAggregateFunction {

 Did you notice this method ?

   def count(columnName: String): TypedColumn[Any, Long] =

 On Wed, Jun 22, 2016 at 9:06 AM, Jakub Dubovsky <
 spark.dubovsky.ja...@gmail.com> wrote:

> Hey sparkers,
>
> an aggregate function *count* in *org.apache.spark.sql.functions*
> package takes a *column* as an argument. Is this needed for
> something? I find it confusing that I need to supply a column there. It
> feels like it might be distinct count or something. This can be seen in 
> latest
> documentation
> 
> .
>
> I am considering filling this in spark bug tracker. Any opinions on
> this?
>
> Thanks
>
> Jakub
>
>

>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Silly question about Yarn client vs Yarn cluster modes...

2016-06-22 Thread Mich Talebzadeh
Thanks Mike for clarification.

I think there is another option to get data out of RDBMS through some form
of SELECT ALL COLUMNS TAB SEPARATED OR OTHER and put them in a flat file or
files. scp that file from the RDBMS directory to a private directory on
HDFS system  and push it into HDFS. That will by-pass the JDBC reliability
problem and I guess in this case one is in more control.

I do concur that there are security issues with this. For example that
local file system may have to have encryption etc  that will make it
tedious. I believe Jorn mentioned this somewhere.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 June 2016 at 15:59, Michael Segel  wrote:

> Hi,
>
> Just to clear a few things up…
>
> First I know its hard to describe some problems because they deal with
> client confidential information.
> (Also some basic ‘dead hooker’ thought problems to work through before
> facing them at a client.)
> The questions I pose here are very general and deal with some basic design
> issues/consideration.
>
>
> W.R.T JDBC / Beeline:
>
> There are many use cases where you don’t want to migrate some or all data
> to HDFS.  This is why tools like Apache Drill exist. At the same time…
> there are different cluster design patterns.  One such pattern is a
> storage/compute model where you have multiple clusters acting either as
> compute clusters which pull data from storage clusters. An example would be
> spinning up an EMR cluster and running a M/R job where you read from S3 and
> output to S3.  Or within your enterprise you have your Data Lake (Data
> Sewer) and then a compute cluster for analytics.
>
> In addition, you have some very nasty design issues to deal with like
> security. Yes, that’s the very dirty word nobody wants to deal with and in
> most of these tools, security is an afterthought.
> So you may not have direct access to the cluster or an edge node. You only
> have access to a single port on a single machine through the firewall,
> which is running beeline so you can pull data from your storage cluster.
>
> Its very possible that you have to pull data from the cluster thru beeline
> to store the data within a spark job running on the cluster. (Oh the irony!
> ;-)
>
> Its important to understand that due to design constraints, options like
> sqoop or running a query directly against Hive may not be possible and
> these use cases do exist when dealing with PII information.
>
> Its also important to realize that you may have to pull data from multiple
> data sources for some not so obvious but important reasons…
> So your spark app has to be able to generate an RDD from data in an RDBS
> (Oracle, DB2, etc …) persist it for local lookups and then pull data from
> the cluster and then output it either back to the cluster, another cluster,
> or someplace else.  All of these design issues occur when you’re dealing
> with large enterprises.
>
>
> But I digress…
>
> The reason I started this thread was to get a better handle on where
> things run when we make decisions to run either in client or cluster mode
> in YARN.
> Issues like starting/stopping long running apps are an issue in
> production.  Tying up cluster resources while your application remains
> dormant waiting for the next batch of events to come through.
> Trying to understand the gotchas of each design consideration so that we
> can weigh the pros and cons of a decision when designing a solution….
>
>
> Spark is relatively new and its a bit disruptive because it forces you to
> think in terms of storage/compute models or Jim Scott’s holistic ‘Zeta
> Architecture’.  In addition, it forces you to rethink your cluster hardware
> design too.
>
> HTH to clarify the questions and of course thanks for the replies.
>
> -Mike
>
> On Jun 21, 2016, at 11:17 PM, Mich Talebzadeh 
> wrote:
>
> If you are going to get data out of an RDBMS like Oracle then the correct
> procedure is:
>
>
>1. Use Hive on Spark execution engine. That improves Hive performance
>2. You can use JDBC through Spark itself. No issue there. It will use
>JDBC provided by HiveContext
>3. JDBC is fine. Every vendor has a utility to migrate a full database
>from one to another using JDBC. For example SAP relies on JDBC to migrate a
>whole Oracle schema to SAP ASE
>4. I have imported an Oracle table of 1 billion rows through Spark
>into Hive ORC table. It works fine. Actually I use Spark to do the job for
>these type of imports. Register a tempTable from DF and use it to put data
>into Hive table. You can create Hive table explicitly in Spark and do an
>INSERT/SELECT into it rather than save etc.
>5. You can access Hive tables through HiveContext. Beeline is a client
>tool 

Re: OOM on the driver after increasing partitions

2016-06-22 Thread Sonal Goyal
What does your application do?

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Wed, Jun 22, 2016 at 9:57 PM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> Hello All,
>
> We have a Spark cluster where driver and master are running on the same
> node. We are using Spark Standalone cluster manager. If the number of nodes
> (and the partitions) are increased, the same dataset that used to run to
> completion on lesser number of nodes is now giving an out of memory on the
> driver.
>
> For example, a dataset that runs on 32 nodes with number of partitions set
> to 256 completes whereas the same dataset when run on 64 nodes with number
> of partitions as 512 gives an OOM on the driver side.
>
> From what I read in the Spark documentation and other articles, following
> are the responsibilities of the driver/master.
>
> 1) create spark context
> 2) build DAG of operations
> 3) schedule tasks
>
> I am guessing that 1) and 2) should not change w.r.t number of
> nodes/partitions. So is it that since the driver has to keep track of lot
> more tasks, that it gives an OOM?
>
> What could be the possible reasons behind the driver-side OOM when the
> number of partitions are increased?
>
> Regards,
> Raghava.
>


Re: Confusing argument of sql.functions.count

2016-06-22 Thread Takeshi Yamamuro
Hi,

An argument for `functions.count` is needed for per-column counting;
df.groupBy($"a").agg(count($"b"))

// maropu

On Thu, Jun 23, 2016 at 1:27 AM, Ted Yu  wrote:

> See the first example in:
>
> http://www.w3schools.com/sql/sql_func_count.asp
>
> On Wed, Jun 22, 2016 at 9:21 AM, Jakub Dubovsky <
> spark.dubovsky.ja...@gmail.com> wrote:
>
>> Hey Ted,
>>
>> thanks for reacting.
>>
>> I am refering to both of them. They both take column as parameter
>> regardless of its type. Intuition here is that count should take no
>> parameter. Or am I missing something?
>>
>> Jakub
>>
>> On Wed, Jun 22, 2016 at 6:19 PM, Ted Yu  wrote:
>>
>>> Are you referring to the following method in
>>> sql/core/src/main/scala/org/apache/spark/sql/functions.scala :
>>>
>>>   def count(e: Column): Column = withAggregateFunction {
>>>
>>> Did you notice this method ?
>>>
>>>   def count(columnName: String): TypedColumn[Any, Long] =
>>>
>>> On Wed, Jun 22, 2016 at 9:06 AM, Jakub Dubovsky <
>>> spark.dubovsky.ja...@gmail.com> wrote:
>>>
 Hey sparkers,

 an aggregate function *count* in *org.apache.spark.sql.functions*
 package takes a *column* as an argument. Is this needed for something?
 I find it confusing that I need to supply a column there. It feels like it
 might be distinct count or something. This can be seen in latest
 documentation
 
 .

 I am considering filling this in spark bug tracker. Any opinions on
 this?

 Thanks

 Jakub


>>>
>>
>


-- 
---
Takeshi Yamamuro


OOM on the driver after increasing partitions

2016-06-22 Thread Raghava Mutharaju
Hello All,

We have a Spark cluster where driver and master are running on the same
node. We are using Spark Standalone cluster manager. If the number of nodes
(and the partitions) are increased, the same dataset that used to run to
completion on lesser number of nodes is now giving an out of memory on the
driver.

For example, a dataset that runs on 32 nodes with number of partitions set
to 256 completes whereas the same dataset when run on 64 nodes with number
of partitions as 512 gives an OOM on the driver side.

>From what I read in the Spark documentation and other articles, following
are the responsibilities of the driver/master.

1) create spark context
2) build DAG of operations
3) schedule tasks

I am guessing that 1) and 2) should not change w.r.t number of
nodes/partitions. So is it that since the driver has to keep track of lot
more tasks, that it gives an OOM?

What could be the possible reasons behind the driver-side OOM when the
number of partitions are increased?

Regards,
Raghava.


Re: Confusing argument of sql.functions.count

2016-06-22 Thread Ted Yu
See the first example in:

http://www.w3schools.com/sql/sql_func_count.asp

On Wed, Jun 22, 2016 at 9:21 AM, Jakub Dubovsky <
spark.dubovsky.ja...@gmail.com> wrote:

> Hey Ted,
>
> thanks for reacting.
>
> I am refering to both of them. They both take column as parameter
> regardless of its type. Intuition here is that count should take no
> parameter. Or am I missing something?
>
> Jakub
>
> On Wed, Jun 22, 2016 at 6:19 PM, Ted Yu  wrote:
>
>> Are you referring to the following method in
>> sql/core/src/main/scala/org/apache/spark/sql/functions.scala :
>>
>>   def count(e: Column): Column = withAggregateFunction {
>>
>> Did you notice this method ?
>>
>>   def count(columnName: String): TypedColumn[Any, Long] =
>>
>> On Wed, Jun 22, 2016 at 9:06 AM, Jakub Dubovsky <
>> spark.dubovsky.ja...@gmail.com> wrote:
>>
>>> Hey sparkers,
>>>
>>> an aggregate function *count* in *org.apache.spark.sql.functions*
>>> package takes a *column* as an argument. Is this needed for something?
>>> I find it confusing that I need to supply a column there. It feels like it
>>> might be distinct count or something. This can be seen in latest
>>> documentation
>>> 
>>> .
>>>
>>> I am considering filling this in spark bug tracker. Any opinions on this?
>>>
>>> Thanks
>>>
>>> Jakub
>>>
>>>
>>
>


Re: Confusing argument of sql.functions.count

2016-06-22 Thread Jakub Dubovsky
Hey Ted,

thanks for reacting.

I am refering to both of them. They both take column as parameter
regardless of its type. Intuition here is that count should take no
parameter. Or am I missing something?

Jakub

On Wed, Jun 22, 2016 at 6:19 PM, Ted Yu  wrote:

> Are you referring to the following method in
> sql/core/src/main/scala/org/apache/spark/sql/functions.scala :
>
>   def count(e: Column): Column = withAggregateFunction {
>
> Did you notice this method ?
>
>   def count(columnName: String): TypedColumn[Any, Long] =
>
> On Wed, Jun 22, 2016 at 9:06 AM, Jakub Dubovsky <
> spark.dubovsky.ja...@gmail.com> wrote:
>
>> Hey sparkers,
>>
>> an aggregate function *count* in *org.apache.spark.sql.functions*
>> package takes a *column* as an argument. Is this needed for something? I
>> find it confusing that I need to supply a column there. It feels like it
>> might be distinct count or something. This can be seen in latest
>> documentation
>> 
>> .
>>
>> I am considering filling this in spark bug tracker. Any opinions on this?
>>
>> Thanks
>>
>> Jakub
>>
>>
>


Re: Confusing argument of sql.functions.count

2016-06-22 Thread Ted Yu
Are you referring to the following method in
sql/core/src/main/scala/org/apache/spark/sql/functions.scala :

  def count(e: Column): Column = withAggregateFunction {

Did you notice this method ?

  def count(columnName: String): TypedColumn[Any, Long] =

On Wed, Jun 22, 2016 at 9:06 AM, Jakub Dubovsky <
spark.dubovsky.ja...@gmail.com> wrote:

> Hey sparkers,
>
> an aggregate function *count* in *org.apache.spark.sql.functions* package
> takes a *column* as an argument. Is this needed for something? I find it
> confusing that I need to supply a column there. It feels like it might be
> distinct count or something. This can be seen in latest documentation
> 
> .
>
> I am considering filling this in spark bug tracker. Any opinions on this?
>
> Thanks
>
> Jakub
>
>


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread Jörn Franke

That is the cost of exactly once :) 

> On 22 Jun 2016, at 12:54, sandesh deshmane  wrote:
> 
> We are going with checkpointing . we don't have identifier available to 
> identify if the message is already processed or not .
> Even if we had it, then it will slow down the processing as we do get 300k 
> messages per sec , so lookup will slow down.
> 
> Thanks
> Sandesh
> 
>> On Wed, Jun 22, 2016 at 3:28 PM, Jörn Franke  wrote:
>> 
>> Spark Streamig does not guarantee exactly once for output action. It means 
>> that one item is only processed in an RDD.
>> You can achieve at most once or at least once.
>> You could however do at least once (via checkpoing) and record which 
>> messages have been proceed (some identifier available?) and do not re 
>> process them  You could also store (safely) what range has been already 
>> processed etc
>> 
>> Think about the business case if exactly once is needed or if it can be 
>> replaced by one of the others.
>> Exactly once, it needed requires in any system including spark more effort 
>> and usually the throughput is lower. A risk evaluation from a business point 
>> of view has to be done anyway...
>> 
>> > On 22 Jun 2016, at 09:09, sandesh deshmane  wrote:
>> >
>> > Hi,
>> >
>> > I am writing spark streaming application which reads messages from Kafka.
>> >
>> > I am using checkpointing and write ahead logs ( WAL) to achieve fault 
>> > tolerance .
>> >
>> > I have created batch size of 10 sec for reading messages from kafka.
>> >
>> > I read messages for kakfa and generate the count of messages as per values 
>> > received from Kafka message.
>> >
>> > In case there is failure and my spark streaming application is restarted I 
>> > see duplicate messages processed ( which is close to 2 batches)
>> >
>> > The problem that I have is per sec I get around 300k messages and In case 
>> > application is restarted I see around 3-5 million duplicate counts.
>> >
>> > How to avoid such duplicates?
>> >
>> > what is best to way to recover from such failures ?
>> >
>> > Thanks
>> > Sandesh
> 


Confusing argument of sql.functions.count

2016-06-22 Thread Jakub Dubovsky
Hey sparkers,

an aggregate function *count* in *org.apache.spark.sql.functions* package
takes a *column* as an argument. Is this needed for something? I find it
confusing that I need to supply a column there. It feels like it might be
distinct count or something. This can be seen in latest documentation

.

I am considering filling this in spark bug tracker. Any opinions on this?

Thanks

Jakub


Re: spark-1.6.1-bin-without-hadoop can not use spark-sql

2016-06-22 Thread Ted Yu
build/mvn clean -Phive -Phive-thriftserver -Pyarn -Phadoop-2.6 -Psparkr
-Dhadoop.version=2.7.2 package

On Wed, Jun 22, 2016 at 8:00 AM, 251922566 <251922...@qq.com> wrote:

> ok,i will rebuild myself. if i want to use spark with hadoop 2.7.2, when i
> build spark, i should put what on param --hadoop, 2.7.2 or others?
>
> 来自我的华为手机
>
>
>  原始邮件 
> 主题:Re: spark-1.6.1-bin-without-hadoop can not use spark-sql
> 发件人:Ted Yu
> 收件人:喜之郎 <251922...@qq.com>
> 抄送:user
>
>
> I wonder if the tar ball was built with:
>
> -Phive -Phive-thriftserver
>
> Maybe rebuild by yourself with the above ?
>
> FYI
>
> On Wed, Jun 22, 2016 at 4:38 AM, 喜之郎 <251922...@qq.com> wrote:
>
>> Hi all.
>> I download spark-1.6.1-bin-without-hadoop.tgz
>>  
>> from
>> website.
>> And I configured "SPARK_DIST_CLASSPATH" in spark-env.sh.
>> Now spark-shell run well. But spark-sql can not run.
>> My hadoop version is 2.7.2.
>> This is error infos:
>>
>> bin/spark-sql
>> java.lang.ClassNotFoundException:
>> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:278)
>> at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Failed to load main class
>> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.
>> You need to build Spark with -Phive and -Phive-thriftserver.
>>
>> Do I need configure something else in spark-env.sh or spark-default.conf?
>> Suggestions are appreciated ,thanks.
>>
>>
>>
>>
>


Re: Spark support for update/delete operations on Hive ORC transactional tables

2016-06-22 Thread Ajay Chander
Hi Mich,

Right now I have a similar usecase where I have to delete some rows from a
hive table. My hive table is of type ORC, Bucketed and included
transactional property. I can delete from hive shell but not from my
spark-shell or spark app. Were you able to find any work around? Thank you.

Regards,
Ajay

On Thursday, June 2, 2016, Mich Talebzadeh 
wrote:

> thanks for that.
>
> I will have a look
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 2 June 2016 at 10:46, Elliot West  > wrote:
>
>> Related to this, there exists an API in Hive to simplify the integrations
>> of other frameworks with Hive's ACID feature:
>>
>> See:
>> https://cwiki.apache.org/confluence/display/Hive/HCatalog+Streaming+Mutation+API
>>
>> It contains code for maintaining heartbeats, handling locks and
>> transactions, and submitting mutations in a distributed environment.
>>
>> We have used it to write to transactional tables from Cascading based
>> processes.
>>
>> Elliot.
>>
>>
>> On 2 June 2016 at 09:54, Mich Talebzadeh > > wrote:
>>
>>>
>>> Hi,
>>>
>>> Spark does not support transactions because as I understand there is a
>>> piece in the execution side that needs to send heartbeats to Hive metastore
>>> saying a transaction is still alive". That has not been implemented in
>>> Spark yet to my knowledge."
>>>
>>> Any idea on the timelines when we are going to have support for
>>> transactions in Spark for Hive ORC tables. This will really be useful.
>>>
>>>
>>> Thanks,
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>
>>
>


回复: spark-1.6.1-bin-without-hadoop can not use spark-sql

2016-06-22 Thread 251922566
ok,i will rebuild myself.  if i want to use spark with hadoop 2.7.2, when i build spark, i should put what on param --hadoop, 2.7.2 or others?来自我的华为手机 原始邮件 主题:Re: spark-1.6.1-bin-without-hadoop can not use spark-sql发件人:Ted Yu 收件人:喜之郎 <251922...@qq.com>抄送:user I wonder if the tar ball was built with:-Phive -Phive-thriftserverMaybe rebuild by yourself with the above ?FYIOn Wed, Jun 22, 2016 at 4:38 AM, 喜之郎 <251922...@qq.com> wrote:Hi all.I download spark-1.6.1-bin-without-hadoop.tgz from website.And I configured "SPARK_DIST_CLASSPATH" in spark-env.sh.Now spark-shell run well. But spark-sql can not run.My hadoop version is 2.7.2.This is error infos:bin/spark-sql java.lang.ClassNotFoundException: org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)	at java.security.AccessController.doPrivileged(Native Method)	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)	at java.lang.Class.forName0(Native Method)	at java.lang.Class.forName(Class.java:278)	at org.apache.spark.util.Utils$.classForName(Utils.scala:174)	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Failed to load main class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.You need to build Spark with -Phive and -Phive-thriftserver.Do I need configure something else in spark-env.sh or spark-default.conf?Suggestions are appreciated ,thanks.


Re: Silly question about Yarn client vs Yarn cluster modes...

2016-06-22 Thread Michael Segel
Hi, 

Just to clear a few things up… 

First I know its hard to describe some problems because they deal with client 
confidential information. 
(Also some basic ‘dead hooker’ thought problems to work through before facing 
them at a client.) 
The questions I pose here are very general and deal with some basic design 
issues/consideration. 


W.R.T JDBC / Beeline:
There are many use cases where you don’t want to migrate some or all data to 
HDFS.  This is why tools like Apache Drill exist. At the same time… there are 
different cluster design patterns.  One such pattern is a storage/compute model 
where you have multiple clusters acting either as compute clusters which pull 
data from storage clusters. An example would be spinning up an EMR cluster and 
running a M/R job where you read from S3 and output to S3.  Or within your 
enterprise you have your Data Lake (Data Sewer) and then a compute cluster for 
analytics. 

In addition, you have some very nasty design issues to deal with like security. 
Yes, that’s the very dirty word nobody wants to deal with and in most of these 
tools, security is an afterthought.  
So you may not have direct access to the cluster or an edge node. You only have 
access to a single port on a single machine through the firewall, which is 
running beeline so you can pull data from your storage cluster. 

Its very possible that you have to pull data from the cluster thru beeline to 
store the data within a spark job running on the cluster. (Oh the irony! ;-) 

Its important to understand that due to design constraints, options like sqoop 
or running a query directly against Hive may not be possible and these use 
cases do exist when dealing with PII information.

Its also important to realize that you may have to pull data from multiple data 
sources for some not so obvious but important reasons… 
So your spark app has to be able to generate an RDD from data in an RDBS 
(Oracle, DB2, etc …) persist it for local lookups and then pull data from the 
cluster and then output it either back to the cluster, another cluster, or 
someplace else.  All of these design issues occur when you’re dealing with 
large enterprises. 


But I digress… 

The reason I started this thread was to get a better handle on where things run 
when we make decisions to run either in client or cluster mode in YARN. 
Issues like starting/stopping long running apps are an issue in production.  
Tying up cluster resources while your application remains dormant waiting for 
the next batch of events to come through. 
Trying to understand the gotchas of each design consideration so that we can 
weigh the pros and cons of a decision when designing a solution…. 


Spark is relatively new and its a bit disruptive because it forces you to think 
in terms of storage/compute models or Jim Scott’s holistic ‘Zeta Architecture’. 
 In addition, it forces you to rethink your cluster hardware design too. 

HTH to clarify the questions and of course thanks for the replies. 

-Mike

> On Jun 21, 2016, at 11:17 PM, Mich Talebzadeh  
> wrote:
> 
> If you are going to get data out of an RDBMS like Oracle then the correct 
> procedure is:
> 
> Use Hive on Spark execution engine. That improves Hive performance
> You can use JDBC through Spark itself. No issue there. It will use JDBC 
> provided by HiveContext
> JDBC is fine. Every vendor has a utility to migrate a full database from one 
> to another using JDBC. For example SAP relies on JDBC to migrate a whole 
> Oracle schema to SAP ASE
> I have imported an Oracle table of 1 billion rows through Spark into Hive ORC 
> table. It works fine. Actually I use Spark to do the job for these type of 
> imports. Register a tempTable from DF and use it to put data into Hive table. 
> You can create Hive table explicitly in Spark and do an INSERT/SELECT into it 
> rather than save etc.  
> You can access Hive tables through HiveContext. Beeline is a client tool that 
> connects to Hive thrift server. I don't think it comes into equation here
> Finally one experiment worth multiples of these speculation. Try for yourself 
> and fine out.
> If you want to use JDBC for an RDBMS table then you will need to download the 
> relevant JAR file. For example for Oracle it is ojdbc6.jar etc
> Like anything else your mileage varies and need to experiment with it. 
> Otherwise these are all opinions.
> 
> HTH
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 22 June 2016 at 06:46, Jörn Franke  > wrote:
> I would import data via sqoop and put it on HDFS. It has some mechanisms to 
> handle the lack of reliability by jdbc. 
> 
> Then you can process the data via Spark. 

spark streaming questions

2016-06-22 Thread pandees waran
Hello all,

I have few questions regarding spark streaming :

* I am wondering anyone uses spark streaming with workflow orchestrators
such as data pipeline/SWF/any other framework. Is there any advantages
/drawbacks on using a workflow orchestrator for spark streaming?

*How do you guys manage the cluster(bringing down /creating a new cluster )
without any data loss in streaming?

I would like to hear your thoughts on this.


Re: spark-1.6.1-bin-without-hadoop can not use spark-sql

2016-06-22 Thread Ted Yu
I wonder if the tar ball was built with:

-Phive -Phive-thriftserver

Maybe rebuild by yourself with the above ?

FYI

On Wed, Jun 22, 2016 at 4:38 AM, 喜之郎 <251922...@qq.com> wrote:

> Hi all.
> I download spark-1.6.1-bin-without-hadoop.tgz
>  from
> website.
> And I configured "SPARK_DIST_CLASSPATH" in spark-env.sh.
> Now spark-shell run well. But spark-sql can not run.
> My hadoop version is 2.7.2.
> This is error infos:
>
> bin/spark-sql
> java.lang.ClassNotFoundException:
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:278)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Failed to load main class
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.
> You need to build Spark with -Phive and -Phive-thriftserver.
>
> Do I need configure something else in spark-env.sh or spark-default.conf?
> Suggestions are appreciated ,thanks.
>
>
>
>


?????? Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread ????????
thanks  you-all  patience help very much??i change the para
spark.yarn.jar   spark.yarn.jar 
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
to
spark.yarn.jar   spark.yarn.jar 
hdfs://master:9000/user/shihj/spark_lib/spark-assembly-1.6.1-hadoop2.6.0.jar


then run well??




thanks you-all again??






--  --
??: "";<958943...@qq.com>;
: 2016??6??22??(??) 3:10
??: "Yash Sharma"; 
: "Saisai Shao"; "user"; 
: ?? Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher



yes??it run well








shihj@master:/usr/local/spark/spark-1.6.1-bin-hadoop2.6$ ./bin/spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --master local[4] \
> lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
16/06/22 15:08:14 INFO SparkContext: Running Spark version 1.6.1
16/06/22 15:08:14 WARN SparkConf: 
SPARK_WORKER_INSTANCES was detected (set to '1').
This is deprecated in Spark 1.0+.


Please instead use:
 - ./spark-submit with --num-executors to specify the number of executors
 - Or set SPARK_EXECUTOR_INSTANCES
 - spark.executor.instances to configure the number of instances in the spark 
config.

16/06/22 15:08:15 INFO SecurityManager: Changing view acls to: shihj
16/06/22 15:08:15 INFO SecurityManager: Changing modify acls to: shihj
16/06/22 15:08:15 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(shihj); users with 
modify permissions: Set(shihj)
16/06/22 15:08:16 INFO Utils: Successfully started service 'sparkDriver' on 
port 43865.
16/06/22 15:08:16 INFO Slf4jLogger: Slf4jLogger started
16/06/22 15:08:16 INFO Remoting: Starting remoting
16/06/22 15:08:17 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkDriverActorSystem@192.168.20.137:39308]
16/06/22 15:08:17 INFO Utils: Successfully started service 
'sparkDriverActorSystem' on port 39308.
16/06/22 15:08:17 INFO SparkEnv: Registering MapOutputTracker
16/06/22 15:08:17 INFO SparkEnv: Registering BlockManagerMaster
16/06/22 15:08:17 INFO DiskBlockManager: Created local directory at 
/tmp/blockmgr-3195b7f2-126d-4734-a681-6ec00727352a
16/06/22 15:08:17 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/06/22 15:08:17 INFO SparkEnv: Registering OutputCommitCoordinator
16/06/22 15:08:18 INFO Utils: Successfully started service 'SparkUI' on port 
4040.
16/06/22 15:08:18 INFO SparkUI: Started SparkUI at http://192.168.20.137:4040
16/06/22 15:08:18 INFO HttpFileServer: HTTP File server directory is 
/tmp/spark-c91a579d-1a18-4f75-ae05-137d9a286080/httpd-961023ad-cc05-4e3e-b648-19581093df11
16/06/22 15:08:18 INFO HttpServer: Starting HTTP Server
16/06/22 15:08:18 INFO Utils: Successfully started service 'HTTP file server' 
on port 49924.
16/06/22 15:08:22 INFO SparkContext: Added JAR 
file:/usr/local/spark/spark-1.6.1-bin-hadoop2.6/lib/spark-examples-1.6.1-hadoop2.6.0.jar
 at http://192.168.20.137:49924/jars/spark-examples-1.6.1-hadoop2.6.0.jar with 
timestamp 1466579302122
16/06/22 15:08:22 INFO Executor: Starting executor ID driver on host localhost
16/06/22 15:08:22 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 33520.
16/06/22 15:08:22 INFO NettyBlockTransferService: Server created on 33520
16/06/22 15:08:22 INFO BlockManagerMaster: Trying to register BlockManager
16/06/22 15:08:22 INFO BlockManagerMasterEndpoint: Registering block manager 
localhost:33520 with 511.1 MB RAM, BlockManagerId(driver, localhost, 33520)
16/06/22 15:08:22 INFO BlockManagerMaster: Registered BlockManager
16/06/22 15:08:23 INFO SparkContext: Starting job: reduce at SparkPi.scala:36
16/06/22 15:08:23 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:36) 
with 10 output partitions
16/06/22 15:08:23 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at 
SparkPi.scala:36)
16/06/22 15:08:23 INFO DAGScheduler: Parents of final stage: List()
16/06/22 15:08:23 INFO DAGScheduler: Missing parents: List()
16/06/22 15:08:23 INFO DAGScheduler: Submitting ResultStage 0 
(MapPartitionsRDD[1] at map at SparkPi.scala:32), which has no missing parents
16/06/22 15:08:23 WARN SizeEstimator: Failed to check whether UseCompressedOops 
is set; assuming yes
16/06/22 15:08:23 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 1904.0 B, free 1904.0 B)
16/06/22 15:08:24 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 1216.0 B, free 3.0 KB)
16/06/22 15:08:24 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
localhost:33520 (size: 1216.0 B, free: 511.1 MB)
16/06/22 15:08:24 INFO SparkContext: Created broadcast 0 from broadcast at 
DAGScheduler.scala:1006
16/06/22 15:08:24 INFO DAGScheduler: Submitting 10 missing tasks from 
ResultStage 0 (MapPartitionsRDD[1] at map at 

Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread Cody Koeninger
The direct stream doesn't automagically give you exactly-once
semantics.  Indeed, you should be pretty suspicious of anything that
claims to give you end-to-end exactly-once semantics without any
additional work on your part.

To the original poster, have you read / watched the materials linked
from the page below?  That should clarify what your options are.

https://github.com/koeninger/kafka-exactly-once

On Wed, Jun 22, 2016 at 5:55 AM, Denys Cherepanin  wrote:
> Hi Sandesh,
>
> As I understand you are using "receiver based" approach to integrate kafka
> with spark streaming.
>
> Did you tried "direct" approach ? In this case offsets will be tracked by
> streaming app via check-pointing and you should achieve exactly-once
> semantics
>
> On Wed, Jun 22, 2016 at 5:58 AM, Jörn Franke  wrote:
>>
>>
>> Spark Streamig does not guarantee exactly once for output action. It means
>> that one item is only processed in an RDD.
>> You can achieve at most once or at least once.
>> You could however do at least once (via checkpoing) and record which
>> messages have been proceed (some identifier available?) and do not re
>> process them  You could also store (safely) what range has been already
>> processed etc
>>
>> Think about the business case if exactly once is needed or if it can be
>> replaced by one of the others.
>> Exactly once, it needed requires in any system including spark more effort
>> and usually the throughput is lower. A risk evaluation from a business point
>> of view has to be done anyway...
>>
>> > On 22 Jun 2016, at 09:09, sandesh deshmane 
>> > wrote:
>> >
>> > Hi,
>> >
>> > I am writing spark streaming application which reads messages from
>> > Kafka.
>> >
>> > I am using checkpointing and write ahead logs ( WAL) to achieve fault
>> > tolerance .
>> >
>> > I have created batch size of 10 sec for reading messages from kafka.
>> >
>> > I read messages for kakfa and generate the count of messages as per
>> > values received from Kafka message.
>> >
>> > In case there is failure and my spark streaming application is restarted
>> > I see duplicate messages processed ( which is close to 2 batches)
>> >
>> > The problem that I have is per sec I get around 300k messages and In
>> > case application is restarted I see around 3-5 million duplicate counts.
>> >
>> > How to avoid such duplicates?
>> >
>> > what is best to way to recover from such failures ?
>> >
>> > Thanks
>> > Sandesh
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>
>
> --
> Yours faithfully, Denys Cherepanin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[Spark + MLlib] how to update offline model with the online model

2016-06-22 Thread diplomatic Guru
Hello all,

I have built a spark batch model using MLlib and a Streaming online model.
Now I would like to load the offline model in streaming job and apply and
update the model. Could to please advise me how to do it. is there an
example to look at. The streaming model does not allow saving or loading a
model. The primary function it provides is trainOn and predictOn.

Thanks.


Running JavaBased Implementation of StreamingKmeans Spark

2016-06-22 Thread Biplob Biswas
Hi, 

I implemented the streamingKmeans example provided in the spark website but
in Java. 
The full implementation is here, 

http://pastebin.com/CJQfWNvk

But i am not getting anything in the output except occasional timestamps
like one below: 

--- 
Time: 1466176935000 ms 
--- 

Also, i have 2 directories: 
"D:\spark\streaming example\Data Sets\training" 
"D:\spark\streaming example\Data Sets\test" 

and inside these directories i have 1 file each "samplegpsdata_train.txt"
and "samplegpsdata_test.txt" with training data having 500 datapoints and
test data with 60 datapoints. 

I am very new to the spark systems and any help is highly appreciated. 

Thank you so much 
Biplob Biswas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-JavaBased-Implementation-of-StreamingKmeans-Spark-tp27206.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can I use log4j2.xml in my Apache Saprk application

2016-06-22 Thread Prajwal Tuladhar
One way to integrate log4j2 would be to enable flags
`spark.executor.userClassPathFirst` and `spark.driver.userClassPathFirst`
when submitting the application. This would cause application class loader
to load first, initializing log4j2 logging context. But this can also
potentially break other things (like: dependencies that Spark master
required initializing overridden by Spark app and so on) so, you will need
to verify.

More info about those flags @
https://spark.apache.org/docs/latest/configuration.html


On Wed, Jun 22, 2016 at 7:11 AM, Charan Adabala 
wrote:

> Hi,
>
> We are trying to integrate log4j2.xml instead of log4j.properties in Apache
> Spark application, We integrated log4j2.xml but, the problem is unable to
> write the worker log of the application and there is no problem for writing
> driver log. Can any one suggest how to integrate log4j2.xml in Apache Spark
> application with successful writing of both worker and driver log.
>
> Thanks in advance..,
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Can-I-use-log4j2-xml-in-my-Apache-Saprk-application-tp27205.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
--
Cheers,
Praj


spark-1.6.1-bin-without-hadoop can not use spark-sql

2016-06-22 Thread ??????
Hi all.
I download spark-1.6.1-bin-without-hadoop.tgz from website.
And I configured "SPARK_DIST_CLASSPATH" in spark-env.sh.
Now spark-shell run well. But spark-sql can not run.
My hadoop version is 2.7.2.
This is error infos:


bin/spark-sql 
java.lang.ClassNotFoundException: 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:278)
at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Failed to load main class 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.
You need to build Spark with -Phive and -Phive-thriftserver.



Do I need configure something else in spark-env.sh or spark-default.conf?
Suggestions are appreciated ,thanks.

Can I use log4j2.xml in my Apache Saprk application

2016-06-22 Thread Charan Adabala
Hi,

We are trying to integrate log4j2.xml instead of log4j.properties in Apache
Spark application, We integrated log4j2.xml but, the problem is unable to
write the worker log of the application and there is no problem for writing
driver log. Can any one suggest how to integrate log4j2.xml in Apache Spark
application with successful writing of both worker and driver log.

Thanks in advance..,




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-I-use-log4j2-xml-in-my-Apache-Saprk-application-tp27205.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Task failure with File segment length as negative

2016-06-22 Thread Priya Ch
Hi All,

I am running Spark Application with 1.8TB of data (which is stored in Hive
tables format).  I am reading the data using HiveContect and processing it.
The cluster has 5 nodes total, 25 cores per machine and 250Gb per node. I
am launching the application with 25 executors with 5 cores each and 45GB
per executor. Also, specified the property
spark.yarn.executor.memoryOverhead=2024.

During the execution, tasks are lost and ShuffleMapTasks are re-submitted.
I am seeing that tasks are failing with the following message -

*java.lang.IllegalArgumentException: requirement failed: File segment
length cannot be negative (got -27045427)*









* at scala.Predef$.require(Predef.scala:233)*









* at org.apache.spark.storage.FileSegment.(FileSegment.scala:28)*









* at
org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:220)*









* at
org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:184)*









* at
org.apache.spark.shuffle.sort.ShuffleExternalSorter.closeAndGetSpills(ShuffleExternalSorter.java:398)*









* at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:206)*









* at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)*









* at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)*









* at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)*









* at org.apache.spark.scheduler.Task.run(Task.scala:89)*









* at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)*









* at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*









* at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*









I understood that its because the shuffle block is > 2G, the Int value is
taking negative and throwing the above exeception.

Can someone throw light on this ? What is the fix for this ?

Thanks,
Padma CH


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread sandesh deshmane
We have not tried direct approach . We are using receiver based approach (
we use zookeepers to connect from spark)

We have around 20+ Kafka and some times we replace the kafka brokers ( they
go down ). So each time I need to change list at spark application and I
need to restart the streaming app.

Thanks
Sandesh

On Wed, Jun 22, 2016 at 4:25 PM, Denys Cherepanin 
wrote:

> Hi Sandesh,
>
> As I understand you are using "receiver based" approach to integrate kafka
> with spark streaming.
>
> Did you tried "direct" approach
> 
>  ?
> In this case offsets will be tracked by streaming app via check-pointing
> and you should achieve exactly-once semantics
>
> On Wed, Jun 22, 2016 at 5:58 AM, Jörn Franke  wrote:
>
>>
>> Spark Streamig does not guarantee exactly once for output action. It
>> means that one item is only processed in an RDD.
>> You can achieve at most once or at least once.
>> You could however do at least once (via checkpoing) and record which
>> messages have been proceed (some identifier available?) and do not re
>> process them  You could also store (safely) what range has been already
>> processed etc
>>
>> Think about the business case if exactly once is needed or if it can be
>> replaced by one of the others.
>> Exactly once, it needed requires in any system including spark more
>> effort and usually the throughput is lower. A risk evaluation from a
>> business point of view has to be done anyway...
>>
>> > On 22 Jun 2016, at 09:09, sandesh deshmane 
>> wrote:
>> >
>> > Hi,
>> >
>> > I am writing spark streaming application which reads messages from
>> Kafka.
>> >
>> > I am using checkpointing and write ahead logs ( WAL) to achieve fault
>> tolerance .
>> >
>> > I have created batch size of 10 sec for reading messages from kafka.
>> >
>> > I read messages for kakfa and generate the count of messages as per
>> values received from Kafka message.
>> >
>> > In case there is failure and my spark streaming application is
>> restarted I see duplicate messages processed ( which is close to 2 batches)
>> >
>> > The problem that I have is per sec I get around 300k messages and In
>> case application is restarted I see around 3-5 million duplicate counts.
>> >
>> > How to avoid such duplicates?
>> >
>> > what is best to way to recover from such failures ?
>> >
>> > Thanks
>> > Sandesh
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Yours faithfully, Denys Cherepanin
>


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread Denys Cherepanin
Hi Sandesh,

As I understand you are using "receiver based" approach to integrate kafka
with spark streaming.

Did you tried "direct" approach

?
In this case offsets will be tracked by streaming app via check-pointing
and you should achieve exactly-once semantics

On Wed, Jun 22, 2016 at 5:58 AM, Jörn Franke  wrote:

>
> Spark Streamig does not guarantee exactly once for output action. It means
> that one item is only processed in an RDD.
> You can achieve at most once or at least once.
> You could however do at least once (via checkpoing) and record which
> messages have been proceed (some identifier available?) and do not re
> process them  You could also store (safely) what range has been already
> processed etc
>
> Think about the business case if exactly once is needed or if it can be
> replaced by one of the others.
> Exactly once, it needed requires in any system including spark more effort
> and usually the throughput is lower. A risk evaluation from a business
> point of view has to be done anyway...
>
> > On 22 Jun 2016, at 09:09, sandesh deshmane 
> wrote:
> >
> > Hi,
> >
> > I am writing spark streaming application which reads messages from Kafka.
> >
> > I am using checkpointing and write ahead logs ( WAL) to achieve fault
> tolerance .
> >
> > I have created batch size of 10 sec for reading messages from kafka.
> >
> > I read messages for kakfa and generate the count of messages as per
> values received from Kafka message.
> >
> > In case there is failure and my spark streaming application is restarted
> I see duplicate messages processed ( which is close to 2 batches)
> >
> > The problem that I have is per sec I get around 300k messages and In
> case application is restarted I see around 3-5 million duplicate counts.
> >
> > How to avoid such duplicates?
> >
> > what is best to way to recover from such failures ?
> >
> > Thanks
> > Sandesh
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Yours faithfully, Denys Cherepanin


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread sandesh deshmane
We are going with checkpointing . we don't have identifier available to
identify if the message is already processed or not .
Even if we had it, then it will slow down the processing as we do get 300k
messages per sec , so lookup will slow down.

Thanks
Sandesh

On Wed, Jun 22, 2016 at 3:28 PM, Jörn Franke  wrote:

>
> Spark Streamig does not guarantee exactly once for output action. It means
> that one item is only processed in an RDD.
> You can achieve at most once or at least once.
> You could however do at least once (via checkpoing) and record which
> messages have been proceed (some identifier available?) and do not re
> process them  You could also store (safely) what range has been already
> processed etc
>
> Think about the business case if exactly once is needed or if it can be
> replaced by one of the others.
> Exactly once, it needed requires in any system including spark more effort
> and usually the throughput is lower. A risk evaluation from a business
> point of view has to be done anyway...
>
> > On 22 Jun 2016, at 09:09, sandesh deshmane 
> wrote:
> >
> > Hi,
> >
> > I am writing spark streaming application which reads messages from Kafka.
> >
> > I am using checkpointing and write ahead logs ( WAL) to achieve fault
> tolerance .
> >
> > I have created batch size of 10 sec for reading messages from kafka.
> >
> > I read messages for kakfa and generate the count of messages as per
> values received from Kafka message.
> >
> > In case there is failure and my spark streaming application is restarted
> I see duplicate messages processed ( which is close to 2 batches)
> >
> > The problem that I have is per sec I get around 300k messages and In
> case application is restarted I see around 3-5 million duplicate counts.
> >
> > How to avoid such duplicates?
> >
> > what is best to way to recover from such failures ?
> >
> > Thanks
> > Sandesh
>


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread Jörn Franke

Spark Streamig does not guarantee exactly once for output action. It means that 
one item is only processed in an RDD.
You can achieve at most once or at least once.
You could however do at least once (via checkpoing) and record which messages 
have been proceed (some identifier available?) and do not re process them  
You could also store (safely) what range has been already processed etc

Think about the business case if exactly once is needed or if it can be 
replaced by one of the others.
Exactly once, it needed requires in any system including spark more effort and 
usually the throughput is lower. A risk evaluation from a business point of 
view has to be done anyway...

> On 22 Jun 2016, at 09:09, sandesh deshmane  wrote:
> 
> Hi,
> 
> I am writing spark streaming application which reads messages from Kafka.
> 
> I am using checkpointing and write ahead logs ( WAL) to achieve fault 
> tolerance .
> 
> I have created batch size of 10 sec for reading messages from kafka.
> 
> I read messages for kakfa and generate the count of messages as per values 
> received from Kafka message.
> 
> In case there is failure and my spark streaming application is restarted I 
> see duplicate messages processed ( which is close to 2 batches)
> 
> The problem that I have is per sec I get around 300k messages and In case 
> application is restarted I see around 3-5 million duplicate counts.
> 
> How to avoid such duplicates?
> 
> what is best to way to recover from such failures ?
> 
> Thanks
> Sandesh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: javax.net.ssl.SSLHandshakeException: unable to find valid certification path to requested target

2016-06-22 Thread Steve Loughran

On 21 Jun 2016, at 00:03, Utkarsh Sengar 
> wrote:

We are intermittently getting this error when spark tried to load data from 
S3:Caused by: sun.security.provider.certpath.SunCertPathBuilderException: 
unable to find valid certification path to requested target.

https://gist.githubusercontent.com/utkarsh2012/1c4cd2dc82c20c6f389b783927371bd7/raw/a1be6617d23b1744631427fe90aaa1cce4313f36/stacktrace

Running spark 1.5.1 on java8 and mesos-0.23.0
jets3t is 0.9.4

What can be the possible issue here - network, mesos or s3? It was working fine 
earlier.

--
Thanks,
-Utkarsh

Not S3, unless it's jets3t not being able to authenticate the AWS S3 endpoint. 
What's the full stack?


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread Mich Talebzadeh
Hi Sandesh,

Where these messages end up? Are they written to a sink (file, database etc)

What is the reason your app fails. Can that be remedied to reduce the
impact.

How do you identify that duplicates are sent and processed?

Cheers,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 June 2016 at 10:38, sandesh deshmane  wrote:

> Mich Talebzadeh thanks for reply.
>
> we have retention policy of 4 hours for kafka messages and we have
> multiple other consumers which reads from kafka cluster. ( spark is one of
> them)
>
> we have timestamp in message, but we actually have multiple message with
> same time stamp. its very hard to differentiate.
>
> But we have some offset with kafka and consumer keeps tracks for offset
> and Consumer should read from offset.
>
> so its problem with kafka , not with Spark?
>
> Any way to rectify this?
> we don't have id in messages. if we keep id also , then we keep map where
> we will put the ids and mark them processed, but at run time i need to do
> that lookup and for us , the number of messages is very high, so look up
> will ad up in processing time ?
>
> Thanks
> Sandesh Deshmane
>
> On Wed, Jun 22, 2016 at 2:36 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Yes this is more of Kafka issue as Kafka send the messages again.
>>
>> In your topic do messages come with an ID or timestamp where you can
>> reject them if they have already been processed.
>>
>> In other words do you have a way what message was last processed via
>> Spark before failing.
>>
>> You can of course  reset Kafka retention time and purge it
>>
>>
>> ${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper rhes564:2181 --alter
>> --topic newtopic --config retention.ms=1000
>>
>>
>>
>> Wait for a minute and then reset back
>>
>>
>>
>> ${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper rhes564:2181 --alter
>> --topic newtopic --config retention.ms=60
>>
>>
>>
>> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
>> --from-beginning --topic newtopic
>>
>>
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 22 June 2016 at 09:57, sandesh deshmane 
>> wrote:
>>
>>> Here I refer to failure in spark app.
>>>
>>> So When I restart , i see duplicate messages.
>>>
>>> To replicate the scenario , i just do kill mysparkapp and then restart .
>>>
>>> On Wed, Jun 22, 2016 at 1:10 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 As I see it you are using Spark streaming to read data from source
 through Kafka. Your batch interval is 10 sec, so in that interval you have
 10*300K = 3Milion messages

 When you say there is failure are you referring to the failure in the
 source or in Spark streaming app?

 HTH

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 22 June 2016 at 08:09, sandesh deshmane 
 wrote:

> Hi,
>
> I am writing spark streaming application which reads messages from
> Kafka.
>
> I am using checkpointing and write ahead logs ( WAL) to achieve fault
> tolerance .
>
> I have created batch size of 10 sec for reading messages from kafka.
>
> I read messages for kakfa and generate the count of messages as per
> values received from Kafka message.
>
> In case there is failure and my spark streaming application is
> restarted I see duplicate messages processed ( which is close to 2 
> batches)
>
> The problem that I have is per sec I get around 300k messages and In
> case application is restarted I see around 3-5 million duplicate counts.
>
> How to avoid such duplicates?
>
> what is best to way to recover from such failures ?
>
> Thanks
> Sandesh
>


>>>
>>
>


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread sandesh deshmane
Mich Talebzadeh thanks for reply.

we have retention policy of 4 hours for kafka messages and we have multiple
other consumers which reads from kafka cluster. ( spark is one of them)

we have timestamp in message, but we actually have multiple message with
same time stamp. its very hard to differentiate.

But we have some offset with kafka and consumer keeps tracks for offset and
Consumer should read from offset.

so its problem with kafka , not with Spark?

Any way to rectify this?
we don't have id in messages. if we keep id also , then we keep map where
we will put the ids and mark them processed, but at run time i need to do
that lookup and for us , the number of messages is very high, so look up
will ad up in processing time ?

Thanks
Sandesh Deshmane

On Wed, Jun 22, 2016 at 2:36 PM, Mich Talebzadeh 
wrote:

> Yes this is more of Kafka issue as Kafka send the messages again.
>
> In your topic do messages come with an ID or timestamp where you can
> reject them if they have already been processed.
>
> In other words do you have a way what message was last processed via Spark
> before failing.
>
> You can of course  reset Kafka retention time and purge it
>
>
> ${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper rhes564:2181 --alter --topic
> newtopic --config retention.ms=1000
>
>
>
> Wait for a minute and then reset back
>
>
>
> ${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper rhes564:2181 --alter --topic
> newtopic --config retention.ms=60
>
>
>
> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
> --from-beginning --topic newtopic
>
>
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 June 2016 at 09:57, sandesh deshmane  wrote:
>
>> Here I refer to failure in spark app.
>>
>> So When I restart , i see duplicate messages.
>>
>> To replicate the scenario , i just do kill mysparkapp and then restart .
>>
>> On Wed, Jun 22, 2016 at 1:10 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> As I see it you are using Spark streaming to read data from source
>>> through Kafka. Your batch interval is 10 sec, so in that interval you have
>>> 10*300K = 3Milion messages
>>>
>>> When you say there is failure are you referring to the failure in the
>>> source or in Spark streaming app?
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 22 June 2016 at 08:09, sandesh deshmane 
>>> wrote:
>>>
 Hi,

 I am writing spark streaming application which reads messages from
 Kafka.

 I am using checkpointing and write ahead logs ( WAL) to achieve fault
 tolerance .

 I have created batch size of 10 sec for reading messages from kafka.

 I read messages for kakfa and generate the count of messages as per
 values received from Kafka message.

 In case there is failure and my spark streaming application is
 restarted I see duplicate messages processed ( which is close to 2 batches)

 The problem that I have is per sec I get around 300k messages and In
 case application is restarted I see around 3-5 million duplicate counts.

 How to avoid such duplicates?

 what is best to way to recover from such failures ?

 Thanks
 Sandesh

>>>
>>>
>>
>


Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-22 Thread Ted Yu
For the run which returned incorrect result, did you observe any error (on
workers) ?

Cheers

On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel  wrote:

> I have an RDD[String, MyObj] which is a result of Join + Map operation. It
> has no partitioner info. I run reduceByKey without passing any Partitioner
> or partition counts.  I observed that output aggregation result for given
> key is incorrect sometime. like 1 out of 5 times. It looks like reduce
> operation is joining values from two different keys. There is no
> configuration change between multiple runs. I am scratching my head over
> this. I verified results by printing out RDD before and after reduce
> operation; collecting subset at driver.
>
> Besides shuffle and storage memory fraction I use following options:
>
> sparkConf.set("spark.driver.userClassPathFirst","true")
> sparkConf.set("spark.unsafe.offHeap","true")
> sparkConf.set("spark.reducer.maxSizeInFlight","128m")
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread Mich Talebzadeh
Yes this is more of Kafka issue as Kafka send the messages again.

In your topic do messages come with an ID or timestamp where you can reject
them if they have already been processed.

In other words do you have a way what message was last processed via Spark
before failing.

You can of course  reset Kafka retention time and purge it


${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper rhes564:2181 --alter --topic
newtopic --config retention.ms=1000



Wait for a minute and then reset back



${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper rhes564:2181 --alter --topic
newtopic --config retention.ms=60



${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
--from-beginning --topic newtopic



HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 June 2016 at 09:57, sandesh deshmane  wrote:

> Here I refer to failure in spark app.
>
> So When I restart , i see duplicate messages.
>
> To replicate the scenario , i just do kill mysparkapp and then restart .
>
> On Wed, Jun 22, 2016 at 1:10 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> As I see it you are using Spark streaming to read data from source
>> through Kafka. Your batch interval is 10 sec, so in that interval you have
>> 10*300K = 3Milion messages
>>
>> When you say there is failure are you referring to the failure in the
>> source or in Spark streaming app?
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 22 June 2016 at 08:09, sandesh deshmane 
>> wrote:
>>
>>> Hi,
>>>
>>> I am writing spark streaming application which reads messages from Kafka.
>>>
>>> I am using checkpointing and write ahead logs ( WAL) to achieve fault
>>> tolerance .
>>>
>>> I have created batch size of 10 sec for reading messages from kafka.
>>>
>>> I read messages for kakfa and generate the count of messages as per
>>> values received from Kafka message.
>>>
>>> In case there is failure and my spark streaming application is restarted
>>> I see duplicate messages processed ( which is close to 2 batches)
>>>
>>> The problem that I have is per sec I get around 300k messages and In
>>> case application is restarted I see around 3-5 million duplicate counts.
>>>
>>> How to avoid such duplicates?
>>>
>>> what is best to way to recover from such failures ?
>>>
>>> Thanks
>>> Sandesh
>>>
>>
>>
>


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread sandesh deshmane
Here I refer to failure in spark app.

So When I restart , i see duplicate messages.

To replicate the scenario , i just do kill mysparkapp and then restart .

On Wed, Jun 22, 2016 at 1:10 PM, Mich Talebzadeh 
wrote:

> As I see it you are using Spark streaming to read data from source through
> Kafka. Your batch interval is 10 sec, so in that interval you have 10*300K
> = 3Milion messages
>
> When you say there is failure are you referring to the failure in the
> source or in Spark streaming app?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 June 2016 at 08:09, sandesh deshmane  wrote:
>
>> Hi,
>>
>> I am writing spark streaming application which reads messages from Kafka.
>>
>> I am using checkpointing and write ahead logs ( WAL) to achieve fault
>> tolerance .
>>
>> I have created batch size of 10 sec for reading messages from kafka.
>>
>> I read messages for kakfa and generate the count of messages as per
>> values received from Kafka message.
>>
>> In case there is failure and my spark streaming application is restarted
>> I see duplicate messages processed ( which is close to 2 batches)
>>
>> The problem that I have is per sec I get around 300k messages and In case
>> application is restarted I see around 3-5 million duplicate counts.
>>
>> How to avoid such duplicates?
>>
>> what is best to way to recover from such failures ?
>>
>> Thanks
>> Sandesh
>>
>
>


Re: FullOuterJoin on Spark

2016-06-22 Thread Gourav Sengupta
+1 for the guidance from Nirvan. Also it would be better to repartition and
store the data in parquet format in case you are planning to do the joins
more than once or with other data sources. Parquet with SPARK works likes a
charm. Over S3 I have seen its performance being quite close to cached data
over a few million records.

Regards,
Gourav

On Wed, Jun 22, 2016 at 7:11 AM, Nirav Patel  wrote:

> Can your domain list fit in memory of one executor. if so you can use
> broadcast join.
>
> You can always narrow down to inner join and derive rest from original set
> if memory is issue there. If you are just concerned about shuffle memory
> then to reduce amount of shuffle you can do following:
> 1) partition both rdd (dataframes) with same partitioner with same count
> so corresponding data will on on same node at least
> 2) increase shuffle.memoryfraction
>
> you can use dataframes with spark 1.6 or greater to further reduce memory
> footprint. I haven't tested that though.
>
>
> On Tue, Jun 21, 2016 at 6:16 AM, Rychnovsky, Dusan <
> dusan.rychnov...@firma.seznam.cz> wrote:
>
>> Hi,
>>
>>
>> can somebody please explain the way FullOuterJoin works on Spark? Does
>> each intersection get fully loaded to memory?
>>
>> My problem is as follows:
>>
>>
>> I have two large data-sets:
>>
>>
>> * a list of web pages,
>>
>> * a list of domain-names with specific rules for processing pages from
>> that domain.
>>
>>
>> I am joining these web-pages with processing rules.
>>
>>
>> For certain domains there are millions of web-pages.
>>
>>
>> Based on the memory demands the join is having it looks like the whole
>> intersection (i.e. a domain + all corresponding pages) are kept in memory
>> while processing.
>>
>>
>> What I really need in this case, though, is to hold just the domain and
>> iterate over all corresponding pages, one at a time.
>>
>>
>> What would be the best way to do this on Spark?
>>
>> Thank you,
>>
>> Dusan Rychnovsky
>>
>>
>>
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 


'numBins' property not honoured in BinaryClassificationMetrics class when spark.default.parallelism is not set to 1

2016-06-22 Thread sneha29shukla
Hi, 

I'm trying to use the BinaryClassificationMetrics class to compute the pr
curve as below - 

import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.{SparkConf, SparkContext}

object TestCode {

  def main(args: Array[String]): Unit = {

val sparkConf = new
SparkConf().setAppName("HBaseRead").setMaster("local")

sparkConf.set("spark.default.parallelism","1")

sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[GenericRecord],
classOf[LabelledData], classOf[Configuration]))

val sc = new SparkContext(sparkConf)

val jobConf = new JobConf(sc.hadoopConfiguration)

val rdd = sc.hadoopFile(
  "sampleData",
  classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
  classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
  classOf[org.apache.hadoop.io.NullWritable],2)

println("Original Partitions : "+rdd.partitions.size)

val anotherRDD = rdd.map(row => row._1.datum).map(rowValue =>
rowValue.get("value").toString.split("\\|"))

println("Another RDD partitions : "+anotherRDD.partitions.size)

var res = scala.collection.mutable.ListBuffer[(Double, Double)]()

val yetAnotherRDD = anotherRDD.mapPartitions[(Double, Double)](iterator
=> {
  while (iterator.hasNext) {
val array = iterator.next
val iter = array.iterator
val prediction = iter.next.toDouble
val label = iter.next.toDouble
val t = (prediction, label)
res += t
  }
  res.iterator
}).map(doubles => (doubles._1, doubles._2))

println("yet anohter rdd partitions : "+yetAnotherRDD.partitions.size)

//Sample data in yetAnotherRDD
//(0.0025952152930881676,0.0)
//(8.08581095750238E-5,0.0)
//(0.1420529729314534,0.0)
//(1.287933787473423,0.0)
//(0.007534799826226573,0.0)
//(0.008488829931163747,0.0)
//(1.441921051791096,0.0)
//(0.0036552783890398343,0.0)
//(2.3833004789198267,0.0)
//(0.3695065893117973,0.0)

//Metrics Calculation. Explicitly setting numBins to 10
val metrics = new BinaryClassificationMetrics(yetAnotherRDD, 10)

val pr = metrics.pr().collect()

val thr = metrics.thresholds().collect()

val joined =
metrics.precisionByThreshold().join(metrics.recallByThreshold()).collect()

println(joined.size)

println(thr.size)

println(pr.size)
  }

}

In the local mode, my local machine as 2 cores, and hence I set the
minPartitions in the original RDD to 2 (based on suggestions here :
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-td9592.html#a10010)

However, upon experimenting a bit, it turns out that the numBins property in
BinaryClassificationMetrics class is not honoured in case the
"spark.default.parallelism" property is not set to 1.
AFAIU, the numBins should downsample the input RDD, as documented here :
https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html
 

 When "spark.default.parallelism" is set to 1, the size of the thesholds and
pr curve is near about the numBins, as documented here
In case I make it 100, the size of the thresholds in the
BinaryClassification class becomes ~100 and so on. 

Am I missing something here? In case the dataset on which pr is being
computed is huge, wouldn't setting parallelism to 1 impact performance? 

I am using spark 1.6.1 in local mode for this experiment. Using spark 1.5.1
in cluster mode has a similar results. 

Any pointers/help would be appreciated!

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/numBins-property-not-honoured-in-BinaryClassificationMetrics-class-when-spark-default-parallelism-is1-tp27204.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread Mich Talebzadeh
As I see it you are using Spark streaming to read data from source through
Kafka. Your batch interval is 10 sec, so in that interval you have 10*300K
= 3Milion messages

When you say there is failure are you referring to the failure in the
source or in Spark streaming app?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 June 2016 at 08:09, sandesh deshmane  wrote:

> Hi,
>
> I am writing spark streaming application which reads messages from Kafka.
>
> I am using checkpointing and write ahead logs ( WAL) to achieve fault
> tolerance .
>
> I have created batch size of 10 sec for reading messages from kafka.
>
> I read messages for kakfa and generate the count of messages as per values
> received from Kafka message.
>
> In case there is failure and my spark streaming application is restarted I
> see duplicate messages processed ( which is close to 2 batches)
>
> The problem that I have is per sec I get around 300k messages and In case
> application is restarted I see around 3-5 million duplicate counts.
>
> How to avoid such duplicates?
>
> what is best to way to recover from such failures ?
>
> Thanks
> Sandesh
>


Re: Does saveAsHadoopFile depend on master?

2016-06-22 Thread Spico Florin
Hi!
 I had a similar issue when the user that submit the job to the spark
cluster didn't have permission to write into the hdfs. If you have the hdfs
GUI then you can check which users are and what permissions. Also can in
hdfs browser:(
http://stackoverflow.com/questions/27996034/opening-a-hdfs-file-in-browser.
) if your folder structure was created.

If you have  yarn, an example ciuld be  spark-submit --class "yopur.class"
--master yarn-client --num-executors 12 --executor-memory 16g
--driver-memory 8g --executor-cores 8 .jar
 hdfs:output

I hope it helps.
 Regards,\ Florin

On Wed, Jun 22, 2016 at 4:57 AM, Jeff Zhang  wrote:

> Please check the driver and executor log, there should be logs about where
> the data is written.
>
>
>
> On Wed, Jun 22, 2016 at 2:03 AM, Pierre Villard <
> pierre.villard...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a Spark job writing files to HDFS using .saveAsHadoopFile method.
>>
>> If I run my job in local/client mode, it works as expected and I get all
>> my files written in HDFS. However if I change to yarn/cluster mode, I don't
>> see any error logs (the job is successful) and there is no files written to
>> HDFS.
>>
>> Is there any reason for this behavior? Any thoughts on how to track down
>> what is happening here?
>>
>> Thanks!
>>
>> Pierre.
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-22 Thread Takeshi Yamamuro
Hi,

Could you check the issue also occurs in v1.6.1 and v2.0?

// maropu

On Wed, Jun 22, 2016 at 2:42 PM, Nirav Patel  wrote:

> I have an RDD[String, MyObj] which is a result of Join + Map operation. It
> has no partitioner info. I run reduceByKey without passing any Partitioner
> or partition counts.  I observed that output aggregation result for given
> key is incorrect sometime. like 1 out of 5 times. It looks like reduce
> operation is joining values from two different keys. There is no
> configuration change between multiple runs. I am scratching my head over
> this. I verified results by printing out RDD before and after reduce
> operation; collecting subset at driver.
>
> Besides shuffle and storage memory fraction I use following options:
>
> sparkConf.set("spark.driver.userClassPathFirst","true")
> sparkConf.set("spark.unsafe.offHeap","true")
> sparkConf.set("spark.reducer.maxSizeInFlight","128m")
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 




-- 
---
Takeshi Yamamuro


?????? Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread ????????
yes??it run well








shihj@master:/usr/local/spark/spark-1.6.1-bin-hadoop2.6$ ./bin/spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --master local[4] \
> lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
16/06/22 15:08:14 INFO SparkContext: Running Spark version 1.6.1
16/06/22 15:08:14 WARN SparkConf: 
SPARK_WORKER_INSTANCES was detected (set to '1').
This is deprecated in Spark 1.0+.


Please instead use:
 - ./spark-submit with --num-executors to specify the number of executors
 - Or set SPARK_EXECUTOR_INSTANCES
 - spark.executor.instances to configure the number of instances in the spark 
config.

16/06/22 15:08:15 INFO SecurityManager: Changing view acls to: shihj
16/06/22 15:08:15 INFO SecurityManager: Changing modify acls to: shihj
16/06/22 15:08:15 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(shihj); users with 
modify permissions: Set(shihj)
16/06/22 15:08:16 INFO Utils: Successfully started service 'sparkDriver' on 
port 43865.
16/06/22 15:08:16 INFO Slf4jLogger: Slf4jLogger started
16/06/22 15:08:16 INFO Remoting: Starting remoting
16/06/22 15:08:17 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkDriverActorSystem@192.168.20.137:39308]
16/06/22 15:08:17 INFO Utils: Successfully started service 
'sparkDriverActorSystem' on port 39308.
16/06/22 15:08:17 INFO SparkEnv: Registering MapOutputTracker
16/06/22 15:08:17 INFO SparkEnv: Registering BlockManagerMaster
16/06/22 15:08:17 INFO DiskBlockManager: Created local directory at 
/tmp/blockmgr-3195b7f2-126d-4734-a681-6ec00727352a
16/06/22 15:08:17 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/06/22 15:08:17 INFO SparkEnv: Registering OutputCommitCoordinator
16/06/22 15:08:18 INFO Utils: Successfully started service 'SparkUI' on port 
4040.
16/06/22 15:08:18 INFO SparkUI: Started SparkUI at http://192.168.20.137:4040
16/06/22 15:08:18 INFO HttpFileServer: HTTP File server directory is 
/tmp/spark-c91a579d-1a18-4f75-ae05-137d9a286080/httpd-961023ad-cc05-4e3e-b648-19581093df11
16/06/22 15:08:18 INFO HttpServer: Starting HTTP Server
16/06/22 15:08:18 INFO Utils: Successfully started service 'HTTP file server' 
on port 49924.
16/06/22 15:08:22 INFO SparkContext: Added JAR 
file:/usr/local/spark/spark-1.6.1-bin-hadoop2.6/lib/spark-examples-1.6.1-hadoop2.6.0.jar
 at http://192.168.20.137:49924/jars/spark-examples-1.6.1-hadoop2.6.0.jar with 
timestamp 1466579302122
16/06/22 15:08:22 INFO Executor: Starting executor ID driver on host localhost
16/06/22 15:08:22 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 33520.
16/06/22 15:08:22 INFO NettyBlockTransferService: Server created on 33520
16/06/22 15:08:22 INFO BlockManagerMaster: Trying to register BlockManager
16/06/22 15:08:22 INFO BlockManagerMasterEndpoint: Registering block manager 
localhost:33520 with 511.1 MB RAM, BlockManagerId(driver, localhost, 33520)
16/06/22 15:08:22 INFO BlockManagerMaster: Registered BlockManager
16/06/22 15:08:23 INFO SparkContext: Starting job: reduce at SparkPi.scala:36
16/06/22 15:08:23 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:36) 
with 10 output partitions
16/06/22 15:08:23 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at 
SparkPi.scala:36)
16/06/22 15:08:23 INFO DAGScheduler: Parents of final stage: List()
16/06/22 15:08:23 INFO DAGScheduler: Missing parents: List()
16/06/22 15:08:23 INFO DAGScheduler: Submitting ResultStage 0 
(MapPartitionsRDD[1] at map at SparkPi.scala:32), which has no missing parents
16/06/22 15:08:23 WARN SizeEstimator: Failed to check whether UseCompressedOops 
is set; assuming yes
16/06/22 15:08:23 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 1904.0 B, free 1904.0 B)
16/06/22 15:08:24 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 1216.0 B, free 3.0 KB)
16/06/22 15:08:24 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
localhost:33520 (size: 1216.0 B, free: 511.1 MB)
16/06/22 15:08:24 INFO SparkContext: Created broadcast 0 from broadcast at 
DAGScheduler.scala:1006
16/06/22 15:08:24 INFO DAGScheduler: Submitting 10 missing tasks from 
ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:32)
16/06/22 15:08:24 INFO TaskSchedulerImpl: Adding task set 0.0 with 10 tasks
16/06/22 15:08:24 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
localhost, partition 0,PROCESS_LOCAL, 2157 bytes)
16/06/22 15:08:24 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
localhost, partition 1,PROCESS_LOCAL, 2157 bytes)
16/06/22 15:08:24 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
localhost, partition 2,PROCESS_LOCAL, 2157 bytes)
16/06/22 15:08:24 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
localhost, partition 3,PROCESS_LOCAL, 2157 bytes)
16/06/22 15:08:24 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)

how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread sandesh deshmane
Hi,

I am writing spark streaming application which reads messages from Kafka.

I am using checkpointing and write ahead logs ( WAL) to achieve fault
tolerance .

I have created batch size of 10 sec for reading messages from kafka.

I read messages for kakfa and generate the count of messages as per values
received from Kafka message.

In case there is failure and my spark streaming application is restarted I
see duplicate messages processed ( which is close to 2 batches)

The problem that I have is per sec I get around 300k messages and In case
application is restarted I see around 3-5 million duplicate counts.

How to avoid such duplicates?

what is best to way to recover from such failures ?

Thanks
Sandesh


Re: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread Yash Sharma
I cannot get a lot of info from these logs but it surely seems like yarn
setup issue. Did you try the local mode to check if it works -

./bin/spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --master local[4] \
> spark-examples-1.6.1-hadoop2.6.0.jar 10


Note - the jar is a local one

On Wed, Jun 22, 2016 at 4:50 PM, 另一片天 <958943...@qq.com> wrote:

>  Application application_1466568126079_0006 failed 2 times due to AM
> Container for appattempt_1466568126079_0006_02 exited with exitCode: 1
> For more detailed output, check application tracking page:
> http://master:8088/proxy/application_1466568126079_0006/Then, click on
> links to logs of each attempt.
> Diagnostics: Exception from container-launch.
> Container id: container_1466568126079_0006_02_01
> Exit code: 1
> Stack trace: ExitCodeException exitCode=1:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
> at org.apache.hadoop.util.Shell.run(Shell.java:455)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Container exited with a non-zero exit code 1
> Failing this attempt. Failing the application.
>
> but command get error
>
> shihj@master:~/workspace/hadoop-2.6.4$ yarn logs -applicationId
> application_1466568126079_0006
> Usage: yarn [options]
>
> yarn: error: no such option: -a
>
>
>
> -- 原始邮件 --
> *发件人:* "Yash Sharma";;
> *发送时间:* 2016年6月22日(星期三) 下午2:46
> *收件人:* "另一片天"<958943...@qq.com>;
> *抄送:* "Saisai Shao"; "user";
>
> *主题:* Re: Could not find or load main class
> org.apache.spark.deploy.yarn.ExecutorLauncher
>
> Are you able to run anything else on the cluster, I suspect its yarn that
> not able to run the class. If you could just share the logs in pastebin we
> could confirm that.
>
> On Wed, Jun 22, 2016 at 4:43 PM, 另一片天 <958943...@qq.com> wrote:
>
>> i  want to avoid Uploading resource file (especially jar package),because
>> them very big,the application will wait for too long,there are good method??
>> so i config that para, but not get the my want to effect。
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Yash Sharma";;
>> *发送时间:* 2016年6月22日(星期三) 下午2:34
>> *收件人:* "另一片天"<958943...@qq.com>;
>> *抄送:* "Saisai Shao"; "user";
>>
>> *主题:* Re: Could not find or load main class
>> org.apache.spark.deploy.yarn.ExecutorLauncher
>>
>> Try with : --master yarn-cluster
>>
>> On Wed, Jun 22, 2016 at 4:30 PM, 另一片天 <958943...@qq.com> wrote:
>>
>>> ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
>>> yarn-client --driver-memory 512m --num-executors 2 --executor-memory 512m
>>> --executor-cores 2
>>> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
>>> 10
>>> Warning: Skip remote jar
>>> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar.
>>> java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:348)
>>> at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Yash Sharma";;
>>> *发送时间:* 2016年6月22日(星期三) 下午2:28
>>> *收件人:* "另一片天"<958943...@qq.com>;
>>> *抄送:* "Saisai Shao"; "user"<
>>> user@spark.apache.org>;
>>> *主题:* Re: Could not find or load main class
>>> org.apache.spark.deploy.yarn.ExecutorLauncher
>>>
>>> Or better , try the master as yarn-cluster,
>>>
>>> ./bin/spark-submit \
>>> --class org.apache.spark.examples.SparkPi \
>>> --master yarn-cluster \
>>> --driver-memory 512m \
>>> --num-executors 

?????? Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread ????????
Application application_1466568126079_0006 failed 2 times due to AM Container 
for appattempt_1466568126079_0006_02 exited with exitCode: 1
For more detailed output, check application tracking 
page:http://master:8088/proxy/application_1466568126079_0006/Then, click on 
links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1466568126079_0006_02_01
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Container exited with a non-zero exit code 1
Failing this attempt. Failing the application. 



but command get error


shihj@master:~/workspace/hadoop-2.6.4$ yarn logs -applicationId 
application_1466568126079_0006
Usage: yarn [options]


yarn: error: no such option: -a







--  --
??: "Yash Sharma";;
: 2016??6??22??(??) 2:46
??: ""<958943...@qq.com>; 
: "Saisai Shao"; "user"; 
: Re: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher



Are you able to run anything else on the cluster, I suspect its yarn that not 
able to run the class. If you could just share the logs in pastebin we could 
confirm that.

On Wed, Jun 22, 2016 at 4:43 PM,  <958943...@qq.com> wrote:
i  want to avoid Uploading resource file ??especially jar packagebecause 
them very big??the application will wait for too long??there are good method
so i config that para?? but not get the my want to effect??




--  --
??: "Yash Sharma";;
: 2016??6??22??(??) 2:34
??: ""<958943...@qq.com>; 
: "Saisai Shao"; "user"; 
: Re: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher





Try with : --master yarn-cluster

On Wed, Jun 22, 2016 at 4:30 PM,  <958943...@qq.com> wrote:
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
yarn-client --driver-memory 512m --num-executors 2 --executor-memory 512m 
--executor-cores 2   
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
Warning: Skip remote jar 
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar.
java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)







--  --
??: "Yash Sharma";;
: 2016??6??22??(??) 2:28
??: ""<958943...@qq.com>; 
: "Saisai Shao"; "user"; 
: Re: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher





Or better , try the master as yarn-cluster,

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--driver-memory 512m \
--num-executors 2 \
--executor-memory 512m \
--executor-cores 2 \
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar



On Wed, Jun 22, 2016 at 4:27 PM,  <958943...@qq.com> wrote:
Is it able to run on local mode ?


what mean?? standalone mode ?




--  --
??: "Yash Sharma";;
: 2016??6??22??(??) 2:18
??: "Saisai Shao"; 
: 

Re: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread Yash Sharma
Are you able to run anything else on the cluster, I suspect its yarn that
not able to run the class. If you could just share the logs in pastebin we
could confirm that.

On Wed, Jun 22, 2016 at 4:43 PM, 另一片天 <958943...@qq.com> wrote:

> i  want to avoid Uploading resource file (especially jar package),because
> them very big,the application will wait for too long,there are good method??
> so i config that para, but not get the my want to effect。
>
>
> -- 原始邮件 --
> *发件人:* "Yash Sharma";;
> *发送时间:* 2016年6月22日(星期三) 下午2:34
> *收件人:* "另一片天"<958943...@qq.com>;
> *抄送:* "Saisai Shao"; "user";
>
> *主题:* Re: Could not find or load main class
> org.apache.spark.deploy.yarn.ExecutorLauncher
>
> Try with : --master yarn-cluster
>
> On Wed, Jun 22, 2016 at 4:30 PM, 另一片天 <958943...@qq.com> wrote:
>
>> ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
>> yarn-client --driver-memory 512m --num-executors 2 --executor-memory 512m
>> --executor-cores 2
>> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
>> 10
>> Warning: Skip remote jar
>> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar.
>> java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Yash Sharma";;
>> *发送时间:* 2016年6月22日(星期三) 下午2:28
>> *收件人:* "另一片天"<958943...@qq.com>;
>> *抄送:* "Saisai Shao"; "user";
>>
>> *主题:* Re: Could not find or load main class
>> org.apache.spark.deploy.yarn.ExecutorLauncher
>>
>> Or better , try the master as yarn-cluster,
>>
>> ./bin/spark-submit \
>> --class org.apache.spark.examples.SparkPi \
>> --master yarn-cluster \
>> --driver-memory 512m \
>> --num-executors 2 \
>> --executor-memory 512m \
>> --executor-cores 2 \
>> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.
>> 1-hadoop2.6.0.jar
>>
>> On Wed, Jun 22, 2016 at 4:27 PM, 另一片天 <958943...@qq.com> wrote:
>>
>>> Is it able to run on local mode ?
>>>
>>> what mean?? standalone mode ?
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Yash Sharma";;
>>> *发送时间:* 2016年6月22日(星期三) 下午2:18
>>> *收件人:* "Saisai Shao";
>>> *抄送:* "另一片天"<958943...@qq.com>; "user";
>>> *主题:* Re: Could not find or load main class
>>> org.apache.spark.deploy.yarn.ExecutorLauncher
>>>
>>> Try providing the jar with the hdfs prefix. Its probably just because
>>> its not able to find the jar on all nodes.
>>>
>>> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.
>>> 1-hadoop2.6.0.jar
>>>
>>> Is it able to run on local mode ?
>>>
>>> On Wed, Jun 22, 2016 at 4:14 PM, Saisai Shao 
>>> wrote:
>>>
 spark.yarn.jar (none) The location of the Spark jar file, in case
 overriding the default location is desired. By default, Spark on YARN will
 use a Spark jar installed locally, but the Spark jar can also be in a
 world-readable location on HDFS. This allows YARN to cache it on nodes so
 that it doesn't need to be distributed each time an application runs. To
 point to a jar on HDFS, for example, set this configuration to
 hdfs:///some/path.

 spark.yarn.jar is used for spark run-time system jar, which is spark
 assembly jar, not the application jar (example-assembly jar). So in your
 case you upload the example-assembly jar into hdfs, in which spark system
 jars are not packed, so ExecutorLaucher cannot be found.

 Thanks
 Saisai

 On Wed, Jun 22, 2016 at 2:10 PM, 另一片天 <958943...@qq.com> wrote:

> shihj@master:/usr/local/spark/spark-1.6.1-bin-hadoop2.6$
> ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
> yarn-client --driver-memory 512m --num-executors 2 --executor-memory 512m
> --executor-cores 2
> /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
> Warning: Local jar
> /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar does not exist,
> skipping.
> java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi

?????? Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread ????????
i  want to avoid Uploading resource file ??especially jar packagebecause 
them very big??the application will wait for too long??there are good method
so i config that para?? but not get the my want to effect??




--  --
??: "Yash Sharma";;
: 2016??6??22??(??) 2:34
??: ""<958943...@qq.com>; 
: "Saisai Shao"; "user"; 
: Re: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher



Try with : --master yarn-cluster

On Wed, Jun 22, 2016 at 4:30 PM,  <958943...@qq.com> wrote:
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
yarn-client --driver-memory 512m --num-executors 2 --executor-memory 512m 
--executor-cores 2   
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
Warning: Skip remote jar 
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar.
java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)







--  --
??: "Yash Sharma";;
: 2016??6??22??(??) 2:28
??: ""<958943...@qq.com>; 
: "Saisai Shao"; "user"; 
: Re: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher





Or better , try the master as yarn-cluster,

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--driver-memory 512m \
--num-executors 2 \
--executor-memory 512m \
--executor-cores 2 \
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar



On Wed, Jun 22, 2016 at 4:27 PM,  <958943...@qq.com> wrote:
Is it able to run on local mode ?


what mean?? standalone mode ?




--  --
??: "Yash Sharma";;
: 2016??6??22??(??) 2:18
??: "Saisai Shao"; 
: ""<958943...@qq.com>; "user"; 
: Re: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher





Try providing the jar with the hdfs prefix. Its probably just because its not 
able to find the jar on all nodes.

hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar



Is it able to run on local mode ?


On Wed, Jun 22, 2016 at 4:14 PM, Saisai Shao  wrote:
spark.yarn.jar(none)The location of the Spark jar file, in case overriding the 
default location is desired. By default, Spark on YARN will use a Spark jar 
installed locally, but the Spark jar can also be in a world-readable location 
on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be 
distributed each time an application runs. To point to a jar on HDFS, for 
example, set this configuration to hdfs:///some/path.


spark.yarn.jar is used for spark run-time system jar, which is spark assembly 
jar, not the application jar (example-assembly jar). So in your case you upload 
the example-assembly jar into hdfs, in which spark system jars are not packed, 
so ExecutorLaucher cannot be found.


Thanks
Saisai


On Wed, Jun 22, 2016 at 2:10 PM,  <958943...@qq.com> wrote:
shihj@master:/usr/local/spark/spark-1.6.1-bin-hadoop2.6$ ./bin/spark-submit 
--class org.apache.spark.examples.SparkPi --master yarn-client --driver-memory 
512m --num-executors 2 --executor-memory 512m --executor-cores 2   
/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
Warning: Local jar /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 
does not exist, skipping.
java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
at 

Re: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread Yash Sharma
Ok, we moved to the next level :)

Could you share more info on the error. You could get logs by the command -

yarn logs -applicationId application_1466568126079_0006


On Wed, Jun 22, 2016 at 4:38 PM, 另一片天 <958943...@qq.com> wrote:

> shihj@master:/usr/local/spark/spark-1.6.1-bin-hadoop2.6$
> ./bin/spark-submit \
> > --class org.apache.spark.examples.SparkPi \
> > --master yarn-cluster \
> > --driver-memory 512m \
> > --num-executors 2 \
> > --executor-memory 512m \
> > --executor-cores 2 \
> >
> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
> 16/06/22 14:36:10 INFO RMProxy: Connecting to ResourceManager at master/
> 192.168.20.137:8032
> 16/06/22 14:36:10 INFO Client: Requesting a new application from cluster
> with 2 NodeManagers
> 16/06/22 14:36:10 INFO Client: Verifying our application has not requested
> more than the maximum memory capability of the cluster (8192 MB per
> container)
> 16/06/22 14:36:10 INFO Client: Will allocate AM container, with 896 MB
> memory including 384 MB overhead
> 16/06/22 14:36:10 INFO Client: Setting up container launch context for our
> AM
> 16/06/22 14:36:10 INFO Client: Setting up the launch environment for our
> AM container
> 16/06/22 14:36:10 INFO Client: Preparing resources for our AM container
> Java HotSpot(TM) Server VM warning: You have loaded library
> /tmp/libnetty-transport-native-epoll3453573359049032130.so which might have
> disabled stack guard. The VM will try to fix the stack guard now.
> It's highly recommended that you fix the library with 'execstack -c
> ', or link it with '-z noexecstack'.
> 16/06/22 14:36:11 INFO Client: Source and destination file systems are the
> same. Not copying
> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
> 16/06/22 14:36:11 WARN Client: Resource
> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
> added multiple times to distributed cache.
> 16/06/22 14:36:11 INFO Client: Uploading resource
> file:/tmp/spark-cf23c5a3-d3fb-4f98-9cd2-bbf268766bbc/__spark_conf__7248368026523433025.zip
> ->
> hdfs://master:9000/user/shihj/.sparkStaging/application_1466568126079_0006/__spark_conf__7248368026523433025.zip
> 16/06/22 14:36:13 INFO SecurityManager: Changing view acls to: shihj
> 16/06/22 14:36:13 INFO SecurityManager: Changing modify acls to: shihj
> 16/06/22 14:36:13 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(shihj); users
> with modify permissions: Set(shihj)
> 16/06/22 14:36:13 INFO Client: Submitting application 6 to ResourceManager
> 16/06/22 14:36:13 INFO YarnClientImpl: Submitted application
> application_1466568126079_0006
> 16/06/22 14:36:14 INFO Client: Application report for
> application_1466568126079_0006 (state: ACCEPTED)
> 16/06/22 14:36:14 INFO Client:
> client token: N/A
> diagnostics: N/A
> ApplicationMaster host: N/A
> ApplicationMaster RPC port: -1
> queue: default
> start time: 1466577373576
> final status: UNDEFINED
> tracking URL: http://master:8088/proxy/application_1466568126079_0006/
> user: shihj
> 16/06/22 14:36:15 INFO Client: Application report for
> application_1466568126079_0006 (state: ACCEPTED)
> 16/06/22 14:36:16 INFO Client: Application report for
> application_1466568126079_0006 (state: ACCEPTED)
> 16/06/22 14:36:17 INFO Client: Application report for
> application_1466568126079_0006 (state: ACCEPTED)
> 16/06/22 14:36:18 INFO Client: Application report for
> application_1466568126079_0006 (state: ACCEPTED)
> 16/06/22 14:36:19 INFO Client: Application report for
> application_1466568126079_0006 (state: ACCEPTED)
> 16/06/22 14:36:20 INFO Client: Application report for
> application_1466568126079_0006 (state: ACCEPTED)
> 16/06/22 14:36:21 INFO Client: Application report for
> application_1466568126079_0006 (state: ACCEPTED)
> 16/06/22 14:36:22 INFO Client: Application report for
> application_1466568126079_0006 (state: ACCEPTED)
> 16/06/22 14:36:23 INFO Client: Application report for
> application_1466568126079_0006 (state: ACCEPTED)
> 16/06/22 14:36:24 INFO Client: Application report for
> application_1466568126079_0006 (state: ACCEPTED)
> 16/06/22 14:36:25 INFO Client: Application report for
> application_1466568126079_0006 (state: ACCEPTED)
> 16/06/22 14:36:26 INFO Client: Application report for
> application_1466568126079_0006 (state: ACCEPTED)
> 16/06/22 14:36:27 INFO Client: Application report for
> application_1466568126079_0006 (state: FAILED)
> 16/06/22 14:36:27 INFO Client:
> client token: N/A
> diagnostics: Application application_1466568126079_0006 failed 2 times due
> to AM Container for appattempt_1466568126079_0006_02 exited with
>  exitCode: 1
> For more detailed output, check application tracking page:
> http://master:8088/proxy/application_1466568126079_0006/Then, click on
> links to logs of each attempt.
> Diagnostics: Exception from container-launch.
> Container id: container_1466568126079_0006_02_01
> 

?????? Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread ????????
shihj@master:/usr/local/spark/spark-1.6.1-bin-hadoop2.6$ ./bin/spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --master yarn-cluster \
> --driver-memory 512m \
> --num-executors 2 \
> --executor-memory 512m \
> --executor-cores 2 \
> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
16/06/22 14:36:10 INFO RMProxy: Connecting to ResourceManager at 
master/192.168.20.137:8032
16/06/22 14:36:10 INFO Client: Requesting a new application from cluster with 2 
NodeManagers
16/06/22 14:36:10 INFO Client: Verifying our application has not requested more 
than the maximum memory capability of the cluster (8192 MB per container)
16/06/22 14:36:10 INFO Client: Will allocate AM container, with 896 MB memory 
including 384 MB overhead
16/06/22 14:36:10 INFO Client: Setting up container launch context for our AM
16/06/22 14:36:10 INFO Client: Setting up the launch environment for our AM 
container
16/06/22 14:36:10 INFO Client: Preparing resources for our AM container
Java HotSpot(TM) Server VM warning: You have loaded library 
/tmp/libnetty-transport-native-epoll3453573359049032130.so which might have 
disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c ', 
or link it with '-z noexecstack'.
16/06/22 14:36:11 INFO Client: Source and destination file systems are the 
same. Not copying 
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
16/06/22 14:36:11 WARN Client: Resource 
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 
added multiple times to distributed cache.
16/06/22 14:36:11 INFO Client: Uploading resource 
file:/tmp/spark-cf23c5a3-d3fb-4f98-9cd2-bbf268766bbc/__spark_conf__7248368026523433025.zip
 -> 
hdfs://master:9000/user/shihj/.sparkStaging/application_1466568126079_0006/__spark_conf__7248368026523433025.zip
16/06/22 14:36:13 INFO SecurityManager: Changing view acls to: shihj
16/06/22 14:36:13 INFO SecurityManager: Changing modify acls to: shihj
16/06/22 14:36:13 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(shihj); users with 
modify permissions: Set(shihj)
16/06/22 14:36:13 INFO Client: Submitting application 6 to ResourceManager
16/06/22 14:36:13 INFO YarnClientImpl: Submitted application 
application_1466568126079_0006
16/06/22 14:36:14 INFO Client: Application report for 
application_1466568126079_0006 (state: ACCEPTED)
16/06/22 14:36:14 INFO Client: 
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1466577373576
 final status: UNDEFINED
 tracking URL: http://master:8088/proxy/application_1466568126079_0006/
 user: shihj
16/06/22 14:36:15 INFO Client: Application report for 
application_1466568126079_0006 (state: ACCEPTED)
16/06/22 14:36:16 INFO Client: Application report for 
application_1466568126079_0006 (state: ACCEPTED)
16/06/22 14:36:17 INFO Client: Application report for 
application_1466568126079_0006 (state: ACCEPTED)
16/06/22 14:36:18 INFO Client: Application report for 
application_1466568126079_0006 (state: ACCEPTED)
16/06/22 14:36:19 INFO Client: Application report for 
application_1466568126079_0006 (state: ACCEPTED)
16/06/22 14:36:20 INFO Client: Application report for 
application_1466568126079_0006 (state: ACCEPTED)
16/06/22 14:36:21 INFO Client: Application report for 
application_1466568126079_0006 (state: ACCEPTED)
16/06/22 14:36:22 INFO Client: Application report for 
application_1466568126079_0006 (state: ACCEPTED)
16/06/22 14:36:23 INFO Client: Application report for 
application_1466568126079_0006 (state: ACCEPTED)
16/06/22 14:36:24 INFO Client: Application report for 
application_1466568126079_0006 (state: ACCEPTED)
16/06/22 14:36:25 INFO Client: Application report for 
application_1466568126079_0006 (state: ACCEPTED)
16/06/22 14:36:26 INFO Client: Application report for 
application_1466568126079_0006 (state: ACCEPTED)
16/06/22 14:36:27 INFO Client: Application report for 
application_1466568126079_0006 (state: FAILED)
16/06/22 14:36:27 INFO Client: 
 client token: N/A
 diagnostics: Application application_1466568126079_0006 failed 2 times 
due to AM Container for appattempt_1466568126079_0006_02 exited with  
exitCode: 1
For more detailed output, check application tracking 
page:http://master:8088/proxy/application_1466568126079_0006/Then, click on 
links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1466568126079_0006_02_01
Exit code: 1
Stack trace: ExitCodeException exitCode=1: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at 

Re: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread Yash Sharma
Try with : --master yarn-cluster

On Wed, Jun 22, 2016 at 4:30 PM, 另一片天 <958943...@qq.com> wrote:

> ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
> yarn-client --driver-memory 512m --num-executors 2 --executor-memory 512m
> --executor-cores 2
> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
> 10
> Warning: Skip remote jar
> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar.
> java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
> -- 原始邮件 --
> *发件人:* "Yash Sharma";;
> *发送时间:* 2016年6月22日(星期三) 下午2:28
> *收件人:* "另一片天"<958943...@qq.com>;
> *抄送:* "Saisai Shao"; "user";
>
> *主题:* Re: Could not find or load main class
> org.apache.spark.deploy.yarn.ExecutorLauncher
>
> Or better , try the master as yarn-cluster,
>
> ./bin/spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --master yarn-cluster \
> --driver-memory 512m \
> --num-executors 2 \
> --executor-memory 512m \
> --executor-cores 2 \
> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.
> 1-hadoop2.6.0.jar
>
> On Wed, Jun 22, 2016 at 4:27 PM, 另一片天 <958943...@qq.com> wrote:
>
>> Is it able to run on local mode ?
>>
>> what mean?? standalone mode ?
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Yash Sharma";;
>> *发送时间:* 2016年6月22日(星期三) 下午2:18
>> *收件人:* "Saisai Shao";
>> *抄送:* "另一片天"<958943...@qq.com>; "user";
>> *主题:* Re: Could not find or load main class
>> org.apache.spark.deploy.yarn.ExecutorLauncher
>>
>> Try providing the jar with the hdfs prefix. Its probably just because its
>> not able to find the jar on all nodes.
>>
>> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.
>> 1-hadoop2.6.0.jar
>>
>> Is it able to run on local mode ?
>>
>> On Wed, Jun 22, 2016 at 4:14 PM, Saisai Shao 
>> wrote:
>>
>>> spark.yarn.jar (none) The location of the Spark jar file, in case
>>> overriding the default location is desired. By default, Spark on YARN will
>>> use a Spark jar installed locally, but the Spark jar can also be in a
>>> world-readable location on HDFS. This allows YARN to cache it on nodes so
>>> that it doesn't need to be distributed each time an application runs. To
>>> point to a jar on HDFS, for example, set this configuration to
>>> hdfs:///some/path.
>>>
>>> spark.yarn.jar is used for spark run-time system jar, which is spark
>>> assembly jar, not the application jar (example-assembly jar). So in your
>>> case you upload the example-assembly jar into hdfs, in which spark system
>>> jars are not packed, so ExecutorLaucher cannot be found.
>>>
>>> Thanks
>>> Saisai
>>>
>>> On Wed, Jun 22, 2016 at 2:10 PM, 另一片天 <958943...@qq.com> wrote:
>>>
 shihj@master:/usr/local/spark/spark-1.6.1-bin-hadoop2.6$
 ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
 yarn-client --driver-memory 512m --num-executors 2 --executor-memory 512m
 --executor-cores 2
 /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
 Warning: Local jar
 /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar does not exist,
 skipping.
 java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:348)
 at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 get error at once
 -- 原始邮件 --
 *发件人:* "Yash Sharma";;

Re: Spark 2.0.0 : GLM problem

2016-06-22 Thread april_ZMQ
The picture below shows the reply from the creator for this package, Yanbo
Liang( https://github.com/yanboliang   )

 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-2-0-0-GLM-problem-tp27145p27203.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



?????? Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread ????????
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
yarn-client --driver-memory 512m --num-executors 2 --executor-memory 512m 
--executor-cores 2   
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
Warning: Skip remote jar 
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar.
java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)







--  --
??: "Yash Sharma";;
: 2016??6??22??(??) 2:28
??: ""<958943...@qq.com>; 
: "Saisai Shao"; "user"; 
: Re: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher



Or better , try the master as yarn-cluster,

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--driver-memory 512m \
--num-executors 2 \
--executor-memory 512m \
--executor-cores 2 \
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar



On Wed, Jun 22, 2016 at 4:27 PM,  <958943...@qq.com> wrote:
Is it able to run on local mode ?


what mean?? standalone mode ?




--  --
??: "Yash Sharma";;
: 2016??6??22??(??) 2:18
??: "Saisai Shao"; 
: ""<958943...@qq.com>; "user"; 
: Re: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher





Try providing the jar with the hdfs prefix. Its probably just because its not 
able to find the jar on all nodes.

hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar



Is it able to run on local mode ?


On Wed, Jun 22, 2016 at 4:14 PM, Saisai Shao  wrote:
spark.yarn.jar(none)The location of the Spark jar file, in case overriding the 
default location is desired. By default, Spark on YARN will use a Spark jar 
installed locally, but the Spark jar can also be in a world-readable location 
on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be 
distributed each time an application runs. To point to a jar on HDFS, for 
example, set this configuration to hdfs:///some/path.


spark.yarn.jar is used for spark run-time system jar, which is spark assembly 
jar, not the application jar (example-assembly jar). So in your case you upload 
the example-assembly jar into hdfs, in which spark system jars are not packed, 
so ExecutorLaucher cannot be found.


Thanks
Saisai


On Wed, Jun 22, 2016 at 2:10 PM,  <958943...@qq.com> wrote:
shihj@master:/usr/local/spark/spark-1.6.1-bin-hadoop2.6$ ./bin/spark-submit 
--class org.apache.spark.examples.SparkPi --master yarn-client --driver-memory 
512m --num-executors 2 --executor-memory 512m --executor-cores 2   
/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
Warning: Local jar /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 
does not exist, skipping.
java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

get error at once
--  --
??: "Yash Sharma";;
: 2016??6??22??(??) 2:04
??: ""<958943...@qq.com>; 
: "user"; 
: Re: Could not find or load main class 

Re: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread Yash Sharma
Or better , try the master as yarn-cluster,

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--driver-memory 512m \
--num-executors 2 \
--executor-memory 512m \
--executor-cores 2 \
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar

On Wed, Jun 22, 2016 at 4:27 PM, 另一片天 <958943...@qq.com> wrote:

> Is it able to run on local mode ?
>
> what mean?? standalone mode ?
>
>
> -- 原始邮件 --
> *发件人:* "Yash Sharma";;
> *发送时间:* 2016年6月22日(星期三) 下午2:18
> *收件人:* "Saisai Shao";
> *抄送:* "另一片天"<958943...@qq.com>; "user";
> *主题:* Re: Could not find or load main class
> org.apache.spark.deploy.yarn.ExecutorLauncher
>
> Try providing the jar with the hdfs prefix. Its probably just because its
> not able to find the jar on all nodes.
>
> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.
> 1-hadoop2.6.0.jar
>
> Is it able to run on local mode ?
>
> On Wed, Jun 22, 2016 at 4:14 PM, Saisai Shao 
> wrote:
>
>> spark.yarn.jar (none) The location of the Spark jar file, in case
>> overriding the default location is desired. By default, Spark on YARN will
>> use a Spark jar installed locally, but the Spark jar can also be in a
>> world-readable location on HDFS. This allows YARN to cache it on nodes so
>> that it doesn't need to be distributed each time an application runs. To
>> point to a jar on HDFS, for example, set this configuration to
>> hdfs:///some/path.
>>
>> spark.yarn.jar is used for spark run-time system jar, which is spark
>> assembly jar, not the application jar (example-assembly jar). So in your
>> case you upload the example-assembly jar into hdfs, in which spark system
>> jars are not packed, so ExecutorLaucher cannot be found.
>>
>> Thanks
>> Saisai
>>
>> On Wed, Jun 22, 2016 at 2:10 PM, 另一片天 <958943...@qq.com> wrote:
>>
>>> shihj@master:/usr/local/spark/spark-1.6.1-bin-hadoop2.6$
>>> ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
>>> yarn-client --driver-memory 512m --num-executors 2 --executor-memory 512m
>>> --executor-cores 2
>>> /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
>>> Warning: Local jar
>>> /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar does not exist,
>>> skipping.
>>> java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:348)
>>> at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> get error at once
>>> -- 原始邮件 --
>>> *发件人:* "Yash Sharma";;
>>> *发送时间:* 2016年6月22日(星期三) 下午2:04
>>> *收件人:* "另一片天"<958943...@qq.com>;
>>> *抄送:* "user";
>>> *主题:* Re: Could not find or load main class
>>> org.apache.spark.deploy.yarn.ExecutorLauncher
>>>
>>> How about supplying the jar directly in spark submit -
>>>
>>> ./bin/spark-submit \
 --class org.apache.spark.examples.SparkPi \
 --master yarn-client \
 --driver-memory 512m \
 --num-executors 2 \
 --executor-memory 512m \
 --executor-cores 2 \
 /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
>>>
>>>
>>> On Wed, Jun 22, 2016 at 3:59 PM, 另一片天 <958943...@qq.com> wrote:
>>>
 i  config this  para  at spark-defaults.conf
 spark.yarn.jar
 hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar

 then ./bin/spark-submit --class org.apache.spark.examples.SparkPi
 --master yarn-client --driver-memory 512m --num-executors 2
 --executor-memory 512m --executor-cores 210:



- Error: Could not find or load main class
org.apache.spark.deploy.yarn.ExecutorLauncher

 but  i don't config that para ,there no error  why???that para is only
 avoid Uploading resource file(jar package)??

>>>
>>>
>>
>


Re: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread Yash Sharma
I meant try having the full path in the spark submit command-

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-client \
--driver-memory 512m \
--num-executors 2 \
--executor-memory 512m \
--executor-cores 2 \
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar



On Wed, Jun 22, 2016 at 4:23 PM, 另一片天 <958943...@qq.com> wrote:

> shihj@master:~/workspace/hadoop-2.6.4$ bin/hadoop fs -ls
> hdfs://master:9000/user/shihj/spark_lib
> Found 1 items
> -rw-r--r--   3 shihj supergroup  118955968 2016-06-22 10:24
> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
> shihj@master:~/workspace/hadoop-2.6.4$
> can find the jar on all nodes.
>
>
> -- 原始邮件 --
> *发件人:* "Yash Sharma";;
> *发送时间:* 2016年6月22日(星期三) 下午2:18
> *收件人:* "Saisai Shao";
> *抄送:* "另一片天"<958943...@qq.com>; "user";
> *主题:* Re: Could not find or load main class
> org.apache.spark.deploy.yarn.ExecutorLauncher
>
> Try providing the jar with the hdfs prefix. Its probably just because its
> not able to find the jar on all nodes.
>
> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.
> 1-hadoop2.6.0.jar
>
> Is it able to run on local mode ?
>
> On Wed, Jun 22, 2016 at 4:14 PM, Saisai Shao 
> wrote:
>
>> spark.yarn.jar (none) The location of the Spark jar file, in case
>> overriding the default location is desired. By default, Spark on YARN will
>> use a Spark jar installed locally, but the Spark jar can also be in a
>> world-readable location on HDFS. This allows YARN to cache it on nodes so
>> that it doesn't need to be distributed each time an application runs. To
>> point to a jar on HDFS, for example, set this configuration to
>> hdfs:///some/path.
>>
>> spark.yarn.jar is used for spark run-time system jar, which is spark
>> assembly jar, not the application jar (example-assembly jar). So in your
>> case you upload the example-assembly jar into hdfs, in which spark system
>> jars are not packed, so ExecutorLaucher cannot be found.
>>
>> Thanks
>> Saisai
>>
>> On Wed, Jun 22, 2016 at 2:10 PM, 另一片天 <958943...@qq.com> wrote:
>>
>>> shihj@master:/usr/local/spark/spark-1.6.1-bin-hadoop2.6$
>>> ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
>>> yarn-client --driver-memory 512m --num-executors 2 --executor-memory 512m
>>> --executor-cores 2
>>> /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
>>> Warning: Local jar
>>> /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar does not exist,
>>> skipping.
>>> java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:348)
>>> at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> get error at once
>>> -- 原始邮件 --
>>> *发件人:* "Yash Sharma";;
>>> *发送时间:* 2016年6月22日(星期三) 下午2:04
>>> *收件人:* "另一片天"<958943...@qq.com>;
>>> *抄送:* "user";
>>> *主题:* Re: Could not find or load main class
>>> org.apache.spark.deploy.yarn.ExecutorLauncher
>>>
>>> How about supplying the jar directly in spark submit -
>>>
>>> ./bin/spark-submit \
 --class org.apache.spark.examples.SparkPi \
 --master yarn-client \
 --driver-memory 512m \
 --num-executors 2 \
 --executor-memory 512m \
 --executor-cores 2 \
 /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
>>>
>>>
>>> On Wed, Jun 22, 2016 at 3:59 PM, 另一片天 <958943...@qq.com> wrote:
>>>
 i  config this  para  at spark-defaults.conf
 spark.yarn.jar
 hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar

 then ./bin/spark-submit --class org.apache.spark.examples.SparkPi
 --master yarn-client --driver-memory 512m --num-executors 2
 --executor-memory 512m --executor-cores 210:



- Error: Could not find or load main class
org.apache.spark.deploy.yarn.ExecutorLauncher

 but  i don't config that para ,there no error  why???that para is only
 avoid Uploading resource file(jar package)??

>>>
>>>
>>
>


?????? Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread ????????
Is it able to run on local mode ?


what mean?? standalone mode ?




--  --
??: "Yash Sharma";;
: 2016??6??22??(??) 2:18
??: "Saisai Shao"; 
: ""<958943...@qq.com>; "user"; 
: Re: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher



Try providing the jar with the hdfs prefix. Its probably just because its not 
able to find the jar on all nodes.

hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar



Is it able to run on local mode ?


On Wed, Jun 22, 2016 at 4:14 PM, Saisai Shao  wrote:
spark.yarn.jar(none)The location of the Spark jar file, in case overriding the 
default location is desired. By default, Spark on YARN will use a Spark jar 
installed locally, but the Spark jar can also be in a world-readable location 
on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be 
distributed each time an application runs. To point to a jar on HDFS, for 
example, set this configuration to hdfs:///some/path.


spark.yarn.jar is used for spark run-time system jar, which is spark assembly 
jar, not the application jar (example-assembly jar). So in your case you upload 
the example-assembly jar into hdfs, in which spark system jars are not packed, 
so ExecutorLaucher cannot be found.


Thanks
Saisai


On Wed, Jun 22, 2016 at 2:10 PM,  <958943...@qq.com> wrote:
shihj@master:/usr/local/spark/spark-1.6.1-bin-hadoop2.6$ ./bin/spark-submit 
--class org.apache.spark.examples.SparkPi --master yarn-client --driver-memory 
512m --num-executors 2 --executor-memory 512m --executor-cores 2   
/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
Warning: Local jar /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 
does not exist, skipping.
java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

get error at once
--  --
??: "Yash Sharma";;
: 2016??6??22??(??) 2:04
??: ""<958943...@qq.com>; 
: "user"; 
: Re: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher



How about supplying the jar directly in spark submit - 

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-client \
--driver-memory 512m \
--num-executors 2 \
--executor-memory 512m \
--executor-cores 2 \
/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar


On Wed, Jun 22, 2016 at 3:59 PM,  <958943...@qq.com> wrote:
i  config this  para  at spark-defaults.conf
spark.yarn.jar 
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar


then ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
yarn-client --driver-memory 512m --num-executors 2 --executor-memory 512m 
--executor-cores 210:




Error: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher  

but  i don't config that para ,there no error  why???that para is only avoid 
Uploading resource file(jar package)??

?????? Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread ????????
shihj@master:~/workspace/hadoop-2.6.4$ bin/hadoop fs -ls 
hdfs://master:9000/user/shihj/spark_lib
Found 1 items
-rw-r--r--   3 shihj supergroup  118955968 2016-06-22 10:24 
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
shihj@master:~/workspace/hadoop-2.6.4$ 

can find the jar on all nodes.




--  --
??: "Yash Sharma";;
: 2016??6??22??(??) 2:18
??: "Saisai Shao"; 
: ""<958943...@qq.com>; "user"; 
: Re: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher



Try providing the jar with the hdfs prefix. Its probably just because its not 
able to find the jar on all nodes.

hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar



Is it able to run on local mode ?


On Wed, Jun 22, 2016 at 4:14 PM, Saisai Shao  wrote:
spark.yarn.jar(none)The location of the Spark jar file, in case overriding the 
default location is desired. By default, Spark on YARN will use a Spark jar 
installed locally, but the Spark jar can also be in a world-readable location 
on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be 
distributed each time an application runs. To point to a jar on HDFS, for 
example, set this configuration to hdfs:///some/path.


spark.yarn.jar is used for spark run-time system jar, which is spark assembly 
jar, not the application jar (example-assembly jar). So in your case you upload 
the example-assembly jar into hdfs, in which spark system jars are not packed, 
so ExecutorLaucher cannot be found.


Thanks
Saisai


On Wed, Jun 22, 2016 at 2:10 PM,  <958943...@qq.com> wrote:
shihj@master:/usr/local/spark/spark-1.6.1-bin-hadoop2.6$ ./bin/spark-submit 
--class org.apache.spark.examples.SparkPi --master yarn-client --driver-memory 
512m --num-executors 2 --executor-memory 512m --executor-cores 2   
/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
Warning: Local jar /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 
does not exist, skipping.
java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

get error at once
--  --
??: "Yash Sharma";;
: 2016??6??22??(??) 2:04
??: ""<958943...@qq.com>; 
: "user"; 
: Re: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher



How about supplying the jar directly in spark submit - 

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-client \
--driver-memory 512m \
--num-executors 2 \
--executor-memory 512m \
--executor-cores 2 \
/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar


On Wed, Jun 22, 2016 at 3:59 PM,  <958943...@qq.com> wrote:
i  config this  para  at spark-defaults.conf
spark.yarn.jar 
hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar


then ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
yarn-client --driver-memory 512m --num-executors 2 --executor-memory 512m 
--executor-cores 210:




Error: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher  

but  i don't config that para ,there no error  why???that para is only avoid 
Uploading resource file(jar package)??

Re: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread Yash Sharma
Try providing the jar with the hdfs prefix. Its probably just because its
not able to find the jar on all nodes.

hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar

Is it able to run on local mode ?

On Wed, Jun 22, 2016 at 4:14 PM, Saisai Shao  wrote:

> spark.yarn.jar (none) The location of the Spark jar file, in case
> overriding the default location is desired. By default, Spark on YARN will
> use a Spark jar installed locally, but the Spark jar can also be in a
> world-readable location on HDFS. This allows YARN to cache it on nodes so
> that it doesn't need to be distributed each time an application runs. To
> point to a jar on HDFS, for example, set this configuration to
> hdfs:///some/path.
>
> spark.yarn.jar is used for spark run-time system jar, which is spark
> assembly jar, not the application jar (example-assembly jar). So in your
> case you upload the example-assembly jar into hdfs, in which spark system
> jars are not packed, so ExecutorLaucher cannot be found.
>
> Thanks
> Saisai
>
> On Wed, Jun 22, 2016 at 2:10 PM, 另一片天 <958943...@qq.com> wrote:
>
>> shihj@master:/usr/local/spark/spark-1.6.1-bin-hadoop2.6$
>> ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
>> yarn-client --driver-memory 512m --num-executors 2 --executor-memory 512m
>> --executor-cores 2
>> /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
>> Warning: Local jar
>> /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar does not exist,
>> skipping.
>> java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> get error at once
>> -- 原始邮件 --
>> *发件人:* "Yash Sharma";;
>> *发送时间:* 2016年6月22日(星期三) 下午2:04
>> *收件人:* "另一片天"<958943...@qq.com>;
>> *抄送:* "user";
>> *主题:* Re: Could not find or load main class
>> org.apache.spark.deploy.yarn.ExecutorLauncher
>>
>> How about supplying the jar directly in spark submit -
>>
>> ./bin/spark-submit \
>>> --class org.apache.spark.examples.SparkPi \
>>> --master yarn-client \
>>> --driver-memory 512m \
>>> --num-executors 2 \
>>> --executor-memory 512m \
>>> --executor-cores 2 \
>>> /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
>>
>>
>> On Wed, Jun 22, 2016 at 3:59 PM, 另一片天 <958943...@qq.com> wrote:
>>
>>> i  config this  para  at spark-defaults.conf
>>> spark.yarn.jar
>>> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
>>>
>>> then ./bin/spark-submit --class org.apache.spark.examples.SparkPi
>>> --master yarn-client --driver-memory 512m --num-executors 2
>>> --executor-memory 512m --executor-cores 210:
>>>
>>>
>>>
>>>- Error: Could not find or load main class
>>>org.apache.spark.deploy.yarn.ExecutorLauncher
>>>
>>> but  i don't config that para ,there no error  why???that para is only
>>> avoid Uploading resource file(jar package)??
>>>
>>
>>
>


Re: Silly question about Yarn client vs Yarn cluster modes...

2016-06-22 Thread Mich Talebzadeh
If you are going to get data out of an RDBMS like Oracle then the correct
procedure is:


   1. Use Hive on Spark execution engine. That improves Hive performance
   2. You can use JDBC through Spark itself. No issue there. It will use
   JDBC provided by HiveContext
   3. JDBC is fine. Every vendor has a utility to migrate a full database
   from one to another using JDBC. For example SAP relies on JDBC to migrate a
   whole Oracle schema to SAP ASE
   4. I have imported an Oracle table of 1 billion rows through Spark into
   Hive ORC table. It works fine. Actually I use Spark to do the job for these
   type of imports. Register a tempTable from DF and use it to put data into
   Hive table. You can create Hive table explicitly in Spark and do an
   INSERT/SELECT into it rather than save etc.
   5. You can access Hive tables through HiveContext. Beeline is a client
   tool that connects to Hive thrift server. I don't think it comes into
   equation here
   6. Finally one experiment worth multiples of these speculation. Try for
   yourself and fine out.
   7. If you want to use JDBC for an RDBMS table then you will need to
   download the relevant JAR file. For example for Oracle it is ojdbc6.jar etc

Like anything else your mileage varies and need to experiment with it.
Otherwise these are all opinions.

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 June 2016 at 06:46, Jörn Franke  wrote:

> I would import data via sqoop and put it on HDFS. It has some mechanisms
> to handle the lack of reliability by jdbc.
>
> Then you can process the data via Spark. You could also use jdbc rdd but I
> do not recommend to use it, because you do not want to pull data all the
> time out of the database when you execute your application. Furthermore,
> you have to handle connection interruptions, the multiple
> serialization/deserialization efforts, if one executor crashes you have to
> repull some or all of the data from the database etc
>
> Within the cluster it does not make sense to me to pull data via jdbc from
> hive. All the benefits such as data locality, reliability etc would be gone.
>
> Hive supports different execution engines (TEZ, Spark), formats (Orc,
> parquet) and further optimizations to make the analysis fast. It always
> depends on your use case.
>
> On 22 Jun 2016, at 05:47, Michael Segel  wrote:
>
>
> Sorry, I think you misunderstood.
> Spark can read from JDBC sources so to say using beeline as a way to
> access data is not a spark application isn’t really true.  Would you say
> the same if you were pulling data in to spark from Oracle or DB2?
> There are a couple of different design patterns and use cases where data
> could be stored in Hive yet your only access method is via a JDBC or
> Thift/Rest service.  Think also of compute / storage cluster
> implementations.
>
> WRT to #2, not exactly what I meant, by exposing the data… and there are
> limitations to the thift service…
>
> On Jun 21, 2016, at 5:44 PM, ayan guha  wrote:
>
> 1. Yes, in the sense you control number of executors from spark
> application config.
> 2. Any IO will be done from executors (never ever on driver, unless you
> explicitly call collect()). For example, connection to a DB happens one for
> each worker (and used by local executors). Also, if you run a reduceByKey
> job and write to hdfs, you will find a bunch of files were written from
> various executors. What happens when you want to expose the data to world:
> Spark Thrift Server (STS), which is a long running spark application (ie
> spark context) which can serve data from RDDs.
>
> Suppose I have a data source… like a couple of hive tables and I access
> the tables via beeline. (JDBC)  -
> This is NOT a spark application, and there is no RDD created. Beeline is
> just a jdbc client tool. You use beeline to connect to HS2 or STS.
>
> In this case… Hive generates a map/reduce job and then would stream the
> result set back to the client node where the RDD result set would be built.
>  --
> This is never true. When you connect Hive from spark, spark actually reads
> hive metastore and streams data directly from HDFS. Hive MR jobs do not
> play any role here, making spark faster than hive.
>
> HTH
>
> Ayan
>
> On Wed, Jun 22, 2016 at 9:58 AM, Michael Segel 
> wrote:
>
>> Ok, its at the end of the day and I’m trying to make sure I understand
>> the locale of where things are running.
>>
>> I have an application where I have to query a bunch of sources, creating
>> some RDDs and then I need to join off the RDDs and some other lookup tables.
>>
>>
>> Yarn has two modes… client and cluster.
>>
>> I get it that in cluster mode… everything is running on the cluster.
>> But in 

  1   2   >