RE: Why is my spark executor is terminated?

2015-10-14 Thread Wang, Ningjun (LNG-NPV)
I checked master log before and did not find anything wrong. Unfortunately I 
have lost the master log now.

So you think master log will tell you why executor is down?

Regards,

Ningjun Wang


-Original Message-
From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net] 
Sent: Tuesday, October 13, 2015 10:42 AM
To: user@spark.apache.org
Subject: Re: Why is my spark executor is terminated?

Hi Ningjun,

Nothing special in the master log ?

Regards
JB

On 10/13/2015 04:34 PM, Wang, Ningjun (LNG-NPV) wrote:
> We use spark on windows 2008 R2 servers. We use one spark context 
> which create one spark executor. We run spark master, slave, driver, 
> executor on one single machine.
>
>  From time to time, we found that the executor JAVA process was 
> terminated. I cannot fig out why it was terminated. Can anybody help 
> me on how to find out why the executor was terminated?
>
> The spark slave log. It shows that it kill the executor process
>
> 2015-10-13 09:58:06,087 INFO
> [sparkWorker-akka.actor.default-dispatcher-16] worker.Worker
> (Logging.scala:logInfo(59)) - Asked to kill executor
> app-20151009201453-/0
>
> But why does it do that?
>
> Here is the detailed logs from spark slave
>
> 2015-10-13 09:58:04,915 WARN
> [sparkWorker-akka.actor.default-dispatcher-16]
> remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) 
> - Association with remote system 
> [akka.tcp://sparkexecu...@qa1-cas01.pcc.lexisnexis.com:61234] has 
> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>
> 2015-10-13 09:58:05,134 INFO
> [sparkWorker-akka.actor.default-dispatcher-16] actor.LocalActorRef
> (Slf4jLogger.scala:apply$mcV$sp(74)) - Message 
> [akka.remote.EndpointWriter$AckIdleCheckTimer$] from 
> Actor[akka://sparkWorker/system/endpointManager/reliableEndpointWriter
> -akka.tcp%3A%2F%2FsparkExecutor%40QA1-CAS01.pcc.lexisnexis.com%3A61234
> -2/endpointWriter#-175670388]
> to
> Actor[akka://sparkWorker/system/endpointManager/reliableEndpointWriter
> -akka.tcp%3A%2F%2FsparkExecutor%40QA1-CAS01.pcc.lexisnexis.com%3A61234
> -2/endpointWriter#-175670388] was not delivered. [2] dead letters 
> encountered. This logging can be turned off or adjusted with 
> configuration settings 'akka.log-dead-letters' and 
> 'akka.log-dead-letters-during-shutdown'.
>
> 2015-10-13 09:58:05,134 INFO
> [sparkWorker-akka.actor.default-dispatcher-16] actor.LocalActorRef
> (Slf4jLogger.scala:apply$mcV$sp(74)) - Message 
> [akka.remote.transport.AssociationHandle$Disassociated] from 
> Actor[akka://sparkWorker/deadLetters] to 
> Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/ak
> kaProtocol-tcp%3A%2F%2FsparkWorker%4010.196.116.184%3A61236-3#-1210125
> 680] was not delivered. [3] dead letters encountered. This logging can 
> be turned off or adjusted with configuration settings 
> 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
>
> 2015-10-13 09:58:05,134 INFO
> [sparkWorker-akka.actor.default-dispatcher-16] actor.LocalActorRef
> (Slf4jLogger.scala:apply$mcV$sp(74)) - Message 
> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
> from Actor[akka://sparkWorker/deadLetters] to 
> Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/ak
> kaProtocol-tcp%3A%2F%2FsparkWorker%4010.196.116.184%3A61236-3#-1210125
> 680] was not delivered. [4] dead letters encountered. This logging can 
> be turned off or adjusted with configuration settings 
> 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
>
> 2015-10-13 09:58:06,087 INFO
> [sparkWorker-akka.actor.default-dispatcher-16] worker.Worker
> (Logging.scala:logInfo(59)) - Asked to kill executor
> app-20151009201453-/0
>
> 2015-10-13 09:58:06,103 INFO  [ExecutorRunner for 
> app-20151009201453-/0] worker.ExecutorRunner
> (Logging.scala:logInfo(59)) - Runner thread for executor
> app-20151009201453-/0 interrupted
>
> 2015-10-13 09:58:06,118 INFO  [ExecutorRunner for 
> app-20151009201453-/0] worker.ExecutorRunner
> (Logging.scala:logInfo(59)) - Killing process!
>
> 2015-10-13 09:58:06,509 INFO
> [sparkWorker-akka.actor.default-dispatcher-16] worker.Worker
> (Logging.scala:logInfo(59)) - Executor app-20151009201453-/0 
> finished with state KILLED exitStatus 1
>
> 2015-10-13 09:58:06,509 INFO
> [sparkWorker-akka.actor.default-dispatcher-16] worker.Worker
> (Logging.scala:logInfo(59)) - Cleaning up local directories for 
> application app-20151009201453-
>
> Thanks
>
> Ningjun Wang
>

--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


Dynamic partitioning pruning

2015-10-14 Thread Younes Naguib
Hi,

This feature was added in Hive 1.3.
https://issues.apache.org/jira/browse/HIVE-9152
Any idea when this would be in Spark? Or is it already?

Any work around in spark 1.5.1?

Thanks,
Younes



RE: Node afinity for Kafka-Direct Stream

2015-10-14 Thread prajod.vettiyattil
Hi,

Another point is the in the receiver based approach, all the data from kafka 
first goes to the Worker where the receiver runs
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

Also if you create one stream (which is the normal case), and you have many 
worker instances, only one worker does all the reading. Once that worker reads, 
the data can be “repartitioned” to distribute the load. This repartitioning is 
a data movement overhead in the receiver based approach.
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
{
In Receiver approach:
Multiple Kafka input DStreams can be created with different groups and topics 
for parallel receiving of data using multiple receivers.


In Direct approach:
Simplified Parallelism: No need to create multiple input Kafka streams and 
union them.
}

Prajod
From: Gerard Maas [mailto:gerard.m...@gmail.com]
Sent: 14 October 2015 18:53
To: Saisai Shao 
Cc: Rishitesh Mishra ; spark users 

Subject: Re: Node afinity for Kafka-Direct Stream

Thanks Saisai, Mishra,

Indeed, that hint will only work on a case where the Spark executor is 
co-located with the Kafka broker.
I think the answer to my question as stated  is that there's no warranty of 
where the task will execute as it will depend on the scheduler and cluster 
resources available  (Mesos in our case).
Therefore, any assumptions made about data locality using the consumer-based 
approach need to be reconsidered when migrating to the direct stream.

((In our case, we were using local caches to decide when a given secondary 
index for a record should be produced and written.))

-kr, Gerard.




On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao 
> wrote:
This preferred locality is a hint to spark to schedule Kafka tasks on the 
preferred nodes, if Kafka and Spark are two separate cluster, obviously this 
locality hint takes no effect, and spark will schedule tasks following 
node-local -> rack-local -> any pattern, like any other spark tasks.

On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra 
> wrote:
Hi Gerard,
I am also trying to understand the same issue. Whatever code I have seen it 
looks like once Kafka RDD is constructed the execution of that RDD is upto the 
task scheduler and it can schedule the partitions based on the load on nodes. 
There is preferred node specified in Kafks RDD. But ASFIK it maps to the Kafka 
partitions host . So if Kafka and Spark are co hosted probably this will work. 
If not, I am not sure how to get data locality for a partition.
Others,
correct me if there is a way.

On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
> wrote:
In the receiver-based kafka streaming model, given that each receiver starts as 
a long-running task, one can rely in a certain degree of data locality based on 
the kafka partitioning:  Data published on a given topic/partition will land on 
the same spark streaming receiving node until the receiver dies and needs to be 
restarted somewhere else.

As I understand, the direct-kafka streaming model just computes offsets and 
relays the work to a KafkaRDD. How is the execution locality compared to the 
receiver-based approach?

thanks, Gerard.



--

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com


Get *document*-topic distribution from PySpark LDA model?

2015-10-14 Thread moustachio
Hi! I already have a StackOverflow question on this (see  here

 
), but haven't received any responses, so I thought I'd try here!

Long story short, I'm working in PySpark and have successfully generated an
LDA topic model, but can't figure out how to (or if I can) extract the topic
distributions for each document from the model. I understand the LDA
functionality is still in development, but getting document topic
distributions is arguably the principal use case here, and is not (as far as
I can tell) implemented in the Python API. I can easily get the *word*-topic
distribution, by calling model.topicsMatrix(), but this isn't what I need,
and there don't seems to be any other useful methods in the Python LDA model
class.

The only glimmer of hope came from finding the documentation for
DistributedLDAModel in the Java api, which has a topicDistributions() method
that I think is just what I need here (but I'm 100% sure if the LDAModel in
Pyspark is in fact a DistributedLDAModel under the hood...).

In any case, I am able to indirectly call this method like so, without any
overt failures:

In [127]: model.call('topicDistributions')
Out[127]: MapPartitionsRDD[3156] at mapPartitions at
PythonMLLibAPI.scala:1480

But if I actually look at the results, all I get are strings telling me that
the result is actually a Scala tuple (I think):

In [128]: model.call('topicDistributions').take(5)
Out[128]:
[{u'__class__': u'scala.Tuple2'},
 {u'__class__': u'scala.Tuple2'},
 {u'__class__': u'scala.Tuple2'},
 {u'__class__': u'scala.Tuple2'},
 {u'__class__': u'scala.Tuple2'}]

Maybe this is generally the right approach, but is there way to get the
actual results?

Thanks in advance for any guidance you can offer!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Get-document-topic-distribution-from-PySpark-LDA-model-tp25063.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Cody Koeninger
Assumptions about locality in spark are not very reliable, regardless of
what consumer you use.  Even if you have locality preferences, and locality
wait turned up really high, you still have to account for losing executors.

On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas  wrote:

> Thanks Saisai, Mishra,
>
> Indeed, that hint will only work on a case where the Spark executor is
> co-located with the Kafka broker.
> I think the answer to my question as stated  is that there's no warranty
> of where the task will execute as it will depend on the scheduler and
> cluster resources available  (Mesos in our case).
> Therefore, any assumptions made about data locality using the
> consumer-based approach need to be reconsidered when migrating to the
> direct stream.
>
> ((In our case, we were using local caches to decide when a given secondary
> index for a record should be produced and written.))
>
> -kr, Gerard.
>
>
>
>
> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao 
> wrote:
>
>> This preferred locality is a hint to spark to schedule Kafka tasks on the
>> preferred nodes, if Kafka and Spark are two separate cluster, obviously
>> this locality hint takes no effect, and spark will schedule tasks following
>> node-local -> rack-local -> any pattern, like any other spark tasks.
>>
>> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra 
>> wrote:
>>
>>> Hi Gerard,
>>> I am also trying to understand the same issue. Whatever code I have seen
>>> it looks like once Kafka RDD is constructed the execution of that RDD is
>>> upto the task scheduler and it can schedule the partitions based on the
>>> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
>>> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
>>> probably this will work. If not, I am not sure how to get data locality for
>>> a partition.
>>> Others,
>>> correct me if there is a way.
>>>
>>> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
>>> wrote:
>>>
 In the receiver-based kafka streaming model, given that each receiver
 starts as a long-running task, one can rely in a certain degree of data
 locality based on the kafka partitioning:  Data published on a given
 topic/partition will land on the same spark streaming receiving node until
 the receiver dies and needs to be restarted somewhere else.

 As I understand, the direct-kafka streaming model just computes offsets
 and relays the work to a KafkaRDD. How is the execution locality compared
 to the receiver-based approach?

 thanks, Gerard.

>>>
>>>
>>>
>>> --
>>>
>>> Regards,
>>> Rishitesh Mishra,
>>> SnappyData . (http://www.snappydata.io/)
>>>
>>> https://in.linkedin.com/in/rishiteshmishra
>>>
>>
>>
>


NullPointerException when adding to accumulator

2015-10-14 Thread Sela, Amit
I'm running a simple streaming application that reads from Kafka, maps the 
events and prints them and I'm trying to use accumulators to count the number 
of mapped records.

While this works in standalone(IDE), when submitting to YARN I get 
NullPointerException on accumulator.add(1) or accumulator += 1

Anyone using accumulators in .map() with Spark 1.5 and YARN ?

Thanks,
Amit





Re: Why is the Spark Web GUI failing with JavaScript "Uncaught SyntaxError"?

2015-10-14 Thread Jonathan Kelly
Ah, yes, it will use private IPs, so you may need to update your FoxyProxy
settings to include the private IPs in the regex as well as the public IPs.

Also, yes, for completed applications you may use the Spark History Server
on port 18080. The YARN ProxyServer will automatically redirect to the
Spark History Server once a job has completed, so you can still start from
the YARN ResourceManager. In the case of completed applications, the link
on the YARN ResourceManager will be "History" instead of
"ApplicationMaster".

~ Jonathan

On Wed, Oct 14, 2015 at 12:57 AM, Joshua Fox  wrote:

> Thank you!
>
> It seems that the the history server at port 18080 also gives access to
> the Spark GUI as below
>
> Following your tip, I see that the  YARN ResourceManager GUI on 8088
> indeed has that ApplicationMaster link, though to a private rather than
> public IP; replacing IPs brings me to the same Spark GUI.
>
> Joshua
> [image: Inline image 3]
>
>
>
>
> On Tue, Oct 13, 2015 at 6:23 PM, Jonathan Kelly 
> wrote:
>
>> Joshua,
>>
>> Since Spark is configured to run on YARN in EMR, instead of viewing the
>> Spark application UI at port 4040, you should instead start from the YARN
>> ResourceManager (on port 8088), then click on the ApplicationMaster link
>> for the Spark application you are interested in. This will take you to the
>> YARN ProxyServer on port 20888, which will proxy you through to the Spark
>> UI for the application (which renders correctly when viewed this way). This
>> works even if the Spark UI is running on a port other than 4040 and even in
>> yarn-cluster mode when the Spark driver is running on a slave node.
>>
>> Hope this helps,
>> Jonathan
>>
>> On Tue, Oct 13, 2015 at 7:19 AM, Jean-Baptiste Onofré 
>> wrote:
>>
>>> Thanks for the update Joshua.
>>>
>>> Let me try with Spark 1.4.1.
>>>
>>> I keep you posted.
>>>
>>> Regards
>>> JB
>>>
>>> On 10/13/2015 04:17 PM, Joshua Fox wrote:
>>>
   * Spark 1.4.1, part of EMR emr-4.0.0
   * Chrome Version 41.0.2272.118 (64-bit) on Ubuntu


 On Tue, Oct 13, 2015 at 3:27 PM, Jean-Baptiste Onofré > wrote:

 Hi Joshua,

 What's the Spark version and what's your browser ?

 I just tried on Spark 1.6-SNAPSHOT with firefox and it works fine.

 Thanks
 Regards
 JB

 On 10/13/2015 02:17 PM, Joshua Fox wrote:

 I am accessing the Spark Jobs Web GUI, running on AWS EMR.

 I can access this webapp (port 4040 as per default), but it only
 half-renders, producing "Uncaught SyntaxError: Unexpected token
 <"

 Here is a screenshot  including
 Chrome
 Developer Console.

 Screenshot 

 Here are some of the error messages in my Chrome console.

 /Uncaught SyntaxError: Unexpected token <
 (index):3 Resource interpreted as Script but transferred with
 MIME type
 text/html:
 "
 http://ec2-52-89-59-167.us-west-2.compute.amazonaws.com:4040/jobs/;.
 (index):74 Uncaught ReferenceError: drawApplicationTimeline is
 not defined
 (index):12 Resource interpreted as Image but transferred with
 MIME type
 text/html:
 "
 http://ec2-52-89-59-167.us-west-2.compute.amazonaws.com:4040/jobs/;

 /
 Note that the History GUI at port 18080 and the Hadoop GUI at
 port 8088
 work fine, and the Spark jobs GUI does partly render. So, it
 seems that
 my browser proxy is not the cause of this problem.

 Joshua


 --
 Jean-Baptiste Onofré
 jbono...@apache.org 
 http://blog.nanthrax.net
 Talend - http://www.talend.com


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 
 For additional commands, e-mail: user-h...@spark.apache.org
 



>>> --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Question about data frame partitioning in Spark 1.3.0

2015-10-14 Thread Cesar Flores
My current version of spark is 1.3.0 and my question is the next:

I have large data frames where the main field is an user id. I need to do
many group by's and joins using that field. Do the performance will
increase if before doing any group by or join operation I first convert to
rdd to partition by the user id? In other words trying something like the
next lines in all my user data tables will improve the performance in the
long run?:

val partitioned_rdd = unpartitioned_df
   .map(row=>(row.getLong(0), row))
   .partitionBy(new HashPartitioner(200))
   .map(x => x._2)

val partitioned_df = hc.createDataFrame(partitioned_rdd,
unpartitioned_df.schema)




Thanks a lot
-- 
Cesar Flores


Re: How to calculate percentile of a column of DataFrame?

2015-10-14 Thread Umesh Kacha
Hi Ted thanks much for your help. So fix is in JIRA 10671 and it is suppose
to release in spark 1.6.0 right? Until 1.6.0 is released I won't be able to
invoke callUdf using string and percentile_approx with lit as argument
right
On Oct 14, 2015 03:26, "Ted Yu"  wrote:

> I modified DataFrameSuite, in master branch, to call percentile_approx
> instead of simpleUDF :
>
> - deprecated callUdf in SQLContext
> - callUDF in SQLContext *** FAILED ***
>   org.apache.spark.sql.AnalysisException: undefined function
> percentile_approx;
>   at
> org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry$$anonfun$2.apply(FunctionRegistry.scala:64)
>   at
> org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry$$anonfun$2.apply(FunctionRegistry.scala:64)
>   at scala.Option.getOrElse(Option.scala:120)
>   at
> org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:63)
>   at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:506)
>   at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:506)
>   at
> org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
>   at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:505)
>   at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:502)
>   at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
>
> SPARK-10671 is included.
> For 1.5.1, I guess the absence of SPARK-10671 means that SparkSQL
> treats percentile_approx as normal UDF.
>
> Experts can correct me, if there is any misunderstanding.
>
> Cheers
>
> On Tue, Oct 13, 2015 at 6:09 AM, Umesh Kacha 
> wrote:
>
>> Hi Ted I am using the following line of code I can't paste entire code
>> sorry but the following only line doesn't compile in my spark job
>>
>>  sourceframe.select(callUDF("percentile_approx",col("mycol"), lit(0.25)))
>>
>> I am using Intellij editor java and maven dependencies of spark core
>> spark sql spark hive version 1.5.1
>> On Oct 13, 2015 18:21, "Ted Yu"  wrote:
>>
>>> Can you pastebin your Java code and the command you used to compile ?
>>>
>>> Thanks
>>>
>>> On Oct 13, 2015, at 1:42 AM, Umesh Kacha  wrote:
>>>
>>> Hi Ted if fix went after 1.5.1 release then how come it's working with
>>> 1.5.1 binary in spark-shell.
>>> On Oct 13, 2015 1:32 PM, "Ted Yu"  wrote:
>>>
 Looks like the fix went in after 1.5.1 was released.

 You may verify using master branch build.

 Cheers

 On Oct 13, 2015, at 12:21 AM, Umesh Kacha 
 wrote:

 Hi Ted, thanks much I tried using percentile_approx in Spark-shell like
 you mentioned it works using 1.5.1 but it doesn't compile in Java using
 1.5.1 maven libraries it still complains same that callUdf can have string
 and column types only. Please guide.
 On Oct 13, 2015 12:34 AM, "Ted Yu"  wrote:

> SQL context available as sqlContext.
>
> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id",
> "value")
> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>
> scala> df.select(callUDF("percentile_approx",col("value"),
> lit(0.25))).show()
> +--+
> |'percentile_approx(value,0.25)|
> +--+
> |   1.0|
> +--+
>
> Can you upgrade to 1.5.1 ?
>
> Cheers
>
> On Mon, Oct 12, 2015 at 11:55 AM, Umesh Kacha 
> wrote:
>
>> Sorry forgot to tell that I am using Spark 1.4.1 as callUdf is
>> available in Spark 1.4.0 as per JAvadocx
>>
>> On Tue, Oct 13, 2015 at 12:22 AM, Umesh Kacha 
>> wrote:
>>
>>> Hi Ted thanks much for the detailed answer and appreciate your
>>> efforts. Do we need to register Hive UDFs?
>>>
>>> sqlContext.udf.register("percentile_approx");???//is it valid?
>>>
>>> I am calling Hive UDF percentile_approx in the following manner
>>> which gives compilation error
>>>
>>> df.select("col1").groupby("col1").agg(callUdf("percentile_approx",col("col1"),lit(0.25)));//compile
>>> error
>>>
>>> //compile error because callUdf() takes String and Column* as
>>> arguments.
>>>
>>> Please guide. Thanks much.
>>>
>>> On Mon, Oct 12, 2015 at 11:44 PM, Ted Yu 
>>> wrote:
>>>
 Using spark-shell, I did the following exercise 

Re: Why is my spark executor is terminated?

2015-10-14 Thread Jean-Baptiste Onofré

Hi Ningjun

I just wanted to check that the master didn't "kick out" the worker, as 
the "Disassociated" can come from the master.


Here it looks like the worker killed the executor before shutting down 
itself.


What's the Spark version ?

Regards
JB

On 10/14/2015 04:42 PM, Wang, Ningjun (LNG-NPV) wrote:

I checked master log before and did not find anything wrong. Unfortunately I 
have lost the master log now.

So you think master log will tell you why executor is down?

Regards,

Ningjun Wang


-Original Message-
From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net]
Sent: Tuesday, October 13, 2015 10:42 AM
To: user@spark.apache.org
Subject: Re: Why is my spark executor is terminated?

Hi Ningjun,

Nothing special in the master log ?

Regards
JB

On 10/13/2015 04:34 PM, Wang, Ningjun (LNG-NPV) wrote:

We use spark on windows 2008 R2 servers. We use one spark context
which create one spark executor. We run spark master, slave, driver,
executor on one single machine.

  From time to time, we found that the executor JAVA process was
terminated. I cannot fig out why it was terminated. Can anybody help
me on how to find out why the executor was terminated?

The spark slave log. It shows that it kill the executor process

2015-10-13 09:58:06,087 INFO
[sparkWorker-akka.actor.default-dispatcher-16] worker.Worker
(Logging.scala:logInfo(59)) - Asked to kill executor
app-20151009201453-/0

But why does it do that?

Here is the detailed logs from spark slave

2015-10-13 09:58:04,915 WARN
[sparkWorker-akka.actor.default-dispatcher-16]
remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71))
- Association with remote system
[akka.tcp://sparkexecu...@qa1-cas01.pcc.lexisnexis.com:61234] has
failed, address is now gated for [5000] ms. Reason is: [Disassociated].

2015-10-13 09:58:05,134 INFO
[sparkWorker-akka.actor.default-dispatcher-16] actor.LocalActorRef
(Slf4jLogger.scala:apply$mcV$sp(74)) - Message
[akka.remote.EndpointWriter$AckIdleCheckTimer$] from
Actor[akka://sparkWorker/system/endpointManager/reliableEndpointWriter
-akka.tcp%3A%2F%2FsparkExecutor%40QA1-CAS01.pcc.lexisnexis.com%3A61234
-2/endpointWriter#-175670388]
to
Actor[akka://sparkWorker/system/endpointManager/reliableEndpointWriter
-akka.tcp%3A%2F%2FsparkExecutor%40QA1-CAS01.pcc.lexisnexis.com%3A61234
-2/endpointWriter#-175670388] was not delivered. [2] dead letters
encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.

2015-10-13 09:58:05,134 INFO
[sparkWorker-akka.actor.default-dispatcher-16] actor.LocalActorRef
(Slf4jLogger.scala:apply$mcV$sp(74)) - Message
[akka.remote.transport.AssociationHandle$Disassociated] from
Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/ak
kaProtocol-tcp%3A%2F%2FsparkWorker%4010.196.116.184%3A61236-3#-1210125
680] was not delivered. [3] dead letters encountered. This logging can
be turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

2015-10-13 09:58:05,134 INFO
[sparkWorker-akka.actor.default-dispatcher-16] actor.LocalActorRef
(Slf4jLogger.scala:apply$mcV$sp(74)) - Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/ak
kaProtocol-tcp%3A%2F%2FsparkWorker%4010.196.116.184%3A61236-3#-1210125
680] was not delivered. [4] dead letters encountered. This logging can
be turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

2015-10-13 09:58:06,087 INFO
[sparkWorker-akka.actor.default-dispatcher-16] worker.Worker
(Logging.scala:logInfo(59)) - Asked to kill executor
app-20151009201453-/0

2015-10-13 09:58:06,103 INFO  [ExecutorRunner for
app-20151009201453-/0] worker.ExecutorRunner
(Logging.scala:logInfo(59)) - Runner thread for executor
app-20151009201453-/0 interrupted

2015-10-13 09:58:06,118 INFO  [ExecutorRunner for
app-20151009201453-/0] worker.ExecutorRunner
(Logging.scala:logInfo(59)) - Killing process!

2015-10-13 09:58:06,509 INFO
[sparkWorker-akka.actor.default-dispatcher-16] worker.Worker
(Logging.scala:logInfo(59)) - Executor app-20151009201453-/0
finished with state KILLED exitStatus 1

2015-10-13 09:58:06,509 INFO
[sparkWorker-akka.actor.default-dispatcher-16] worker.Worker
(Logging.scala:logInfo(59)) - Cleaning up local directories for
application app-20151009201453-

Thanks

Ningjun Wang



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



--
Jean-Baptiste Onofré
jbono...@apache.org

Re: Question about data frame partitioning in Spark 1.3.0

2015-10-14 Thread Cesar Flores
Thanks Michael for your input.


By 1) do you mean:

   - Caching the partitioned_rdd
   - Caching the partitioned_df
   - *Or* just caching unpartitioned_df without the need of creating
the partitioned_rdd
   variable?


Can you expand a little bit more 2)


Thanks!

On Wed, Oct 14, 2015 at 12:11 PM, Michael Armbrust 
wrote:

> This won't help as for two reasons:
>  1) Its all still just creating lineage since you aren't caching the
> partitioned data.  It will still fetch the shuffled blocks for each query.
>  2) The query optimizer is not aware of RDD level partitioning since its
> mostly a blackbox.
>
> 1) could be fixed by adding caching.  2) is on our roadmap (though you'd
> have to use logical DataFrame expressions to do the partitioning instead of
> a class based partitioner).
>
> On Wed, Oct 14, 2015 at 8:45 AM, Cesar Flores  wrote:
>
>>
>> My current version of spark is 1.3.0 and my question is the next:
>>
>> I have large data frames where the main field is an user id. I need to do
>> many group by's and joins using that field. Do the performance will
>> increase if before doing any group by or join operation I first convert to
>> rdd to partition by the user id? In other words trying something like the
>> next lines in all my user data tables will improve the performance in the
>> long run?:
>>
>> val partitioned_rdd = unpartitioned_df
>>.map(row=>(row.getLong(0), row))
>>.partitionBy(new HashPartitioner(200))
>>.map(x => x._2)
>>
>> val partitioned_df = hc.createDataFrame(partitioned_rdd,
>> unpartitioned_df.schema)
>>
>>
>>
>>
>> Thanks a lot
>> --
>> Cesar Flores
>>
>
>


-- 
Cesar Flores


Re: Programmatically connect to remote YARN in yarn-client mode

2015-10-14 Thread Marcelo Vanzin
On Wed, Oct 14, 2015 at 10:01 AM, Florian Kaspar
 wrote:
> we are working on a project running on Spark. Currently we connect to a 
> remote Spark-Cluster in Standalone mode to obtain the SparkContext using
>
> new JavaSparkContext(new 
> SparkConf().setAppName("").setMaster("spark://:7077"));

> Can anyone tell me how to create a Spark context programmatically connecting 
> to a remote YARN cluster?

You should be able to replace the standalone URL with "yarn-client",
and it should work, assuming you have the HADOOP_CONF_DIR (or
YARN_CONF_DIR) env variable pointing at a valid YARN configuration.

Note that if the machine running this code is far from the cluster
performance might not be that great.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.5 java.net.ConnectException: Connection refused

2015-10-14 Thread Spark Newbie
Is it slowing things down or blocking progress.
>> I didn't see slowing of processing, but I do see jobs aborted
consecutively for a period of 18 batches (5 minute batch intervals). So I
am worried about what happened to the records that these jobs were
processing.
Also, one more thing to mention is that the
StreamingListenerBatchCompleted.numRecords information shows all received
records as processed even if the batch/job failed. The processing time as
well shows as the same time it takes for a successful batch.
It seems like it is the numRecords which was the input to the batch
regardless of whether they were successfully processed or not.

On Wed, Oct 14, 2015 at 11:01 AM, Spark Newbie 
wrote:

> I ran 2 different spark 1.5 clusters that have been running for more than
> a day now. I do see jobs getting aborted due to task retry's maxing out
> (default 4) due to ConnectionException. It seems like the executors die and
> get restarted and I was unable to find the root cause (same app code and
> conf used on spark 1.4.1 I don't see ConnectionException).
>
> Another question related to this, what happens to the kinesis records
> received when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I
> am using) does the job gets resubmitted with the same received records? Or
> does the kinesis-asl library get those records again based on sequence
> numbers it tracks? It would good for me to understand the story around
> lossless processing of kinesis records in Spark-1.5 + kinesis-asl-1.5 when
> jobs are aborted. Any pointers or quick explanation would be very helpful.
>
>
> On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das 
> wrote:
>
>> Is this happening too often? Is it slowing things down or blocking
>> progress. Failures once in a while is part of the norm, and the system
>> should take care of itself.
>>
>> On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie 
>> wrote:
>>
>>> Hi Spark users,
>>>
>>> I'm seeing the below exception in my spark streaming application. It
>>> happens in the first stage where the kinesis receivers receive records and
>>> perform a flatMap operation on the unioned Dstream. A coalesce step also
>>> happens as a part of that stage for optimizing the performance.
>>>
>>> This is happening on my spark 1.5 instance using kinesis-asl-1.5. When I
>>> look at the executor logs I do not see any exceptions indicating the root
>>> cause of why there is no connectivity on xxx.xx.xx.xxx:36684 or when did
>>> that service go down.
>>>
>>> Any help debugging this problem will be helpful.
>>>
>>> 15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while
>>> beginning fetch of 1 outstanding blocks
>>> java.io.IOException: Failed to connect to
>>> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684
>>> at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
>>> at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>>> at
>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
>>> at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>>> at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>>> at
>>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97)
>>> at
>>> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
>>> at
>>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595)
>>> at
>>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:593)
>>> at
>>> org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:579)
>>> at
>>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:623)
>>> at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>> at
>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139)
>>> at
>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135)
>>> at
>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at
>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>> 

Re: [SQL] Memory leak with spark streaming and spark sql in spark 1.5.1

2015-10-14 Thread Reynold Xin
+dev list

On Wed, Oct 14, 2015 at 1:07 AM, Terry Hoo  wrote:

> All,
>
> Does anyone meet memory leak issue with spark streaming and spark sql in
> spark 1.5.1? I can see the memory is increasing all the time when running
> this simple sample:
>
> val sc = new SparkContext(conf)
> val sqlContext = new HiveContext(sc)
> import sqlContext.implicits._
> val ssc = new StreamingContext(sc, Seconds(1))
> val s1 = ssc.socketTextStream("localhost", ).map(x =>
> (x,1)).reduceByKey((x : Int, y : Int) => x + y)
> s1.print
> s1.foreachRDD(rdd => {
>   rdd.foreach(_ => Unit)
>   sqlContext.createDataFrame(rdd).registerTempTable("A")
>   sqlContext.sql("""select * from A""").show(1)
> })
>
> After dump the the java heap, I can see there is about 22K entries
> in SQLListener._stageIdToStageMetrics after 2 hour running (other maps in
> this SQLListener has about 1K entries), is this a leak in SQLListener?
>
> Thanks!
> Terry
>


Strange spark problems among different versions

2015-10-14 Thread xia zhao
  Hi. I try to run the Spark Pi on the cluster, some strange errors happen
and I do not know what cause the error.
 
  When I am using the hadoop2.6 and spark-1.5.1-bin-hadoop2.6 the error log
is below:


118 10/01/01 11:59:14 ERROR yarn.ApplicationMaster: User class threw
exception: java.lang.reflect.InvocationTargetException

119 java.lang.reflect.InvocationTargetException

Caused by: java.lang.IllegalArgumentException:
java.lang.UnsatisfiedLinkError:
/opt/hadoop/tmp/nm-local-dir/usercache/root/appcache/application_1444839345484_0006/container_1444839345484_0006_01_01/tmp/snappy-1.0.4.1-8378427e-4d5c-42b1-ae49-c9600
c204bd7-libsnappyjava.so:
/opt/hadoop/tmp/nm-local-dir/usercache/root/appcache*/application_1444839345484_0006/container_14448
39345484_0006_01_01/tmp/snappy-1.0.4.1-8378427e-4d5c-42b1-ae49-c9600c204bd7-libsnappyjava.so:
cannot open shared object file: No such file or directory*



  When I am using the hadoop2.6 and spark-1.3.0-bin-hadoop2.4 the error
seems different and the log is below:


*   108 org.apache.spark.SparkException: Job aborted due to stage failure:
Task serialization failed: java.lang.reflect.InvocationTargetException*
109 sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
110
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
111
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
112 java.lang.reflect.Constructor.newInstance(Constructor.java:408)
113
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
114
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
115
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
116
org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:79)
117
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
118
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
119
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
120 org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
121
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:839)
122
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
123
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
124
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
125
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
126 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


What causes the error? the java compatibility or the hadoop compatibility?
Thank you for your help




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Strange-spark-problems-among-different-versions-tp25065.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Building with SBT and Scala 2.11

2015-10-14 Thread Jakob Odersky
[Repost to mailing list]

Hey,
Sorry about the typo, I of course meant hadoop-2.6, not 2.11.
I suspect something bad happened with my Ivy cache, since when reverting
back to scala 2.10, I got a very strange IllegalStateException (something
something IvyNode, I can't remember the details).
Kilking the cache made 2.10 work at least, I'll retry with 2.11

Thx for your help
On Oct 14, 2015 6:52 AM, "Ted Yu"  wrote:

> Adrian:
> Likely you were using maven.
>
> Jakob's report was with sbt.
>
> Cheers
>
> On Tue, Oct 13, 2015 at 10:05 PM, Adrian Tanase  wrote:
>
>> Do you mean hadoop-2.4 or 2.6? not sure if this is the issue but I'm also
>> compiling the 1.5.1 version with scala 2.11 and hadoop 2.6 and it works.
>>
>> -adrian
>>
>> Sent from my iPhone
>>
>> On 14 Oct 2015, at 03:53, Jakob Odersky  wrote:
>>
>> I'm having trouble compiling Spark with SBT for Scala 2.11. The command I
>> use is:
>>
>> dev/change-version-to-2.11.sh
>> build/sbt -Pyarn -Phadoop-2.11 -Dscala-2.11
>>
>> followed by
>>
>> compile
>>
>> in the sbt shell.
>>
>> The error I get specifically is:
>>
>> spark/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala:308:
>> no valid targets for annotation on value conf - it is discarded unused. You
>> may specify targets with meta-annotations, e.g. @(transient @param)
>> [error] private[netty] class NettyRpcEndpointRef(@transient conf:
>> SparkConf)
>> [error]
>>
>> However I am also getting a large amount of deprecation warnings, making
>> me wonder if I am supplying some incompatible/unsupported options to sbt. I
>> am using Java 1.8 and the latest Spark master sources.
>> Does someone know if I am doing anything wrong or is the sbt build broken?
>>
>> thanks for you help,
>> --Jakob
>>
>>
>


Re: Question about data frame partitioning in Spark 1.3.0

2015-10-14 Thread Michael Armbrust
This won't help as for two reasons:
 1) Its all still just creating lineage since you aren't caching the
partitioned data.  It will still fetch the shuffled blocks for each query.
 2) The query optimizer is not aware of RDD level partitioning since its
mostly a blackbox.

1) could be fixed by adding caching.  2) is on our roadmap (though you'd
have to use logical DataFrame expressions to do the partitioning instead of
a class based partitioner).

On Wed, Oct 14, 2015 at 8:45 AM, Cesar Flores  wrote:

>
> My current version of spark is 1.3.0 and my question is the next:
>
> I have large data frames where the main field is an user id. I need to do
> many group by's and joins using that field. Do the performance will
> increase if before doing any group by or join operation I first convert to
> rdd to partition by the user id? In other words trying something like the
> next lines in all my user data tables will improve the performance in the
> long run?:
>
> val partitioned_rdd = unpartitioned_df
>.map(row=>(row.getLong(0), row))
>.partitionBy(new HashPartitioner(200))
>.map(x => x._2)
>
> val partitioned_df = hc.createDataFrame(partitioned_rdd,
> unpartitioned_df.schema)
>
>
>
>
> Thanks a lot
> --
> Cesar Flores
>


Re: Programmatically connect to remote YARN in yarn-client mode

2015-10-14 Thread Florian Kaspar

Thank you, Marcelo,

so it is possible to simply copy the YARN configuration from the remote 
cluster to the local machine (assuming, the local machine can resolve 
the YARN host etc.) and just letting Spark do the rest?

This would actually be great!
Our "local" machine is just another virtual machine running in the same 
environment, connected to the cluster via a virtual network.


Cheers
Florian

Am 14.10.2015 um 19:13 schrieb Marcelo Vanzin:

On Wed, Oct 14, 2015 at 10:01 AM, Florian Kaspar
 wrote:

we are working on a project running on Spark. Currently we connect to a remote 
Spark-Cluster in Standalone mode to obtain the SparkContext using

new JavaSparkContext(new 
SparkConf().setAppName("").setMaster("spark://:7077"));
Can anyone tell me how to create a Spark context programmatically connecting to 
a remote YARN cluster?

You should be able to replace the standalone URL with "yarn-client",
and it should work, assuming you have the HADOOP_CONF_DIR (or
YARN_CONF_DIR) env variable pointing at a valid YARN configuration.

Note that if the machine running this code is far from the cluster
performance might not be that great.



--
Florian Kaspar

ONE LOGIC

Dr. Hans-Kapfinger-Str. 3, DE 94032 Passau
T +49 851 22590 25
florian.kas...@onelogic.de
www.onelogic.de


ONE LOGIC GmbH, HRB 7780 Amtsgericht Passau
Geschäftsführung Andreas Böhm, Prof. Dr. Andreas Pfeifer



Re: Programmatically connect to remote YARN in yarn-client mode

2015-10-14 Thread Marcelo Vanzin
On Wed, Oct 14, 2015 at 10:29 AM, Florian Kaspar  wrote:

> so it is possible to simply copy the YARN configuration from the remote
> cluster to the local machine (assuming, the local machine can resolve the
> YARN host etc.) and just letting Spark do the rest?
>

Yes, that should be all.

-- 
Marcelo


Programmatically connect to remote YARN in yarn-client mode

2015-10-14 Thread Florian Kaspar

Hey everyone,

we are working on a project running on Spark. Currently we connect to a 
remote Spark-Cluster in Standalone mode to obtain the SparkContext using


new JavaSparkContext(new 
SparkConf().setAppName("").setMaster("spark://:7077"));

Currently, we try to connect to a remote (!) YARN cluster instead. This 
should also happen programmatically. We use the Spark context for the 
whole lifetime of a Web-Application.
spark-submit is essentially no good option for us because we want to 
have a local Spark Driver connecting to a remote cluster with the driver 
interacting with other locally deployed modules.
Can anyone tell me how to create a Spark context programmatically 
connecting to a remote YARN cluster?
Tutorials online seem to have the precondition that the current 
application runs inside the cluster but not remotely.

Thank you in advance for your support!

Kind regards
Florian

--
Florian Kaspar

ONE LOGIC

Dr. Hans-Kapfinger-Str. 3, DE 94032 Passau
T +49 851 22590 25
florian.kas...@onelogic.de
www.onelogic.de


ONE LOGIC GmbH, HRB 7780 Amtsgericht Passau
Geschäftsführung Andreas Böhm, Prof. Dr. Andreas Pfeifer



Re: Building with SBT and Scala 2.11

2015-10-14 Thread Adrian Tanase
You are correct, of course. Gave up on sbt for spark long ago, I never managed 
to get it working while mvn works great.

Sent from my iPhone

On 14 Oct 2015, at 16:52, Ted Yu 
> wrote:

Adrian:
Likely you were using maven.

Jakob's report was with sbt.

Cheers

On Tue, Oct 13, 2015 at 10:05 PM, Adrian Tanase 
> wrote:
Do you mean hadoop-2.4 or 2.6? not sure if this is the issue but I'm also 
compiling the 1.5.1 version with scala 2.11 and hadoop 2.6 and it works.

-adrian

Sent from my iPhone

On 14 Oct 2015, at 03:53, Jakob Odersky 
> wrote:

I'm having trouble compiling Spark with SBT for Scala 2.11. The command I use 
is:

dev/change-version-to-2.11.sh
build/sbt -Pyarn -Phadoop-2.11 -Dscala-2.11

followed by

compile

in the sbt shell.

The error I get specifically is:

spark/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala:308: no 
valid targets for annotation on value conf - it is discarded unused. You may 
specify targets with meta-annotations, e.g. @(transient @param)
[error] private[netty] class NettyRpcEndpointRef(@transient conf: SparkConf)
[error]

However I am also getting a large amount of deprecation warnings, making me 
wonder if I am supplying some incompatible/unsupported options to sbt. I am 
using Java 1.8 and the latest Spark master sources.
Does someone know if I am doing anything wrong or is the sbt build broken?

thanks for you help,
--Jakob




Reusing Spark Functions

2015-10-14 Thread Starch, Michael D (398M)
All,

Is a Function object in Spark reused on a given executor, or is sent and 
deserialized with each new task?

On my project, we have functions that incur a very large setup cost, but then 
could be called many times.  Currently, I am using object deserialization to 
run this intensive setup,  I am wondering if this function is reused (within 
the context of the executor), or I am I deserializing this object over and over 
again for each task sent to a given worker.

Are there other ways to share objects between tasks on the same executor?

Many thanks,

Michael
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SPARK SQL Error

2015-10-14 Thread pnpritchard
I think the stack trace is quite informative.

Assuming line 10 of CsvDataSource is "val df =
sqlContext.load("com.databricks.spark.csv", Map("path" ->
args(1),"header"->"true"))", then the "args(1)" call is throwing an
ArrayIndexOutOfBoundsException. The reason for this is because you aren't
passing any command line arguments to your application. When using
spark-submit, you should put all of your app command line arguments at then
end, after the jar. In your example, I think you'd want:

spark-submit --master yarn --class org.spark.apache.CsvDataSource --files
hdfs:///people_csv /home/cloudera/Desktop/TestMain.jar hdfs:///people_csv

Also, I don't think it is necessary for you to have "--files
hdfs:///people_csv". The documentation for this option says "Comma-separated
list of files to be placed in the working directory of each executor." Since
you are going to read the "people_csv" file from hdfs, rather than the local
file system, it seems unnecessary.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-SQL-Error-tp25050p25064.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: thriftserver: access temp dataframe from in-memory of spark-shell

2015-10-14 Thread Michael Armbrust
Yes, call startWithContext from the spark shell:
https://github.com/apache/spark/blob/master/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala#L56

On Wed, Oct 14, 2015 at 7:10 AM,  wrote:

> Hi,
>
> Is it possible to load a spark-shell, in which we do any number of
> operations in a dataframe, then register it as a temporary table and get to
> see it through thriftserver?
> ps. or even better, submit a full job and store the dataframe in
> thriftserver in-memory before the job completes.
>
> I have been trying this without success, bee does not see the dataframes
> of the spark-shell’s hive context.
> If any of you confirms this possibility, I will try further ahead. So far
> it only seems to be able to manually read from persistent tables.
>
> Thanks for any insights,
> Saif
>
>


Re: Spark 1.5 java.net.ConnectException: Connection refused

2015-10-14 Thread Spark Newbie
I ran 2 different spark 1.5 clusters that have been running for more than a
day now. I do see jobs getting aborted due to task retry's maxing out
(default 4) due to ConnectionException. It seems like the executors die and
get restarted and I was unable to find the root cause (same app code and
conf used on spark 1.4.1 I don't see ConnectionException).

Another question related to this, what happens to the kinesis records
received when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I
am using) does the job gets resubmitted with the same received records? Or
does the kinesis-asl library get those records again based on sequence
numbers it tracks? It would good for me to understand the story around
lossless processing of kinesis records in Spark-1.5 + kinesis-asl-1.5 when
jobs are aborted. Any pointers or quick explanation would be very helpful.


On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das  wrote:

> Is this happening too often? Is it slowing things down or blocking
> progress. Failures once in a while is part of the norm, and the system
> should take care of itself.
>
> On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie 
> wrote:
>
>> Hi Spark users,
>>
>> I'm seeing the below exception in my spark streaming application. It
>> happens in the first stage where the kinesis receivers receive records and
>> perform a flatMap operation on the unioned Dstream. A coalesce step also
>> happens as a part of that stage for optimizing the performance.
>>
>> This is happening on my spark 1.5 instance using kinesis-asl-1.5. When I
>> look at the executor logs I do not see any exceptions indicating the root
>> cause of why there is no connectivity on xxx.xx.xx.xxx:36684 or when did
>> that service go down.
>>
>> Any help debugging this problem will be helpful.
>>
>> 15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while
>> beginning fetch of 1 outstanding blocks
>> java.io.IOException: Failed to connect to
>> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>> at
>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>> at
>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97)
>> at
>> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
>> at
>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595)
>> at
>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:593)
>> at
>> org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:579)
>> at
>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:623)
>> at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>> at
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139)
>> at
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135)
>> at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>> at
>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>> at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>> at
>> 

Re: Spark DataFrame GroupBy into List

2015-10-14 Thread Deenar Toraskar
collect_set and collect_list are built-in User Defined functions see
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

On 14 October 2015 at 03:45, SLiZn Liu  wrote:

> Hi Michael,
>
> Can you be more specific on `collect_set`? Is it a built-in function or,
> if it is an UDF, how it is defined?
>
> BR,
> Todd Leo
>
> On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust 
> wrote:
>
>> import org.apache.spark.sql.functions._
>>
>> df.groupBy("category")
>>   .agg(callUDF("collect_set", df("id")).as("id_list"))
>>
>> On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu 
>> wrote:
>>
>>> Hey Spark users,
>>>
>>> I'm trying to group by a dataframe, by appending occurrences into a list
>>> instead of count.
>>>
>>> Let's say we have a dataframe as shown below:
>>>
>>> | category | id |
>>> |  |:--:|
>>> | A| 1  |
>>> | A| 2  |
>>> | B| 3  |
>>> | B| 4  |
>>> | C| 5  |
>>>
>>> ideally, after some magic group by (reverse explode?):
>>>
>>> | category | id_list  |
>>> |  |  |
>>> | A| 1,2  |
>>> | B| 3,4  |
>>> | C| 5|
>>>
>>> any tricks to achieve that? Scala Spark API is preferred. =D
>>>
>>> BR,
>>> Todd Leo
>>>
>>>
>>>
>>>
>>


Re: OutOfMemoryError When Reading Many json Files

2015-10-14 Thread Deenar Toraskar
Hi

Why dont you check if you can just process the large file standalone and
then do the outer loop next.

sqlContext.read.json(jsonFile) .select($"some", $"fields") .withColumn(
"new_col", some_transformations($"col")) .rdd.map( x: Row => (k, v) )
.combineByKey()

Deenar

On 14 October 2015 at 05:18, SLiZn Liu  wrote:

> Hey Spark Users,
>
> I kept getting java.lang.OutOfMemoryError: Java heap space as I read a
> massive amount of json files, iteratively via read.json(). Even the
> result RDD is rather small, I still get the OOM Error. The brief structure
> of my program reads as following, in psuedo-code:
>
> file_path_list.map{ jsonFile: String =>
>   sqlContext.read.json(jsonFile)
> .select($"some", $"fields")
> .withColumn("new_col", some_transformations($"col"))
> .rdd.map( x: Row => (k, v) )
> .combineByKey() // which groups a column into item lists by another 
> column as keys
> }.reduce( (i, j) => i.union(j) )
> .combineByKey() // which combines results from all json files
>
> I confess some of the json files are Gigabytes huge, yet the combined RDD
> is in a few Megabytes. I’m not familiar with the under-the-hood mechanism,
> but my intuitive understanding of how the code executes is, read the file
> once a time (where I can easily modify map to foreach when fetching from
> file_path_list, if that’s the case), do the inner transformation on DF
> and combine, then reduce and do the outer combine immediately, which
> doesn’t require to hold all RDDs generated from all files in the memory.
> Obviously, as my code raises OOM Error, I must have missed something
> important.
>
> From the debug log, I can tell the OOM Error happens when reading the same
> file, which is in a modest size of 2GB, while driver.memory is set to 13GB,
> and the available memory size before the code execution is around 8GB, on
> my standalone machine running as “local[8]”.
>
> To overcome this, I also tried to initialize an empty universal RDD
> variable, iteratively read one file at a time using foreach, then instead
> of reduce, simply combine each RDD generated by the json files, except the
> OOM Error remains.
>
> Other configurations:
>
>- set(“spark.storage.memoryFraction”, “0.1”) // no cache of RDD is used
>- set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
>
> Any suggestions other than scale up/out the spark cluster?
>
> BR,
> Todd Leo
> ​
>


spark sql OOM

2015-10-14 Thread Andy Zhao
Hi guys,

I'm testing sparkSql 1.5.1, and I use hadoop-2.5.0-cdh5.3.2.
One sql which can ran successfully using hive failed when I ran it using
sparkSql.
I got the following errno:

 

I read the source code, It seems that the compute method of HadoopRDD is
called infinite times, every time it got called, some new instance need to
be allocated on the heap and finally OOM.

Does anyone have same problem?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-OOM-tp25059.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Machine learning with spark (book code example error)

2015-10-14 Thread Nick Pentreath
Hi there. I'm the author of the book (thanks for buying it by the way :)

Ideally if you're having any trouble with the book or code, it's best to
contact the publisher and submit a query (
https://www.packtpub.com/books/content/support/17400)

However, I can help with this issue. The problem is that the "testLabels"
code needs to be indented over multiple lines:

val testPath = "/PATH/20news-bydate-test/*"
val testRDD = sc.wholeTextFiles(testPath)
val testLabels = testRDD.map { case (file, text) =>
val topic = file.split("/").takeRight(2).head
newsgroupsMap(topic)
}

As it is in the sample code attached. If you copy the whole indented block
(or line by line) into the console, it should work - I've tested all the
sample code again and indeed it works for me.

Hope this helps
Nick

On Tue, Oct 13, 2015 at 8:31 PM, Zsombor Egyed 
wrote:

> Hi!
>
> I was reading the ML with spark book, and I was very interested about the
> 9. chapter (text mining), so I tried code examples.
>
> Everything was fine, but in this line:
>
> val testLabels = testRDD.map {
>
> case (file, text) => val topic = file.split("/").takeRight(2).head
>
> newsgroupsMap(topic) }
>
> I got an error: "value newsgroupsMap is not a member of String"
>
> Other relevant part of the code:
> val path = "/PATH/20news-bydate-train/*"
> val rdd = sc.wholeTextFiles(path)
> val newsgroups = rdd.map { case (file, text) =>
> file.split("/").takeRight(2).head }
>
> val tf = hashingTF.transform(tokens)
> val idf = new IDF().fit(tf)
> val tfidf = idf.transform(tf)
>
> val newsgroupsMap = newsgroups.distinct.collect().zipWithIndex.toMap
> val zipped = newsgroups.zip(tfidf)
> val train = zipped.map { case (topic, vector)
> =>LabeledPoint(newsgroupsMap(topic), vector) }
> train.cache
>
> val model = NaiveBayes.train(train, lambda = 0.1)
>
> val testPath = "/PATH//20news-bydate-test/*"
> val testRDD = sc.wholeTextFiles(testPath)
> val testLabels = testRDD.map { case (file, text) => val topic =
> file.split("/").takeRight(2).head newsgroupsMap(topic) }
>
> I attached the whole program code.
> Can anyone help, what the problem is?
>
> Regards,
> Zsombor
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


[SQL] Memory leak with spark streaming and spark sql in spark 1.5.1

2015-10-14 Thread Terry Hoo
All,

Does anyone meet memory leak issue with spark streaming and spark sql in
spark 1.5.1? I can see the memory is increasing all the time when running
this simple sample:

val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val ssc = new StreamingContext(sc, Seconds(1))
val s1 = ssc.socketTextStream("localhost", ).map(x =>
(x,1)).reduceByKey((x : Int, y : Int) => x + y)
s1.print
s1.foreachRDD(rdd => {
  rdd.foreach(_ => Unit)
  sqlContext.createDataFrame(rdd).registerTempTable("A")
  sqlContext.sql("""select * from A""").show(1)
})

After dump the the java heap, I can see there is about 22K entries
in SQLListener._stageIdToStageMetrics after 2 hour running (other maps in
this SQLListener has about 1K entries), is this a leak in SQLListener?

Thanks!
Terry


Re: HiveThriftServer not registering with Zookeeper

2015-10-14 Thread Xiaoyu Wang

I create a jira and pull request for this issue.
https://issues.apache.org/jira/browse/SPARK-11100

在 2015年10月13日 16:36, Xiaoyu Wang 写道:

I have the same issue.
I think spark thrift server is not suport HA with zookeeper now.

在 2015年09月01日 18:10, sreeramvenkat 写道:

Hi,

  I am trying to setup dynamic service discovery for HiveThriftServer 
in a

two node cluster.

In the thrift server logs, I am not seeing itself registering with 
zookeeper

- no znode is getting created.

Pasting relevant section from my $SPARK_HOME/conf/hive-site.xml


   hive.zookeeper.quorum
   host1:port1,host2:port2



hive.server2.support.dynamic.service.discovery
   true



   hive.server2.zookeeper.namespace
   hivethriftserver2



   hive.zookeeper.client.port
   2181



  Any help is appreciated.

PS: Zookeeper is working fine and zknodes are getting created with
hiveserver2. This issue happens only with hivethriftserver.

Regards,
Sreeram



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveThriftServer-not-registering-with-Zookeeper-tp24534.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
In the receiver-based kafka streaming model, given that each receiver
starts as a long-running task, one can rely in a certain degree of data
locality based on the kafka partitioning:  Data published on a given
topic/partition will land on the same spark streaming receiving node until
the receiver dies and needs to be restarted somewhere else.

As I understand, the direct-kafka streaming model just computes offsets and
relays the work to a KafkaRDD. How is the execution locality compared to
the receiver-based approach?

thanks, Gerard.


EdgeTriplet showing two versions of the same vertex

2015-10-14 Thread rohit13k
Hi

I have a scenario where in the graph I am doing graph.vertices.collect() and
getting the 5 vertex i added each of my vertex is an scala object as shown
below

class NodeExact(nodeId: Long, summ: Array[collection.mutable.Map[Long,
Long]]) extends Serializable {
  var node: Long = nodeId
  var currentsuperstep = 0
  var summary: Array[collection.mutable.Map[Long, Long]] = summ
  var ischanged = false
  def getsummary(window: Long): Int = {
var i = 0
var sum = summary.clone()
sum=sum.filter({ p => p != null })
sum.foreach(f => f.filter { case (value, time) => time > window })
var temp: scala.collection.Set[Long] = null
for (i <- 0 to sum.length - 1) {
  if (temp == null)
temp = sum(i).keySet
  else
temp ++ sum(i).keySet
}
return temp.size
  }
}

there are multiple edges between nodes in the graph i.e both a -> b and b->a 
now when i do 
graph.triplets.collect()

I am getting edgetriplets with source id as *a* but the src attr of* a* is
not same as the value of *a* in the vertexRDD for some edge triplets where
as for some edge triplets its same as vertexRDD. 

I am not able to understand how come src Attr for the same vertex for two
edgetripplets can have different values? It should always have the same attr
as in vertexRDD?

Please let me know if I am missing something.

Thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/EdgeTriplet-showing-two-versions-of-the-same-vertex-tp25058.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: graphx - mutable?

2015-10-14 Thread rohit13k
Hi

I am also working on the same area where the graph evolves over time and the
current approach of rebuilding the graph again and again is very slow and
memory consuming did you find any workaround?
What was your usecase?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/graphx-mutable-tp15777p25057.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why is the Spark Web GUI failing with JavaScript "Uncaught SyntaxError"?

2015-10-14 Thread Joshua Fox
Thank you!

It seems that the the history server at port 18080 also gives access to the
Spark GUI as below

Following your tip, I see that the  YARN ResourceManager GUI on 8088 indeed
has that ApplicationMaster link, though to a private rather than public IP;
replacing IPs brings me to the same Spark GUI.

Joshua
[image: Inline image 3]




On Tue, Oct 13, 2015 at 6:23 PM, Jonathan Kelly 
wrote:

> Joshua,
>
> Since Spark is configured to run on YARN in EMR, instead of viewing the
> Spark application UI at port 4040, you should instead start from the YARN
> ResourceManager (on port 8088), then click on the ApplicationMaster link
> for the Spark application you are interested in. This will take you to the
> YARN ProxyServer on port 20888, which will proxy you through to the Spark
> UI for the application (which renders correctly when viewed this way). This
> works even if the Spark UI is running on a port other than 4040 and even in
> yarn-cluster mode when the Spark driver is running on a slave node.
>
> Hope this helps,
> Jonathan
>
> On Tue, Oct 13, 2015 at 7:19 AM, Jean-Baptiste Onofré 
> wrote:
>
>> Thanks for the update Joshua.
>>
>> Let me try with Spark 1.4.1.
>>
>> I keep you posted.
>>
>> Regards
>> JB
>>
>> On 10/13/2015 04:17 PM, Joshua Fox wrote:
>>
>>>   * Spark 1.4.1, part of EMR emr-4.0.0
>>>   * Chrome Version 41.0.2272.118 (64-bit) on Ubuntu
>>>
>>>
>>> On Tue, Oct 13, 2015 at 3:27 PM, Jean-Baptiste Onofré >> > wrote:
>>>
>>> Hi Joshua,
>>>
>>> What's the Spark version and what's your browser ?
>>>
>>> I just tried on Spark 1.6-SNAPSHOT with firefox and it works fine.
>>>
>>> Thanks
>>> Regards
>>> JB
>>>
>>> On 10/13/2015 02:17 PM, Joshua Fox wrote:
>>>
>>> I am accessing the Spark Jobs Web GUI, running on AWS EMR.
>>>
>>> I can access this webapp (port 4040 as per default), but it only
>>> half-renders, producing "Uncaught SyntaxError: Unexpected token
>>> <"
>>>
>>> Here is a screenshot  including
>>> Chrome
>>> Developer Console.
>>>
>>> Screenshot 
>>>
>>> Here are some of the error messages in my Chrome console.
>>>
>>> /Uncaught SyntaxError: Unexpected token <
>>> (index):3 Resource interpreted as Script but transferred with
>>> MIME type
>>> text/html:
>>> "
>>> http://ec2-52-89-59-167.us-west-2.compute.amazonaws.com:4040/jobs/;.
>>> (index):74 Uncaught ReferenceError: drawApplicationTimeline is
>>> not defined
>>> (index):12 Resource interpreted as Image but transferred with
>>> MIME type
>>> text/html:
>>> "
>>> http://ec2-52-89-59-167.us-west-2.compute.amazonaws.com:4040/jobs/;
>>>
>>> /
>>> Note that the History GUI at port 18080 and the Hadoop GUI at
>>> port 8088
>>> work fine, and the Spark jobs GUI does partly render. So, it
>>> seems that
>>> my browser proxy is not the cause of this problem.
>>>
>>> Joshua
>>>
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org 
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> 
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>>>
>>>
>>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


java.io.InvalidClassException using spark1.4.1 for Terasort

2015-10-14 Thread Shreeharsha G Neelakantachar
Hi,
  I have Terasort being executed on spark1.4.1 with hadoop 2.7 for a 
datasize of 1TB. When i change my os user from spark1 to hduser, i am 
observing below exception.  Please let me know what is wrong here. I tried 
to update scala-2.10 to 2.11 and compiled Terasort scala code using sbt. 
But nothing is helping here. I had created the spark1 (spark1 to spark4) 
user for a multi-tenancy run. 

2015-10-13 03:42:54,840 [sparkDriver-akka.actor.default-dispatcher-2] INFO 
 org.apache.spark.scheduler.TaskSetManager - Starting task 199.0 in stage 
0.0 (TID 199, 9.37.251.65, ANY, 1544 bytes)
2015-10-13 03:42:54,843 [task-result-getter-2] WARN 
org.apache.spark.scheduler.TaskSetManager - Lost task 173.0 in stage 0.0 
(TID 173, 9.37.251.65): java.io.InvalidClassException: 
scala.reflect.ClassTag$$anon$1; local class incompatible: stream classdesc 
serialVersionUID = -4937928798201944954, local class serialVersionUID = 
-8102093212602380348
at 
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Any help in this regard would be of great help. 

Regards
Shreeharsha GN




Re: Machine learning with spark (book code example error)

2015-10-14 Thread Fengdong Yu
Don’t recommend this code style, you’d better brace the function block.

val testLabels = testRDD.map { case (file, text) => {
  val topic = file.split("/").takeRight(2).head
 newsgroupsMap(topic)
} }


> On Oct 14, 2015, at 15:46, Nick Pentreath  wrote:
> 
> Hi there. I'm the author of the book (thanks for buying it by the way :)
> 
> Ideally if you're having any trouble with the book or code, it's best to 
> contact the publisher and submit a query 
> (https://www.packtpub.com/books/content/support/17400 
> ) 
> 
> However, I can help with this issue. The problem is that the "testLabels" 
> code needs to be indented over multiple lines:
> 
> val testPath = "/PATH/20news-bydate-test/*"
> val testRDD = sc.wholeTextFiles(testPath)
> val testLabels = testRDD.map { case (file, text) => 
>   val topic = file.split("/").takeRight(2).head
>   newsgroupsMap(topic)
> }
> 
> As it is in the sample code attached. If you copy the whole indented block 
> (or line by line) into the console, it should work - I've tested all the 
> sample code again and indeed it works for me.
> 
> Hope this helps
> Nick
> 
> On Tue, Oct 13, 2015 at 8:31 PM, Zsombor Egyed  > wrote:
> Hi!
> 
> I was reading the ML with spark book, and I was very interested about the 9. 
> chapter (text mining), so I tried code examples. 
> 
> Everything was fine, but in this line:
> val testLabels = testRDD.map { 
> case (file, text) => val topic = file.split("/").takeRight(2).head
> newsgroupsMap(topic) }
> I got an error: "value newsgroupsMap is not a member of String"
> 
> Other relevant part of the code:
> val path = "/PATH/20news-bydate-train/*"
> val rdd = sc.wholeTextFiles(path) 
> val newsgroups = rdd.map { case (file, text) => 
> file.split("/").takeRight(2).head }
> 
> val tf = hashingTF.transform(tokens)
> val idf = new IDF().fit(tf)
> val tfidf = idf.transform(tf)
> 
> val newsgroupsMap = newsgroups.distinct.collect().zipWithIndex.toMap
> val zipped = newsgroups.zip(tfidf)
> val train = zipped.map { case (topic, vector) 
> =>LabeledPoint(newsgroupsMap(topic), vector) }
> train.cache
> 
> val model = NaiveBayes.train(train, lambda = 0.1)
> 
> val testPath = "/PATH//20news-bydate-test/*"
> val testRDD = sc.wholeTextFiles(testPath)
> val testLabels = testRDD.map { case (file, text) => val topic = 
> file.split("/").takeRight(2).head newsgroupsMap(topic) }
> 
> I attached the whole program code. 
> Can anyone help, what the problem is?
> 
> Regards,
> Zsombor
> 
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 



spark sql OOM

2015-10-14 Thread Andy Zhao
Hi guys, 

I'm testing sparkSql 1.5.1, and I use hadoop-2.5.0-cdh5.3.2. 
One sql which can ran successfully using hive failed when I ran it using
sparkSql. 
I got the following errno: 


 

I read the source code, It seems that the compute method of HadoopRDD is
called infinite times, every time it got called, some new instance need to
be allocated on the heap and finally OOM. 

Does anyone have same problem? 

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-OOM-tp25060.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



If you use Spark 1.5 and disabled Tungsten mode ...

2015-10-14 Thread Reynold Xin
Can you reply to this email and provide us with reasons why you disable it?

Thanks.


Re: TTL for saveAsObjectFile()

2015-10-14 Thread Calvin Jia
Hi Antonio,

I don't think Spark provides a way to pass down params with
saveAsObjectFile. One way could be to pass a default TTL in the
configuration, but the approach doesn't make much sense since TTL is not
necessarily uniform.

Baidu will be talking about their use of TTL in Tachyon with Spark in this
meetup , which may be
helpful to understanding different ways to integrate.

Hope this helps,
Calvin

On Tue, Oct 13, 2015 at 1:07 PM, antoniosi  wrote:

> Hi,
>
> I am using RDD.saveAsObjectFile() to save the RDD dataset to Tachyon. In
> version 0.8, Tachyon will support for TTL for saved file. Is that supported
> from Spark as well? Is there a way I could specify an TTL for a saved
> object
> file?
>
> Thanks.
>
> Antonio.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/TTL-for-saveAsObjectFile-tp25051.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Hi Cody,

I think that I misused the term 'data locality'. I think I should better
call it "node affinity"  instead, as this is what I would like to have:
For as long as an executor is available, I would like to have the same
kafka partition processed by the same node in order to take advantage of
local in-memory structures.

In the receiver-based mode this was a given. Any ideas how to achieve that
with the direct stream approach?

-greetz, Gerard.


On Wed, Oct 14, 2015 at 4:31 PM, Cody Koeninger  wrote:

> Assumptions about locality in spark are not very reliable, regardless of
> what consumer you use.  Even if you have locality preferences, and locality
> wait turned up really high, you still have to account for losing executors.
>
> On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas 
> wrote:
>
>> Thanks Saisai, Mishra,
>>
>> Indeed, that hint will only work on a case where the Spark executor is
>> co-located with the Kafka broker.
>> I think the answer to my question as stated  is that there's no warranty
>> of where the task will execute as it will depend on the scheduler and
>> cluster resources available  (Mesos in our case).
>> Therefore, any assumptions made about data locality using the
>> consumer-based approach need to be reconsidered when migrating to the
>> direct stream.
>>
>> ((In our case, we were using local caches to decide when a given
>> secondary index for a record should be produced and written.))
>>
>> -kr, Gerard.
>>
>>
>>
>>
>> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao 
>> wrote:
>>
>>> This preferred locality is a hint to spark to schedule Kafka tasks on
>>> the preferred nodes, if Kafka and Spark are two separate cluster, obviously
>>> this locality hint takes no effect, and spark will schedule tasks following
>>> node-local -> rack-local -> any pattern, like any other spark tasks.
>>>
>>> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra >> > wrote:
>>>
 Hi Gerard,
 I am also trying to understand the same issue. Whatever code I have
 seen it looks like once Kafka RDD is constructed the execution of that RDD
 is upto the task scheduler and it can schedule the partitions based on the
 load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
 maps to the Kafka partitions host . So if Kafka and Spark are co hosted
 probably this will work. If not, I am not sure how to get data locality for
 a partition.
 Others,
 correct me if there is a way.

 On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
 wrote:

> In the receiver-based kafka streaming model, given that each receiver
> starts as a long-running task, one can rely in a certain degree of data
> locality based on the kafka partitioning:  Data published on a given
> topic/partition will land on the same spark streaming receiving node until
> the receiver dies and needs to be restarted somewhere else.
>
> As I understand, the direct-kafka streaming model just computes
> offsets and relays the work to a KafkaRDD. How is the execution locality
> compared to the receiver-based approach?
>
> thanks, Gerard.
>



 --

 Regards,
 Rishitesh Mishra,
 SnappyData . (http://www.snappydata.io/)

 https://in.linkedin.com/in/rishiteshmishra

>>>
>>>
>>
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Cody Koeninger
What I'm saying is that it's not a given with spark, even in receiver-based
mode, because as soon as you lose an executor you'll have a rebalance.

Spark's model in general isn't a good fit for pinning work to specific
nodes.

If you really want to try and fake this, you can override
getPreferredLocations and set spark.locality.wait to a high value.



On Wed, Oct 14, 2015 at 2:45 PM, Gerard Maas  wrote:

> Hi Cody,
>
> I think that I misused the term 'data locality'. I think I should better
> call it "node affinity"  instead, as this is what I would like to have:
> For as long as an executor is available, I would like to have the same
> kafka partition processed by the same node in order to take advantage of
> local in-memory structures.
>
> In the receiver-based mode this was a given. Any ideas how to achieve that
> with the direct stream approach?
>
> -greetz, Gerard.
>
>
> On Wed, Oct 14, 2015 at 4:31 PM, Cody Koeninger 
> wrote:
>
>> Assumptions about locality in spark are not very reliable, regardless of
>> what consumer you use.  Even if you have locality preferences, and locality
>> wait turned up really high, you still have to account for losing executors.
>>
>> On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas 
>> wrote:
>>
>>> Thanks Saisai, Mishra,
>>>
>>> Indeed, that hint will only work on a case where the Spark executor is
>>> co-located with the Kafka broker.
>>> I think the answer to my question as stated  is that there's no warranty
>>> of where the task will execute as it will depend on the scheduler and
>>> cluster resources available  (Mesos in our case).
>>> Therefore, any assumptions made about data locality using the
>>> consumer-based approach need to be reconsidered when migrating to the
>>> direct stream.
>>>
>>> ((In our case, we were using local caches to decide when a given
>>> secondary index for a record should be produced and written.))
>>>
>>> -kr, Gerard.
>>>
>>>
>>>
>>>
>>> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao 
>>> wrote:
>>>
 This preferred locality is a hint to spark to schedule Kafka tasks on
 the preferred nodes, if Kafka and Spark are two separate cluster, obviously
 this locality hint takes no effect, and spark will schedule tasks following
 node-local -> rack-local -> any pattern, like any other spark tasks.

 On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra <
 rmis...@snappydata.io> wrote:

> Hi Gerard,
> I am also trying to understand the same issue. Whatever code I have
> seen it looks like once Kafka RDD is constructed the execution of that RDD
> is upto the task scheduler and it can schedule the partitions based on the
> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK 
> it
> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
> probably this will work. If not, I am not sure how to get data locality 
> for
> a partition.
> Others,
> correct me if there is a way.
>
> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
> wrote:
>
>> In the receiver-based kafka streaming model, given that each receiver
>> starts as a long-running task, one can rely in a certain degree of data
>> locality based on the kafka partitioning:  Data published on a given
>> topic/partition will land on the same spark streaming receiving node 
>> until
>> the receiver dies and needs to be restarted somewhere else.
>>
>> As I understand, the direct-kafka streaming model just computes
>> offsets and relays the work to a KafkaRDD. How is the execution locality
>> compared to the receiver-based approach?
>>
>> thanks, Gerard.
>>
>
>
>
> --
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>


>>>
>>
>


Re: Spark 1.5 java.net.ConnectException: Connection refused

2015-10-14 Thread Tathagata Das
When a job gets aborted, it means that the internal tasks were retried a
number of times before the system gave up. You can control the number
retries (see Spark's configuration page). The job by default does not get
resubmitted.

You could try getting the logs of the failed executor, to see what caused
the failure. Could be a memory limit issue, and YARN killing it somehow.



On Wed, Oct 14, 2015 at 11:05 AM, Spark Newbie 
wrote:

> Is it slowing things down or blocking progress.
> >> I didn't see slowing of processing, but I do see jobs aborted
> consecutively for a period of 18 batches (5 minute batch intervals). So I
> am worried about what happened to the records that these jobs were
> processing.
> Also, one more thing to mention is that the
> StreamingListenerBatchCompleted.numRecords information shows all received
> records as processed even if the batch/job failed. The processing time as
> well shows as the same time it takes for a successful batch.
> It seems like it is the numRecords which was the input to the batch
> regardless of whether they were successfully processed or not.
>
> On Wed, Oct 14, 2015 at 11:01 AM, Spark Newbie 
> wrote:
>
>> I ran 2 different spark 1.5 clusters that have been running for more than
>> a day now. I do see jobs getting aborted due to task retry's maxing out
>> (default 4) due to ConnectionException. It seems like the executors die and
>> get restarted and I was unable to find the root cause (same app code and
>> conf used on spark 1.4.1 I don't see ConnectionException).
>>
>> Another question related to this, what happens to the kinesis records
>> received when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I
>> am using) does the job gets resubmitted with the same received records? Or
>> does the kinesis-asl library get those records again based on sequence
>> numbers it tracks? It would good for me to understand the story around
>> lossless processing of kinesis records in Spark-1.5 + kinesis-asl-1.5 when
>> jobs are aborted. Any pointers or quick explanation would be very helpful.
>>
>>
>> On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das 
>> wrote:
>>
>>> Is this happening too often? Is it slowing things down or blocking
>>> progress. Failures once in a while is part of the norm, and the system
>>> should take care of itself.
>>>
>>> On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie >> > wrote:
>>>
 Hi Spark users,

 I'm seeing the below exception in my spark streaming application. It
 happens in the first stage where the kinesis receivers receive records and
 perform a flatMap operation on the unioned Dstream. A coalesce step also
 happens as a part of that stage for optimizing the performance.

 This is happening on my spark 1.5 instance using kinesis-asl-1.5. When
 I look at the executor logs I do not see any exceptions indicating the root
 cause of why there is no connectivity on xxx.xx.xx.xxx:36684 or when did
 that service go down.

 Any help debugging this problem will be helpful.

 15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks
 java.io.IOException: Failed to connect to
 ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
 at
 org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97)
 at
 org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
 at
 org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595)
 at
 org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:593)
 at
 org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:579)
 at
 org.apache.spark.storage.BlockManager.get(BlockManager.scala:623)
 at
 

Re: spark-avro 2.0.1 generates strange schema (spark-avro 1.0.0 is fine)

2015-10-14 Thread Josh Rosen
Can you report this as an issue at
https://github.com/databricks/spark-avro/issues so that it's easier to
track? Thanks!

On Wed, Oct 14, 2015 at 1:38 PM, Alex Nastetsky <
alex.nastet...@vervemobile.com> wrote:

> I save my dataframe to avro with spark-avro 1.0.0 and it looks like this
> (using avro-tools tojson):
>
> {"field1":"value1","field2":976200}
> {"field1":"value2","field2":976200}
> {"field1":"value3","field2":614100}
>
> But when I use spark-avro 2.0.1, it looks like this:
>
> {"field1":{"string":"value1"},"field2":{"long":976200}}
> {"field1":{"string":"value2"},"field2":{"long":976200}}
> {"field1":{"string":"value3"},"field2":{"long":614100}}
>
> At this point I'd be happy to use spark-avro 1.0.0, except that it doesn't
> seem to support specifying a compression codec (I want deflate).
>


Re: Question about data frame partitioning in Spark 1.3.0

2015-10-14 Thread Michael Armbrust
Caching the partitioned_df  <- this one, but you have to do the
partitioning using something like sql("SELECT * FROM ... CLUSTER BY a") as
there is no such operation exposed on dataframes.

2) Here is the JIRA: https://issues.apache.org/jira/browse/SPARK-5354


Re: Problem installing Sparck on Windows 8

2015-10-14 Thread Marco Mistroni
Thanks Steve
followed instruction, spk is started and i can see the web ui
however after launching spark-shell i am getting another exception. is this
preventing me from actually using spark?

kind regards
 marco

15/10/14 20:52:35 WARN : Your hostname, MarcoLaptop resolves to a
loopback/non-r
eachable address: fe80:0:0:0:c5ed:a66d:9d95:5caa%wlan2, but we couldn't
find any
 external IP address!
java.lang.RuntimeException: java.lang.RuntimeException: The root scratch
dir: /t
mp/hive on HDFS should be writable. Current permissions are: -
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav
a:522)
at
org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.s
cala:171)
at
org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveCo
ntext.scala:162)
at
org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala
:160)
at
org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:167)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)

at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruct
orAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingC
onstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at
org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:10
28)
at $iwC$$iwC.(:9)
at $iwC.(:18)
at (:20)
at .(:24)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
sorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:
1065)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:
1340)
at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840
)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:8
57)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.sca
la:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at
org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply
(SparkILoopInit.scala:132)
at
org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply
(SparkILoopInit.scala:124)
at
org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:324)
at
org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoop
Init.scala:124)
at
org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:64)

at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$Spark
ILoop$$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:974)
at
org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.s
cala:159)
at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:64)
at
org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkIL
oopInit.scala:108)
at
org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:
64)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$Spark
ILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:991)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$Spark
ILoop$$process$1.apply(SparkILoop.scala:945)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$Spark
ILoop$$process$1.apply(SparkILoop.scala:945)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClass
Loader.scala:135)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$pr
ocess(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
sorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSub
mit$$runMain(SparkSubmit.scala:672)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:18
0)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at 

Re: Running in cluster mode causes native library linking to fail

2015-10-14 Thread Bernardo Vecchia Stein
Hi Deenar,

Yes, the native library is installed on all machines of the cluster. I
tried a simpler approach by just using System.load() and passing the exact
path of the library, and things still won't work (I get exactly the same
error and message).

Any ideas of what might be failing?

Thank you,
Bernardo

On 14 October 2015 at 02:50, Deenar Toraskar 
wrote:

> Hi Bernardo
>
> Is the native library installed on all machines of your cluster and are
> you setting both the spark.driver.extraLibraryPath and
> spark.executor.extraLibraryPath ?
>
> Deenar
>
>
>
> On 14 October 2015 at 05:44, Bernardo Vecchia Stein <
> bernardovst...@gmail.com> wrote:
>
>> Hello,
>>
>> I am trying to run some scala code in cluster mode using spark-submit.
>> This code uses addLibrary to link with a .so that exists in the machine,
>> and this library has a function to be called natively (there's a native
>> definition as needed in the code).
>>
>> The problem I'm facing is: whenever I try to run this code in cluster
>> mode, spark fails with the following message when trying to execute the
>> native function:
>> java.lang.UnsatisfiedLinkError:
>> org.name.othername.ClassName.nativeMethod([B[B)[B
>>
>> Apparently, the library is being found by spark, but the required
>> function isn't found.
>>
>> When trying to run in client mode, however, this doesn't fail and
>> everything works as expected.
>>
>> Does anybody have any idea of what might be the problem here? Is there
>> any bug that could be related to this when running in cluster mode?
>>
>> I appreciate any help.
>> Thanks,
>> Bernardo
>>
>
>


Re: Running in cluster mode causes native library linking to fail

2015-10-14 Thread Renato Marroquín Mogrovejo
Hi Bernardo,

So is this in distributed mode? or single node? Maybe fix the issue with a
single node first ;)
You are right that Spark finds the library but not the *.so file. I also
use System.load() with LD_LIBRARY_PATH set, and I am able to
execute without issues. Maybe you'd like to double check paths, env
variables, or the parameters spark.driver.extraLibraryPath,
spark.executor.extraLibraryPath.


Best,

Renato M.

2015-10-14 21:40 GMT+02:00 Bernardo Vecchia Stein 
:

> Hi Renato,
>
> I have done that as well, but so far no luck. I believe spark is finding
> the library correctly, otherwise the error message would be "no libraryname
> found" or something like that. The problem seems to be something else, and
> I'm not sure how to find it.
>
> Thanks,
> Bernardo
>
> On 14 October 2015 at 16:28, Renato Marroquín Mogrovejo <
> renatoj.marroq...@gmail.com> wrote:
>
>> You can also try setting the env variable LD_LIBRARY_PATH to point where
>> your compiled libraries are.
>>
>>
>> Renato M.
>>
>> 2015-10-14 21:07 GMT+02:00 Bernardo Vecchia Stein <
>> bernardovst...@gmail.com>:
>>
>>> Hi Deenar,
>>>
>>> Yes, the native library is installed on all machines of the cluster. I
>>> tried a simpler approach by just using System.load() and passing the exact
>>> path of the library, and things still won't work (I get exactly the same
>>> error and message).
>>>
>>> Any ideas of what might be failing?
>>>
>>> Thank you,
>>> Bernardo
>>>
>>> On 14 October 2015 at 02:50, Deenar Toraskar 
>>> wrote:
>>>
 Hi Bernardo

 Is the native library installed on all machines of your cluster and are
 you setting both the spark.driver.extraLibraryPath and
 spark.executor.extraLibraryPath ?

 Deenar



 On 14 October 2015 at 05:44, Bernardo Vecchia Stein <
 bernardovst...@gmail.com> wrote:

> Hello,
>
> I am trying to run some scala code in cluster mode using spark-submit.
> This code uses addLibrary to link with a .so that exists in the machine,
> and this library has a function to be called natively (there's a native
> definition as needed in the code).
>
> The problem I'm facing is: whenever I try to run this code in cluster
> mode, spark fails with the following message when trying to execute the
> native function:
> java.lang.UnsatisfiedLinkError:
> org.name.othername.ClassName.nativeMethod([B[B)[B
>
> Apparently, the library is being found by spark, but the required
> function isn't found.
>
> When trying to run in client mode, however, this doesn't fail and
> everything works as expected.
>
> Does anybody have any idea of what might be the problem here? Is there
> any bug that could be related to this when running in cluster mode?
>
> I appreciate any help.
> Thanks,
> Bernardo
>


>>>
>>
>


Re: Running in cluster mode causes native library linking to fail

2015-10-14 Thread Bernardo Vecchia Stein
Hi Renato,

I am using a single master and a single worker node, both in the same
machine, to simplify everything. I have tested with System.loadLibrary() as
well (setting all the necessary paths) and get the same error. Just double
checked everything and the parameters are fine.

Bernardo

On 14 October 2015 at 16:54, Renato Marroquín Mogrovejo <
renatoj.marroq...@gmail.com> wrote:

> Sorry Bernardo, I just double checked. I use: System.loadLibrary();
> Could you also try that?
>
>
> Renato M.
>
> 2015-10-14 21:51 GMT+02:00 Renato Marroquín Mogrovejo <
> renatoj.marroq...@gmail.com>:
>
>> Hi Bernardo,
>>
>> So is this in distributed mode? or single node? Maybe fix the issue with
>> a single node first ;)
>> You are right that Spark finds the library but not the *.so file. I also
>> use System.load() with LD_LIBRARY_PATH set, and I am able to
>> execute without issues. Maybe you'd like to double check paths, env
>> variables, or the parameters spark.driver.extraLibraryPath,
>> spark.executor.extraLibraryPath.
>>
>>
>> Best,
>>
>> Renato M.
>>
>> 2015-10-14 21:40 GMT+02:00 Bernardo Vecchia Stein <
>> bernardovst...@gmail.com>:
>>
>>> Hi Renato,
>>>
>>> I have done that as well, but so far no luck. I believe spark is finding
>>> the library correctly, otherwise the error message would be "no libraryname
>>> found" or something like that. The problem seems to be something else, and
>>> I'm not sure how to find it.
>>>
>>> Thanks,
>>> Bernardo
>>>
>>> On 14 October 2015 at 16:28, Renato Marroquín Mogrovejo <
>>> renatoj.marroq...@gmail.com> wrote:
>>>
 You can also try setting the env variable LD_LIBRARY_PATH to point
 where your compiled libraries are.


 Renato M.

 2015-10-14 21:07 GMT+02:00 Bernardo Vecchia Stein <
 bernardovst...@gmail.com>:

> Hi Deenar,
>
> Yes, the native library is installed on all machines of the cluster. I
> tried a simpler approach by just using System.load() and passing the exact
> path of the library, and things still won't work (I get exactly the same
> error and message).
>
> Any ideas of what might be failing?
>
> Thank you,
> Bernardo
>
> On 14 October 2015 at 02:50, Deenar Toraskar <
> deenar.toras...@gmail.com> wrote:
>
>> Hi Bernardo
>>
>> Is the native library installed on all machines of your cluster and
>> are you setting both the spark.driver.extraLibraryPath and
>> spark.executor.extraLibraryPath ?
>>
>> Deenar
>>
>>
>>
>> On 14 October 2015 at 05:44, Bernardo Vecchia Stein <
>> bernardovst...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am trying to run some scala code in cluster mode using
>>> spark-submit. This code uses addLibrary to link with a .so that exists 
>>> in
>>> the machine, and this library has a function to be called natively 
>>> (there's
>>> a native definition as needed in the code).
>>>
>>> The problem I'm facing is: whenever I try to run this code in
>>> cluster mode, spark fails with the following message when trying to 
>>> execute
>>> the native function:
>>> java.lang.UnsatisfiedLinkError:
>>> org.name.othername.ClassName.nativeMethod([B[B)[B
>>>
>>> Apparently, the library is being found by spark, but the required
>>> function isn't found.
>>>
>>> When trying to run in client mode, however, this doesn't fail and
>>> everything works as expected.
>>>
>>> Does anybody have any idea of what might be the problem here? Is
>>> there any bug that could be related to this when running in cluster 
>>> mode?
>>>
>>> I appreciate any help.
>>> Thanks,
>>> Bernardo
>>>
>>
>>
>

>>>
>>
>


spark-shell :javap fails with complaint about JAVA_HOME, but it is set correctly

2015-10-14 Thread Robert Dodier
Hi,

I am working with Spark 1.5.1 (official release), with Oracle Java8,
on Ubuntu 14.04. echo $JAVA_HOME says "/usr/lib/jvm/java-8-oracle".

I'd like to use :javap in spark-shell, but I get an error message:

scala> :javap java.lang.Object
Failed: Could not load javap tool. Check that JAVA_HOME is correct.

However ls $JAVA_HOME/lib/tools.jar shows that it is there.

I tried starting spark-shell with -toolcp $JAVA_HOME/lib/tools.jar but
I get the same error.

For comparison, if execute scala and enter :javap java.lang.Object, it
works as expected.

Not sure where to go from here. Thanks for any advice.

best,

Robert Dodier

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem installing Sparck on Windows 8

2015-10-14 Thread Steve Loughran

On 14 Oct 2015, at 20:56, Marco Mistroni 
> wrote:


15/10/14 20:52:35 WARN : Your hostname, MarcoLaptop resolves to a loopback/non-r
eachable address: fe80:0:0:0:c5ed:a66d:9d95:5caa%wlan2, but we couldn't find any
 external IP address!
java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /t
mp/hive on HDFS should be writable. Current permissions are: -
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav
a:522)
at org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.s
cala:171)
at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveCo

now, that I haven't seen. Looks like it thinks the permissions are wrong, 
doesn't it?


Re: Running in cluster mode causes native library linking to fail

2015-10-14 Thread Renato Marroquín Mogrovejo
Sorry Bernardo, I just double checked. I use: System.loadLibrary();
Could you also try that?


Renato M.

2015-10-14 21:51 GMT+02:00 Renato Marroquín Mogrovejo <
renatoj.marroq...@gmail.com>:

> Hi Bernardo,
>
> So is this in distributed mode? or single node? Maybe fix the issue with a
> single node first ;)
> You are right that Spark finds the library but not the *.so file. I also
> use System.load() with LD_LIBRARY_PATH set, and I am able to
> execute without issues. Maybe you'd like to double check paths, env
> variables, or the parameters spark.driver.extraLibraryPath,
> spark.executor.extraLibraryPath.
>
>
> Best,
>
> Renato M.
>
> 2015-10-14 21:40 GMT+02:00 Bernardo Vecchia Stein <
> bernardovst...@gmail.com>:
>
>> Hi Renato,
>>
>> I have done that as well, but so far no luck. I believe spark is finding
>> the library correctly, otherwise the error message would be "no libraryname
>> found" or something like that. The problem seems to be something else, and
>> I'm not sure how to find it.
>>
>> Thanks,
>> Bernardo
>>
>> On 14 October 2015 at 16:28, Renato Marroquín Mogrovejo <
>> renatoj.marroq...@gmail.com> wrote:
>>
>>> You can also try setting the env variable LD_LIBRARY_PATH to point where
>>> your compiled libraries are.
>>>
>>>
>>> Renato M.
>>>
>>> 2015-10-14 21:07 GMT+02:00 Bernardo Vecchia Stein <
>>> bernardovst...@gmail.com>:
>>>
 Hi Deenar,

 Yes, the native library is installed on all machines of the cluster. I
 tried a simpler approach by just using System.load() and passing the exact
 path of the library, and things still won't work (I get exactly the same
 error and message).

 Any ideas of what might be failing?

 Thank you,
 Bernardo

 On 14 October 2015 at 02:50, Deenar Toraskar  wrote:

> Hi Bernardo
>
> Is the native library installed on all machines of your cluster and
> are you setting both the spark.driver.extraLibraryPath and
> spark.executor.extraLibraryPath ?
>
> Deenar
>
>
>
> On 14 October 2015 at 05:44, Bernardo Vecchia Stein <
> bernardovst...@gmail.com> wrote:
>
>> Hello,
>>
>> I am trying to run some scala code in cluster mode using
>> spark-submit. This code uses addLibrary to link with a .so that exists in
>> the machine, and this library has a function to be called natively 
>> (there's
>> a native definition as needed in the code).
>>
>> The problem I'm facing is: whenever I try to run this code in cluster
>> mode, spark fails with the following message when trying to execute the
>> native function:
>> java.lang.UnsatisfiedLinkError:
>> org.name.othername.ClassName.nativeMethod([B[B)[B
>>
>> Apparently, the library is being found by spark, but the required
>> function isn't found.
>>
>> When trying to run in client mode, however, this doesn't fail and
>> everything works as expected.
>>
>> Does anybody have any idea of what might be the problem here? Is
>> there any bug that could be related to this when running in cluster mode?
>>
>> I appreciate any help.
>> Thanks,
>> Bernardo
>>
>
>

>>>
>>
>


Spark streaming checkpoint against s3

2015-10-14 Thread Tian Zhang
Hi, I am trying to set spark streaming checkpoint to s3, here is what I did
basically

val checkpoint = "s3://myBucket/checkpoint"
val ssc = StreamingContext.getOrCreate(checkpointDir,
   () =>
getStreamingContext(sparkJobName,

   
batchDurationSec),

  
classOf[MyClassKryoRegistrator],

  
checkpointDir),

  
getHadoopConfiguration) 
  
  def getHadoopConfiguration: Configuration = {
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", "s3://"+myBucket+"/")
hadoopConf.set("fs.s3.awsAccessKeyId", "myAccessKey")
hadoopConf.set("fs.s3.awsSecretAccessKey", "mySecretKey")
hadoopConf.set("fs.s3n.awsAccessKeyId", "myAccessKey")
hadoopConf.set("fs.s3n.awsSecretAccessKey", "mySecretKey
hadoopConf
   }

It is working as I can see that it tries to retrieve checkpoint from s3. 

However it did more than what I intended.  I saw in the log of the following
15/10/14 19:58:47 ERROR spark.SparkContext: Jar not found at
file:/media/ephemeral0/oncue/mesos-slave/slaves/20151007-172900-436893194-5050-2984-S9/frameworks/20150825-180042-604730890-5050-4268-0003/executors/tian-act-reg.47368a1a-71f9-11e5-ad61-de5fb3a867da/runs/dfc28a6c-48a0-464b-bdb1-d6dd057acd51/artifacts/rna-spark-streaming.jar

Now SparkContext is trying to look the following path instead of local

file:/media/ephemeral0/oncue/mesos-slave/slaves/20151007-172900-436893194-5050-2984-S9/frameworks/20150825-180042-604730890-5050-4268-0003/executors/tian-act-reg.47368a1a-71f9-11e5-ad61-de5fb3a867da/runs/dfc28a6c-48a0-464b-bdb1-d6dd057acd51/artifacts/rna-spark-streaming.jar

How do I let SparkContext to look just
/media/ephemeral0/oncue/mesos-slave/slaves/20151007-172900-436893194-5050-2984-S9/frameworks/20150825-180042-604730890-5050-4268-0003/executors/tian-act-reg.47368a1a-71f9-11e5-ad61-de5fb3a867da/runs/dfc28a6c-48a0-464b-bdb1-d6dd057acd51/artifacts/rna-spark-streaming.jar?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-checkpoint-against-s3-tp25068.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running in cluster mode causes native library linking to fail

2015-10-14 Thread Renato Marroquín Mogrovejo
You can also try setting the env variable LD_LIBRARY_PATH to point where
your compiled libraries are.


Renato M.

2015-10-14 21:07 GMT+02:00 Bernardo Vecchia Stein 
:

> Hi Deenar,
>
> Yes, the native library is installed on all machines of the cluster. I
> tried a simpler approach by just using System.load() and passing the exact
> path of the library, and things still won't work (I get exactly the same
> error and message).
>
> Any ideas of what might be failing?
>
> Thank you,
> Bernardo
>
> On 14 October 2015 at 02:50, Deenar Toraskar 
> wrote:
>
>> Hi Bernardo
>>
>> Is the native library installed on all machines of your cluster and are
>> you setting both the spark.driver.extraLibraryPath and
>> spark.executor.extraLibraryPath ?
>>
>> Deenar
>>
>>
>>
>> On 14 October 2015 at 05:44, Bernardo Vecchia Stein <
>> bernardovst...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am trying to run some scala code in cluster mode using spark-submit.
>>> This code uses addLibrary to link with a .so that exists in the machine,
>>> and this library has a function to be called natively (there's a native
>>> definition as needed in the code).
>>>
>>> The problem I'm facing is: whenever I try to run this code in cluster
>>> mode, spark fails with the following message when trying to execute the
>>> native function:
>>> java.lang.UnsatisfiedLinkError:
>>> org.name.othername.ClassName.nativeMethod([B[B)[B
>>>
>>> Apparently, the library is being found by spark, but the required
>>> function isn't found.
>>>
>>> When trying to run in client mode, however, this doesn't fail and
>>> everything works as expected.
>>>
>>> Does anybody have any idea of what might be the problem here? Is there
>>> any bug that could be related to this when running in cluster mode?
>>>
>>> I appreciate any help.
>>> Thanks,
>>> Bernardo
>>>
>>
>>
>


Re: Running in cluster mode causes native library linking to fail

2015-10-14 Thread Bernardo Vecchia Stein
Hi Renato,

I have done that as well, but so far no luck. I believe spark is finding
the library correctly, otherwise the error message would be "no libraryname
found" or something like that. The problem seems to be something else, and
I'm not sure how to find it.

Thanks,
Bernardo

On 14 October 2015 at 16:28, Renato Marroquín Mogrovejo <
renatoj.marroq...@gmail.com> wrote:

> You can also try setting the env variable LD_LIBRARY_PATH to point where
> your compiled libraries are.
>
>
> Renato M.
>
> 2015-10-14 21:07 GMT+02:00 Bernardo Vecchia Stein <
> bernardovst...@gmail.com>:
>
>> Hi Deenar,
>>
>> Yes, the native library is installed on all machines of the cluster. I
>> tried a simpler approach by just using System.load() and passing the exact
>> path of the library, and things still won't work (I get exactly the same
>> error and message).
>>
>> Any ideas of what might be failing?
>>
>> Thank you,
>> Bernardo
>>
>> On 14 October 2015 at 02:50, Deenar Toraskar 
>> wrote:
>>
>>> Hi Bernardo
>>>
>>> Is the native library installed on all machines of your cluster and are
>>> you setting both the spark.driver.extraLibraryPath and
>>> spark.executor.extraLibraryPath ?
>>>
>>> Deenar
>>>
>>>
>>>
>>> On 14 October 2015 at 05:44, Bernardo Vecchia Stein <
>>> bernardovst...@gmail.com> wrote:
>>>
 Hello,

 I am trying to run some scala code in cluster mode using spark-submit.
 This code uses addLibrary to link with a .so that exists in the machine,
 and this library has a function to be called natively (there's a native
 definition as needed in the code).

 The problem I'm facing is: whenever I try to run this code in cluster
 mode, spark fails with the following message when trying to execute the
 native function:
 java.lang.UnsatisfiedLinkError:
 org.name.othername.ClassName.nativeMethod([B[B)[B

 Apparently, the library is being found by spark, but the required
 function isn't found.

 When trying to run in client mode, however, this doesn't fail and
 everything works as expected.

 Does anybody have any idea of what might be the problem here? Is there
 any bug that could be related to this when running in cluster mode?

 I appreciate any help.
 Thanks,
 Bernardo

>>>
>>>
>>
>


spark-avro 2.0.1 generates strange schema (spark-avro 1.0.0 is fine)

2015-10-14 Thread Alex Nastetsky
I save my dataframe to avro with spark-avro 1.0.0 and it looks like this
(using avro-tools tojson):

{"field1":"value1","field2":976200}
{"field1":"value2","field2":976200}
{"field1":"value3","field2":614100}

But when I use spark-avro 2.0.1, it looks like this:

{"field1":{"string":"value1"},"field2":{"long":976200}}
{"field1":{"string":"value2"},"field2":{"long":976200}}
{"field1":{"string":"value3"},"field2":{"long":614100}}

At this point I'd be happy to use spark-avro 1.0.0, except that it doesn't
seem to support specifying a compression codec (I want deflate).


PySpark - Hive Context Does Not Return Results but SQL Context Does for Similar Query.

2015-10-14 Thread charles.drotar
I have duplicated my submission to stack overflow below since it is exactly
the same question I would like to post here as well. Please don't judge me
too harshly for my laziness 


 

*The questions I am concerned with are the same ones listed in the
"QUESTIONS" section namely:*

*/1) Has anyone noticed anything similar to this? 
2) What is happening on the backend that could be causing this consumption
of resources and what could I do to avoid it?/*



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Hive-Context-Does-Not-Return-Results-but-SQL-Context-Does-for-Similar-Query-tp25067.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.5.1 ClassNotFoundException in cluster mode.

2015-10-14 Thread Renato Perini

Hello.
I have developed a Spark job using a jersey client (1.9 included with 
Spark) to make some service calls during data computations.

Data is read and written on an Apache Cassandra 2.2.1 database.
When I run the job in local mode, everything works nicely. But when I 
execute my job in cluster mode (spark standalone) I receive the 
following exception:
I have no clue on where this exception occurs. Any idea / advice on what 
can I check?


[Stage 38:==> (8 + 
2) / 200]15/10/14 15:54:07 WARN ThrowableSerializationWrapper: Task 
exception could
java.lang.ClassNotFoundException: 
com.datastax.spark.connector.types.TypeConversionException

at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:278)
at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)

at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)
15/10/14 15:54:07 WARN ThrowableSerializationWrapper: Task exception 
could not be deserialized
java.lang.ClassNotFoundException: 
com.datastax.spark.connector.types.TypeConversionException

at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:278)
at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)

at 

Re: spark-avro 2.0.1 generates strange schema (spark-avro 1.0.0 is fine)

2015-10-14 Thread Alex Nastetsky
Here you go: https://github.com/databricks/spark-avro/issues/92

Thanks.

On Wed, Oct 14, 2015 at 4:41 PM, Josh Rosen  wrote:

> Can you report this as an issue at
> https://github.com/databricks/spark-avro/issues so that it's easier to
> track? Thanks!
>
> On Wed, Oct 14, 2015 at 1:38 PM, Alex Nastetsky <
> alex.nastet...@vervemobile.com> wrote:
>
>> I save my dataframe to avro with spark-avro 1.0.0 and it looks like this
>> (using avro-tools tojson):
>>
>> {"field1":"value1","field2":976200}
>> {"field1":"value2","field2":976200}
>> {"field1":"value3","field2":614100}
>>
>> But when I use spark-avro 2.0.1, it looks like this:
>>
>> {"field1":{"string":"value1"},"field2":{"long":976200}}
>> {"field1":{"string":"value2"},"field2":{"long":976200}}
>> {"field1":{"string":"value3"},"field2":{"long":614100}}
>>
>> At this point I'd be happy to use spark-avro 1.0.0, except that it
>> doesn't seem to support specifying a compression codec (I want deflate).
>>
>
>


IPv6 regression in Spark 1.5.1

2015-10-14 Thread Thomas Dudziak
It looks like Spark 1.5.1 does not work with IPv6. When
adding -Djava.net.preferIPv6Addresses=true on my dual stack server, the
driver fails with:

15/10/14 14:36:01 ERROR SparkContext: Error initializing SparkContext.
java.lang.AssertionError: assertion failed: Expected hostname
at scala.Predef$.assert(Predef.scala:179)
at org.apache.spark.util.Utils$.checkHost(Utils.scala:805)
at org.apache.spark.storage.BlockManagerId.(BlockManagerId.scala:48)
at org.apache.spark.storage.BlockManagerId$.apply(BlockManagerId.scala:107)
at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:190)
at org.apache.spark.SparkContext.(SparkContext.scala:528)
at
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1017)

Looking at the checkHost method, it clearly does not work for IPv6 as it
assumes : is not a valid part of the hostname. I think this code should use
Guava's HostAndPort or related classes to properly deal with IPv4 and IPv6
(and other parts of Utils already use Guava).

cheers,
Tom


Re: IPv6 regression in Spark 1.5.1

2015-10-14 Thread Thomas Dudziak
Specifically, something like this should probably do the trick:

  def checkHost(host: String, message: String = "") {
assert(!HostAndPort.fromString(host).hasPort, message)
  }

  def checkHostPort(hostPort: String, message: String = "") {
assert(HostAndPort.fromString(hostPort).hasPort, message)
  }


On Wed, Oct 14, 2015 at 2:40 PM, Thomas Dudziak  wrote:

> It looks like Spark 1.5.1 does not work with IPv6. When
> adding -Djava.net.preferIPv6Addresses=true on my dual stack server, the
> driver fails with:
>
> 15/10/14 14:36:01 ERROR SparkContext: Error initializing SparkContext.
> java.lang.AssertionError: assertion failed: Expected hostname
> at scala.Predef$.assert(Predef.scala:179)
> at org.apache.spark.util.Utils$.checkHost(Utils.scala:805)
> at org.apache.spark.storage.BlockManagerId.(BlockManagerId.scala:48)
> at org.apache.spark.storage.BlockManagerId$.apply(BlockManagerId.scala:107)
> at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:190)
> at org.apache.spark.SparkContext.(SparkContext.scala:528)
> at
> org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1017)
>
> Looking at the checkHost method, it clearly does not work for IPv6 as it
> assumes : is not a valid part of the hostname. I think this code should use
> Guava's HostAndPort or related classes to properly deal with IPv4 and IPv6
> (and other parts of Utils already use Guava).
>
> cheers,
> Tom
>


Spark Master Dying saying TimeoutException

2015-10-14 Thread Kartik Mathur
Hi,

I have some nightly jobs which runs every night but dies sometimes because
of unresponsive master , spark master logs says -

Not seeing much else there , what could possible cause an exception like
this.

*Exception in thread "main" java.util.concurrent.TimeoutException: Futures
timed out after [1 milliseconds]*

at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)

at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

at scala.concurrent.Await$.result(package.scala:107)

at akka.remote.Remoting.start(Remoting.scala:180)

at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)

at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)

at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)

at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)

at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)

at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)

2015-10-14 05:43:04 ERROR Remoting:65 - Remoting error: [Startup timed out]
[

akka.remote.RemoteTransportException: Startup timed out

at
akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136)

at akka.remote.Remoting.start(Remoting.scala:198)

at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)

at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)

at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)

at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)

at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)

at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)

at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)

at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)

at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55)

at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)

at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1837)

at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)

at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1828)

at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57)

at
org.apache.spark.deploy.master.Master$.startSystemAndActor(Master.scala:906)

at org.apache.spark.deploy.master.Master$.main(Master.scala:869)

at org.apache.spark.deploy.master.Master.main(Master.scala)

Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[1 milliseconds]

at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)

at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

at scala.concurrent.Await$.result(package.scala:107)

at akka.remote.Remoting.start(Remoting.scala:180)

... 17 more


Re: Spark 1.5.1 ClassNotFoundException in cluster mode.

2015-10-14 Thread Dean Wampler
There is a Datastax Spark connector library jar file that you probably have
on your CLASSPATH locally, but not on the cluster. If you know where it is,
you could either install it on each node in some location on their
CLASSPATHs or when you submit the mob, pass the jar file using the "--jars"
option. Note that the latter may not be an ideal solution if it has other
dependencies that also need to be passed.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Wed, Oct 14, 2015 at 5:05 PM, Renato Perini 
wrote:

> Hello.
> I have developed a Spark job using a jersey client (1.9 included with
> Spark) to make some service calls during data computations.
> Data is read and written on an Apache Cassandra 2.2.1 database.
> When I run the job in local mode, everything works nicely. But when I
> execute my job in cluster mode (spark standalone) I receive the following
> exception:
> I have no clue on where this exception occurs. Any idea / advice on what
> can I check?
>
> [Stage 38:==> (8 + 2)
> / 200]15/10/14 15:54:07 WARN ThrowableSerializationWrapper: Task exception
> could
> java.lang.ClassNotFoundException:
> com.datastax.spark.connector.types.TypeConversionException
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:278)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/10/14 15:54:07 WARN ThrowableSerializationWrapper: Task exception could
> 

Re: Spark streaming checkpoint against s3

2015-10-14 Thread Tian Zhang
It looks like that reconstruction of SparkContext from checkpoint data is
trying to look for 
the jar file of previous failed runs.  It can not find the jar files as our
jar files are on local 
machines and were cleaned up after each failed run.








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-checkpoint-against-s3-tp25068p25070.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark - Hive Context Does Not Return Results but SQL Context Does for Similar Query.

2015-10-14 Thread Michael Armbrust
No link to the original stack overflow so I can up my reputation? :)

This is likely not a difference between HiveContext/SQLContext, but instead
a difference between a table where the metadata is coming from the
HiveMetastore vs the SparkSQL Data Source API.  I would guess that if you
create the table the same way, the performance would be similar.

In the data source API we have spent a fair amount of time optimizing the
discovery and handling of many partitions, and in general I would say this
path is easier to use / faster.

Likely the problem with the hive table, is downloading all of the partition
metadata from the metastore and converting it to our internal format.  We
do this for all partitions, even though in this case you only want the
first ~20 rows.

On Wed, Oct 14, 2015 at 1:38 PM, charles.drotar <
charles.dro...@capitalone.com> wrote:

> I have duplicated my submission to stack overflow below since it is exactly
> the same question I would like to post here as well. Please don't judge me
> too harshly for my laziness
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25067/Screen_Shot_2015-10-14_at_3.png
> >
>
> *The questions I am concerned with are the same ones listed in the
> "QUESTIONS" section namely:*
>
> */1) Has anyone noticed anything similar to this?
> 2) What is happening on the backend that could be causing this consumption
> of resources and what could I do to avoid it?/*
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Hive-Context-Does-Not-Return-Results-but-SQL-Context-Does-for-Similar-Query-tp25067.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


stability of Spark 1.4.1 with Python 3 versions

2015-10-14 Thread shoira.mukhsin...@bnpparibasfortis.com
Dear Spark Community,

The official documentation of Spark 1.4.1 mentions that Spark runs on Python 
2.6+ http://spark.apache.org/docs/1.4.1/
It is not clear if by "Python 2.6+" do you also mean Python 3.4 or not.

There is a resolved issue on this point which makes me believe that it does run 
on Python 3.4: https://issues.apache.org/jira/i#browse/SPARK-9705
Maybe the documentation is simply not up to date ? The programming guide 
mentions that it does not work for Python 3: 
https://spark.apache.org/docs/1.4.1/programming-guide.html

Do you confirm that Spark 1.4.1 does run on Python3.4?

Thanks in advance for your reaction!

Regards,
Shoira



==
BNP Paribas Fortis disclaimer:
http://www.bnpparibasfortis.com/e-mail-disclaimer.html
 
BNP Paribas Fortis privacy policy:
http://www.bnpparibasfortis.com/privacy-policy.html
 
==


Re: java.io.InvalidClassException using spark1.4.1 for Terasort

2015-10-14 Thread Sonal Goyal
This is probably a versioning issue, are you sure your code is compiling
and running against the same versions?
On Oct 14, 2015 2:19 PM, "Shreeharsha G Neelakantachar" <
shreeharsh...@in.ibm.com> wrote:

> Hi,
>   I have Terasort being executed on spark1.4.1 with hadoop 2.7 for a
> datasize of 1TB. When i change my os user from spark1 to hduser, i am
> observing below exception.  Please let me know what is wrong here. I tried
> to update scala-2.10 to 2.11 and compiled Terasort scala code using sbt.
> But nothing is helping here. I had created the spark1 (spark1 to spark4)
> user for a multi-tenancy run.
>
> 2015-10-13 03:42:54,840 [sparkDriver-akka.actor.default-dispatcher-2] INFO
>  org.apache.spark.scheduler.TaskSetManager - Starting task 199.0 in stage
> 0.0 (TID 199, 9.37.251.65, ANY, 1544 bytes)
> 2015-10-13 03:42:54,843 [task-result-getter-2] WARN
>  org.apache.spark.scheduler.TaskSetManager - Lost task 173.0 in stage 0.0
> (TID 173, 9.37.251.65): java.io.InvalidClassException:
> scala.reflect.ClassTag$$anon$1; local class incompatible: stream classdesc
> serialVersionUID = -4937928798201944954, local class serialVersionUID =
> -8102093212602380348
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Any help in this regard would be of great help.
>
> Regards
> Shreeharsha GN
>
>


Re: spark sql OOM

2015-10-14 Thread Andy Zhao
I increased executor memory from 6g to 10g, but it still failed and report
the same error and because of my company security policy, I cannot write the
sql out. But I'm sure that this error occurred in the compute method of
HadoopRDD, and this error happened in one of executors.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-OOM-tp25060p25061.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: OutOfMemoryError When Reading Many json Files

2015-10-14 Thread SLiZn Liu
Yes it went wrong when processing a large file only. I removed
transformations on DF, and it worked just fine. But doing a simple filter
operation on the DF became the last straw that breaks the camel’s back.
That’s confusing.
​

On Wed, Oct 14, 2015 at 2:11 PM Deenar Toraskar 
wrote:

> Hi
>
> Why dont you check if you can just process the large file standalone and
> then do the outer loop next.
>
> sqlContext.read.json(jsonFile) .select($"some", $"fields") .withColumn(
> "new_col", some_transformations($"col")) .rdd.map( x: Row => (k, v) )
> .combineByKey()
>
> Deenar
>
> On 14 October 2015 at 05:18, SLiZn Liu  wrote:
>
>> Hey Spark Users,
>>
>> I kept getting java.lang.OutOfMemoryError: Java heap space as I read a
>> massive amount of json files, iteratively via read.json(). Even the
>> result RDD is rather small, I still get the OOM Error. The brief structure
>> of my program reads as following, in psuedo-code:
>>
>> file_path_list.map{ jsonFile: String =>
>>   sqlContext.read.json(jsonFile)
>> .select($"some", $"fields")
>> .withColumn("new_col", some_transformations($"col"))
>> .rdd.map( x: Row => (k, v) )
>> .combineByKey() // which groups a column into item lists by another 
>> column as keys
>> }.reduce( (i, j) => i.union(j) )
>> .combineByKey() // which combines results from all json files
>>
>> I confess some of the json files are Gigabytes huge, yet the combined RDD
>> is in a few Megabytes. I’m not familiar with the under-the-hood mechanism,
>> but my intuitive understanding of how the code executes is, read the file
>> once a time (where I can easily modify map to foreach when fetching from
>> file_path_list, if that’s the case), do the inner transformation on DF
>> and combine, then reduce and do the outer combine immediately, which
>> doesn’t require to hold all RDDs generated from all files in the memory.
>> Obviously, as my code raises OOM Error, I must have missed something
>> important.
>>
>> From the debug log, I can tell the OOM Error happens when reading the
>> same file, which is in a modest size of 2GB, while driver.memory is set to
>> 13GB, and the available memory size before the code execution is around
>> 8GB, on my standalone machine running as “local[8]”.
>>
>> To overcome this, I also tried to initialize an empty universal RDD
>> variable, iteratively read one file at a time using foreach, then
>> instead of reduce, simply combine each RDD generated by the json files,
>> except the OOM Error remains.
>>
>> Other configurations:
>>
>>- set(“spark.storage.memoryFraction”, “0.1”) // no cache of RDD is
>>used
>>- set(“spark.serializer”,
>>“org.apache.spark.serializer.KryoSerializer”)
>>
>> Any suggestions other than scale up/out the spark cluster?
>>
>> BR,
>> Todd Leo
>> ​
>>
>
>


Re: spark sql OOM

2015-10-14 Thread cherrywayb...@gmail.com
Hi,pls increase your memory .



cherrywayb...@gmail.com
 
From: Andy Zhao
Date: 2015-10-14 17:40
To: user
Subject: spark sql OOM
Hi guys, 
 
I'm testing sparkSql 1.5.1, and I use hadoop-2.5.0-cdh5.3.2. 
One sql which can ran successfully using hive failed when I ran it using
sparkSql. 
I got the following errno: 
 

 
 
I read the source code, It seems that the compute method of HadoopRDD is
called infinite times, every time it got called, some new instance need to
be allocated on the heap and finally OOM. 
 
Does anyone have same problem? 
 
Thanks
 
 
 
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-OOM-tp25060.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
 


Fwd: Partition Column in JDBCRDD or Datasource API

2015-10-14 Thread satish chandra j
HI All,
Please give me some inputs on *Partition Column *to be used in
DataSourceAPI or JDBCRDD to define Lowerbound and Upperbound value which
would be used to define No. of partitions, but issue is my source table does
not have a Numeric Columns which is sequential and unique such that proper
partitioning can take place

And what if we dont give any inputs as such for Lowerbound,Upperbound and
No. of partitions, how Spark can perform execution in distributed
mode/parallel and is there any default mode to perform in distributed
mode/parallel with out the above mentioned inputs

Regards,
Satish Chandra Jummula
-- Forwarded message --
From: satish chandra j 
Date: Wed, Sep 30, 2015 at 2:10 PM
Subject: Partition Column in JDBCRDD or Datasource API
To: user 


HI All,
Please provide your inputs on Partition Column to be used in DataSourceAPI
or JDBCRDD in a scenerio where the source table does not have a Numeric
Columns which is sequential and unique such that proper partitioning can
take place in Spark

Regards,
Satish


Re: Problem installing Sparck on Windows 8

2015-10-14 Thread Raghavendra Pandey
Looks like you are facing ipv6 issue. Can you try using preferIPv4 property
on.
On Oct 15, 2015 2:10 AM, "Steve Loughran"  wrote:

>
> On 14 Oct 2015, at 20:56, Marco Mistroni  wrote:
>
>
> 15/10/14 20:52:35 WARN : Your hostname, MarcoLaptop resolves to a
> loopback/non-r
> eachable address: fe80:0:0:0:c5ed:a66d:9d95:5caa%wlan2, but we couldn't
> find any
>  external IP address!
> java.lang.RuntimeException: java.lang.RuntimeException: The root scratch
> dir: /t
> mp/hive on HDFS should be writable. Current permissions are: -
> at
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav
> a:522)
> at
> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.s
> cala:171)
> at
> org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveCo
>
>
> now, that I haven't seen. Looks like it thinks the permissions are wrong,
> doesn't it?
>


Re: Application not found in Spark historyserver in yarn-client mode

2015-10-14 Thread Ted Yu
Which Spark release are you using ?

Thanks

On Wed, Oct 14, 2015 at 4:20 PM, Anfernee Xu  wrote:

> Hi,
>
> Here's the problem I'm facing, I have a standalone java application which
> is periodically submit Spark jobs to my yarn cluster, btw I'm not using
> 'spark-submit' or 'org.apache.spark.launcher' to submit my jobs. These jobs
> are successful and I can see them on Yarn RM webUI, but when I want to
> follow the link to the app history, I always got 404(application is not
> found) from Spark historyserver.
>
> My code looks likes as below
>
>
> SparkConf conf = new
> SparkConf().setAppName("testSpak").setMaster("yarn-client")
> .setJars(new String[]{IOUtil.getJar(MySparkApp.class)});
>
> conf.set("spark.yarn.historyServer.address", "10.247.44.155:18080");
> conf.set("spark.history.fs.logDirectory",
> "
> hdfs://myHdfsNameNode:55310/scratch/tie/spark/applicationHistory");
>
> JavaSparkContext sc = new JavaSparkContext(conf);
>
> try {
>
>  ... my application code
>
> }finally{
>   sc.stop();
>
> }
>
> --
> --Anfernee
>


Re: Spark Master Dying saying TimeoutException

2015-10-14 Thread Raghavendra Pandey
I fixed these timeout errors by retrying...
On Oct 15, 2015 3:41 AM, "Kartik Mathur"  wrote:

> Hi,
>
> I have some nightly jobs which runs every night but dies sometimes because
> of unresponsive master , spark master logs says -
>
> Not seeing much else there , what could possible cause an exception like
> this.
>
> *Exception in thread "main" java.util.concurrent.TimeoutException: Futures
> timed out after [1 milliseconds]*
>
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>
> at scala.concurrent.Await$.result(package.scala:107)
>
> at akka.remote.Remoting.start(Remoting.scala:180)
>
> at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
>
> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
>
> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
>
> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
>
> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
>
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
>
> 2015-10-14 05:43:04 ERROR Remoting:65 - Remoting error: [Startup timed
> out] [
>
> akka.remote.RemoteTransportException: Startup timed out
>
> at
> akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136)
>
> at akka.remote.Remoting.start(Remoting.scala:198)
>
> at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
>
> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
>
> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
>
> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
>
> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
>
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
>
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
>
> at
> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
>
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55)
>
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
>
> at
> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1837)
>
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>
> at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1828)
>
> at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57)
>
> at
> org.apache.spark.deploy.master.Master$.startSystemAndActor(Master.scala:906)
>
> at org.apache.spark.deploy.master.Master$.main(Master.scala:869)
>
> at org.apache.spark.deploy.master.Master.main(Master.scala)
>
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [1 milliseconds]
>
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>
> at scala.concurrent.Await$.result(package.scala:107)
>
> at akka.remote.Remoting.start(Remoting.scala:180)
>
> ... 17 more
>
>
>


Re: "java.io.IOException: Filesystem closed" on executors

2015-10-14 Thread Lan Jiang
Thank you, Akhil. Actually the problem was solved last week and I did not
have time to report back. The error was caused by YARN killing the
container because executors use more off-heap memory that they were
assigned. There was nothing in the exectuor log, but the AM log clearly
states this is the problem.

After I increased the spark.yarn.executor.memoryOverhead, it was working
fine. I was using Spark 1.3, which has the defaut value as executorMemory *
0.07, with minimum of 384. In spark 1.4 and later, the default value was
changed to executorMemory * 0.10, with minimum of 384.

Lan

On Mon, Oct 12, 2015 at 8:34 AM, Akhil Das 
wrote:

> Can you look a bit deeper in the executor logs? It could be filling up the
> memory and getting killed.
>
> Thanks
> Best Regards
>
> On Mon, Oct 5, 2015 at 8:55 PM, Lan Jiang  wrote:
>
>> I am still facing this issue. Executor dies due to
>>
>> org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem
>> closed
>> at
>> org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278)
>> at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
>> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
>> ...
>> Caused by: java.io.IOException: Filesystem closed
>> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
>> at java.io.DataInputStream.read(DataInputStream.java:149)
>>
>> Spark automatically launched new executors and the whole job completed
>> fine. Anyone has a clue what's going on?
>>
>> The spark job reads avro files from a directory, do some basic map/filter
>> and then repartition to 1, write the result to HDFS. I use spark 1.3 with
>> spark-avro (1.0.0). The error only happens when running on the whole
>> dataset. When running on 1/3 of the files, the same job completes without
>> error.
>>
>>
>> On Thu, Oct 1, 2015 at 2:41 PM, Lan Jiang  wrote:
>>
>>> Hi, there
>>>
>>> Here is the problem I ran into when executing a Spark Job (Spark 1.3).
>>> The spark job is loading a bunch of avro files using Spark SQL spark-avro
>>> 1.0.0 library. Then it does some filter/map transformation, repartition to
>>> 1 partition and then write to HDFS. It creates 2 stages. The total HDFS
>>> block number is around 12000, thus it creates 12000 partitions, thus 12000
>>> tasks for the first stage. I have total 9 executors launched with 5 thread
>>> for each. The job has run fine until the very end.  When it reaches
>>> 19980/2 tasks succeeded, it suddenly failed the last 20 tasks and I
>>> lost 2 executors. The spark did launched 2 new executors and finishes the
>>> job eventually by reprocessing the 20 tasks.
>>>
>>> I only ran into this issue when I run the spark application on the full
>>> dataset. When I run the 1/3 of the dataset, everything finishes fine
>>> without error.
>>>
>>> Question 1: What is the root cause of this issue? It is simiar to
>>> http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed
>>> and https://issues.apache.org/jira/browse/SPARK-3052, but it says the
>>> issue has been fixed since 1.2
>>> Quesiton 2: I am a little surprised that after the 2 new executors were
>>> launched,  replacing the two failed executors, they simply reprocessed the
>>> failed 20 tasks/partitions.  What about the results for other parititons
>>> processed by the 2 failed executors before? I assumed the results of these
>>> parititons are stored to the local disk and thus do not need to be computed
>>> by the new exectuors?  When are the data stored locally? Is it
>>> configuration? This question is for my own understanding about the spark
>>> framework.
>>>
>>> The exception causing the exectuor failure is below
>>>
>>> org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem
>>> closed
>>> at
>>> org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278)
>>> at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
>>> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
>>> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
>>> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245)
>>> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212)
>>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>> at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at 

spark streaming filestream API

2015-10-14 Thread Chandra Mohan, Ananda Vel Murugan
Hi All,

I have a directory hdfs which I want to monitor and whenever there is a new 
file in it, I want to parse that file and load the contents into a HIVE table. 
File format is proprietary and I have java parsers for parsing it. I am 
building a spark streaming application for this workflow. For doing this, I 
found JavaStreamingContext.filestream API. It takes four arguments directory 
path, key class, value class and inputformat. What should be values of key and 
value class? Please suggest. Thank you.


Regards,
Anand.C


Re: spark streaming filestream API

2015-10-14 Thread Akhil Das
Key and Value are the ones that you are using with your InputFormat. Eg:

JavaReceiverInputDStream lines = jssc.fileStream("/sigmoid",
LongWritable.class, Text.class, TextInputFormat.class);


TextInputFormat uses the LongWritable as Key and Text as Value classes. If
your data is plain CSV or text data then you can use the
*jssc.textFileStream("/sigmoid")* without worrying about the InputFormat,
Key and Value classes.



Thanks
Best Regards

On Wed, Oct 14, 2015 at 5:12 PM, Chandra Mohan, Ananda Vel Murugan <
ananda.muru...@honeywell.com> wrote:

> Hi All,
>
>
>
> I have a directory hdfs which I want to monitor and whenever there is a
> new file in it, I want to parse that file and load the contents into a HIVE
> table. File format is proprietary and I have java parsers for parsing it. I
> am building a spark streaming application for this workflow. For doing
> this, I found JavaStreamingContext.filestream API. It takes four arguments
> directory path, key class, value class and inputformat. What should be
> values of key and value class? Please suggest. Thank you.
>
>
>
>
>
> Regards,
>
> Anand.C
>


Re: how to use SharedSparkContext

2015-10-14 Thread Fengdong Yu
oh, 
Yes. Thanks much.



> On Oct 14, 2015, at 18:47, Akhil Das  wrote:
> 
> com.holdenkarau.spark.testing



Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Saisai Shao
You could check the code of KafkaRDD, the locality (host) is got from
Kafka's partition and set in KafkaRDD, this will a hint for Spark to
schedule task on the preferred location.

override def getPreferredLocations(thePart: Partition): Seq[String] = {
  val part = thePart.asInstanceOf[KafkaRDDPartition]
  // TODO is additional hostname resolution necessary here
  Seq(part.host)
}


On Wed, Oct 14, 2015 at 5:38 PM, Gerard Maas  wrote:

> In the receiver-based kafka streaming model, given that each receiver
> starts as a long-running task, one can rely in a certain degree of data
> locality based on the kafka partitioning:  Data published on a given
> topic/partition will land on the same spark streaming receiving node until
> the receiver dies and needs to be restarted somewhere else.
>
> As I understand, the direct-kafka streaming model just computes offsets
> and relays the work to a KafkaRDD. How is the execution locality compared
> to the receiver-based approach?
>
> thanks, Gerard.
>


Re: Changing application log level in standalone cluster

2015-10-14 Thread Akhil Das
You should be able to do that from your application. In the beginning of
the application, just add:

import org.apache.log4j.Loggerimport org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)

That will switch off the logs.


Thanks
Best Regards

On Tue, Oct 13, 2015 at 11:30 PM, Tom Graves 
wrote:

> I would like to change the logging level for my application running on a
> standalone Spark cluster.  Is there an easy way to do that  without
> changing the log4j.properties on each individual node?
>
> Thanks,
> Tom
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Rishitesh Mishra
Hi Gerard,
I am also trying to understand the same issue. Whatever code I have seen it
looks like once Kafka RDD is constructed the execution of that RDD is upto
the task scheduler and it can schedule the partitions based on the load on
nodes. There is preferred node specified in Kafks RDD. But ASFIK it maps to
the Kafka partitions host . So if Kafka and Spark are co hosted probably
this will work. If not, I am not sure how to get data locality for a
partition.
Others,
correct me if there is a way.

On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas  wrote:

> In the receiver-based kafka streaming model, given that each receiver
> starts as a long-running task, one can rely in a certain degree of data
> locality based on the kafka partitioning:  Data published on a given
> topic/partition will land on the same spark streaming receiving node until
> the receiver dies and needs to be restarted somewhere else.
>
> As I understand, the direct-kafka streaming model just computes offsets
> and relays the work to a KafkaRDD. How is the execution locality compared
> to the receiver-based approach?
>
> thanks, Gerard.
>



-- 

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


RE: Spark DataFrame GroupBy into List

2015-10-14 Thread java8964
My guess is the same as UDAF of (collect_set) in Hive.
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF)
Yong

From: sliznmail...@gmail.com
Date: Wed, 14 Oct 2015 02:45:48 +
Subject: Re: Spark DataFrame GroupBy into List
To: mich...@databricks.com
CC: user@spark.apache.org

Hi Michael, 
Can you be more specific on `collect_set`? Is it a built-in function or, if it 
is an UDF, how it is defined?
BR,Todd Leo
On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust  wrote:
import org.apache.spark.sql.functions._
df.groupBy("category")  .agg(callUDF("collect_set", df("id")).as("id_list"))
On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu  wrote:
Hey Spark users,
I'm trying to group by a dataframe, by appending occurrences into a list 
instead of count. 
Let's say we have a dataframe as shown below:| category | id |
|  |:--:|
| A| 1  |
| A| 2  |
| B| 3  |
| B| 4  |
| C| 5  |
ideally, after some magic group by (reverse explode?):| category | id_list  |
|  |  |
| A| 1,2  |
| B| 3,4  |
| C| 5|
any tricks to achieve that? Scala Spark API is preferred. =D
BR,Todd Leo 





  

Re: spark sql OOM

2015-10-14 Thread Fengdong Yu
Can you search the mail-archive before asked the question? at least search for 
how ask the question.

nobody can give your answer if you don’t paste your SQL or SparkSQL code.


> On Oct 14, 2015, at 17:40, Andy Zhao  wrote:
> 
> Hi guys, 
> 
> I'm testing sparkSql 1.5.1, and I use hadoop-2.5.0-cdh5.3.2. 
> One sql which can ran successfully using hive failed when I ran it using
> sparkSql. 
> I got the following errno: 
> 
> 
>  
> 
> I read the source code, It seems that the compute method of HadoopRDD is
> called infinite times, every time it got called, some new instance need to
> be allocated on the heap and finally OOM. 
> 
> Does anyone have same problem? 
> 
> Thanks
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-OOM-tp25060.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cannot connect to standalone spark cluster

2015-10-14 Thread Akhil Das
Open a spark-shell by:

MASTER=Ellens-MacBook-Pro.local:7077 bin/spark-shell

And if its able to connect, then check your java projects build file and
make sure you are having the proper spark version.

Thanks
Best Regards

On Sat, Oct 10, 2015 at 3:07 AM, ekraffmiller 
wrote:

> Hi,
> I'm trying to run a java application that connects to a local standalone
> spark cluster.  I start the cluster with the default configuration, using
> start-all.sh.  When I go to the web page for the cluster, it is started ok.
> I can connect to this cluster with SparkR, but when I use the same master
> URL to connect from within Java, I get an error message.
>
> I'm using Spark 1.5.
>
> Here is a snippet of the error message:
>
>  ReliableDeliverySupervisor: Association with remote system
> [akka.tcp://sparkMaster@Ellens-MacBook-Pro.local:7077] has failed, address
> is now gated for [5000] ms. Reason: [Disassociated]
> 15/10/09 17:31:41 INFO AppClient$ClientEndpoint: Connecting to master
> spark://Ellens-MacBook-Pro.local:7077...
> 15/10/09 17:31:41 WARN ReliableDeliverySupervisor: Association with remote
> system [akka.tcp://sparkMaster@Ellens-MacBook-Pro.local:7077] has failed,
> address is now gated for [5000] ms. Reason: [Disassociated]
> 15/10/09 17:32:01 INFO AppClient$ClientEndpoint: Connecting to master
> spark://Ellens-MacBook-Pro.local:7077...
> 15/10/09 17:32:01 ERROR SparkUncaughtExceptionHandler: Uncaught exception
> in
> thread Thread[appclient-registration-retry-thread,5,main]
> java.util.concurrent.RejectedExecutionException: Task
> java.util.concurrent.FutureTask@54e2b678 rejected from
> java.util.concurrent.ThreadPoolExecutor@5d9f3e0d[Running, pool size = 1,
> active threads = 1, queued tasks = 0, completed tasks = 2]
> at
>
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
> at
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>
> Thanks,
> Ellen
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-connect-to-standalone-spark-cluster-tp25004.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: how to use SharedSparkContext

2015-10-14 Thread Akhil Das
Did a quick search and found the following, I haven't tested it myself.

Add the following to your build.sbt

libraryDependencies += "com.holdenkarau" % "spark-testing-base_2.10" %
"1.5.0_1.4.0_1.4.1_0.1.2"



Create a class extending com.holdenkarau.spark.testing.SharedSparkContext

And you should be able to use it.

Thanks
Best Regards

On Mon, Oct 12, 2015 at 2:18 PM, Fengdong Yu 
wrote:

> Hi,
> How to add dependency in build.sbt  if I want to use SharedSparkContext?
>
> I’ve added spark-core, but it doesn’t work.(cannot find SharedSparkContext)
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Sensitivity analysis using Spark MlLib

2015-10-14 Thread Sourav Mazumder
Is there any algorithm implementated in Spark MLLib which supports
parameter sensitivity analysis ?

After the model is created using a training data set, the model should be
able to tell among the various features used which are the ones most
important (from the perspective of their contribution to the dependent
variable) ?

Regards,
Sourav


Re: Spark DataFrame GroupBy into List

2015-10-14 Thread Michael Armbrust
Thats correct.  It is a Hive UDAF.

On Wed, Oct 14, 2015 at 6:45 AM, java8964  wrote:

> My guess is the same as UDAF of (collect_set) in Hive.
>
>
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF)
>
> Yong
>
> --
> From: sliznmail...@gmail.com
> Date: Wed, 14 Oct 2015 02:45:48 +
> Subject: Re: Spark DataFrame GroupBy into List
> To: mich...@databricks.com
> CC: user@spark.apache.org
>
>
> Hi Michael,
>
> Can you be more specific on `collect_set`? Is it a built-in function or,
> if it is an UDF, how it is defined?
>
> BR,
> Todd Leo
>
> On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust 
> wrote:
>
> import org.apache.spark.sql.functions._
>
> df.groupBy("category")
>   .agg(callUDF("collect_set", df("id")).as("id_list"))
>
> On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu 
> wrote:
>
> Hey Spark users,
>
> I'm trying to group by a dataframe, by appending occurrences into a list
> instead of count.
>
> Let's say we have a dataframe as shown below:
>
> | category | id |
> |  |:--:|
> | A| 1  |
> | A| 2  |
> | B| 3  |
> | B| 4  |
> | C| 5  |
>
> ideally, after some magic group by (reverse explode?):
>
> | category | id_list  |
> |  |  |
> | A| 1,2  |
> | B| 3,4  |
> | C| 5|
>
> any tricks to achieve that? Scala Spark API is preferred. =D
>
> BR,
> Todd Leo
>
>
>
>
>


Re: Reusing Spark Functions

2015-10-14 Thread Michael Armbrust
Unless its a broadcast variable, a new copy will be deserialized for every
task.

On Wed, Oct 14, 2015 at 10:18 AM, Starch, Michael D (398M) <
michael.d.sta...@jpl.nasa.gov> wrote:

> All,
>
> Is a Function object in Spark reused on a given executor, or is sent and
> deserialized with each new task?
>
> On my project, we have functions that incur a very large setup cost, but
> then could be called many times.  Currently, I am using object
> deserialization to run this intensive setup,  I am wondering if this
> function is reused (within the context of the executor), or I am I
> deserializing this object over and over again for each task sent to a given
> worker.
>
> Are there other ways to share objects between tasks on the same executor?
>
> Many thanks,
>
> Michael
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: stability of Spark 1.4.1 with Python 3 versions

2015-10-14 Thread Nicholas Chammas
The Spark 1.4 release notes
 say that
Python 3 is supported. The 1.4 docs are incorrect, and the 1.5 programming
guide has been updated to indicate Python 3 support.

On Wed, Oct 14, 2015 at 7:06 AM shoira.mukhsin...@bnpparibasfortis.com <
shoira.mukhsin...@bnpparibasfortis.com> wrote:

> Dear Spark Community,
>
>
>
> The official documentation of Spark 1.4.1 mentions that Spark runs on Python
> 2.6+ http://spark.apache.org/docs/1.4.1/
>
> It is not clear if by “Python 2.6+” do you also mean Python 3.4 or not.
>
>
>
> There is a resolved issue on this point which makes me believe that it
> does run on Python 3.4: https://issues.apache.org/jira/i#browse/SPARK-9705
>
> Maybe the documentation is simply not up to date ? The programming guide
> mentions that it does not work for Python 3:
> https://spark.apache.org/docs/1.4.1/programming-guide.html
>
>
>
> Do you confirm that Spark 1.4.1 does run on Python3.4?
>
>
>
> Thanks in advance for your reaction!
>
>
>
> Regards,
>
> Shoira
>
>
>
>
>
>
>
> ==
> BNP Paribas Fortis disclaimer:
> http://www.bnpparibasfortis.com/e-mail-disclaimer.html
>
> BNP Paribas Fortis privacy policy:
> http://www.bnpparibasfortis.com/privacy-policy.html
>
> ==
>


Re: writing to hive

2015-10-14 Thread Ted Yu
Can you show your query ?

Thanks

> On Oct 13, 2015, at 12:29 AM, Hafiz Mujadid  wrote:
> 
> hi!
> 
> I am following  this
> 
>   
> tutorial to read and write from hive. But i am facing following exception
> when i run the code.
> 
> 15/10/12 14:57:36 INFO storage.BlockManagerMaster: Registered BlockManager
> 15/10/12 14:57:38 INFO scheduler.EventLoggingListener: Logging events to
> hdfs://host:9000/spark/logs/local-1444676256555
> Exception in thread "main" java.lang.VerifyError: Bad return type
> Exception Details:
>  Location:
> 
> org/apache/spark/sql/catalyst/expressions/Pmod.inputType()Lorg/apache/spark/sql/types/AbstractDataType;
> @3: areturn
>  Reason:
>Type 'org/apache/spark/sql/types/NumericType$' (current frame, stack[0])
> is not assignable to 'org/apache/spark/sql/types/AbstractDataType' (from
> method signature)
>  Current Frame:
>bci: @3
>flags: { }
>locals: { 'org/apache/spark/sql/catalyst/expressions/Pmod' }
>stack: { 'org/apache/spark/sql/types/NumericType$' }
>  Bytecode:
>000: b200 63b0
> 
>at java.lang.Class.getDeclaredConstructors0(Native Method)
>at java.lang.Class.privateGetDeclaredConstructors(Class.java:2595)
>at java.lang.Class.getConstructor0(Class.java:2895)
>at java.lang.Class.getDeclaredConstructor(Class.java:2066)
>at
> org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$4.apply(FunctionRegistry.scala:267)
>at
> org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$4.apply(FunctionRegistry.scala:267)
>at scala.util.Try$.apply(Try.scala:161)
>at
> org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.expression(FunctionRegistry.scala:267)
>at
> org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.(FunctionRegistry.scala:148)
>at
> org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.(FunctionRegistry.scala)
>at
> org.apache.spark.sql.hive.HiveContext.functionRegistry$lzycompute(HiveContext.scala:414)
>at
> org.apache.spark.sql.hive.HiveContext.functionRegistry(HiveContext.scala:413)
>at
> org.apache.spark.sql.UDFRegistration.(UDFRegistration.scala:39)
>at org.apache.spark.sql.SQLContext.(SQLContext.scala:203)
>at
> org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:72)
> 
> 
> Is there any suggestion how to read and write in hive?
> 
> thanks
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/writing-to-hive-tp25046.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark streaming filestream API

2015-10-14 Thread Chandra Mohan, Ananda Vel Murugan
Hi,

Thanks for your response. My input format is the one I have created to handle 
the files as a whole i.e. WholeFileInputFormat I wrote one based on this 
example 
https://code.google.com/p/hadoop-course/source/browse/HadoopSamples/src/main/java/mr/wholeFile/WholeFileInputFormat.java?r=3
 In this case, key would be Nullwritable and value would be BytesWritable right?

Unfortunately my files are binary and not text files.

Regards,
Anand.C

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Wednesday, October 14, 2015 5:31 PM
To: Chandra Mohan, Ananda Vel Murugan
Cc: user
Subject: Re: spark streaming filestream API

Key and Value are the ones that you are using with your InputFormat. Eg:

JavaReceiverInputDStream lines = jssc.fileStream("/sigmoid", 
LongWritable.class, Text.class, TextInputFormat.class);

TextInputFormat uses the LongWritable as Key and Text as Value classes. If your 
data is plain CSV or text data then you can use the 
jssc.textFileStream("/sigmoid") without worrying about the InputFormat, Key and 
Value classes.



Thanks
Best Regards

On Wed, Oct 14, 2015 at 5:12 PM, Chandra Mohan, Ananda Vel Murugan 
> wrote:
Hi All,

I have a directory hdfs which I want to monitor and whenever there is a new 
file in it, I want to parse that file and load the contents into a HIVE table. 
File format is proprietary and I have java parsers for parsing it. I am 
building a spark streaming application for this workflow. For doing this, I 
found JavaStreamingContext.filestream API. It takes four arguments directory 
path, key class, value class and inputformat. What should be values of key and 
value class? Please suggest. Thank you.


Regards,
Anand.C



Re: spark streaming filestream API

2015-10-14 Thread Akhil Das
Yes, that is correct. When you import the K,V classes, make sure you import
it from the hadoop.io package.

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;


Thanks
Best Regards

On Wed, Oct 14, 2015 at 6:26 PM, Chandra Mohan, Ananda Vel Murugan <
ananda.muru...@honeywell.com> wrote:

> Hi,
>
>
>
> Thanks for your response. My input format is the one I have created to
> handle the files as a whole i.e. WholeFileInputFormat I wrote one based on
> this example
> https://code.google.com/p/hadoop-course/source/browse/HadoopSamples/src/main/java/mr/wholeFile/WholeFileInputFormat.java?r=3
> In this case, key would be Nullwritable and value would be BytesWritable
> right?
>
>
>
> Unfortunately my files are binary and not text files.
>
>
>
> Regards,
>
> Anand.C
>
>
>
> *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
> *Sent:* Wednesday, October 14, 2015 5:31 PM
> *To:* Chandra Mohan, Ananda Vel Murugan
> *Cc:* user
> *Subject:* Re: spark streaming filestream API
>
>
>
> Key and Value are the ones that you are using with your InputFormat. Eg:
>
>
>
> JavaReceiverInputDStream lines = jssc.fileStream("/sigmoid",
> LongWritable.class, Text.class, TextInputFormat.class);
>
>
>
> TextInputFormat uses the LongWritable as Key and Text as Value classes. If
> your data is plain CSV or text data then you can use the
> *jssc.textFileStream("/sigmoid")* without worrying about the InputFormat,
> Key and Value classes.
>
>
>
>
>
>
> Thanks
>
> Best Regards
>
>
>
> On Wed, Oct 14, 2015 at 5:12 PM, Chandra Mohan, Ananda Vel Murugan <
> ananda.muru...@honeywell.com> wrote:
>
> Hi All,
>
>
>
> I have a directory hdfs which I want to monitor and whenever there is a
> new file in it, I want to parse that file and load the contents into a HIVE
> table. File format is proprietary and I have java parsers for parsing it. I
> am building a spark streaming application for this workflow. For doing
> this, I found JavaStreamingContext.filestream API. It takes four arguments
> directory path, key class, value class and inputformat. What should be
> values of key and value class? Please suggest. Thank you.
>
>
>
>
>
> Regards,
>
> Anand.C
>
>
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Saisai Shao
This preferred locality is a hint to spark to schedule Kafka tasks on the
preferred nodes, if Kafka and Spark are two separate cluster, obviously
this locality hint takes no effect, and spark will schedule tasks following
node-local -> rack-local -> any pattern, like any other spark tasks.

On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra 
wrote:

> Hi Gerard,
> I am also trying to understand the same issue. Whatever code I have seen
> it looks like once Kafka RDD is constructed the execution of that RDD is
> upto the task scheduler and it can schedule the partitions based on the
> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
> probably this will work. If not, I am not sure how to get data locality for
> a partition.
> Others,
> correct me if there is a way.
>
> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
> wrote:
>
>> In the receiver-based kafka streaming model, given that each receiver
>> starts as a long-running task, one can rely in a certain degree of data
>> locality based on the kafka partitioning:  Data published on a given
>> topic/partition will land on the same spark streaming receiving node until
>> the receiver dies and needs to be restarted somewhere else.
>>
>> As I understand, the direct-kafka streaming model just computes offsets
>> and relays the work to a KafkaRDD. How is the execution locality compared
>> to the receiver-based approach?
>>
>> thanks, Gerard.
>>
>
>
>
> --
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>


Re: unresolved dependency: org.apache.spark#spark-streaming_2.10;1.5.0: not found

2015-10-14 Thread Ted Yu
This might be related :

http://search-hadoop.com/m/q3RTta8AxS1UjMSI=Cannot+get+spark+streaming_2+10+1+5+0+pom+from+the+maven+repository

> On Oct 12, 2015, at 11:30 PM, Akhil Das  wrote:
> 
> You need to add "org.apache.spark" % "spark-streaming_2.10" % "1.5.0" to the 
> dependencies list.
> 
> Thanks
> Best Regards
> 
>> On Tue, Oct 6, 2015 at 3:20 PM, shahab  wrote:
>> Hi, 
>> 
>> I am trying to use Spark 1.5, Mlib, but I keep getting  
>> "sbt.ResolveException: unresolved dependency: 
>> org.apache.spark#spark-streaming_2.10;1.5.0: not found" .
>> 
>> It is weird that this happens, but I could not find any solution for this. 
>> Does any one faced the same issue?
>> 
>> 
>> best,
>> /Shahab
>> 
>> Here is my SBT library dependencies:
>> libraryDependencies ++= Seq(
>> 
>> "com.google.guava" % "guava" % "16.0"  ,
>> 
>> "org.apache.spark" % "spark-unsafe_2.10" % "1.5.0",
>> 
>> "org.apache.spark" % "spark-core_2.10" % "1.5.0",
>> 
>> "org.apache.spark" % "spark-mllib_2.10" % "1.5.0",   
>> 
>> "org.apache.hadoop" % "hadoop-client" % "2.6.0",
>> 
>> "net.java.dev.jets3t" % "jets3t" % "0.9.0" % "provided",
>> 
>> "com.github.nscala-time" %% "nscala-time" % "1.0.0",
>> 
>> "org.scalatest" % "scalatest_2.10" % "2.1.3",
>> 
>> "junit" % "junit" % "4.8.1" % "test",
>> 
>> "net.jpountz.lz4" % "lz4" % "1.2.0" % "provided",
>> 
>> "org.clapper" %% "grizzled-slf4j" % "1.0.2",
>> 
>> "net.jpountz.lz4" % "lz4" % "1.2.0" % "provided"
>> 
>>)
>> 
> 


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Thanks Saisai, Mishra,

Indeed, that hint will only work on a case where the Spark executor is
co-located with the Kafka broker.
I think the answer to my question as stated  is that there's no warranty of
where the task will execute as it will depend on the scheduler and cluster
resources available  (Mesos in our case).
Therefore, any assumptions made about data locality using the
consumer-based approach need to be reconsidered when migrating to the
direct stream.

((In our case, we were using local caches to decide when a given secondary
index for a record should be produced and written.))

-kr, Gerard.




On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao  wrote:

> This preferred locality is a hint to spark to schedule Kafka tasks on the
> preferred nodes, if Kafka and Spark are two separate cluster, obviously
> this locality hint takes no effect, and spark will schedule tasks following
> node-local -> rack-local -> any pattern, like any other spark tasks.
>
> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra 
> wrote:
>
>> Hi Gerard,
>> I am also trying to understand the same issue. Whatever code I have seen
>> it looks like once Kafka RDD is constructed the execution of that RDD is
>> upto the task scheduler and it can schedule the partitions based on the
>> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
>> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
>> probably this will work. If not, I am not sure how to get data locality for
>> a partition.
>> Others,
>> correct me if there is a way.
>>
>> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
>> wrote:
>>
>>> In the receiver-based kafka streaming model, given that each receiver
>>> starts as a long-running task, one can rely in a certain degree of data
>>> locality based on the kafka partitioning:  Data published on a given
>>> topic/partition will land on the same spark streaming receiving node until
>>> the receiver dies and needs to be restarted somewhere else.
>>>
>>> As I understand, the direct-kafka streaming model just computes offsets
>>> and relays the work to a KafkaRDD. How is the execution locality compared
>>> to the receiver-based approach?
>>>
>>> thanks, Gerard.
>>>
>>
>>
>>
>> --
>>
>> Regards,
>> Rishitesh Mishra,
>> SnappyData . (http://www.snappydata.io/)
>>
>> https://in.linkedin.com/in/rishiteshmishra
>>
>
>


dataframes and numPartitions

2015-10-14 Thread Alex Nastetsky
A lot of RDD methods take a numPartitions parameter that lets you specify
the number of partitions in the result. For example, groupByKey.

The DataFrame counterparts don't have a numPartitions parameter, e.g.
groupBy only takes a bunch of Columns as params.

I understand that the DataFrame API is supposed to be smarter and go
through a LogicalPlan, and perhaps determine the number of optimal
partitions for you, but sometimes you want to specify the number of
partitions yourself. One such use case is when you are preparing to do a
"merge" join with another dataset that is similarly partitioned with the
same number of partitions.


Re: Spark DataFrame GroupBy into List

2015-10-14 Thread SLiZn Liu
Thanks, Michael and java8964!

Does Hive Context also provides udf for combining existing lists, into
flattened(not nested) list? (list->list of lists -[flatten]->list).

On Thu, Oct 15, 2015 at 1:16 AM Michael Armbrust 
wrote:

> Thats correct.  It is a Hive UDAF.
>
> On Wed, Oct 14, 2015 at 6:45 AM, java8964  wrote:
>
>> My guess is the same as UDAF of (collect_set) in Hive.
>>
>>
>> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF)
>>
>> Yong
>>
>> --
>> From: sliznmail...@gmail.com
>> Date: Wed, 14 Oct 2015 02:45:48 +
>> Subject: Re: Spark DataFrame GroupBy into List
>> To: mich...@databricks.com
>> CC: user@spark.apache.org
>>
>>
>> Hi Michael,
>>
>> Can you be more specific on `collect_set`? Is it a built-in function or,
>> if it is an UDF, how it is defined?
>>
>> BR,
>> Todd Leo
>>
>> On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust 
>> wrote:
>>
>> import org.apache.spark.sql.functions._
>>
>> df.groupBy("category")
>>   .agg(callUDF("collect_set", df("id")).as("id_list"))
>>
>> On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu 
>> wrote:
>>
>> Hey Spark users,
>>
>> I'm trying to group by a dataframe, by appending occurrences into a list
>> instead of count.
>>
>> Let's say we have a dataframe as shown below:
>>
>> | category | id |
>> |  |:--:|
>> | A| 1  |
>> | A| 2  |
>> | B| 3  |
>> | B| 4  |
>> | C| 5  |
>>
>> ideally, after some magic group by (reverse explode?):
>>
>> | category | id_list  |
>> |  |  |
>> | A| 1,2  |
>> | B| 3,4  |
>> | C| 5|
>>
>> any tricks to achieve that? Scala Spark API is preferred. =D
>>
>> BR,
>> Todd Leo
>>
>>
>>
>>
>>
>


Re: Building with SBT and Scala 2.11

2015-10-14 Thread Ted Yu
Adrian:
Likely you were using maven.

Jakob's report was with sbt.

Cheers

On Tue, Oct 13, 2015 at 10:05 PM, Adrian Tanase  wrote:

> Do you mean hadoop-2.4 or 2.6? not sure if this is the issue but I'm also
> compiling the 1.5.1 version with scala 2.11 and hadoop 2.6 and it works.
>
> -adrian
>
> Sent from my iPhone
>
> On 14 Oct 2015, at 03:53, Jakob Odersky  wrote:
>
> I'm having trouble compiling Spark with SBT for Scala 2.11. The command I
> use is:
>
> dev/change-version-to-2.11.sh
> build/sbt -Pyarn -Phadoop-2.11 -Dscala-2.11
>
> followed by
>
> compile
>
> in the sbt shell.
>
> The error I get specifically is:
>
> spark/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala:308:
> no valid targets for annotation on value conf - it is discarded unused. You
> may specify targets with meta-annotations, e.g. @(transient @param)
> [error] private[netty] class NettyRpcEndpointRef(@transient conf:
> SparkConf)
> [error]
>
> However I am also getting a large amount of deprecation warnings, making
> me wonder if I am supplying some incompatible/unsupported options to sbt. I
> am using Java 1.8 and the latest Spark master sources.
> Does someone know if I am doing anything wrong or is the sbt build broken?
>
> thanks for you help,
> --Jakob
>
>


thriftserver: access temp dataframe from in-memory of spark-shell

2015-10-14 Thread Saif.A.Ellafi
Hi,

Is it possible to load a spark-shell, in which we do any number of operations 
in a dataframe, then register it as a temporary table and get to see it through 
thriftserver?
ps. or even better, submit a full job and store the dataframe in thriftserver 
in-memory before the job completes.

I have been trying this without success, bee does not see the dataframes of the 
spark-shell's hive context.
If any of you confirms this possibility, I will try further ahead. So far it 
only seems to be able to manually read from persistent tables.

Thanks for any insights,
Saif



Application not found in Spark historyserver in yarn-client mode

2015-10-14 Thread Anfernee Xu
Hi,

Here's the problem I'm facing, I have a standalone java application which
is periodically submit Spark jobs to my yarn cluster, btw I'm not using
'spark-submit' or 'org.apache.spark.launcher' to submit my jobs. These jobs
are successful and I can see them on Yarn RM webUI, but when I want to
follow the link to the app history, I always got 404(application is not
found) from Spark historyserver.

My code looks likes as below


SparkConf conf = new
SparkConf().setAppName("testSpak").setMaster("yarn-client")
.setJars(new String[]{IOUtil.getJar(MySparkApp.class)});

conf.set("spark.yarn.historyServer.address", "10.247.44.155:18080");
conf.set("spark.history.fs.logDirectory",
"
hdfs://myHdfsNameNode:55310/scratch/tie/spark/applicationHistory");

JavaSparkContext sc = new JavaSparkContext(conf);

try {

 ... my application code

}finally{
  sc.stop();

}

-- 
--Anfernee


  1   2   >