Re: ROSE: Spark + R on the JVM.

2016-01-13 Thread Richard Siebeling
Hi David,

the use case is that we're building a data processing system with an
intuitive user interface where Spark is used as the data processing
framework.
We would like to provide a HTML user interface to R where the user types or
copy-pastes his R code, the system should then send this R code (using
ROSE) to R, process it and give the results back to the user. The RDD would
be used so that the data can be further processed by the system but we
would like to also show or be able to show the messages printed to STDOUT
and also the images (plots) that are generated by R. The plots seems to be
available in the OpenCPU API, see below

[image: Inline image 1]

So the case is not that we're trying to process millions of images but
rather that we would like to show the generated plots (like a regression
plot) that's generated in R to the user. There could be several plots
generated by the code, but certainly not thousands or even hundreds, only a
few.

Hope that this would be possible using ROSE because it seems a really good
fit,
thanks in advance,
Richard

On Wed, Jan 13, 2016 at 3:39 AM, David Russell <
themarchoffo...@protonmail.com> wrote:

> Hi Richard,
>
> > Would it be possible to access the session API from within ROSE,
> > to get for example the images that are generated by R / openCPU
>
> Technically it would be possible although there would be some potentially
> significant runtime costs per task in doing so, primarily those related to
> extracting image data from the R session, serializing and then moving that
> data across the cluster for each and every image.
>
> From a design perspective ROSE was intended to be used within Spark scale
> applications where R object data was seen as the primary task output. An
> output in a format that could be rapidly serialized and easily processed.
> Are there real world use cases where Spark scale applications capable of
> generating 10k, 100k, or even millions of image files would actually
> need to capture and store images? If so, how practically speaking, would
> these images ever be used? I'm just not sure. Maybe you could describe your
> own use case to provide some insights?
>
> > and the logging to stdout that is logged by R?
>
> If you are referring to the R console output (generated within the R
> session during the execution of an OCPUTask) then this data could certainly
> (optionally) be captured and returned on an OCPUResult. Again, can you
> provide any details for how you might use this console output in a real
> world application?
>
> As an aside, for simple standalone Spark applications that will only ever
> run on a single host (no cluster) you could consider using an alternative
> library called *fluent-r*. This library is also available under my GitHub
> repo, see here . The fluent-r
> library already has support for the retrieval of R objects, R console
> output and R graphics device image/plots. However it is not as lightweight
> as ROSE and it not designed to work in a clustered environment. ROSE on the
> other hand is designed for scale.
>
> David
>
> "All that is gold does not glitter, Not all those who wander are lost."
>
>
>  Original Message 
> Subject: Re: ROSE: Spark + R on the JVM.
> Local Time: January 12 2016 6:56 pm
> UTC Time: January 12 2016 11:56 pm
> From: rsiebel...@gmail.com
> To: m...@vijaykiran.com
> CC: cjno...@gmail.com,themarchoffo...@protonmail.com,user@spark.apache.org
> ,d...@spark.apache.org
>
> Hi,
>
> this looks great and seems to be very usable.
> Would it be possible to access the session API from within ROSE, to get
> for example the images that are generated by R / openCPU and the logging to
> stdout that is logged by R?
>
> thanks in advance,
> Richard
>
> On Tue, Jan 12, 2016 at 10:16 PM, Vijay Kiran  wrote:
>
>> I think it would be this:
>> https://github.com/onetapbeyond/opencpu-spark-executor
>>
>> > On 12 Jan 2016, at 18:32, Corey Nolet  wrote:
>> >
>> > David,
>> >
>> > Thank you very much for announcing this! It looks like it could be very
>> useful. Would you mind providing a link to the github?
>> >
>> > On Tue, Jan 12, 2016 at 10:03 AM, David 
>> wrote:
>> > Hi all,
>> >
>> > I'd like to share news of the recent release of a new Spark package,
>> ROSE.
>> >
>> > ROSE is a Scala library offering access to the full scientific
>> computing power of the R programming language to Apache Spark batch and
>> streaming applications on the JVM. Where Apache SparkR lets data scientists
>> use Spark from R, ROSE is designed to let Scala and Java developers use R
>> from Spark.
>> >
>> > The project is available and documented on GitHub and I would encourage
>> you to take a look. Any feedback, questions etc very welcome.
>> >
>> > David
>> >
>> > "All that is gold does not glitter, Not all those who wander are lost."
>> >
>>
>>
>> 

RE: spark job failure - akka error Association with remote system has failed

2016-01-13 Thread vivek.meghanathan
Identified the problem - the Cassandra seed ip we use was down!

From: Vivek Meghanathan (WT01 - NEP)
Sent: 13 January 2016 13:06
To: 'user@spark.apache.org' 
Subject: RE: spark job failure - akka error Association with remote system has 
failed

I have used master_ip as ip address and spark conf also has Ip address . But 
the following logs shows hostname. (The spark Ui shows master details in IP)


16/01/13 12:31:38 WARN ReliableDeliverySupervisor: Association with remote 
system [akka.tcp://sparkDriver@masternode1:36537] has failed, address is now 
gated for [5000] ms. Reason is: [Disassociated].

From: Vivek Meghanathan (WT01 - NEP)
Sent: 13 January 2016 12:18
To: user@spark.apache.org
Subject: spark job failure - akka error Association with remote system has 
failed

Hi All,
I am running spark 1.3.0 standalone cluster mode, we have rebooted the cluster 
servers (system reboot). After that the spark jobs are failing by showing 
following error (it fails within 7-8 seconds). 2 of the jobs are running fine. 
All the jobs used to be stable before the system reboot. We have not enabled 
any default configurations in the conf file other than spark-env.sh, slaves and 
log4j.properties.

Warning in the master log:

16/01/13 11:58:16 WARN ReliableDeliverySupervisor: Association with remote 
system [akka.tcp://sparkDriver@masternode1:41419] has failed, address is now 
gated for [5000] ms. Reason is: [Disassociated].

Regards,
Vivek M
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com


Re: Concurrent Read of Accumulator's Value

2016-01-13 Thread Ted Yu
One option is to use a NoSQL data store, such as hbase, for the two actions
to exchange status information.
Write to data store in action 1 and read from action 2.

Cheers

On Wed, Jan 13, 2016 at 2:20 AM, Kira  wrote:

> Hi,
>
> So i have an action on one RDD that is relatively long, let's call it ac1;
> what i want to do is to execute another action (ac2) on the same RDD to see
> the evolution of the first one (ac1); for this end i want to use an
> accumulator and read it's value progressively to see the changes on it (on
> the fly) while ac1 is always running. My problem is that the accumulator is
> only updated once the ac1 has been finished, this is not helpful for me :/
> .
>
> I ve seen  here
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/Asynchronous-Broadcast-from-driver-to-workers-is-it-possible-td15758.html
> >
> what may seem like a solution for me but it doesn t work : "While Spark
> already offers support for asynchronous reduce (collect data from workers,
> while not interrupting execution of a parallel transformation) through
> accumulator"
>
> Another post suggested to use SparkListner to do that.
>
> are these solutions correct ? if yes, give me a simple exemple ?
> are there other solutions ?
>
> thank you.
> Regards
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Read-of-Accumulator-s-Value-tp25957.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Merging compatible schemas on Spark 1.6.0

2016-01-13 Thread emlyn
I have a series of directories on S3 with parquet data, all with compatible
(but not identical) schemas. We verify that the schemas stay compatible when
they evolve using
org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility. On Spark
1.5, I could read these into a DataFrame with sqlCtx.read().parquet(path1,
path2), and Spark would take care of merging the compatible schemas.
I have just been trying to run on Spark 1.6, and that is now giving an
error, saying:

java.lang.AssertionError: assertion failed: Conflicting directory structures
detected. Suspicious paths:
s3n://bucket/data/app1/version1/event1
s3n://bucket/data/app2/version1/event1
If provided paths are partition directories, please set "basePath" in the
options of the data source to specify the root directory of the table. If
there are multiple root directories, please load them separately and then
union them.

Under these paths I have partitioned data, like
s3n://bucket/data/appN/versionN/eventN/dat_received=-MM-DD/fingerprint=/part-r--.lzo.parquet
If I load both paths into separate DataFrames and then try to union them, as
suggested in the error message, that fails with:

org.apache.spark.sql.AnalysisException: unresolved operator 'Union;
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:203)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
at
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
at org.apache.spark.sql.DataFrame.unionAll(DataFrame.scala:1052)

How can I combine these data sets in Spark 1.6? Is there are way to union
DataFrames with different but compatible schemas?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Merging-compatible-schemas-on-Spark-1-6-0-tp25958.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Kafka Streaming and partitioning

2016-01-13 Thread ddav
Hi, 

I have the following use case:

1. Reference data stored in an RDD that is persisted and partitioned using a
simple custom partitioner. 
2. Input stream from kafka that uses the same partitioner algorithm as the
ref data RDD - this partitioning is done in kafka. 

I am using kafka direct streams so the number of kafka partitions map to the
number of partitions in the spark RDD. From testing and the documentation I
see Spark does not know anything about how the data has been partitioned in
kafka.

In my use case I need to join the reference data RDD and the input stream
RDD.  Due to the fact I have manually ensured the incoming data from kafka
uses the same partitioning algorithm I know the data has been grouped
correctly in the input stream RDD in Spark but I cannot do a join without a
shuffle step due to the fact Spark has no knowledge of how the data has been
partitioned.

I have two ways to do this. 
1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD
followed by a join. This results in a shuffle of the input stream RDD and
then the co-partitioned join to take place.
2. Call join on the reference data RDD passing in the input stream RDD.
Spark will do a shuffle under the hood in this case and the join will take
place. The join will do its best to run on a node that has local access to
the reference data RDD. 

Is there any difference between the 2 methods above or will both cause the
same sequence of events to take place in Spark?
Is all I have stated above correct?

Finally, is there any road map feature for looking at allowing the user to
push a partitioner into an already created RDD and not to do a shuffle.
Spark in this case trusts that the data is setup correctly (as in the use
case above) and simply fills in the necessary meta data on the RDD
partitions i.e. check the first entry in each partition to determine the
partition number of the data. 

Thank you in advance for any help on this issue. 
Dave.  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [KafkaRDD]: rdd.cache() does not seem to work

2016-01-13 Thread Tathagata Das
Can you just simplify the code and run a few counts to see if the cache is
being used (later jobs are faster or not). In addition, use the Spark UI to
see whether it is cached, see the DAG viz of the job to see whethr it is
using the cached RDD or not (DAG will show a green vertex if RDD is
cached).

TD

On Tue, Jan 12, 2016 at 12:46 AM, Понькин Алексей 
wrote:

> Hi Charles,
>
> I have created very simplified job -
> https://github.com/ponkin/KafkaSnapshot to illustrate the problem.
> https://github.com/ponkin/KafkaSnapshot/blob/master/src/main/scala/ru/ponkin/KafkaSnapshot.scala
>
> In a short - may be persist method is working but not like I expected.
> I thought that spark will fetch all data from kafka topic once and cache
> it in memory, instead add is calculating every time I call saveAsObjectFile
> method
>
> --
> Яндекс.Почта — надёжная почта
> http://mail.yandex.ru/neo2/collect/?exp=1=1
>
>
> 12.01.2016, 10:56, "charles li" :
> > cache is the default storage level of persist, and it is lazy [ not
> cached indeed ] until the first time it is computed.
> >
> > ​
> >
> > On Tue, Jan 12, 2016 at 5:13 AM, ponkin  wrote:
> >> Hi,
> >>
> >> Here is my use case :
> >> I have kafka topic. The job is fairly simple - it reads topic and save
> data to several hdfs paths.
> >> I create rdd with the following code
> >>  val r =
>  
> KafkaUtils.createRDD[Array[Byte],Array[Byte],DefaultDecoder,DefaultDecoder](context,kafkaParams,range)
> >> Then I am trying to cache that rdd with
> >>  r.cache()
> >> and then save this rdd to several hdfs locations.
> >> But it seems that KafkaRDD is fetching data from kafka broker every
> time I call saveAsNewAPIHadoopFile.
> >>
> >> How can I cache data from Kafka in memory?
> >>
> >> P.S. When I do repartition add it seems to work properly( read kafka
> only once) but spark store shuffled data localy.
> >> Is it possible to keep data in memory?
> >>
> >> 
> >> View this message in context: [KafkaRDD]: rdd.cache() does not seem to
> work
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Concurrent Read of Accumulator's Value

2016-01-13 Thread Kira
Hi,

So i have an action on one RDD that is relatively long, let's call it ac1;
what i want to do is to execute another action (ac2) on the same RDD to see
the evolution of the first one (ac1); for this end i want to use an
accumulator and read it's value progressively to see the changes on it (on
the fly) while ac1 is always running. My problem is that the accumulator is
only updated once the ac1 has been finished, this is not helpful for me :/ .

I ve seen  here

  
what may seem like a solution for me but it doesn t work : "While Spark
already offers support for asynchronous reduce (collect data from workers,
while not interrupting execution of a parallel transformation) through
accumulator"

Another post suggested to use SparkListner to do that.

are these solutions correct ? if yes, give me a simple exemple ?
are there other solutions ?

thank you.
Regards



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Read-of-Accumulator-s-Value-tp25957.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FPGrowth does not handle large result sets

2016-01-13 Thread Sean Owen
As I said in your JIRA, the collect() in question is bringing results
back to the driver to return them. The assumption is that there aren't
a vast number of frequent items. If they are, they aren't 'frequent'
and your min support is too low.

On Wed, Jan 13, 2016 at 12:43 AM, Ritu Raj Tiwari
 wrote:
> Folks:
> We are running into a problem where FPGrowth seems to choke on data sets
> that we think are not too large. We have about 200,000 transactions. Each
> transaction is composed of on an average 50 items. There are about 17,000
> unique item (SKUs) that might show up in any transaction.
>
> When running locally with 12G ram given to the PySpark process, the FPGrowth
> code fails with out of memory error for minSupport of 0.001. The failure
> occurs when we try to enumerate and save the frequent itemsets. Looking at
> the FPGrowth code
> (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala),
> it seems this is because the genFreqItems() method tries to collect() all
> items. Is there a way the code could be rewritten so it does not try to
> collect and therefore store all frequent item sets in memory?
>
> Thanks for any insights.
>
> -Raj

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark ignores SPARK_WORKER_MEMORY?

2016-01-13 Thread Barak Yaish
Hello,

Although I'm setting SPARK_WORKER_MEMORY in spark-env.sh, looks like this
setting is ignored. I can't find any indication at the scripts under
bin/sbin that -Xms/-Xmx are set.

If I ps the worker pid, it looks like memory set to 1G:

[hadoop@sl-env1-hadoop1 spark-1.5.2-bin-hadoop2.6]$ ps -ef | grep 20232
hadoop   20232 1  0 02:01 ?00:00:22 /usr/java/latest//bin/java
-cp
/workspace/3rd-party/spark/spark-1.5.2-bin-hadoop2.6/sbin/../conf/:/workspace/3rd-party/spark/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar:/workspace/3rd-party/spark/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/workspace/3rd-party/spark/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/workspace/3rd-party/spark/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/workspace/3rd-party/hadoop/2.6.3//etc/hadoop/
-Xms1g -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081
spark://10.52.39.92:7077

Am I missing something?

Thanks.


Co-Partitioned Joins

2016-01-13 Thread ddav
Hi,

I am quite new to Spark and have some questions on joins and
co-partitioning. 

Are the following assumptions correct. 

When a join takes place and one of the RDD's has been partitioned, does
Spark make a best effort to execute the join for a specific partition where
the partitioned data resides locally i.e. in memory. 
Does the order at which I write the join for this case make any difference -
example:

rdd1 - hash partitioned 
rdd2 - not partitioned

rdd1.join(rdd2);
rdd2.join(rdd1);

--
If we have two RDD's that have both been partitioned using the same
partitioner and both have been evaluated into memory on the cluster, how
does Spark decide what node to run the join for each partition. Is the
following order correct. 

1. Spark will try to run the join for a partition on a node where both RDD's
partitions are co-located. 
2. If both RDD's have already been evaluated into memory on the cluster but
the partitions are not co-located it will try to run on the node that has
local access to the larger partition of both RDD's. 
3. Continuing on from 2 it will fallback to local access to the smaller RDD
partition. 
4. Run on any node. 

In the above scenario does the order at which I write the join have any
effect on how Spark decides where to run the join. 

rdd1.join(rdd2);
rdd2.join(rdd1);

---
A follow on from above. If there are two partitioned RDD's but only a single
RDD has been evaluated into memory in the cluster, is the following order
correct. 

1. Spark will try to run the join on the node where the RDD partition has
been evaluated into memory. 
2. Run on any node. 

Again does the order I write the join in make any difference. 

rdd1.join(rdd2);
rdd2.join(rdd1);

Thank you in advance for any help on this topic. 
Dave. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Co-Partitioned-Joins-tp25956.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is it possible to use SparkSQL JDBC ThriftServer without Hive

2016-01-13 Thread angela.whelan
hi,
I'm wondering if it is possible to use the SparkSQL JDBC ThriftServer
without Hive?

The reason I'm asking is that we are unsure about the speed of Hive with
SparkSQL JDBC connectivity.

I can't find any article online about using SparkSQL JDBC ThriftServer
without Hive.

Many thanks in advance for any help on this.

Thanks, Angela



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-SparkSQL-JDBC-ThriftServer-without-Hive-tp25959.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Read Accumulator value while running

2016-01-13 Thread Kira
Hi, 

So i have an action on one RDD that is relatively long, let's call it ac1;
what i want to do is to execute another action (ac2) on the same RDD to see
the evolution of the first one (ac1); for this end i want to use an
accumulator and read it's value progressively to see the changes on it (on
the fly) while ac1 is always running. My problem is that the accumulator is
only updated once the ac1 has been finished, this is not helpful for me :/ . 

I ve seen  here

  
what may seem like a solution for me but it doesn t work : "While Spark
already offers support for asynchronous reduce (collect data from workers,
while not interrupting execution of a parallel transformation) through
accumulator" 

Another post suggested to use SparkListner to do that. 

are these solutions correct ? if yes, give me a simple exemple ? 
are there other solutions ? 

thank you. 
Regards



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Read-Accumulator-value-while-running-tp25960.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Cassandra Java Connector: records missing despite consistency=ALL

2016-01-13 Thread Dennis Birkholz

Hi together,

we Cassandra to log event data and process it every 15 minutes with 
Spark. We are using the Cassandra Java Connector for Spark.


Randomly our Spark runs produce too few output records because no data 
is returned from Cassandra for a several minutes window of input data. 
When querying the data (with cqlsh), after multiple tries, the data 
eventually becomes available.


To solve the problem, we tried to use consistency=ALL when reading the 
data in Spark. We use the 
CassandraJavaUtil.javafunctions().cassandraTable() method and have set 
"spark.cassandra.input.consistency.level"="ALL" on the config when 
creating the Spark context. The problem persists but according to 
http://stackoverflow.com/a/25043599 using a consistency level of ONE on 
the write side (which we use) and ALL on the READ side should be 
sufficient for data consistency.


I would really appreciate if someone could give me a hint how to fix 
this problem, thanks!


Greets,
Dennis

P.s.:
some information about our setup:
Cassandra 2.1.12 in a two Node configuration with replication factor=2
Spark 1.5.1
Cassandra Java Driver 2.2.0-rc3
Spark Cassandra Java Connector 2.10-1.5.0-M2

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ROSE: Spark + R on the JVM.

2016-01-13 Thread David Russell
Hi Richard,

Thanks for providing the background on your application.

> the user types or copy-pastes his R code,
> the system should then send this R code (using ROSE) to R

Unfortunately this type of ad hoc R analysis is not supported. ROSE supports 
the execution of any R function or script within an existing R package on CRAN, 
Bioc, or github. It does not support the direct execution of arbitrary blocks 
of R code as you described.

You may want to look at [DeployR](http://deployr.revolutionanalytics.com/), 
it's an open source R integration server that provides APIs in Java, JavaScript 
and .NET that can easily support your use case. The outputs of your DeployR 
integration could then become inputs to your data processing system.

David

"All that is gold does not glitter, Not all those who wander are lost."



 Original Message 
Subject: Re: ROSE: Spark + R on the JVM.
Local Time: January 13 2016 3:18 am
UTC Time: January 13 2016 8:18 am
From: rsiebel...@gmail.com
To: themarchoffo...@protonmail.com
CC: 
m...@vijaykiran.com,cjno...@gmail.com,user@spark.apache.org,d...@spark.apache.org


Hi David,

the use case is that we're building a data processing system with an intuitive 
user interface where Spark is used as the data processing framework.
We would like to provide a HTML user interface to R where the user types or 
copy-pastes his R code, the system should then send this R code (using ROSE) to 
R, process it and give the results back to the user. The RDD would be used so 
that the data can be further processed by the system but we would like to also 
show or be able to show the messages printed to STDOUT and also the images 
(plots) that are generated by R. The plots seems to be available in the OpenCPU 
API, see below

Inline image 1

So the case is not that we're trying to process millions of images but rather 
that we would like to show the generated plots (like a regression plot) that's 
generated in R to the user. There could be several plots generated by the code, 
but certainly not thousands or even hundreds, only a few.

Hope that this would be possible using ROSE because it seems a really good fit,
thanks in advance,
Richard



On Wed, Jan 13, 2016 at 3:39 AM, David Russell  
wrote:

Hi Richard,


> Would it be possible to access the session API from within ROSE,
> to get for example the images that are generated by R / openCPU

Technically it would be possible although there would be some potentially 
significant runtime costs per task in doing so, primarily those related to 
extracting image data from the R session, serializing and then moving that data 
across the cluster for each and every image.

From a design perspective ROSE was intended to be used within Spark scale 
applications where R object data was seen as the primary task output. An output 
in a format that could be rapidly serialized and easily processed. Are there 
real world use cases where Spark scale applications capable of generating 10k, 
100k, or even millions of image files would actually need to capture and store 
images? If so, how practically speaking, would these images ever be used? I'm 
just not sure. Maybe you could describe your own use case to provide some 
insights?


> and the logging to stdout that is logged by R?

If you are referring to the R console output (generated within the R session 
during the execution of an OCPUTask) then this data could certainly 
(optionally) be captured and returned on an OCPUResult. Again, can you provide 
any details for how you might use this console output in a real world 
application?

As an aside, for simple standalone Spark applications that will only ever run 
on a single host (no cluster) you could consider using an alternative library 
called fluent-r. This library is also available under my GitHub repo, [see 
here](https://github.com/onetapbeyond/fluent-r). The fluent-r library already 
has support for the retrieval of R objects, R console output and R graphics 
device image/plots. However it is not as lightweight as ROSE and it not 
designed to work in a clustered environment. ROSE on the other hand is designed 
for scale.


David

"All that is gold does not glitter, Not all those who wander are lost."




 Original Message 
Subject: Re: ROSE: Spark + R on the JVM.


Local Time: January 12 2016 6:56 pm
UTC Time: January 12 2016 11:56 pm
From: rsiebel...@gmail.com
To: m...@vijaykiran.com
CC: 
cjno...@gmail.com,themarchoffo...@protonmail.com,user@spark.apache.org,d...@spark.apache.org



Hi,

this looks great and seems to be very usable.
Would it be possible to access the session API from within ROSE, to get for 
example the images that are generated by R / openCPU and the logging to stdout 
that is logged by R?

thanks in advance,
Richard



On Tue, Jan 12, 2016 at 10:16 PM, Vijay Kiran  wrote:

I think it would be this: https://github.com/onetapbeyond/opencpu-spark-executor


Error in Spark Executors when trying to read HBase table from Spark with Kerberos enabled

2016-01-13 Thread Vinay Kashyap
Hi all,

I am using  *Spark 1.5.1 in YARN cluster mode in CDH 5.5.*
I am trying to create an RDD by reading HBase table with kerberos enabled.
I am able to launch the spark job to read the HBase table, but I notice
that the executors launched for the job cannot proceed due to an issue with
Kerberos and they are stuck indefinitely.

Below is my code to read a HBase table.


*Configuration configuration = HBaseConfiguration.create();*
*  configuration.set(TableInputFormat.INPUT_TABLE,
frameStorage.getHbaseStorage().getTableId());*
*  String hbaseKerberosUser = "sparkUser";*
*  String hbaseKerberosKeytab = "";*
*  if (!hbaseKerberosUser.trim().isEmpty() &&
!hbaseKerberosKeytab.trim().isEmpty()) {*
*configuration.set("hadoop.security.authentication", "kerberos");*
*configuration.set("hbase.security.authentication", "kerberos");*
*configuration.set("hbase.security.authorization", "true");*
*configuration.set("hbase.rpc.protection", "authentication");*
*configuration.set("hbase.master.kerberos.principal",
"hbase/_HOST@CERT.LOCAL");*
*configuration.set("hbase.regionserver.kerberos.principal",
"hbase/_HOST@CERT.LOCAL");*
*configuration.set("hbase.rest.kerberos.principal",
"hbase/_HOST@CERT.LOCAL");*
*configuration.set("hbase.thrift.kerberos.principal",
"hbase/_HOST@CERT.LOCAL");*
*configuration.set("hbase.master.keytab.file",
hbaseKerberosKeytab);*
*configuration.set("hbase.regionserver.keytab.file",
hbaseKerberosKeytab);*
*configuration.set("hbase.rest.authentication.kerberos.keytab",
hbaseKerberosKeytab);*
*configuration.set("hbase.thrift.keytab.file",
hbaseKerberosKeytab);*
*UserGroupInformation.setConfiguration(configuration);*
*if (UserGroupInformation.isSecurityEnabled()) {*
*  UserGroupInformation ugi = UserGroupInformation*
*  .loginUserFromKeytabAndReturnUGI(hbaseKerberosUser,
hbaseKerberosKeytab);*
*  TokenUtil.obtainAndCacheToken(configuration, ugi);*
*}*
*  }*

*  System.out.println("loading HBase Table RDD ...");*
*  JavaPairRDD hbaseTableRDD =
this.sparkContext.newAPIHadoopRDD(*
*  configuration, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);*
*  JavaRDD tableRDD = getTableRDD(hbaseTableRDD, dataFrameModel);*
*  System.out.println("Count :: " + tableRDD.count());*
Following is the error which I can see in the container logs

*16/01/13 10:01:42 WARN security.UserGroupInformation:
PriviledgedActionException as:sparkUser (auth:SIMPLE)
cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed to
find any Kerberos tgt)]*
*16/01/13 10:01:42 WARN ipc.RpcClient: Exception encountered while
connecting to the server : javax.security.sasl.SaslException: GSS initiate
failed [Caused by GSSException: No valid credentials provided (Mechanism
level: Failed to find any Kerberos tgt)]*
*16/01/13 10:01:42 ERROR ipc.RpcClient: SASL authentication failed. The
most likely cause is missing or invalid credentials. Consider 'kinit'.*
*javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed to
find any Kerberos tgt)]*
* at
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)*
* at
org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)*
* at
org.apache.hadoop.hbase.ipc.RpcClient$Connection.setupSaslConnection(RpcClient.java:770)*
* at
org.apache.hadoop.hbase.ipc.RpcClient$Connection.access$600(RpcClient.java:357)*
* at
org.apache.hadoop.hbase.ipc.RpcClient$Connection$2.run(RpcClient.java:891)*
* at
org.apache.hadoop.hbase.ipc.RpcClient$Connection$2.run(RpcClient.java:888)*
* at java.security.AccessController.doPrivileged(Native Method)*
* at javax.security.auth.Subject.doAs(Subject.java:415)*

Have valid Kerberos Token as can be seen below:

sparkUser@infra:/ebs1/agent$ klist
Ticket cache: FILE:/tmp/krb5cc_1001
Default principal: sparkUser@CERT.LOCAL

Valid startingExpires   Service principal
13/01/2016 12:07  14/01/2016 12:07  krbtgt/CERT.LOCAL@CERT.LOCAL

Also, I confirmed that only reading from HBase is giving this problem.
Because I can read a simple file in HDFS and I am able to create the RDD as
required.
After digging through some contents in the net, found that there is a
ticket in JIRA which is logged which is similar to what I am experiencing
*https://issues.apache.org/jira/browse/SPARK-12279
*

Wanted to know if the issue is the same as I am facing..??
And any workaround for the same so that I can proceed with my requirement
reading from HBase table.??

-- 
*Thanks and regards*
*Vinay Kashyap*


Error connecting to temporary derby metastore used by Spark, when running multiple jobs on the same SparkContext

2016-01-13 Thread Deenar Toraskar
Hi

I am using the spark-jobserver and see the following messages when a lot of
jobs are submitted simultaneously to the same SparkContext. Any ideas as to
what might cause this.

[2016-01-13 13:12:11,753] ERROR com.jolbox.bonecp.BoneCP []
[akka://JobServer/user/context-supervisor/ingest-context] - Failed to
acquire connection to
jdbc:derby:;databaseName=/tmp/spark-0b6014b0-96b8-419e-9f9f-10fa9392d9f4/metastore;create=true.
Sleeping for 7000 ms. Attempts left: 1
java.sql.SQLException: No suitable driver found for
jdbc:derby:;databaseName=/tmp/spark-0b6014b0-96b8-419e-9f9f-10fa9392d9f4/metastore;create=true

Regards
Deenar


Re: How to optimiz and make this code faster using coalesce(1) and mapPartitionIndex

2016-01-13 Thread unk1102
Hi thanks for the reply. Actually I cant share details as it is classified
and pretty complex to understand as it is not general problem I am trying to
solve related to database dynamic sql order execution. I need to use Spark
as my other jobs which dont use coalesce uses spark. My source data is hive
orc table partitions and with Spark it is easy to load orc files in
DataFrame. Initially I have 24 orc files/split and hence 24 partitions but
when I do sourceFrame.toJavaRDD().coalesce(1,true) this is where it stucks
hangs for hours and do nothing I am sure even it is not hitting 2GB limit as
data set size is small I dont understand why it just hangs there. I have
seen same code runs fine when dataset is smaller than regular size over
weekend.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-optimiz-and-make-this-code-faster-using-coalesce-1-and-mapPartitionIndex-tp25947p25966.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Optimized way to multiply two large matrices and save output using Spark and Scala

2016-01-13 Thread Devi P.V
I want to multiply two large matrices (from csv files)using Spark and Scala
and save output.I use the following code

  val rows=file1.coalesce(1,false).map(x=>{
  val line=x.split(delimiter).map(_.toDouble)
  Vectors.sparse(line.length,
line.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))

})

val rmat = new RowMatrix(rows)

val dm=file2.coalesce(1,false).map(x=>{
  val line=x.split(delimiter).map(_.toDouble)
  Vectors.dense(line)
})

val ma = dm.map(_.toArray).take(dm.count.toInt)
val localMat = Matrices.dense( dm.count.toInt,
  dm.take(1)(0).size,

  transpose(ma).flatten)

// Multiply two matrices
val s=rmat.multiply(localMat).rows

s.map(x=>x.toArray.mkString(delimiter)).saveAsTextFile(OutputPath)

  }

  def transpose(m: Array[Array[Double]]): Array[Array[Double]] = {
(for {
  c <- m(0).indices
} yield m.map(_(c)) ).toArray
  }

When I save file it takes more time and output file has very large in
size.what is the optimized way to multiply two large files and save the
output to a text file ?


Re: yarn-client: SparkSubmitDriverBootstrapper not found in yarn client mode (1.6.0)

2016-01-13 Thread Kevin Mellott
Lin - if you add "--verbose" to your original *spark-submit* command, it
will let you know the location in which Spark is running. As Marcelo
pointed out, this will likely indicate version 1.3, which may help you
confirm if this is your problem.

On Wed, Jan 13, 2016 at 12:06 PM, Marcelo Vanzin 
wrote:

> SparkSubmitDriverBootstrapper was removed back in Spark 1.4, so it
> seems you have a mixbag of 1.3 / 1.6 in your path / classpath and
> things are failing because of that.
>
> On Wed, Jan 13, 2016 at 9:31 AM, Lin Zhao  wrote:
> > My job runs fine in yarn cluster mode but I have reason to use client
> mode
> > instead. But I'm hitting this error when submitting:
> >> spark-submit --class com.exabeam.martini.scripts.SparkStreamingTest
> >> --master yarn --deploy-mode client --executor-memory 90G
> --num-executors 3
> >> --executor-cores 14 Martini-assembly-0.1.jar yarn-client
> >
> > Error: Could not find or load main class
> > org.apache.spark.deploy.SparkSubmitDriverBootstrapper
> >
> >
> >  If I replace deploy-mode to cluster the job is submitted successfully.
> Is
> > there a dependency missing from my project? Right now only one I
> included is
> > spark-streaming 1.6.0.
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Kafka Streaming and partitioning

2016-01-13 Thread Cody Koeninger
The idea here is that the custom partitioner shouldn't actually get used
for repartitioning the kafka stream (because that would involve a shuffle,
which is what you're trying to avoid).  You're just assigning a partitioner
because you know how it already is partitioned.


On Wed, Jan 13, 2016 at 11:22 AM, Dave  wrote:

> So for case 1 below
> - subclass or modify the direct stream and kafkardd.  They're private, so
> you'd need to rebuild just the external kafka project, not all of spark
> When the data is read from Kafka it will be partitioned correctly with the
> Custom Partitioner passed in to the new direct stream and kafka RDD
> implementations.
>
> For case 2
> - write a wrapper subclass of rdd that takes a given custom partitioner
> and rdd in the constructor, overrides partitioner, and delegates every
> other method to the wrapped rdd.  This should be possible without
> modification to any existing spark code.  You'd use it something like 
> Am I correct in saying that the data from Kafka will not be read into
> memory in the cluster (kafka server is not located on the Spark Cluster in
> my use case) until the following code is executed
> stream.transform { rdd =>
>   val wrapped = YourWrapper(cp, rdd)
>   wrapped.join(reference)
> }
> In which case it will run through the partitioner of the wrapped RDD when
> it arrives in the cluster for the first time i.e. no shuffle.
>
> Thanks,
> Dave.
>
>
>
> On 13/01/16 17:00, Cody Koeninger wrote:
>
> In the case here of a kafkaRDD, the data doesn't reside on the cluster,
> it's not cached by default.  If you're running kafka on the same nodes as
> spark, then data locality would play a factor, but that should be handled
> by the existing getPreferredLocations method.
>
> On Wed, Jan 13, 2016 at 10:46 AM, Dave  wrote:
>
>> Thanks Cody, appreciate the response.
>>
>> With this pattern the partitioners will now match when the join is
>> executed.
>> However, does the wrapper RDD not need to set the partition meta data on
>> the wrapped RDD in order to allow Spark to know where the data for each
>> partition resides in the cluster.
>>
>> Thanks,
>> Dave.
>>
>>
>> On 13/01/16 16:21, Cody Koeninger wrote:
>>
>> If two rdds have an identical partitioner, joining should not involve a
>> shuffle.
>>
>> You should be able to override the partitioner without calling
>> partitionBy.
>>
>> Two ways I can think of to do this:
>> - subclass or modify the direct stream and kafkardd.  They're private, so
>> you'd need to rebuild just the external kafka project, not all of spark
>>
>> - write a wrapper subclass of rdd that takes a given custom partitioner
>> and rdd in the constructor, overrides partitioner, and delegates every
>> other method to the wrapped rdd.  This should be possible without
>> modification to any existing spark code.  You'd use it something like
>>
>> val cp = YourCustomPartitioner(...)
>> val reference = YourReferenceRDD(cp, ...)
>> val stream = KafkaUtils
>>
>> stream.transform { rdd =>
>>   val wrapped = YourWrapper(cp, rdd)
>>   wrapped.join(reference)
>> }
>>
>>
>> I haven't had reason to do either one of those approaches, so YMMV, but I
>> believe others have
>>
>>
>>
>>
>> On Wed, Jan 13, 2016 at 3:40 AM, ddav < 
>> dave.davo...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have the following use case:
>>>
>>> 1. Reference data stored in an RDD that is persisted and partitioned
>>> using a
>>> simple custom partitioner.
>>> 2. Input stream from kafka that uses the same partitioner algorithm as
>>> the
>>> ref data RDD - this partitioning is done in kafka.
>>>
>>> I am using kafka direct streams so the number of kafka partitions map to
>>> the
>>> number of partitions in the spark RDD. From testing and the
>>> documentation I
>>> see Spark does not know anything about how the data has been partitioned
>>> in
>>> kafka.
>>>
>>> In my use case I need to join the reference data RDD and the input stream
>>> RDD.  Due to the fact I have manually ensured the incoming data from
>>> kafka
>>> uses the same partitioning algorithm I know the data has been grouped
>>> correctly in the input stream RDD in Spark but I cannot do a join
>>> without a
>>> shuffle step due to the fact Spark has no knowledge of how the data has
>>> been
>>> partitioned.
>>>
>>> I have two ways to do this.
>>> 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD
>>> followed by a join. This results in a shuffle of the input stream RDD and
>>> then the co-partitioned join to take place.
>>> 2. Call join on the reference data RDD passing in the input stream RDD.
>>> Spark will do a shuffle under the hood in this case and the join will
>>> take
>>> place. The join will do its best to run on a node that has local access
>>> to
>>> the reference data RDD.
>>>
>>> Is there any difference between the 2 methods above or will both cause
>>> the
>>> same sequence of events to take place in Spark?

Re: automatically unpersist RDDs which are not used for 24 hours?

2016-01-13 Thread Andrew Or
Hi Alex,

Yes, you can set `spark.cleaner.ttl`:
http://spark.apache.org/docs/1.6.0/configuration.html, but I would not
recommend it!

We are actually removing this property in Spark 2.0 because it has caused
problems for many users in the past. In particular, if you accidentally use
a variable that has been automatically cleaned, then you will run into
problems like shuffle fetch failures or broadcast variable not found etc,
which may fail your job.

Alternatively, Spark already automatically cleans up all variables that
have been garbage collected, including RDDs, shuffle dependencies,
broadcast variables and accumulators. This context-based cleaning has been
enabled by default for many versions by now so it should be reliable. The
only caveat is that it may not work super well in a shell environment,
where some variables may never exit the scope.

Please let me know if you have more questions,
-Andrew


2016-01-13 11:36 GMT-08:00 Alexander Pivovarov :

> Is it possible to automatically unpersist RDDs which are not used for 24
> hours?
>


Re: SQL UDF problem (with re to types)

2016-01-13 Thread Ted Yu
Looks like BigDecimal was passed to your call() method.

Can you modify your udf to see if using BigDecimal works ?

Cheers

On Wed, Jan 13, 2016 at 11:58 AM, raghukiran  wrote:

> While registering and using SQL UDFs, I am running into the following
> problem:
>
> UDF registered:
>
> ctx.udf().register("Test", new UDF1() {
> /**
>  *
>  */
> private static final long serialVersionUID =
> -8231917155671435931L;
>
> public String call(Double x) throws Exception {
> return "testing";
> }
> }, DataTypes.StringType);
>
> Usage:
> query = "SELECT Test(82.4)";
> result = sqlCtx.sql(query).first();
> System.out.println(result.toString());
>
> Problem: Class Cast exception thrown
> Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be
> cast
> to java.lang.Double
>
> This problem occurs with Spark v1.5.2 and 1.6.0.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SQL-UDF-problem-with-re-to-types-tp25968.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Read Accumulator value while running

2016-01-13 Thread Andrew Or
Hi Kira,

As you suspected, accumulator values are only updated after the task
completes. We do send accumulator updates from the executors to the driver
on periodic heartbeats, but these only concern internal accumulators, not
the ones created by the user.

In short, I'm afraid there is not currently a way (in Spark 1.6 and before)
to access the accumulator values until after the tasks that updated them
have completed. This will change in Spark 2.0, the next version, however.

Please let me know if you have more questions.
-Andrew

2016-01-13 11:24 GMT-08:00 Daniel Imberman :

> Hi Kira,
>
> I'm having some trouble understanding your question. Could you please give
> a code example?
>
>
>
> From what I think you're asking there are two issues with what you're
> looking to do. (Please keep in mind I could be totally wrong on both of
> these assumptions, but this is what I've been lead to believe)
>
> 1. The contract of an accumulator is that you can't actually read the
> value as the function is performing because the values in the accumulator
> don't actually mean anything until they are reduced. If you were looking
> for progress in a local context, you could do mapPartitions and have a
> local accumulator per partition, but I don't think it's possible to get the
> actual accumulator value in the middle of the map job.
>
> 2. As far as performing ac2 while ac1 is "always running", I'm pretty sure
> that's not possible. The way that lazy valuation works in Spark, the
> transformations have to be done serially. Having it any other way would
> actually be really bad because then you could have ac1 changing the data
> thereby making ac2's output unpredictable.
>
> That being said, with a more specific example it might be possible to help
> figure out a solution that accomplishes what you are trying to do.
>
> On Wed, Jan 13, 2016 at 5:43 AM Kira  wrote:
>
>> Hi,
>>
>> So i have an action on one RDD that is relatively long, let's call it ac1;
>> what i want to do is to execute another action (ac2) on the same RDD to
>> see
>> the evolution of the first one (ac1); for this end i want to use an
>> accumulator and read it's value progressively to see the changes on it (on
>> the fly) while ac1 is always running. My problem is that the accumulator
>> is
>> only updated once the ac1 has been finished, this is not helpful for me
>> :/ .
>>
>> I ve seen  here
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/Asynchronous-Broadcast-from-driver-to-workers-is-it-possible-td15758.html
>> >
>> what may seem like a solution for me but it doesn t work : "While Spark
>> already offers support for asynchronous reduce (collect data from workers,
>> while not interrupting execution of a parallel transformation) through
>> accumulator"
>>
>> Another post suggested to use SparkListner to do that.
>>
>> are these solutions correct ? if yes, give me a simple exemple ?
>> are there other solutions ?
>>
>> thank you.
>> Regards
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Read-Accumulator-value-while-running-tp25960.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: Optimized way to multiply two large matrices and save output using Spark and Scala

2016-01-13 Thread Burak Yavuz
BlockMatrix.multiply is the suggested method of multiplying two large
matrices. Is there a reason that you didn't use BlockMatrices?

You can load the matrices and convert to and from RowMatrix. If it's in
sparse format (i, j, v), then you can also use the CoordinateMatrix to
load, BlockMatrix to multiply, and CoordinateMatrix to save it back again.

Thanks,
Burak

On Wed, Jan 13, 2016 at 8:16 PM, Devi P.V  wrote:

> I want to multiply two large matrices (from csv files)using Spark and
> Scala and save output.I use the following code
>
>   val rows=file1.coalesce(1,false).map(x=>{
>   val line=x.split(delimiter).map(_.toDouble)
>   Vectors.sparse(line.length,
> line.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
>
> })
>
> val rmat = new RowMatrix(rows)
>
> val dm=file2.coalesce(1,false).map(x=>{
>   val line=x.split(delimiter).map(_.toDouble)
>   Vectors.dense(line)
> })
>
> val ma = dm.map(_.toArray).take(dm.count.toInt)
> val localMat = Matrices.dense( dm.count.toInt,
>   dm.take(1)(0).size,
>
>   transpose(ma).flatten)
>
> // Multiply two matrices
> val s=rmat.multiply(localMat).rows
>
> s.map(x=>x.toArray.mkString(delimiter)).saveAsTextFile(OutputPath)
>
>   }
>
>   def transpose(m: Array[Array[Double]]): Array[Array[Double]] = {
> (for {
>   c <- m(0).indices
> } yield m.map(_(c)) ).toArray
>   }
>
> When I save file it takes more time and output file has very large in
> size.what is the optimized way to multiply two large files and save the
> output to a text file ?
>


Sending large objects to specific RDDs

2016-01-13 Thread Daniel Imberman
I'm looking for a way to send structures to pre-determined partitions so that
they can be used by another RDD in a mapPartition.

Essentially I'm given and RDD of SparseVectors and an RDD of inverted
indexes. The inverted index objects are quite large.

My hope is to do a MapPartitions within the RDD of vectors where I can
compare each vector to the inverted index. The issue is that I only NEED one
inverted index object per partition (which would have the same key as the
values within that partition).


val vectors:RDD[(Int, SparseVector)]

val invertedIndexes:RDD[(Int, InvIndex)] =
a.reduceByKey(generateInvertedIndex)
vectors:RDD.mapPartitions{
iter =>
 val invIndex = invertedIndexes(samePartitionKey)
 iter.map(invIndex.calculateSimilarity(_))
 ) 
}

How could I go about setting up the Partition such that the specific data
structure I need will be present for the mapPartition but I won't have the
extra overhead of sending over all values (which would happen if I were to
make a broadcast variable).

One thought I have been having is to store the objects in HDFS but I'm not
sure if that would be a suboptimal solution (It seems like it could slow
down the process a lot)

Another thought I am currently exploring is whether there is some way I can
create a custom Partition or Partitioner that could hold the data structure
(Although that might get too complicated and become problematic)

Any thoughts on how I could attack this issue would be highly appreciated.

thank you for your help!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sending-large-objects-to-specific-RDDs-tp25967.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sending large objects to specific RDDs

2016-01-13 Thread Ted Yu
Another approach is to store the objects in NoSQL store such as HBase.

Looking up object should be very fast.

Cheers

On Wed, Jan 13, 2016 at 11:29 AM, Daniel Imberman  wrote:

> I'm looking for a way to send structures to pre-determined partitions so
> that
> they can be used by another RDD in a mapPartition.
>
> Essentially I'm given and RDD of SparseVectors and an RDD of inverted
> indexes. The inverted index objects are quite large.
>
> My hope is to do a MapPartitions within the RDD of vectors where I can
> compare each vector to the inverted index. The issue is that I only NEED
> one
> inverted index object per partition (which would have the same key as the
> values within that partition).
>
>
> val vectors:RDD[(Int, SparseVector)]
>
> val invertedIndexes:RDD[(Int, InvIndex)] =
> a.reduceByKey(generateInvertedIndex)
> vectors:RDD.mapPartitions{
> iter =>
>  val invIndex = invertedIndexes(samePartitionKey)
>  iter.map(invIndex.calculateSimilarity(_))
>  )
> }
>
> How could I go about setting up the Partition such that the specific data
> structure I need will be present for the mapPartition but I won't have the
> extra overhead of sending over all values (which would happen if I were to
> make a broadcast variable).
>
> One thought I have been having is to store the objects in HDFS but I'm not
> sure if that would be a suboptimal solution (It seems like it could slow
> down the process a lot)
>
> Another thought I am currently exploring is whether there is some way I can
> create a custom Partition or Partitioner that could hold the data structure
> (Although that might get too complicated and become problematic)
>
> Any thoughts on how I could attack this issue would be highly appreciated.
>
> thank you for your help!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Sending-large-objects-to-specific-RDDs-tp25967.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


automatically unpersist RDDs which are not used for 24 hours?

2016-01-13 Thread Alexander Pivovarov
Is it possible to automatically unpersist RDDs which are not used for 24
hours?


Re: Sending large objects to specific RDDs

2016-01-13 Thread Daniel Imberman
Thank you Ted! That sounds like it would probably be the most efficient
(with the least overhead) way of handling this situation.

On Wed, Jan 13, 2016 at 11:36 AM Ted Yu  wrote:

> Another approach is to store the objects in NoSQL store such as HBase.
>
> Looking up object should be very fast.
>
> Cheers
>
> On Wed, Jan 13, 2016 at 11:29 AM, Daniel Imberman <
> daniel.imber...@gmail.com> wrote:
>
>> I'm looking for a way to send structures to pre-determined partitions so
>> that
>> they can be used by another RDD in a mapPartition.
>>
>> Essentially I'm given and RDD of SparseVectors and an RDD of inverted
>> indexes. The inverted index objects are quite large.
>>
>> My hope is to do a MapPartitions within the RDD of vectors where I can
>> compare each vector to the inverted index. The issue is that I only NEED
>> one
>> inverted index object per partition (which would have the same key as the
>> values within that partition).
>>
>>
>> val vectors:RDD[(Int, SparseVector)]
>>
>> val invertedIndexes:RDD[(Int, InvIndex)] =
>> a.reduceByKey(generateInvertedIndex)
>> vectors:RDD.mapPartitions{
>> iter =>
>>  val invIndex = invertedIndexes(samePartitionKey)
>>  iter.map(invIndex.calculateSimilarity(_))
>>  )
>> }
>>
>> How could I go about setting up the Partition such that the specific data
>> structure I need will be present for the mapPartition but I won't have the
>> extra overhead of sending over all values (which would happen if I were to
>> make a broadcast variable).
>>
>> One thought I have been having is to store the objects in HDFS but I'm not
>> sure if that would be a suboptimal solution (It seems like it could slow
>> down the process a lot)
>>
>> Another thought I am currently exploring is whether there is some way I
>> can
>> create a custom Partition or Partitioner that could hold the data
>> structure
>> (Although that might get too complicated and become problematic)
>>
>> Any thoughts on how I could attack this issue would be highly appreciated.
>>
>> thank you for your help!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Sending-large-objects-to-specific-RDDs-tp25967.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to make Dataset api as fast as DataFrame

2016-01-13 Thread Michael Armbrust
The focus of this release was to get the API out there and there's a lot of
low hanging performance optimizations.  That said, there is likely always
going to be some cost of materializing objects.

Another note, anytime your comparing performance its useful to include the
output of explain so we can sanity check the chosen query plan.

On Wed, Jan 13, 2016 at 6:39 AM, Arkadiusz Bicz 
wrote:

> Hi,
>
> I have done some performance tests by repeating execution with
> different number of  executors and memory for YARN  clustered Spark
> (version 1.6.0)  ( cluster contains 6 large size nodes)
>
> I found Dataset joinWith or cogroup from 3 to 5 times slower then
> broadcast join in DataFrame, how to make it at least similar fast ?
>
> Examples of my code :
>
> DataFrame :
> // 500 milion rows
> val r = results.select("tradeId", "tradeVersion", "values").as("r")
> // 100 thousand rows
> val t = trades.select("tradeId", "tradeVersion", "agreement").distinct.as
> ("t")
>
> val j = r.join(broadcast(t), r("tradeId") === t("tradeId") &&
> r("tradeVersion") === t("tradeVersion"))
> val s = j.select(r("tradeId"), t("tradeVersion"), t("agreement"),
> r("values"))
> val g = s.groupBy(t("agreement"))
>
> val maxvec = new MaxVectorAggFunction
> val agg = g.agg(maxvec(r("values")).as("maxvalues"))
> agg.write.parquet("hdfs:.../tmp/somelocation")
>
> DataSet
>
> case class ResultsA(tradeId: String, tradeVersion: String, resultType:
> Int, values: Array[Double])
>
> case class TradesA(tradeId: String, tradeVersion: String, tradeType:
> String, notional: BigDecimal, currency: String,
>   asset: String, trader: String, productCode:
> String, counterParty: String, counterPartyAccronym: String,
>   tradeStatus: String, portfolio: String,
> internalPortfolio: String, ptsBook: String, validFrom: String,
>   validTill: String, tradeDate: String, maturity:
> String, buySellIndicator: String, agreement: String)
>
> case class ResultSmallA(tradeId: String, tradeVersion: String, values:
> Array[Double])
> case class ResultAgreementA(tradeId: String, tradeVersion: String,
> agreement: String, values: Array[Double])
> case class TradeSmallA(tradeId: String, tradeVersion: String, agreement:
> String)
>
> lazy val dsresults = results.as[ResultsA].map(r =>
> ResultSmallA(r.tradeId, r.tradeVersion, r.values)).as("r")
> lazy val dstrades = trades.as[TradesA].map(t => TradeSmallA(t.tradeId,
> t.tradeVersion, t.agreement)).distinct.as("t")
> lazy val j = dsresults.joinWith(dstrades, $"r.tradeId" ===
> $"t.tradeId" && $"r.tradeVersion" === $"t.tradeVersion", "inner")
>
> //1. MapGrouped
>
> val group = j.groupBy { v => v match {
> case (r: ResultSmallA, t: TradeSmallA) => t
>   }
> }
>
> val reduced = group.mapGroups { case (t, iter) => (t.tradeId,
> t.tradeVersion, t.agreement,
>   iter.map { case (r, t) => r.values }.reduce((l, r) => {
> val min = new MinVectorAggFunction(); min.mergeArrays(l, r)
>   }))
> }
>
> //2. Reduce
>
> val group2 = j.groupBy(_._2)
>
> val reduced2 = group2.reduce((i1, i2) => {
>   val r1 = i1._1
>   val r2 = i2._1
>   import r1._
>   val min = new MinVectorAggFunction();
>   (ResultSmallA(tradeId, tradeVersion, min.mergeArrays(values,
> r2.values)), i1._2)
> })
>
> val reduced = reduced2.map { case (t, (r, _)) => (r.tradeId,
> r.tradeVersion, t.agreement, r.values) }
>
>
> //3. Cogroup
>
> val cogrouped1 = dsresults.groupBy(r => (r.tradeId,
> r.tradeVersion)).cogroup(dstrades.groupBy(t => (t.tradeId,
> t.tradeVersion))) {
>   case (key, data1, data2) =>
> if (data2.isEmpty || data1.isEmpty) Iterator()
> else {
>   val t = data2.next()
>   val min = new MinVectorAggFunction()
>   Iterator((t.tradeId, t.tradeVersion, t.agreement,
> data1.map(_.values).reduce(min.mergeArrays)))
> }
> }
>
> // MinVectorAggFunction just merge two array of Double
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Read Accumulator value while running

2016-01-13 Thread Daniel Imberman
Hi Kira,

I'm having some trouble understanding your question. Could you please give
a code example?



>From what I think you're asking there are two issues with what you're
looking to do. (Please keep in mind I could be totally wrong on both of
these assumptions, but this is what I've been lead to believe)

1. The contract of an accumulator is that you can't actually read the value
as the function is performing because the values in the accumulator don't
actually mean anything until they are reduced. If you were looking for
progress in a local context, you could do mapPartitions and have a local
accumulator per partition, but I don't think it's possible to get the
actual accumulator value in the middle of the map job.

2. As far as performing ac2 while ac1 is "always running", I'm pretty sure
that's not possible. The way that lazy valuation works in Spark, the
transformations have to be done serially. Having it any other way would
actually be really bad because then you could have ac1 changing the data
thereby making ac2's output unpredictable.

That being said, with a more specific example it might be possible to help
figure out a solution that accomplishes what you are trying to do.

On Wed, Jan 13, 2016 at 5:43 AM Kira  wrote:

> Hi,
>
> So i have an action on one RDD that is relatively long, let's call it ac1;
> what i want to do is to execute another action (ac2) on the same RDD to see
> the evolution of the first one (ac1); for this end i want to use an
> accumulator and read it's value progressively to see the changes on it (on
> the fly) while ac1 is always running. My problem is that the accumulator is
> only updated once the ac1 has been finished, this is not helpful for me :/
> .
>
> I ve seen  here
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/Asynchronous-Broadcast-from-driver-to-workers-is-it-possible-td15758.html
> >
> what may seem like a solution for me but it doesn t work : "While Spark
> already offers support for asynchronous reduce (collect data from workers,
> while not interrupting execution of a parallel transformation) through
> accumulator"
>
> Another post suggested to use SparkListner to do that.
>
> are these solutions correct ? if yes, give me a simple exemple ?
> are there other solutions ?
>
> thank you.
> Regards
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Read-Accumulator-value-while-running-tp25960.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Best practice for retrieving over 1 million files from S3

2016-01-13 Thread Steve Loughran

use s3a://, especially on hadoop-2.7+. It uses the amazon libs and is faster 
for directory lookups than jets3t

> On 13 Jan 2016, at 11:42, Darin McBeath  wrote:
> 
> I'm looking for some suggestions based on other's experiences.
> 
> I currently have a job that I need to run periodically where I need to read 
> on the order of 1+ million files from an S3 bucket.  It is not the entire 
> bucket (nor does it match a pattern).  Instead, I have a list of random keys 
> that are 'names' for the files in this S3 bucket.  The bucket itself will 
> contain upwards of 60M or more files.
> 
> My current approach has been to get my list of keys, partition on the key, 
> and then map this to an underlying class that uses the most recent AWS SDK to 
> retrieve the file from S3 using this key, which then returns the file.  So, 
> in the end, I have an RDD.  This works, but I really wonder if this 
> is the best way.  I suspect there might be a better/faster way.
> 
> One thing I've been considering is passing all of the keys (using s3n: urls) 
> to sc.textFile or sc.wholeTextFiles(since some of my files can have embedded 
> newlines).  But, I wonder how either of these would behave if I passed 
> literally a million (or more) 'filenames'.
> 
> Before I spend time exploring, I wanted to seek some input.
> 
> Any thoughts would be appreciated.
> 
> Darin.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Long running jobs in CDH

2016-01-13 Thread Jorge Machado
Hi Jan, 

Oozie oder you can check the parameter —supervise option  
http://spark.apache.org/docs/latest/submitting-applications.html 
 

> On 11/01/2016, at 14:23, Jan Holmberg  wrote:
> 
> Hi,
> any preferences how to run constantly running jobs (streaming) in CDH? Oozie? 
> Cmdline? Something else?
> 
> cheers,
> -jan
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



Re: Best practice for retrieving over 1 million files from S3

2016-01-13 Thread Daniel Imberman
I guess my big question would be why do you have so many files? Is there no
possibility that you can merge a lot of those files together before
processing them?

On Wed, Jan 13, 2016 at 11:59 AM Darin McBeath  wrote:

> Thanks for the tip, as I had not seen this before.  That's pretty much
> what I'm doing already.  Was just thinking there might be a better way.
>
> Darin.
> --
> *From:* Daniel Imberman 
> *To:* Darin McBeath ; User 
> *Sent:* Wednesday, January 13, 2016 2:48 PM
> *Subject:* Re: Best practice for retrieving over 1 million files from S3
>
> Hi Darin,
>
> You should read this article. TextFile is very inefficient in S3.
>
> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
>
> Cheers
>
> On Wed, Jan 13, 2016 at 11:43 AM Darin McBeath 
> wrote:
>
> I'm looking for some suggestions based on other's experiences.
>
> I currently have a job that I need to run periodically where I need to
> read on the order of 1+ million files from an S3 bucket.  It is not the
> entire bucket (nor does it match a pattern).  Instead, I have a list of
> random keys that are 'names' for the files in this S3 bucket.  The bucket
> itself will contain upwards of 60M or more files.
>
> My current approach has been to get my list of keys, partition on the key,
> and then map this to an underlying class that uses the most recent AWS SDK
> to retrieve the file from S3 using this key, which then returns the file.
> So, in the end, I have an RDD.  This works, but I really wonder if
> this is the best way.  I suspect there might be a better/faster way.
>
> One thing I've been considering is passing all of the keys (using s3n:
> urls) to sc.textFile or sc.wholeTextFiles(since some of my files can have
> embedded newlines).  But, I wonder how either of these would behave if I
> passed literally a million (or more) 'filenames'.
>
> Before I spend time exploring, I wanted to seek some input.
>
> Any thoughts would be appreciated.
>
> Darin.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>


Re: Best practice for retrieving over 1 million files from S3

2016-01-13 Thread Daniel Imberman
Hi Darin,

You should read this article. TextFile is very inefficient in S3.

http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219

Cheers

On Wed, Jan 13, 2016 at 11:43 AM Darin McBeath 
wrote:

> I'm looking for some suggestions based on other's experiences.
>
> I currently have a job that I need to run periodically where I need to
> read on the order of 1+ million files from an S3 bucket.  It is not the
> entire bucket (nor does it match a pattern).  Instead, I have a list of
> random keys that are 'names' for the files in this S3 bucket.  The bucket
> itself will contain upwards of 60M or more files.
>
> My current approach has been to get my list of keys, partition on the key,
> and then map this to an underlying class that uses the most recent AWS SDK
> to retrieve the file from S3 using this key, which then returns the file.
> So, in the end, I have an RDD.  This works, but I really wonder if
> this is the best way.  I suspect there might be a better/faster way.
>
> One thing I've been considering is passing all of the keys (using s3n:
> urls) to sc.textFile or sc.wholeTextFiles(since some of my files can have
> embedded newlines).  But, I wonder how either of these would behave if I
> passed literally a million (or more) 'filenames'.
>
> Before I spend time exploring, I wanted to seek some input.
>
> Any thoughts would be appreciated.
>
> Darin.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: FPGrowth does not handle large result sets

2016-01-13 Thread Ritu Raj Tiwari
Thanks Sean! I'll start with higher support threshold and work my way down. 

On Wednesday, January 13, 2016 8:57 AM, Sean Owen  
wrote:
 

 You're looking for subsets of items that appear in at least 200 of
200,000 transactions, which could be a whole lot. Keep in mind there
are 25,000 items, sure, but already 625,000,000 possible pairs of
items, and trillions of possible 3-item subsets. This sounds like it's
just far too low. Start with 0.1 and work down. I don't think there's
a general formula since if each transaction just contained 1 item, no
sets would be frequent, and if every transaction has ever item, than
all sets are frequent and that number is indescribably large.

On Wed, Jan 13, 2016 at 4:32 PM, Ritu Raj Tiwari
 wrote:
> Hi Sean:
> Thanks for checking out my question here. Its possible I am making a newbie
> error. Based on my dataset of about 200,000 transactions and a minimum
> support level of 0.001, I am looking for items that appear at least 200
> times. Given that the items in my transactions are drawn from a set of about
> 25,000 (I previously thought 17,000), what would be a rational way to
> determine the (peak) memory needs of my driver node?
>
> -Raj
>
>
> On Wednesday, January 13, 2016 1:18 AM, Sean Owen 
> wrote:
>
>
> As I said in your JIRA, the collect() in question is bringing results
> back to the driver to return them. The assumption is that there aren't
> a vast number of frequent items. If they are, they aren't 'frequent'
> and your min support is too low.
>
> On Wed, Jan 13, 2016 at 12:43 AM, Ritu Raj Tiwari
>  wrote:
>> Folks:
>> We are running into a problem where FPGrowth seems to choke on data sets
>> that we think are not too large. We have about 200,000 transactions. Each
>> transaction is composed of on an average 50 items. There are about 17,000
>> unique item (SKUs) that might show up in any transaction.
>>
>> When running locally with 12G ram given to the PySpark process, the
>> FPGrowth
>> code fails with out of memory error for minSupport of 0.001. The failure
>> occurs when we try to enumerate and save the frequent itemsets. Looking at
>> the FPGrowth code
>>
>> (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala),
>> it seems this is because the genFreqItems() method tries to collect() all
>> items. Is there a way the code could be rewritten so it does not try to
>> collect and therefore store all frequent item sets in memory?
>>
>> Thanks for any insights.
>>
>> -Raj
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



   

Re: How to make Dataset api as fast as DataFrame

2016-01-13 Thread Arkadiusz Bicz
Hi,

Including query plan :
DataFrame :

== Physical Plan ==
SortBasedAggregate(key=[agreement#23],
functions=[(MaxVectorAggFunction(values#3),mode=Final,isDistinct=false)],
output=[agreement#23,maxvalues#27])
+- ConvertToSafe
   +- Sort [agreement#23 ASC], false, 0
  +- TungstenExchange hashpartitioning(agreement#23,48), None
 +- ConvertToUnsafe
+- SortBasedAggregate(key=[agreement#23],
functions=[(MaxVectorAggFunction(values#3),mode=Partial,isDistinct=false)],
output=[agreement#23,values#26])
   +- ConvertToSafe
  +- Sort [agreement#23 ASC], false, 0
 +- Project [agreement#23,values#3]
+- BroadcastHashJoin
[tradeId#0,tradeVersion#1], [tradeId#4,tradeVersion#5], BuildRight
   :- Scan
ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-0-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-1-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-2-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-3-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet]
PushedFilter: [] [tradeId#0,tradeVersion#1,values#3]
   +-
TungstenAggregate(key=[tradeId#4,tradeVersion#5,agreement#23],
functions=[], output=[tradeId#4,tradeVersion#5,agreement#23])
  +- TungstenExchange
hashpartitioning(tradeId#4,tradeVersion#5,agreement#23,48), None
 +-
TungstenAggregate(key=[tradeId#4,tradeVersion#5,agreement#23],
functions=[], output=[tradeId#4,tradeVersion#5,agreement#23])
+- Scan ParquetRelation[hdfs://



//1. MapGrouped
== Physical Plan ==
!MapGroups , class[tradeId[0]: string, tradeVersion[0]:
string, agreement[0]: string], class[_1[0]:
struct,
_2[0]: struct],
class[_1[0]: string, _2[0]: string, _3[0]: string, _4[0]:
array], [tradeId#79,tradeVersion#80,agreement#81],
[_1#88,_2#89,_3#90,_4#91]
+- ConvertToSafe
   +- Sort [tradeId#79 ASC,tradeVersion#80 ASC,agreement#81 ASC], false, 0
  +- TungstenExchange
hashpartitioning(tradeId#79,tradeVersion#80,agreement#81,48), None
 +- ConvertToUnsafe
+- !AppendColumns , class[_1[0]:
struct,
_2[0]: struct],
class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]:
string], [tradeId#79,tradeVersion#80,agreement#81]
   +- Project
[struct(tradeId#38,tradeVersion#39,values#40) AS
_1#73,struct(tradeId#67,tradeVersion#68,agreement#69) AS _2#74]
  +- BroadcastHashJoin [tradeId#38,tradeVersion#39],
[tradeId#67,tradeVersion#68], BuildRight
 :- ConvertToUnsafe
 :  +- !MapPartitions ,
class[tradeId[0]: string, tradeVersion[0]: string, resultType[0]: int,
values[0]: array], class[tradeId[0]: string, tradeVersion[0]:
string, values[0]: array],
[tradeId#38,tradeVersion#39,values#40]
 : +- ConvertToSafe
 :+- Scan
ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-0-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-1-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-2-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-3-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet]
PushedFilter: [] [tradeId#0,tradeVersion#1,resultType#2,values#3]
 +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
+- TungstenExchange
hashpartitioning(tradeId#67,tradeVersion#68,agreement#69,48), None
   +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
  +- !MapPartitions ,
class[tradeId[0]: string, tradeVersion[0]: string, tradeType[0]:
string, notional[0]: decimal(38,18), currency[0]: string, asset[0]:
string, trader[0]: string, productCode[0]: string, counterParty[0]:
string, counterPartyAccronym[0]: string, 

Re: Kafka Streaming and partitioning

2016-01-13 Thread David D
Yep that's exactly what we want. Thanks for all the info Cody.
Dave.
On 13 Jan 2016 18:29, "Cody Koeninger"  wrote:

> The idea here is that the custom partitioner shouldn't actually get used
> for repartitioning the kafka stream (because that would involve a shuffle,
> which is what you're trying to avoid).  You're just assigning a partitioner
> because you know how it already is partitioned.
>
>
> On Wed, Jan 13, 2016 at 11:22 AM, Dave  wrote:
>
>> So for case 1 below
>> - subclass or modify the direct stream and kafkardd.  They're private, so
>> you'd need to rebuild just the external kafka project, not all of spark
>> When the data is read from Kafka it will be partitioned correctly with
>> the Custom Partitioner passed in to the new direct stream and kafka RDD
>> implementations.
>>
>> For case 2
>> - write a wrapper subclass of rdd that takes a given custom partitioner
>> and rdd in the constructor, overrides partitioner, and delegates every
>> other method to the wrapped rdd.  This should be possible without
>> modification to any existing spark code.  You'd use it something like 
>> Am I correct in saying that the data from Kafka will not be read into
>> memory in the cluster (kafka server is not located on the Spark Cluster in
>> my use case) until the following code is executed
>> stream.transform { rdd =>
>>   val wrapped = YourWrapper(cp, rdd)
>>   wrapped.join(reference)
>> }
>> In which case it will run through the partitioner of the wrapped RDD when
>> it arrives in the cluster for the first time i.e. no shuffle.
>>
>> Thanks,
>> Dave.
>>
>>
>>
>> On 13/01/16 17:00, Cody Koeninger wrote:
>>
>> In the case here of a kafkaRDD, the data doesn't reside on the cluster,
>> it's not cached by default.  If you're running kafka on the same nodes as
>> spark, then data locality would play a factor, but that should be handled
>> by the existing getPreferredLocations method.
>>
>> On Wed, Jan 13, 2016 at 10:46 AM, Dave  wrote:
>>
>>> Thanks Cody, appreciate the response.
>>>
>>> With this pattern the partitioners will now match when the join is
>>> executed.
>>> However, does the wrapper RDD not need to set the partition meta data on
>>> the wrapped RDD in order to allow Spark to know where the data for each
>>> partition resides in the cluster.
>>>
>>> Thanks,
>>> Dave.
>>>
>>>
>>> On 13/01/16 16:21, Cody Koeninger wrote:
>>>
>>> If two rdds have an identical partitioner, joining should not involve a
>>> shuffle.
>>>
>>> You should be able to override the partitioner without calling
>>> partitionBy.
>>>
>>> Two ways I can think of to do this:
>>> - subclass or modify the direct stream and kafkardd.  They're private,
>>> so you'd need to rebuild just the external kafka project, not all of spark
>>>
>>> - write a wrapper subclass of rdd that takes a given custom partitioner
>>> and rdd in the constructor, overrides partitioner, and delegates every
>>> other method to the wrapped rdd.  This should be possible without
>>> modification to any existing spark code.  You'd use it something like
>>>
>>> val cp = YourCustomPartitioner(...)
>>> val reference = YourReferenceRDD(cp, ...)
>>> val stream = KafkaUtils
>>>
>>> stream.transform { rdd =>
>>>   val wrapped = YourWrapper(cp, rdd)
>>>   wrapped.join(reference)
>>> }
>>>
>>>
>>> I haven't had reason to do either one of those approaches, so YMMV, but
>>> I believe others have
>>>
>>>
>>>
>>>
>>> On Wed, Jan 13, 2016 at 3:40 AM, ddav < 
>>> dave.davo...@gmail.com> wrote:
>>>
 Hi,

 I have the following use case:

 1. Reference data stored in an RDD that is persisted and partitioned
 using a
 simple custom partitioner.
 2. Input stream from kafka that uses the same partitioner algorithm as
 the
 ref data RDD - this partitioning is done in kafka.

 I am using kafka direct streams so the number of kafka partitions map
 to the
 number of partitions in the spark RDD. From testing and the
 documentation I
 see Spark does not know anything about how the data has been
 partitioned in
 kafka.

 In my use case I need to join the reference data RDD and the input
 stream
 RDD.  Due to the fact I have manually ensured the incoming data from
 kafka
 uses the same partitioning algorithm I know the data has been grouped
 correctly in the input stream RDD in Spark but I cannot do a join
 without a
 shuffle step due to the fact Spark has no knowledge of how the data has
 been
 partitioned.

 I have two ways to do this.
 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD
 followed by a join. This results in a shuffle of the input stream RDD
 and
 then the co-partitioned join to take place.
 2. Call join on the reference data RDD passing in the input stream RDD.
 Spark will do a shuffle under the hood in this case 

Re: Serializing DataSets

2016-01-13 Thread Michael Armbrust
Yeah, thats the best way for now (note the conversion is purely logical so
there is no cost of calling toDF()).  We'll likely be combining the classes
in Spark 2.0 to remove this awkwardness.

On Tue, Jan 12, 2016 at 11:20 PM, Simon Hafner 
wrote:

> What's the proper way to write DataSets to disk? Convert them to a
> DataFrame and use the writers there?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Best practice for retrieving over 1 million files from S3

2016-01-13 Thread Darin McBeath
I'm looking for some suggestions based on other's experiences.

I currently have a job that I need to run periodically where I need to read on 
the order of 1+ million files from an S3 bucket.  It is not the entire bucket 
(nor does it match a pattern).  Instead, I have a list of random keys that are 
'names' for the files in this S3 bucket.  The bucket itself will contain 
upwards of 60M or more files.

My current approach has been to get my list of keys, partition on the key, and 
then map this to an underlying class that uses the most recent AWS SDK to 
retrieve the file from S3 using this key, which then returns the file.  So, in 
the end, I have an RDD.  This works, but I really wonder if this is the 
best way.  I suspect there might be a better/faster way.

One thing I've been considering is passing all of the keys (using s3n: urls) to 
sc.textFile or sc.wholeTextFiles(since some of my files can have embedded 
newlines).  But, I wonder how either of these would behave if I passed 
literally a million (or more) 'filenames'.

Before I spend time exploring, I wanted to seek some input.

Any thoughts would be appreciated.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SQL UDF problem (with re to types)

2016-01-13 Thread raghukiran
While registering and using SQL UDFs, I am running into the following
problem:

UDF registered:

ctx.udf().register("Test", new UDF1() {
/**
 * 
 */
private static final long serialVersionUID = 
-8231917155671435931L;

public String call(Double x) throws Exception {
return "testing";
}
}, DataTypes.StringType);

Usage:
query = "SELECT Test(82.4)";
result = sqlCtx.sql(query).first();
System.out.println(result.toString());

Problem: Class Cast exception thrown
Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be cast
to java.lang.Double

This problem occurs with Spark v1.5.2 and 1.6.0.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SQL-UDF-problem-with-re-to-types-tp25968.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Usage of SparkContext within a Web container

2016-01-13 Thread praveen S
Is use of SparkContext from a Web container a right way to process spark
jobs or should we use spark-submit in a processbuilder?

Are there any pros or cons of using SparkContext from a Web container..?

How does zeppelin trigger spark jobs from the Web context?


[discuss] dropping Hadoop 2.2 and 2.3 support in Spark 2.0?

2016-01-13 Thread Reynold Xin
We've dropped Hadoop 1.x support in Spark 2.0.

There is also a proposal to drop Hadoop 2.2 and 2.3, i.e. the minimal
Hadoop version we support would be Hadoop 2.4. The main advantage is then
we'd be able to focus our Jenkins resources (and the associated maintenance
of Jenkins) to create builds for Hadoop 2.6/2.7. It is my understanding
that all Hadoop vendors have moved away from 2.2/2.3, but there might be
some users that are on these older versions.

What do you think about this idea?


Re: spark job failure - akka error Association with remote system has failed

2016-01-13 Thread vivek.meghanathan
Mohammed,

As i have mentioned in latest email, it was failing due to a communication 
issue with cassandra. Once i fixed that the issue is no more there.


Regards,
Vivek M



From: Mohammed Guller 
Sent: Thursday, January 14, 2016 4:38 AM
To: Vivek Meghanathan (WT01 - NEP); user@spark.apache.org
Subject: RE: spark job failure - akka error Association with remote system has 
failed


Check the entries in your /etc/hosts file.



Also check what the hostname command returns.



Mohammed



From: vivek.meghanat...@wipro.com [mailto:vivek.meghanat...@wipro.com]
Sent: Tuesday, January 12, 2016 11:36 PM
To: user@spark.apache.org
Subject: RE: spark job failure - akka error Association with remote system has 
failed



I have used master_ip as ip address and spark conf also has Ip address . But 
the following logs shows hostname. (The spark Ui shows master details in IP)





16/01/13 12:31:38 WARN ReliableDeliverySupervisor: Association with remote 
system [akka.tcp://sparkDriver@masternode1:36537] has failed, address is now 
gated for [5000] ms. Reason is: [Disassociated].



From: Vivek Meghanathan (WT01 - NEP)
Sent: 13 January 2016 12:18
To: user@spark.apache.org
Subject: spark job failure - akka error Association with remote system has 
failed



Hi All,

I am running spark 1.3.0 standalone cluster mode, we have rebooted the cluster 
servers (system reboot). After that the spark jobs are failing by showing 
following error (it fails within 7-8 seconds). 2 of the jobs are running fine. 
All the jobs used to be stable before the system reboot. We have not enabled 
any default configurations in the conf file other than spark-env.sh, slaves and 
log4j.properties.



Warning in the master log:



16/01/13 11:58:16 WARN ReliableDeliverySupervisor: Association with remote 
system [akka.tcp://sparkDriver@masternode1:41419] has failed, address is now 
gated for [5000] ms. Reason is: [Disassociated].



Regards,
Vivek M

The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com

The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com


答复: 答复: spark streaming context trigger invoke stop why?

2016-01-13 Thread Triones,Deng(vip.com)
What’s more, I am running a 7*24 hours job , so I won’t call System.exit() by 
myself. So I believe somewhere of the driver kill itself

发件人: 邓刚[技术中心]
发送时间: 2016年1月14日 15:45
收件人: 'Yogesh Mahajan'
抄送: user
主题: 答复: 答复: spark streaming context trigger invoke stop why?

Thanks for your response, ApplicationMaster is only for yarn mode. I am using 
standalone mode. Could you kindly please let me know where trigger the shutdown 
hook?

发件人: Yogesh Mahajan [mailto:ymaha...@snappydata.io]
发送时间: 2016年1月14日 12:42
收件人: 邓刚[技术中心]
抄送: user
主题: Re: 答复: spark streaming context trigger invoke stop why?

All the action happens in ApplicationMaster expecially in run method
Check ApplicationMaster#startUserApplication : userThread(Driver) which invokes 
ApplicationMaster#finish method. You can also try System.exit in your program

Regards,
Yogesh Mahajan,
SnappyData Inc, snappydata.io

On Thu, Jan 14, 2016 at 9:56 AM, Yogesh Mahajan 
> wrote:
Hi Triones,

Check the org.apache.spark.util.ShutdownHookManager : It adds this ShutDownHook 
when you start a StreamingContext

Here is the code in StreamingContext.start()

shutdownHookRef = ShutdownHookManager.addShutdownHook(
  StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)

Also looke at the following def in StreamingContext which actually stops the 
context from shutdown hook :
private def stopOnShutdown(): Unit = {
val stopGracefully = 
conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
}

Regards,
Yogesh Mahajan,
SnappyData Inc, snappydata.io

On Thu, Jan 14, 2016 at 8:55 AM, Triones,Deng(vip.com) 
> wrote:
More info

I am using spark version 1.5.2


发件人: Triones,Deng(vip.com) 
[mailto:triones.d...@vipshop.com]
发送时间: 2016年1月14日 11:24
收件人: user
主题: spark streaming context trigger invoke stop why?

Hi all
 As I saw the driver log, the task failed 4 times in a stage, the stage 
will be dropped when the input block was deleted before make use of. After that 
the StreamingContext invoke stop.  Does anyone know what kind of akka message 
trigger the stop or which code trigger the shutdown hook?


Thanks




Driver log:

 Job aborted due to stage failure: Task 410 in stage 215.0 failed 4 times
[org.apache.spark.streaming.StreamingContext---Thread-0]: Invoking 
stop(stopGracefully=false) from shutdown hook
本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without making or otherwise retaining a copy.
本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without making or otherwise retaining a copy.


本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments 

答复: 答复: spark streaming context trigger invoke stop why?

2016-01-13 Thread Triones,Deng(vip.com)
Thanks for your response, ApplicationMaster is only for yarn mode. I am using 
standalone mode. Could you kindly please let me know where trigger the shutdown 
hook?

发件人: Yogesh Mahajan [mailto:ymaha...@snappydata.io]
发送时间: 2016年1月14日 12:42
收件人: 邓刚[技术中心]
抄送: user
主题: Re: 答复: spark streaming context trigger invoke stop why?

All the action happens in ApplicationMaster expecially in run method
Check ApplicationMaster#startUserApplication : userThread(Driver) which invokes 
ApplicationMaster#finish method. You can also try System.exit in your program

Regards,
Yogesh Mahajan,
SnappyData Inc, snappydata.io

On Thu, Jan 14, 2016 at 9:56 AM, Yogesh Mahajan 
> wrote:
Hi Triones,

Check the org.apache.spark.util.ShutdownHookManager : It adds this ShutDownHook 
when you start a StreamingContext

Here is the code in StreamingContext.start()

shutdownHookRef = ShutdownHookManager.addShutdownHook(
  StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)

Also looke at the following def in StreamingContext which actually stops the 
context from shutdown hook :
private def stopOnShutdown(): Unit = {
val stopGracefully = 
conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
}

Regards,
Yogesh Mahajan,
SnappyData Inc, snappydata.io

On Thu, Jan 14, 2016 at 8:55 AM, Triones,Deng(vip.com) 
> wrote:
More info

I am using spark version 1.5.2


发件人: Triones,Deng(vip.com) 
[mailto:triones.d...@vipshop.com]
发送时间: 2016年1月14日 11:24
收件人: user
主题: spark streaming context trigger invoke stop why?

Hi all
 As I saw the driver log, the task failed 4 times in a stage, the stage 
will be dropped when the input block was deleted before make use of. After that 
the StreamingContext invoke stop.  Does anyone know what kind of akka message 
trigger the stop or which code trigger the shutdown hook?


Thanks




Driver log:

 Job aborted due to stage failure: Task 410 in stage 215.0 failed 4 times
[org.apache.spark.streaming.StreamingContext---Thread-0]: Invoking 
stop(stopGracefully=false) from shutdown hook
本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without making or otherwise retaining a copy.
本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without making or otherwise retaining a copy.


本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without 

Re: SparkContext SyntaxError: invalid syntax

2016-01-13 Thread Bryan Cutler
Hi Andrew,

There are a couple of things to check.  First, is Python 2.7 the default
version on all nodes in the cluster or is it an alternate install? Meaning
what is the output of this command "$>  python --version"  If it is an
alternate install, you could set the environment variable "PYSPARK_PYTHON"
Python binary executable to use for PySpark in both driver and workers
(default is python).

Did you try to submit the Python example under client mode?  Otherwise, the
command looks fine, you don't use the --class option for submitting python
files
* ./bin/spark-submit  --master yarn --deploy-mode client
--driver-memory 4g --executor-memory 2g --executor-cores 1
./examples/src/main/python/pi.py 10*

That is a good sign that local jobs and Java examples work, probably just a
small configuration issue :)

Bryan

On Wed, Jan 13, 2016 at 3:51 PM, Andrew Weiner <
andrewweiner2...@u.northwestern.edu> wrote:

> Thanks for your continuing help.  Here is some additional info.
>
> *OS/architecture*
> output of *cat /proc/version*:
> Linux version 2.6.18-400.1.1.el5 (mockbu...@x86-012.build.bos.redhat.com)
>
> output of *lsb_release -a*:
> LSB Version:
>  
> :core-4.0-amd64:core-4.0-ia32:core-4.0-noarch:graphics-4.0-amd64:graphics-4.0-ia32:graphics-4.0-noarch:printing-4.0-amd64:printing-4.0-ia32:printing-4.0-noarch
> Distributor ID: RedHatEnterpriseServer
> Description:Red Hat Enterprise Linux Server release 5.11 (Tikanga)
> Release:5.11
> Codename:   Tikanga
>
> *Running a local job*
> I have confirmed that I can successfully run python jobs using
> bin/spark-submit --master local[*]
> Specifically, this is the command I am using:
> *./bin/spark-submit --master local[8]
> ./examples/src/main/python/wordcount.py
> file:/home//spark-1.6.0-bin-hadoop2.4/README.md*
> And it works!
>
> *Additional info*
> I am also able to successfully run the Java SparkPi example using yarn in
> cluster mode using this command:
> * ./bin/spark-submit --class org.apache.spark.examples.SparkPi
> --master yarn --deploy-mode cluster --driver-memory 4g
> --executor-memory 2g --executor-cores 1 lib/spark-examples*.jar
> 10*
> This Java job also runs successfully when I change --deploy-mode to
> client.  The fact that I can run Java jobs in cluster mode makes me thing
> that everything is installed correctly--is that a valid assumption?
>
> The problem remains that I cannot submit python jobs.  Here is the command
> that I am using to try to submit python jobs:
> * ./bin/spark-submit  --master yarn --deploy-mode cluster
> --driver-memory 4g --executor-memory 2g --executor-cores 1
> ./examples/src/main/python/pi.py 10*
> Does that look like a correct command?  I wasn't sure what to put for
> --class so I omitted it.  At any rate, the result of the above command is a
> syntax error, similar to the one I posted in the original email:
>
> Traceback (most recent call last):
>   File "pi.py", line 24, in ?
> from pyspark import SparkContext
>   File 
> "/home//spark-1.6.0-bin-hadoop2.4/python/pyspark/__init__.py", line 
> 61
> indent = ' ' * (min(len(m) for m in indents) if indents else 0)
>   ^
> SyntaxError: invalid syntax
>
>
> This really looks to me like a problem with the python version.  Python
> 2.4 would throw this syntax error but Python 2.7 would not.  And yet I am
> using Python 2.7.8.  Is there any chance that Spark or Yarn is somehow
> using an older version of Python without my knowledge?
>
> Finally, when I try to run the same command in client mode...
> * ./bin/spark-submit  --master yarn --deploy-mode client
> --driver-memory 4g --executor-memory 2g --executor-cores 1
> ./examples/src/main/python/pi.py 10*
> I get the error I mentioned in the prior email:
> Error from python worker:
>   python: module pyspark.daemon not found
>
> Any thoughts?
>
> Best,
> Andrew
>
>
> On Mon, Jan 11, 2016 at 12:25 PM, Bryan Cutler  wrote:
>
>> This could be an environment issue, could you give more details about the
>> OS/architecture that you are using?  If you are sure everything is
>> installed correctly on each node following the guide on "Running Spark on
>> Yarn" http://spark.apache.org/docs/latest/running-on-yarn.html and that
>> the spark assembly jar is reachable, then I would check to see if you can
>> submit a local job to just run on one node.
>>
>> On Fri, Jan 8, 2016 at 5:22 PM, Andrew Weiner <
>> andrewweiner2...@u.northwestern.edu> wrote:
>>
>>> Now for simplicity I'm testing with wordcount.py from the provided
>>> examples, and using Spark 1.6.0
>>>
>>> The first error I get is:
>>>
>>> 16/01/08 19:14:46 ERROR lzo.GPLNativeCodeLoader: Could not load native
>>> gpl library
>>> java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path
>>> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1864)
>>> at []
>>>
>>> A bit lower down, I see 

Re: SQL UDF problem (with re to types)

2016-01-13 Thread Ted Yu
Please take a look
at sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java
which shows a UserDefinedAggregateFunction that works on DoubleType column.

sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
shows how it is registered.

Cheers

On Wed, Jan 13, 2016 at 11:58 AM, raghukiran  wrote:

> While registering and using SQL UDFs, I am running into the following
> problem:
>
> UDF registered:
>
> ctx.udf().register("Test", new UDF1() {
> /**
>  *
>  */
> private static final long serialVersionUID =
> -8231917155671435931L;
>
> public String call(Double x) throws Exception {
> return "testing";
> }
> }, DataTypes.StringType);
>
> Usage:
> query = "SELECT Test(82.4)";
> result = sqlCtx.sql(query).first();
> System.out.println(result.toString());
>
> Problem: Class Cast exception thrown
> Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be
> cast
> to java.lang.Double
>
> This problem occurs with Spark v1.5.2 and 1.6.0.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SQL-UDF-problem-with-re-to-types-tp25968.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: Is it possible to use SparkSQL JDBC ThriftServer without Hive

2016-01-13 Thread Mohammed Guller
Hi Angela,
Yes, you can use Spark SQL JDBC/ThriftServer without Hive.

Mohammed


-Original Message-
From: angela.whelan [mailto:angela.whe...@synchronoss.com] 
Sent: Wednesday, January 13, 2016 3:37 AM
To: user@spark.apache.org
Subject: Is it possible to use SparkSQL JDBC ThriftServer without Hive

hi,
I'm wondering if it is possible to use the SparkSQL JDBC ThriftServer without 
Hive?

The reason I'm asking is that we are unsure about the speed of Hive with 
SparkSQL JDBC connectivity.

I can't find any article online about using SparkSQL JDBC ThriftServer without 
Hive.

Many thanks in advance for any help on this.

Thanks, Angela



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-SparkSQL-JDBC-ThriftServer-without-Hive-tp25959.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark ignores SPARK_WORKER_MEMORY?

2016-01-13 Thread Mohammed Guller
Barak,

The SPARK_WORKER_MEMORYsetting is used for allocating memory to executors.

You can use SPARK_DAEMON_MEMORY to set memory for the worker JVM.

Mohammed

From: Barak Yaish [mailto:barak.ya...@gmail.com]
Sent: Wednesday, January 13, 2016 12:59 AM
To: user@spark.apache.org
Subject: Spark ignores SPARK_WORKER_MEMORY?

Hello,

Although I'm setting SPARK_WORKER_MEMORY in spark-env.sh, looks like this 
setting is ignored. I can't find any indication at the scripts under bin/sbin 
that -Xms/-Xmx are set.

If I ps the worker pid, it looks like memory set to 1G:

[hadoop@sl-env1-hadoop1 spark-1.5.2-bin-hadoop2.6]$ ps -ef | grep 20232
hadoop   20232 1  0 02:01 ?00:00:22 /usr/java/latest//bin/java -cp 
/workspace/3rd-party/spark/spark-1.5.2-bin-hadoop2.6/sbin/../conf/:/workspace/3rd-party/spark/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar:/workspace/3rd-party/spark/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/workspace/3rd-party/spark/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/workspace/3rd-party/spark/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/workspace/3rd-party/hadoop/2.6.3//etc/hadoop/
 -Xms1g -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 
spark://10.52.39.92:7077

Am I missing something?

Thanks.


Random Forest FeatureImportance throwing NullPointerException

2016-01-13 Thread Rachana Srivastava
I have a Random forest model for which I am trying to get the featureImportance 
vector.

Map categoricalFeaturesParam = new HashMap<>();
scala.collection.immutable.Map categoricalFeatures =  
(scala.collection.immutable.Map)
scala.collection.immutable.Map$.MODULE$.apply(JavaConversions.mapAsScalaMap(categoricalFeaturesParam).toSeq());
int numberOfClasses =2;
RandomForestClassifier rfc = new RandomForestClassifier();
RandomForestClassificationModel rfm = 
RandomForestClassificationModel.fromOld(model, rfc, categoricalFeatures, 
numberOfClasses);
System.out.println(rfm.featureImportances());

When I run above code I found featureImportance as null.  Do I need to set 
anything in specific to get the feature importance for the random forest model.

Thanks,

Rachana


Re: trouble calculating TF-IDF data type mismatch: '(tf * idf)' requires numeric type, not vector;

2016-01-13 Thread Andy Davidson
you¹ll need the following function if you want to run the test code

Kind regards 

Andy

private DataFrame createData(JavaRDD rdd) {

StructField id = null;

id = new StructField("id", DataTypes.IntegerType, false,
Metadata.empty());



StructField label = null;

label = new StructField("label", DataTypes.DoubleType, false,
Metadata.empty());

   

StructField words = null;

words = new StructField("words",
DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty());



StructType schema = new StructType(new StructField[] { id, label,
words });

DataFrame ret = sqlContext.createDataFrame(rdd, schema);



return ret;

}


From:  Andrew Davidson 
Date:  Wednesday, January 13, 2016 at 2:52 PM
To:  "user @spark" 
Subject:  trouble calculating TF-IDF data type mismatch: '(tf * idf)'
requires numeric type, not vector;

> Bellow is a little snippet of my Java Test Code. Any idea how I implement
> member wise vector multiplication?
> 
> Also notice the idf value for ŒChinese¹ is 0.0? The calculation is ln((4+1) /
> (6/4 + 1)) = ln(2) = 0.6931 ??
> 
> Also any idea if this code would work in a pipe line? I.E. Is the pipeline
> smart about using cache() ?
> 
> Kind regards
> 
> Andy
> 
> transformed df printSchema()
> 
> root
> 
>  |-- id: integer (nullable = false)
> 
>  |-- label: double (nullable = false)
> 
>  |-- words: array (nullable = false)
> 
>  ||-- element: string (containsNull = true)
> 
>  |-- tf: vector (nullable = true)
> 
>  |-- idf: vector (nullable = true)
> 
> 
> 
> +---+-++-+
> ---+
> 
> |id |label|words   |tf   |idf
> |
> 
> +---+-++-+
> ---+
> 
> |0  |0.0  |[Chinese, Beijing, Chinese] |(7,[1,2],[2.0,1.0])
> |(7,[1,2],[0.0,0.9162907318741551]) |
> 
> |1  |0.0  |[Chinese, Chinese, Shanghai]|(7,[1,4],[2.0,1.0])
> |(7,[1,4],[0.0,0.9162907318741551]) |
> 
> |2  |0.0  |[Chinese, Macao]|(7,[1,6],[1.0,1.0])
> |(7,[1,6],[0.0,0.9162907318741551]) |
> 
> |3  |1.0  |[Tokyo, Japan, Chinese]
> |(7,[1,3,5],[1.0,1.0,1.0])|(7,[1,3,5],[0.0,0.9162907318741551,0.91629073187415
> 51])|
> 
> +---+-++-+
> ---+
> 
> 
> @Test
> 
> public void test() {
> 
> DataFrame rawTrainingDF = createTrainingData();
> 
> DataFrame trainingDF = runPipleLineTF_IDF(rawTrainingDF);
> 
> . . .
> 
> }
> 
>private DataFrame runPipleLineTF_IDF(DataFrame rawDF) {
> 
> HashingTF hashingTF = new HashingTF()
> 
> .setInputCol("words")
> 
> .setOutputCol("tf")
> 
> .setNumFeatures(dictionarySize);
> 
> 
> 
> DataFrame termFrequenceDF = hashingTF.transform(rawDF);
> 
> 
> 
> termFrequenceDF.cache(); // idf needs to make 2 passes over data set
> 
> IDFModel idf = new IDF()
> 
> //.setMinDocFreq(1) // our vocabulary has 6 words we
> hash into 7
> 
> .setInputCol(hashingTF.getOutputCol())
> 
> .setOutputCol("idf")
> 
> .fit(termFrequenceDF);
> 
> 
> 
> DataFrame tmp = idf.transform(termFrequenceDF);
> 
> 
> 
> DataFrame ret = tmp.withColumn("features",
> tmp.col("tf").multiply(tmp.col("idf")));
> 
> logger.warn("\ntransformed df printSchema()");
> 
> ret.printSchema();
> 
> ret.show(false);
> 
> 
> 
> return ret;
> 
> }
> 
> 
> 
> org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to
> data type mismatch: '(tf * idf)' requires numeric type, not vector;
> 
> 
> 
> 
> 
> private DataFrame createTrainingData() {
> 
> // make sure we only use dictionarySize words
> 
> JavaRDD rdd = javaSparkContext.parallelize(Arrays.asList(
> 
> // 0 is Chinese
> 
> // 1 in notChinese
> 
> RowFactory.create(0, 0.0, Arrays.asList("Chinese", "Beijing",
> "Chinese")),
> 
> RowFactory.create(1, 0.0, Arrays.asList("Chinese", "Chinese",
> "Shanghai")),
> 
> RowFactory.create(2, 0.0, Arrays.asList("Chinese", "Macao")),
> 
> RowFactory.create(3, 1.0, Arrays.asList("Tokyo", "Japan",
> "Chinese";
> 
>
> 
> return createData(rdd);
> 
> }
> 
> 
> 
> private DataFrame createTestData() {
> 
> JavaRDD rdd = javaSparkContext.parallelize(Arrays.asList(
> 
>   

trouble calculating TF-IDF data type mismatch: '(tf * idf)' requires numeric type, not vector;

2016-01-13 Thread Andy Davidson
Bellow is a little snippet of my Java Test Code. Any idea how I implement
member wise vector multiplication?

Also notice the idf value for ŒChinese¹ is 0.0? The calculation is ln((4+1)
/ (6/4 + 1)) = ln(2) = 0.6931 ??

Also any idea if this code would work in a pipe line? I.E. Is the pipeline
smart about using cache() ?

Kind regards

Andy

transformed df printSchema()

root

 |-- id: integer (nullable = false)

 |-- label: double (nullable = false)

 |-- words: array (nullable = false)

 ||-- element: string (containsNull = true)

 |-- tf: vector (nullable = true)

 |-- idf: vector (nullable = true)



+---+-++-+--
-+

|id |label|words   |tf   |idf
|

+---+-++-+--
-+

|0  |0.0  |[Chinese, Beijing, Chinese] |(7,[1,2],[2.0,1.0])
|(7,[1,2],[0.0,0.9162907318741551]) |

|1  |0.0  |[Chinese, Chinese, Shanghai]|(7,[1,4],[2.0,1.0])
|(7,[1,4],[0.0,0.9162907318741551]) |

|2  |0.0  |[Chinese, Macao]|(7,[1,6],[1.0,1.0])
|(7,[1,6],[0.0,0.9162907318741551]) |

|3  |1.0  |[Tokyo, Japan, Chinese]
|(7,[1,3,5],[1.0,1.0,1.0])|(7,[1,3,5],[0.0,0.9162907318741551,0.916290731874
1551])|

+---+-++-+--
-+


@Test

public void test() {

DataFrame rawTrainingDF = createTrainingData();

DataFrame trainingDF = runPipleLineTF_IDF(rawTrainingDF);

. . .

}

   private DataFrame runPipleLineTF_IDF(DataFrame rawDF) {

HashingTF hashingTF = new HashingTF()

.setInputCol("words")

.setOutputCol("tf")

.setNumFeatures(dictionarySize);



DataFrame termFrequenceDF = hashingTF.transform(rawDF);



termFrequenceDF.cache(); // idf needs to make 2 passes over data set

IDFModel idf = new IDF()

//.setMinDocFreq(1) // our vocabulary has 6 words we
hash into 7

.setInputCol(hashingTF.getOutputCol())

.setOutputCol("idf")

.fit(termFrequenceDF);



DataFrame tmp = idf.transform(termFrequenceDF);



DataFrame ret = tmp.withColumn("features",
tmp.col("tf").multiply(tmp.col("idf")));

logger.warn("\ntransformed df printSchema()");

ret.printSchema();

ret.show(false);



return ret;

}



org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to
data type mismatch: '(tf * idf)' requires numeric type, not vector;





private DataFrame createTrainingData() {

// make sure we only use dictionarySize words

JavaRDD rdd = javaSparkContext.parallelize(Arrays.asList(

// 0 is Chinese

// 1 in notChinese

RowFactory.create(0, 0.0, Arrays.asList("Chinese",
"Beijing", "Chinese")),

RowFactory.create(1, 0.0, Arrays.asList("Chinese",
"Chinese", "Shanghai")),

RowFactory.create(2, 0.0, Arrays.asList("Chinese",
"Macao")),

RowFactory.create(3, 1.0, Arrays.asList("Tokyo", "Japan",
"Chinese";

   

return createData(rdd);

}



private DataFrame createTestData() {

JavaRDD rdd = javaSparkContext.parallelize(Arrays.asList(

// 0 is Chinese

// 1 in notChinese

// "bernoulli" requires label to be IntegerType

RowFactory.create(4, 1.0, Arrays.asList("Chinese",
"Chinese", "Chinese", "Tokyo", "Japan";

return createData(rdd);

}




Re: SparkContext SyntaxError: invalid syntax

2016-01-13 Thread Andrew Weiner
Thanks for your continuing help.  Here is some additional info.

*OS/architecture*
output of *cat /proc/version*:
Linux version 2.6.18-400.1.1.el5 (mockbu...@x86-012.build.bos.redhat.com)

output of *lsb_release -a*:
LSB Version:
 
:core-4.0-amd64:core-4.0-ia32:core-4.0-noarch:graphics-4.0-amd64:graphics-4.0-ia32:graphics-4.0-noarch:printing-4.0-amd64:printing-4.0-ia32:printing-4.0-noarch
Distributor ID: RedHatEnterpriseServer
Description:Red Hat Enterprise Linux Server release 5.11 (Tikanga)
Release:5.11
Codename:   Tikanga

*Running a local job*
I have confirmed that I can successfully run python jobs using
bin/spark-submit --master local[*]
Specifically, this is the command I am using:
*./bin/spark-submit --master local[8]
./examples/src/main/python/wordcount.py
file:/home//spark-1.6.0-bin-hadoop2.4/README.md*
And it works!

*Additional info*
I am also able to successfully run the Java SparkPi example using yarn in
cluster mode using this command:
* ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
yarn --deploy-mode cluster --driver-memory 4g --executor-memory
2g --executor-cores 1 lib/spark-examples*.jar 10*
This Java job also runs successfully when I change --deploy-mode to
client.  The fact that I can run Java jobs in cluster mode makes me thing
that everything is installed correctly--is that a valid assumption?

The problem remains that I cannot submit python jobs.  Here is the command
that I am using to try to submit python jobs:
* ./bin/spark-submit  --master yarn --deploy-mode cluster
--driver-memory 4g --executor-memory 2g --executor-cores 1
./examples/src/main/python/pi.py 10*
Does that look like a correct command?  I wasn't sure what to put for
--class so I omitted it.  At any rate, the result of the above command is a
syntax error, similar to the one I posted in the original email:

Traceback (most recent call last):
  File "pi.py", line 24, in ?
from pyspark import SparkContext
  File "/home//spark-1.6.0-bin-hadoop2.4/python/pyspark/__init__.py",
line 61
indent = ' ' * (min(len(m) for m in indents) if indents else 0)
  ^
SyntaxError: invalid syntax


This really looks to me like a problem with the python version.  Python 2.4
would throw this syntax error but Python 2.7 would not.  And yet I am using
Python 2.7.8.  Is there any chance that Spark or Yarn is somehow using an
older version of Python without my knowledge?

Finally, when I try to run the same command in client mode...
* ./bin/spark-submit  --master yarn --deploy-mode client
--driver-memory 4g --executor-memory 2g --executor-cores 1
./examples/src/main/python/pi.py 10*
I get the error I mentioned in the prior email:
Error from python worker:
  python: module pyspark.daemon not found

Any thoughts?

Best,
Andrew


On Mon, Jan 11, 2016 at 12:25 PM, Bryan Cutler  wrote:

> This could be an environment issue, could you give more details about the
> OS/architecture that you are using?  If you are sure everything is
> installed correctly on each node following the guide on "Running Spark on
> Yarn" http://spark.apache.org/docs/latest/running-on-yarn.html and that
> the spark assembly jar is reachable, then I would check to see if you can
> submit a local job to just run on one node.
>
> On Fri, Jan 8, 2016 at 5:22 PM, Andrew Weiner <
> andrewweiner2...@u.northwestern.edu> wrote:
>
>> Now for simplicity I'm testing with wordcount.py from the provided
>> examples, and using Spark 1.6.0
>>
>> The first error I get is:
>>
>> 16/01/08 19:14:46 ERROR lzo.GPLNativeCodeLoader: Could not load native
>> gpl library
>> java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path
>> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1864)
>> at []
>>
>> A bit lower down, I see this error:
>>
>> 16/01/08 19:14:48 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
>> 0.0 (TID 0, mundonovo-priv): org.apache.spark.SparkException:
>> Error from python worker:
>>   python: module pyspark.daemon not found
>> PYTHONPATH was:
>>
>> /scratch5/hadoop/yarn/local/usercache//filecache/22/spark-assembly-1.6.0-hadoop2.4.0.jar:/home/jpr123/hg.pacific/python-common:/home/jpr123/python-libs:/home/jpr123/lib/python2.7/site-packages:/home/zsb739/local/lib/python2.7/site-packages:/home/jpr123/mobile-cdn-analysis:/home//lib/python2.7/site-packages:/scratch4/hadoop/yarn/local/usercache//appcache/application_1450370639491_0136/container_1450370639491_0136_01_02/pyspark.zip:/scratch4/hadoop/yarn/local/usercache//appcache/application_1450370639491_0136/container_1450370639491_0136_01_02/py4j-0.9-src.zip
>> java.io.EOFException
>> at java.io.DataInputStream.readInt(DataInputStream.java:392)
>> at []
>>
>> And then a few more similar pyspark.daemon not found errors...
>>
>> Andrew
>>
>>
>>
>> On Fri, Jan 8, 2016 at 2:31 PM, Bryan Cutler 

Re: Exception in Spark-sql insertIntoJDBC command

2016-01-13 Thread RichG
Hi All, 

I am having the same issue in Spark 1.5.1. I've tried through Scala and
Python and neither the 'append' or 'overwrite' mode allow me to insert rows
from a dataframe into an existing MS SQL Server 2012 table. When the table
doesn't exist, this works fine and creates the table with the rows from the
dataframe.

Is there a work-around or is this a known issue?

Here is an example of what I've tried in the Spark Shell:
df: org.apache.spark.sql.DataFrame = [AMOUNT: bigint, ID: bigint, NAME:
string]

scala> val url = "jdbc:sqlserver://"
url: String = 

scala> import java.util.Properties
import java.util.Properties

scala> val prop = new Properties()
prop: java.util.Properties = {}

scala> prop.put("user",)
res0: Object = null

scala> prop.put("password",)
res1: Object = null

scala> df.write.mode("append").jdbc(url,"small_test", prop)
com.microsoft.sqlserver.jdbc.SQLServerException: There is already an object
named 'small_test' in the database.
at
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:216)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1515)
at
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:404)
at
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:350)
at
com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:5696)
at
com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:1715)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:180)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:155)
at
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:314)
at
org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:275)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:27)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:32)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:34)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:36)
at $iwC$$iwC$$iwC$$iwC.(:38)
at $iwC$$iwC$$iwC.(:40)
at $iwC$$iwC.(:42)
at $iwC.(:44)
at (:46)
at .(:50)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at

RE: spark job failure - akka error Association with remote system has failed

2016-01-13 Thread Mohammed Guller
Check the entries in your /etc/hosts file.

Also check what the hostname command returns.

Mohammed

From: vivek.meghanat...@wipro.com [mailto:vivek.meghanat...@wipro.com]
Sent: Tuesday, January 12, 2016 11:36 PM
To: user@spark.apache.org
Subject: RE: spark job failure - akka error Association with remote system has 
failed

I have used master_ip as ip address and spark conf also has Ip address . But 
the following logs shows hostname. (The spark Ui shows master details in IP)


16/01/13 12:31:38 WARN ReliableDeliverySupervisor: Association with remote 
system [akka.tcp://sparkDriver@masternode1:36537] has failed, address is now 
gated for [5000] ms. Reason is: [Disassociated].

From: Vivek Meghanathan (WT01 - NEP)
Sent: 13 January 2016 12:18
To: user@spark.apache.org
Subject: spark job failure - akka error Association with remote system has 
failed

Hi All,
I am running spark 1.3.0 standalone cluster mode, we have rebooted the cluster 
servers (system reboot). After that the spark jobs are failing by showing 
following error (it fails within 7-8 seconds). 2 of the jobs are running fine. 
All the jobs used to be stable before the system reboot. We have not enabled 
any default configurations in the conf file other than spark-env.sh, slaves and 
log4j.properties.

Warning in the master log:

16/01/13 11:58:16 WARN ReliableDeliverySupervisor: Association with remote 
system [akka.tcp://sparkDriver@masternode1:41419] has failed, address is now 
gated for [5000] ms. Reason is: [Disassociated].

Regards,
Vivek M
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com


Re: 答复: spark streaming context trigger invoke stop why?

2016-01-13 Thread Yogesh Mahajan
Hi Triones,

Check the org.apache.spark.util.ShutdownHookManager : It adds this
ShutDownHook when you start a StreamingContext

Here is the code in StreamingContext.start()

shutdownHookRef = ShutdownHookManager.addShutdownHook(
  StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)

Also looke at the following def in StreamingContext which actually stops
the context from shutdown hook :
private def stopOnShutdown(): Unit = {
val stopGracefully =
conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown
hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
}

Regards,
Yogesh Mahajan,
SnappyData Inc, snappydata.io

On Thu, Jan 14, 2016 at 8:55 AM, Triones,Deng(vip.com) <
triones.d...@vipshop.com> wrote:

> More info
>
>
>
> I am using spark version 1.5.2
>
>
>
>
>
> *发件人:* Triones,Deng(vip.com) [mailto:triones.d...@vipshop.com]
> *发送时间:* 2016年1月14日 11:24
> *收件人:* user
> *主题:* spark streaming context trigger invoke stop why?
>
>
>
> Hi all
>
>  As I saw the driver log, the task failed 4 times in a stage, the
> stage will be dropped when the input block was deleted before make use of.
> After that the StreamingContext invoke stop.  Does anyone know what kind of
> akka message trigger the stop or which code trigger the shutdown hook?
>
>
>
>
>
> Thanks
>
>
>
>
>
>
>
>
>
> Driver log:
>
>
>
>  Job aborted due to stage failure: Task 410 in stage 215.0 failed 4 times
>
> [org.apache.spark.streaming.StreamingContext---Thread-0]: Invoking
> stop(stopGracefully=false) from shutdown hook
>
>
> 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
> This communication is intended only for the addressee(s) and may contain
> information that is privileged and confidential. You are hereby notified
> that, if you are not an intended recipient listed above, or an authorized
> employee or agent of an addressee of this communication responsible for
> delivering e-mail messages to an intended recipient, any dissemination,
> distribution or reproduction of this communication (including any
> attachments hereto) is strictly prohibited. If you have received this
> communication in error, please notify us immediately by a reply e-mail
> addressed to the sender and permanently delete the original e-mail
> communication and any attachments from all storage devices without making
> or otherwise retaining a copy.
> 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
> This communication is intended only for the addressee(s) and may contain
> information that is privileged and confidential. You are hereby notified
> that, if you are not an intended recipient listed above, or an authorized
> employee or agent of an addressee of this communication responsible for
> delivering e-mail messages to an intended recipient, any dissemination,
> distribution or reproduction of this communication (including any
> attachments hereto) is strictly prohibited. If you have received this
> communication in error, please notify us immediately by a reply e-mail
> addressed to the sender and permanently delete the original e-mail
> communication and any attachments from all storage devices without making
> or otherwise retaining a copy.
>


Re: Manipulate Twitter Stream Filter on runtime

2016-01-13 Thread Yogesh Mahajan
Hi Alem,

I haven't tried it, but can you give a try and TwitterStream.clenup and add
your modified filter if it works ?  I am using twitter4j 4.0.4 with spark
streaming.

Regards,
Yogesh Mahajan
SnappyData Inc, snappydata.io

On Mon, Jan 11, 2016 at 6:43 PM, Filli Alem  wrote:

> Hi,
>
>
>
> I try to implement a twitter stream processing, where I would want to
> change the filtered keywords during run time. I implemented the twitter
> stream with a custom receiver which works fine. I’m stuck with the runtime
> alteration now.
>
>
>
> Any ideas?
>
>
>
> Thanks
>
> Alem
>
>
>
>
> 
>


Hive is unable to avro file written by spark avro

2016-01-13 Thread Siva
Hi Everyone,

Avro data written by dataframe in hdfs in not able to read by hive. Saving
data avro format with below statement.

df.save("com.databricks.spark.avro", SaveMode.Append, Map("path" -> path))

Created hive avro external table and while reading I see all nulls. Did
anyone face similar issue, what is the best way to write the data in avro
format from spark, so that it can also readable by hive.

Thanks,
Sivakumar Bhavanari.


答复: spark streaming context trigger invoke stop why?

2016-01-13 Thread Triones,Deng(vip.com)
More info

I am using spark version 1.5.2


发件人: Triones,Deng(vip.com) [mailto:triones.d...@vipshop.com]
发送时间: 2016年1月14日 11:24
收件人: user
主题: spark streaming context trigger invoke stop why?

Hi all
 As I saw the driver log, the task failed 4 times in a stage, the stage 
will be dropped when the input block was deleted before make use of. After that 
the StreamingContext invoke stop.  Does anyone know what kind of akka message 
trigger the stop or which code trigger the shutdown hook?


Thanks




Driver log:

 Job aborted due to stage failure: Task 410 in stage 215.0 failed 4 times
[org.apache.spark.streaming.StreamingContext---Thread-0]: Invoking 
stop(stopGracefully=false) from shutdown hook
本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without making or otherwise retaining a copy.
本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without making or otherwise retaining a copy.


spark streaming context trigger invoke stop why?

2016-01-13 Thread Triones,Deng(vip.com)
Hi all
 As I saw the driver log, the task failed 4 times in a stage, the stage 
will be dropped when the input block was deleted before make use of. After that 
the StreamingContext invoke stop.  Does anyone know what kind of akka message 
trigger the stop or which code trigger the shutdown hook?


Thanks




Driver log:

 Job aborted due to stage failure: Task 410 in stage 215.0 failed 4 times
[org.apache.spark.streaming.StreamingContext---Thread-0]: Invoking 
stop(stopGracefully=false) from shutdown hook
本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without making or otherwise retaining a copy.


[Spark Streaming] "Could not compute split, block input-0-1452563923800 not found” when trying to recover from checkpoint data

2016-01-13 Thread Collin Shi
Hi

I was doing a simple updateByKey transformation and print on the data received 
from socket, and spark version is 1.4.0. The first submit went all right, but 
after I kill (CTRL + C) the job and submit again. Apparently spark was trying 
to recover from the checkpoint data , but then the exception occured. So I'm 
wondering why this happen and how to fix it. 

Answers I found on google is "Set the storage level of input DStream to 
MEMORY_AND_DISK" or "set spark.streaming.stopGracefullyOnShutdown to true", but 
neither works. 

I was using nc -lk  as data server, notice that this problem will always 
happen in which case I send a few message and shutdown the job immediately. 

The code and driver logs is in the attachment.  



Thanks

Collin


driver logs.md
Description: Binary data


TestRestart.scala
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Hive is unable to avro file written by spark avro

2016-01-13 Thread Kevin Mellott
Hi Sivakumar,

I have run into this issue in the past, and we were able to fix it by using
an explicit schema when saving the DataFrame to the Avro file. This schema
was an exact match to the one associated with the metadata on the Hive
database table, which allowed the Hive queries to work even after updating
the underlying Avro file via Spark.

We are using Spark 1.3.0, and I was hoping to find a better solution to
this problem once we upgrade to Spark 1.5.0 (we manage versions via CDH).
This one works, but the coding involved can be a little tedious based on
the complexity of your data.

If memory serves correctly, the explicit schema was necessary because our
data structure contained optional nested properties. The DataFrame writer
will automatically create a schema for you, but ours was differing based on
the data being saved (i.e. whether it did or did not contain a nested
element).

- Kevin

On Wed, Jan 13, 2016 at 7:20 PM, Siva  wrote:

> Hi Everyone,
>
> Avro data written by dataframe in hdfs in not able to read by hive. Saving
> data avro format with below statement.
>
> df.save("com.databricks.spark.avro", SaveMode.Append, Map("path" -> path))
>
> Created hive avro external table and while reading I see all nulls. Did
> anyone face similar issue, what is the best way to write the data in avro
> format from spark, so that it can also readable by hive.
>
> Thanks,
> Sivakumar Bhavanari.
>


Re: SQL UDF problem (with re to types)

2016-01-13 Thread Raghu Ganti
So, when I try BigDecimal, it works. But, should it not parse based on what
the UDF defines? Am I missing something here?

On Wed, Jan 13, 2016 at 4:57 PM, Ted Yu  wrote:

> Please take a look
> at sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java
> which shows a UserDefinedAggregateFunction that works on DoubleType column.
>
> sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
> shows how it is registered.
>
> Cheers
>
> On Wed, Jan 13, 2016 at 11:58 AM, raghukiran  wrote:
>
>> While registering and using SQL UDFs, I am running into the following
>> problem:
>>
>> UDF registered:
>>
>> ctx.udf().register("Test", new UDF1() {
>> /**
>>  *
>>  */
>> private static final long serialVersionUID =
>> -8231917155671435931L;
>>
>> public String call(Double x) throws Exception {
>> return "testing";
>> }
>> }, DataTypes.StringType);
>>
>> Usage:
>> query = "SELECT Test(82.4)";
>> result = sqlCtx.sql(query).first();
>> System.out.println(result.toString());
>>
>> Problem: Class Cast exception thrown
>> Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be
>> cast
>> to java.lang.Double
>>
>> This problem occurs with Spark v1.5.2 and 1.6.0.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/SQL-UDF-problem-with-re-to-types-tp25968.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: 答复: spark streaming context trigger invoke stop why?

2016-01-13 Thread Yogesh Mahajan
All the action happens in ApplicationMaster expecially in run method
Check ApplicationMaster#startUserApplication : userThread(Driver) which
invokes ApplicationMaster#finish method. You can also try System.exit in
your program

Regards,
Yogesh Mahajan,
SnappyData Inc, snappydata.io

On Thu, Jan 14, 2016 at 9:56 AM, Yogesh Mahajan 
wrote:

> Hi Triones,
>
> Check the org.apache.spark.util.ShutdownHookManager : It adds this
> ShutDownHook when you start a StreamingContext
>
> Here is the code in StreamingContext.start()
>
> shutdownHookRef = ShutdownHookManager.addShutdownHook(
>   StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
>
> Also looke at the following def in StreamingContext which actually stops
> the context from shutdown hook :
> private def stopOnShutdown(): Unit = {
> val stopGracefully =
> conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
> logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown
> hook")
> // Do not stop SparkContext, let its own shutdown hook stop it
> stop(stopSparkContext = false, stopGracefully = stopGracefully)
> }
>
> Regards,
> Yogesh Mahajan,
> SnappyData Inc, snappydata.io
>
> On Thu, Jan 14, 2016 at 8:55 AM, Triones,Deng(vip.com) <
> triones.d...@vipshop.com> wrote:
>
>> More info
>>
>>
>>
>> I am using spark version 1.5.2
>>
>>
>>
>>
>>
>> *发件人:* Triones,Deng(vip.com) [mailto:triones.d...@vipshop.com]
>> *发送时间:* 2016年1月14日 11:24
>> *收件人:* user
>> *主题:* spark streaming context trigger invoke stop why?
>>
>>
>>
>> Hi all
>>
>>  As I saw the driver log, the task failed 4 times in a stage, the
>> stage will be dropped when the input block was deleted before make use of.
>> After that the StreamingContext invoke stop.  Does anyone know what kind of
>> akka message trigger the stop or which code trigger the shutdown hook?
>>
>>
>>
>>
>>
>> Thanks
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Driver log:
>>
>>
>>
>>  Job aborted due to stage failure: Task 410 in stage 215.0 failed 4 times
>>
>> [org.apache.spark.streaming.StreamingContext---Thread-0]: Invoking
>> stop(stopGracefully=false) from shutdown hook
>>
>>
>> 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
>> This communication is intended only for the addressee(s) and may contain
>> information that is privileged and confidential. You are hereby notified
>> that, if you are not an intended recipient listed above, or an authorized
>> employee or agent of an addressee of this communication responsible for
>> delivering e-mail messages to an intended recipient, any dissemination,
>> distribution or reproduction of this communication (including any
>> attachments hereto) is strictly prohibited. If you have received this
>> communication in error, please notify us immediately by a reply e-mail
>> addressed to the sender and permanently delete the original e-mail
>> communication and any attachments from all storage devices without making
>> or otherwise retaining a copy.
>> 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
>> This communication is intended only for the addressee(s) and may contain
>> information that is privileged and confidential. You are hereby notified
>> that, if you are not an intended recipient listed above, or an authorized
>> employee or agent of an addressee of this communication responsible for
>> delivering e-mail messages to an intended recipient, any dissemination,
>> distribution or reproduction of this communication (including any
>> attachments hereto) is strictly prohibited. If you have received this
>> communication in error, please notify us immediately by a reply e-mail
>> addressed to the sender and permanently delete the original e-mail
>> communication and any attachments from all storage devices without making
>> or otherwise retaining a copy.
>>
>
>


Spark on YARN job continuously reports "Application does not exist in cache"

2016-01-13 Thread Prabhu Joseph
Hi All,

  When we submit Spark jobs on YARN, during RM failover, we see lot of jobs
reporting below error messages.


*2016-01-11 09:41:06,682 INFO
org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService:
Unregistering app attempt : appattempt_1450676950893_0280_01*
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
appattempt_1450676950893_0280_01 State change from FINAL_SAVING to
FAILED
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl:
application_1450676950893_0280 State change from RUNNING to ACCEPTED
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler:
Application appattempt_1450676950893_0280_01 is done. finalState=FAILED
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService:
Registering app attempt : appattempt_1450676950893_0280_02
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo:
Application application_1450676950893_0280 requests cleared
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
appattempt_1450676950893_0280_02 State change from NEW to SUBMITTED
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher:
Cleaning master appattempt_1450676950893_0280_01
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler:
Added Application Attempt appattempt_1450676950893_0280_02 to scheduler
from user: glenm
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
appattempt_1450676950893_0280_02 State change from SUBMITTED to
SCHEDULED




*2016-01-11 09:41:06,747 ERROR
org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService:
AppAttemptId doesnt exist in cache appattempt_1450676950893_0280_01*
ResourceManager has a ConcurrentMap where it puts applicationId during
resgistering of application attempt, and when there is
finishApplicationMaster request, it gets the entry from ConcurrentMap, if
there if no entry present, it throws that ERROR message. When there is
unregistering Application Attempt, it removes the entry.

So, after the unregistering application attempt, there are many
finishApplicationMaster request causing the ERROR.

Need your help to understand on what scenario the above happens.


JIRA's related are

https://issues.apache.org/jira/browse/SPARK-1032
https://issues.apache.org/jira/browse/SPARK-3072



Thanks,
Prabhu Joseph


How to get the working directory in executor

2016-01-13 Thread Byron Wang
I am using the following command to submit Spark job, I hope to send jar and
config files to each executor and load it there

spark-submit --verbose \
--files=/tmp/metrics.properties \ 
--jars /tmp/datainsights-metrics-source-aembly-1.0.jar \ 
--total-executor-cores 4\
--conf "spark.metrics.conf=metrics.properties" \
--conf
"spark.executor.extraClassPath=datainsights-metrics-source-assembly-1.0.jar" 
\
--class org.microsoft.ofe.datainsights.StartServiceSignalPipeline \
./target/datainsights-1.0-jar-with-dependencies.jar

--files and --jars is used to send files to executors, I found that the
files are sent to the working directory of executor like
'worker/app-x-/0/

But when job is running, the executor always throws exception saying that it
could not find the file 'metrics.properties'or the class which is contained
in 'datainsights-metrics-source-assembly-1.0.jar'. It seems that the job is
looking for files under another dir other than working directory.

Do you know how to load the file which is sent to executors?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-working-directory-in-executor-tp25962.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka Streaming and partitioning

2016-01-13 Thread Cody Koeninger
If two rdds have an identical partitioner, joining should not involve a
shuffle.

You should be able to override the partitioner without calling partitionBy.

Two ways I can think of to do this:
- subclass or modify the direct stream and kafkardd.  They're private, so
you'd need to rebuild just the external kafka project, not all of spark

- write a wrapper subclass of rdd that takes a given custom partitioner and
rdd in the constructor, overrides partitioner, and delegates every other
method to the wrapped rdd.  This should be possible without modification to
any existing spark code.  You'd use it something like

val cp = YourCustomPartitioner(...)
val reference = YourReferenceRDD(cp, ...)
val stream = KafkaUtils

stream.transform { rdd =>
  val wrapped = YourWrapper(cp, rdd)
  wrapped.join(reference)
}


I haven't had reason to do either one of those approaches, so YMMV, but I
believe others have




On Wed, Jan 13, 2016 at 3:40 AM, ddav  wrote:

> Hi,
>
> I have the following use case:
>
> 1. Reference data stored in an RDD that is persisted and partitioned using
> a
> simple custom partitioner.
> 2. Input stream from kafka that uses the same partitioner algorithm as the
> ref data RDD - this partitioning is done in kafka.
>
> I am using kafka direct streams so the number of kafka partitions map to
> the
> number of partitions in the spark RDD. From testing and the documentation I
> see Spark does not know anything about how the data has been partitioned in
> kafka.
>
> In my use case I need to join the reference data RDD and the input stream
> RDD.  Due to the fact I have manually ensured the incoming data from kafka
> uses the same partitioning algorithm I know the data has been grouped
> correctly in the input stream RDD in Spark but I cannot do a join without a
> shuffle step due to the fact Spark has no knowledge of how the data has
> been
> partitioned.
>
> I have two ways to do this.
> 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD
> followed by a join. This results in a shuffle of the input stream RDD and
> then the co-partitioned join to take place.
> 2. Call join on the reference data RDD passing in the input stream RDD.
> Spark will do a shuffle under the hood in this case and the join will take
> place. The join will do its best to run on a node that has local access to
> the reference data RDD.
>
> Is there any difference between the 2 methods above or will both cause the
> same sequence of events to take place in Spark?
> Is all I have stated above correct?
>
> Finally, is there any road map feature for looking at allowing the user to
> push a partitioner into an already created RDD and not to do a shuffle.
> Spark in this case trusts that the data is setup correctly (as in the use
> case above) and simply fills in the necessary meta data on the RDD
> partitions i.e. check the first entry in each partition to determine the
> partition number of the data.
>
> Thank you in advance for any help on this issue.
> Dave.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark 1.6 and Application History not working correctly

2016-01-13 Thread Darin McBeath
I tried using Spark 1.6 in a stand-alone cluster this morning.

I submitted 2 jobs (and they both executed fine).  In fact, they are the exact 
same jobs with just some different parameters.

I was able to view the application history for the first job.

However, when I tried to view the second job, I get the following error message.

Application history not found (app-20160113140054-0001)
No event logs found for application SparkSync Application in 
file:///root/spark/applicationHistory. Did you specify the correct logging 
directory?


Everything works fine with Spark 1.5.  I'm able to view the application history 
for both jobs.

Has anyone else noticed this issue?  Any suggestions? 

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ml.classification.NaiveBayesModel how to reshape theta

2016-01-13 Thread Yanbo Liang
Yep, row of Matrix theta is the number of classes and column of theta is
the number of features.

2016-01-13 10:47 GMT+08:00 Andy Davidson :

> I am trying to debug my trained model by exploring theta
> Theta is a Matrix. The java Doc for Matrix says that it is column major
> formate
>
> I have trained a NaiveBayesModel. Is the number of classes == to the
> number of rows?
>
> int numRows = nbModel.numClasses();
>
> int numColumns = nbModel.numFeatures();
>
>
> Kind regards
>
>
> Andy
>


How to make Dataset api as fast as DataFrame

2016-01-13 Thread Arkadiusz Bicz
Hi,

I have done some performance tests by repeating execution with
different number of  executors and memory for YARN  clustered Spark
(version 1.6.0)  ( cluster contains 6 large size nodes)

I found Dataset joinWith or cogroup from 3 to 5 times slower then
broadcast join in DataFrame, how to make it at least similar fast ?

Examples of my code :

DataFrame :
// 500 milion rows
val r = results.select("tradeId", "tradeVersion", "values").as("r")
// 100 thousand rows
val t = trades.select("tradeId", "tradeVersion", "agreement").distinct.as("t")

val j = r.join(broadcast(t), r("tradeId") === t("tradeId") &&
r("tradeVersion") === t("tradeVersion"))
val s = j.select(r("tradeId"), t("tradeVersion"), t("agreement"), r("values"))
val g = s.groupBy(t("agreement"))

val maxvec = new MaxVectorAggFunction
val agg = g.agg(maxvec(r("values")).as("maxvalues"))
agg.write.parquet("hdfs:.../tmp/somelocation")

DataSet

case class ResultsA(tradeId: String, tradeVersion: String, resultType:
Int, values: Array[Double])

case class TradesA(tradeId: String, tradeVersion: String, tradeType:
String, notional: BigDecimal, currency: String,
  asset: String, trader: String, productCode:
String, counterParty: String, counterPartyAccronym: String,
  tradeStatus: String, portfolio: String,
internalPortfolio: String, ptsBook: String, validFrom: String,
  validTill: String, tradeDate: String, maturity:
String, buySellIndicator: String, agreement: String)

case class ResultSmallA(tradeId: String, tradeVersion: String, values:
Array[Double])
case class ResultAgreementA(tradeId: String, tradeVersion: String,
agreement: String, values: Array[Double])
case class TradeSmallA(tradeId: String, tradeVersion: String, agreement: String)

lazy val dsresults = results.as[ResultsA].map(r =>
ResultSmallA(r.tradeId, r.tradeVersion, r.values)).as("r")
lazy val dstrades = trades.as[TradesA].map(t => TradeSmallA(t.tradeId,
t.tradeVersion, t.agreement)).distinct.as("t")
lazy val j = dsresults.joinWith(dstrades, $"r.tradeId" ===
$"t.tradeId" && $"r.tradeVersion" === $"t.tradeVersion", "inner")

//1. MapGrouped

val group = j.groupBy { v => v match {
case (r: ResultSmallA, t: TradeSmallA) => t
  }
}

val reduced = group.mapGroups { case (t, iter) => (t.tradeId,
t.tradeVersion, t.agreement,
  iter.map { case (r, t) => r.values }.reduce((l, r) => {
val min = new MinVectorAggFunction(); min.mergeArrays(l, r)
  }))
}

//2. Reduce

val group2 = j.groupBy(_._2)

val reduced2 = group2.reduce((i1, i2) => {
  val r1 = i1._1
  val r2 = i2._1
  import r1._
  val min = new MinVectorAggFunction();
  (ResultSmallA(tradeId, tradeVersion, min.mergeArrays(values,
r2.values)), i1._2)
})

val reduced = reduced2.map { case (t, (r, _)) => (r.tradeId,
r.tradeVersion, t.agreement, r.values) }


//3. Cogroup

val cogrouped1 = dsresults.groupBy(r => (r.tradeId,
r.tradeVersion)).cogroup(dstrades.groupBy(t => (t.tradeId,
t.tradeVersion))) {
  case (key, data1, data2) =>
if (data2.isEmpty || data1.isEmpty) Iterator()
else {
  val t = data2.next()
  val min = new MinVectorAggFunction()
  Iterator((t.tradeId, t.tradeVersion, t.agreement,
data1.map(_.values).reduce(min.mergeArrays)))
}
}

// MinVectorAggFunction just merge two array of Double

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Need 'Learning Spark' Partner

2016-01-13 Thread King sami
Hi,

As I'm beginner in Spark, I'm looking for someone who's also beginner to
learn and train on Spark together.

Please contact me if interested

Cordially,


Re: How to get the working directory in executor

2016-01-13 Thread Ted Yu
Can you place metrics.properties and
datainsights-metrics-source-assembly-1.0.jar
on hdfs ?

Cheers

On Wed, Jan 13, 2016 at 8:01 AM, Byron Wang  wrote:

> I am using the following command to submit Spark job, I hope to send jar
> and
> config files to each executor and load it there
>
> spark-submit --verbose \
> --files=/tmp/metrics.properties \
> --jars /tmp/datainsights-metrics-source-aembly-1.0.jar \
> --total-executor-cores 4\
> --conf "spark.metrics.conf=metrics.properties" \
> --conf
>
> "spark.executor.extraClassPath=datainsights-metrics-source-assembly-1.0.jar"
> \
> --class org.microsoft.ofe.datainsights.StartServiceSignalPipeline \
> ./target/datainsights-1.0-jar-with-dependencies.jar
>
> --files and --jars is used to send files to executors, I found that the
> files are sent to the working directory of executor like
> 'worker/app-x-/0/
>
> But when job is running, the executor always throws exception saying that
> it
> could not find the file 'metrics.properties'or the class which is contained
> in 'datainsights-metrics-source-assembly-1.0.jar'. It seems that the job is
> looking for files under another dir other than working directory.
>
> Do you know how to load the file which is sent to executors?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-working-directory-in-executor-tp25962.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark 1.6 and Application History not working correctly

2016-01-13 Thread Steve Loughran

> On 13 Jan 2016, at 07:15, Darin McBeath  wrote:
> 
> Thanks.
> 
> I already set the following in spark-defaults.conf so I don't think that is 
> going to fix my problem.
> 
> spark.eventLog.dir file:///root/spark/applicationHistory
> spark.eventLog.enabled true
> 
> 
> I suspect my problem must be something else.
> Darin.
> 


Step 1 has to be: is the log file there? 

> 
> From: Don Drake 
> To: Darin McBeath  
> Cc: User 
> Sent: Wednesday, January 13, 2016 10:10 AM
> Subject: Re: Spark 1.6 and Application History not working correctly
> 
> 
> 
> I noticed a similar problem going from 1.5.x to 1.6.0 on YARN.
> 
> I resolved it be setting the following command-line parameters:
> 
> spark.eventLog.enabled=true
> 
> spark.eventLog.dir=
> 
> 
> -Don
> 
> 
> On Wed, Jan 13, 2016 at 8:29 AM, Darin McBeath  
> wrote:
> 
> I tried using Spark 1.6 in a stand-alone cluster this morning.
>> 
>> I submitted 2 jobs (and they both executed fine).  In fact, they are the 
>> exact same jobs with just some different parameters.
>> 
>> I was able to view the application history for the first job.
>> 
>> However, when I tried to view the second job, I get the following error 
>> message.
>> 
>> Application history not found (app-20160113140054-0001)
>> No event logs found for application SparkSync Application in 
>> file:///root/spark/applicationHistory. Did you specify the correct logging 
>> directory?
>> 
>> 
>> Everything works fine with Spark 1.5.  I'm able to view the application 
>> history for both jobs.
>> 
>> Has anyone else noticed this issue?  Any suggestions?
>> 
>> Thanks.
>> 
>> Darin.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
>> 
> 
> 
> -- 
> 
> Donald Drake
> Drake Consulting
> http://www.drakeconsulting.com/
> https://twitter.com/dondrake
> 800-733-2143
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



distributeBy using advantage of HDFS or RDD partitioning

2016-01-13 Thread Deenar Toraskar
Hi

I have data in HDFS partitioned by a logical key and would like to preserve
the partitioning when creating a dataframe for the same. Is it possible to
create a dataframe that preserves partitioning from HDFS or the underlying
RDD?

Regards
Deenar


Re: Spark 1.6 and Application History not working correctly

2016-01-13 Thread Darin McBeath
Thanks.

I already set the following in spark-defaults.conf so I don't think that is 
going to fix my problem.

spark.eventLog.dir file:///root/spark/applicationHistory
spark.eventLog.enabled true


I suspect my problem must be something else.
Darin.


From: Don Drake 
To: Darin McBeath  
Cc: User 
Sent: Wednesday, January 13, 2016 10:10 AM
Subject: Re: Spark 1.6 and Application History not working correctly



I noticed a similar problem going from 1.5.x to 1.6.0 on YARN.

I resolved it be setting the following command-line parameters:

spark.eventLog.enabled=true

spark.eventLog.dir=


-Don


On Wed, Jan 13, 2016 at 8:29 AM, Darin McBeath  
wrote:

I tried using Spark 1.6 in a stand-alone cluster this morning.
>
>I submitted 2 jobs (and they both executed fine).  In fact, they are the exact 
>same jobs with just some different parameters.
>
>I was able to view the application history for the first job.
>
>However, when I tried to view the second job, I get the following error 
>message.
>
>Application history not found (app-20160113140054-0001)
>No event logs found for application SparkSync Application in 
>file:///root/spark/applicationHistory. Did you specify the correct logging 
>directory?
>
>
>Everything works fine with Spark 1.5.  I'm able to view the application 
>history for both jobs.
>
>Has anyone else noticed this issue?  Any suggestions?
>
>Thanks.
>
>Darin.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake
800-733-2143

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Thrift Server 2 problem

2016-01-13 Thread Бобров Виктор
Hi,

I’m trying to connect my tableau to spark sql. Using this guide 
https://community.tableau.com/docs/DOC-7638.

Then I’m try start thrift server (or $SPARK_HOME/sbin/start-thriftserver.sh 
--master spark://data01:7077 --driver-class-path $CLASSPATH --hiveconf 
hive.server2.thrift.bind.host 127.0.0.1 --hiveconf hive.server2.thrift.port 
10001) its starts without any problem, but state changing to KILLED after 
~10sec. 

My hive-site





javax.jdo.option.ConnectionURL

  jcd sp 
dbc:mysql://127.0.0.1:3306/metastore_db?createDatabaseIfNotExist=true

metadata is stored in a MySQL server





  javax.jdo.option.ConnectionDriverName

  com.mysql.jdbc.Driver

MySQL JDBC driver class





  javax.jdo.option.ConnectionUserName

hiveuser

user name for connecting to mysql server 





javax.jdo.option.ConnectionPassword

hive123

password for connecting to mysql server 





 

stderr log

 

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

16/01/13 17:10:36 INFO CoarseGrainedExecutorBackend: Registered signal handlers 
for [TERM, HUP, INT]

16/01/13 17:10:37 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable

16/01/13 17:10:37 INFO SecurityManager: Changing view acls to: hduser

16/01/13 17:10:37 INFO SecurityManager: Changing modify acls to: hduser

16/01/13 17:10:37 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hduser); users 
with modify permissions: Set(hduser)

16/01/13 17:10:41 WARN ThreadLocalRandom: Failed to generate a seed from 
SecureRandom within 3 seconds. Not enough entrophy?

16/01/13 17:10:41 INFO SecurityManager: Changing view acls to: hduser

16/01/13 17:10:41 INFO SecurityManager: Changing modify acls to: hduser

16/01/13 17:10:41 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hduser); users 
with modify permissions: Set(hduser)

16/01/13 17:10:42 INFO Slf4jLogger: Slf4jLogger started

16/01/13 17:10:42 INFO Remoting: Starting remoting

16/01/13 17:10:42 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkExecutorActorSystem@10.0.10.212:42412]

16/01/13 17:10:42 INFO Utils: Successfully started service 
'sparkExecutorActorSystem' on port 42412.

16/01/13 17:10:42 INFO DiskBlockManager: Created local directory at 
/tmp/spark-f3208756-f443-487a-8b5d-84c78956d47c/executor-b8f6ff53-e4f0-4e1f-a57f-e9275e0c9acb/blockmgr-89d68d63-2df7-4ae4-a451-6091e7e6ef95

16/01/13 17:10:42 INFO MemoryStore: MemoryStore started with capacity 511.5 MB

16/01/13 17:10:42 INFO CoarseGrainedExecutorBackend: Connecting to driver: 
spark://CoarseGrainedScheduler@10.0.10.212:49076

16/01/13 17:10:42 INFO WorkerWatcher: Connecting to worker 
spark://Worker@10.0.10.212:41507

16/01/13 17:10:42 INFO CoarseGrainedExecutorBackend: Successfully registered 
with driver

16/01/13 17:10:42 INFO Executor: Starting executor ID 0 on host localhost

16/01/13 17:10:42 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 54055.

16/01/13 17:10:42 INFO NettyBlockTransferService: Server created on 54055

16/01/13 17:10:42 INFO BlockManagerMaster: Trying to register BlockManager

16/01/13 17:10:42 INFO BlockManagerMaster: Registered BlockManager

16/01/13 17:10:45 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown

16/01/13 17:10:45 INFO MemoryStore: MemoryStore cleared

16/01/13 17:10:45 INFO BlockManager: BlockManager stopped

16/01/13 17:10:45 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down 
remote daemon.

16/01/13 17:10:45 WARN CoarseGrainedExecutorBackend: An unknown 
(localhost:49076) driver disconnected.

 



Re: Spark 1.6 and Application History not working correctly

2016-01-13 Thread Don Drake
I noticed a similar problem going from 1.5.x to 1.6.0 on YARN.

I resolved it be setting the following command-line parameters:

spark.eventLog.enabled=true
spark.eventLog.dir=

-Don

On Wed, Jan 13, 2016 at 8:29 AM, Darin McBeath 
wrote:

> I tried using Spark 1.6 in a stand-alone cluster this morning.
>
> I submitted 2 jobs (and they both executed fine).  In fact, they are the
> exact same jobs with just some different parameters.
>
> I was able to view the application history for the first job.
>
> However, when I tried to view the second job, I get the following error
> message.
>
> Application history not found (app-20160113140054-0001)
> No event logs found for application SparkSync Application in
> file:///root/spark/applicationHistory. Did you specify the correct logging
> directory?
>
>
> Everything works fine with Spark 1.5.  I'm able to view the application
> history for both jobs.
>
> Has anyone else noticed this issue?  Any suggestions?
>
> Thanks.
>
> Darin.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: Kafka Streaming and partitioning

2016-01-13 Thread Dave

Thanks Cody, appreciate the response.

With this pattern the partitioners will now match when the join is 
executed.
However, does the wrapper RDD not need to set the partition meta data on 
the wrapped RDD in order to allow Spark to know where the data for each 
partition resides in the cluster.


Thanks,
Dave.

On 13/01/16 16:21, Cody Koeninger wrote:
If two rdds have an identical partitioner, joining should not involve 
a shuffle.


You should be able to override the partitioner without calling 
partitionBy.


Two ways I can think of to do this:
- subclass or modify the direct stream and kafkardd. They're private, 
so you'd need to rebuild just the external kafka project, not all of spark


- write a wrapper subclass of rdd that takes a given custom 
partitioner and rdd in the constructor, overrides partitioner, and 
delegates every other method to the wrapped rdd.  This should be 
possible without modification to any existing spark code.  You'd use 
it something like


val cp = YourCustomPartitioner(...)
val reference = YourReferenceRDD(cp, ...)
val stream = KafkaUtils

stream.transform { rdd =>
  val wrapped = YourWrapper(cp, rdd)
  wrapped.join(reference)
}


I haven't had reason to do either one of those approaches, so YMMV, 
but I believe others have





On Wed, Jan 13, 2016 at 3:40 AM, ddav > wrote:


Hi,

I have the following use case:

1. Reference data stored in an RDD that is persisted and
partitioned using a
simple custom partitioner.
2. Input stream from kafka that uses the same partitioner
algorithm as the
ref data RDD - this partitioning is done in kafka.

I am using kafka direct streams so the number of kafka partitions
map to the
number of partitions in the spark RDD. From testing and the
documentation I
see Spark does not know anything about how the data has been
partitioned in
kafka.

In my use case I need to join the reference data RDD and the input
stream
RDD.  Due to the fact I have manually ensured the incoming data
from kafka
uses the same partitioning algorithm I know the data has been grouped
correctly in the input stream RDD in Spark but I cannot do a join
without a
shuffle step due to the fact Spark has no knowledge of how the
data has been
partitioned.

I have two ways to do this.
1. Explicitly call PartitionBy(CutomParitioner) on the input
stream RDD
followed by a join. This results in a shuffle of the input stream
RDD and
then the co-partitioned join to take place.
2. Call join on the reference data RDD passing in the input stream
RDD.
Spark will do a shuffle under the hood in this case and the join
will take
place. The join will do its best to run on a node that has local
access to
the reference data RDD.

Is there any difference between the 2 methods above or will both
cause the
same sequence of events to take place in Spark?
Is all I have stated above correct?

Finally, is there any road map feature for looking at allowing the
user to
push a partitioner into an already created RDD and not to do a
shuffle.
Spark in this case trusts that the data is setup correctly (as in
the use
case above) and simply fills in the necessary meta data on the RDD
partitions i.e. check the first entry in each partition to
determine the
partition number of the data.

Thank you in advance for any help on this issue.
Dave.



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org







Re: Kafka Streaming and partitioning

2016-01-13 Thread Cody Koeninger
In the case here of a kafkaRDD, the data doesn't reside on the cluster,
it's not cached by default.  If you're running kafka on the same nodes as
spark, then data locality would play a factor, but that should be handled
by the existing getPreferredLocations method.

On Wed, Jan 13, 2016 at 10:46 AM, Dave  wrote:

> Thanks Cody, appreciate the response.
>
> With this pattern the partitioners will now match when the join is
> executed.
> However, does the wrapper RDD not need to set the partition meta data on
> the wrapped RDD in order to allow Spark to know where the data for each
> partition resides in the cluster.
>
> Thanks,
> Dave.
>
>
> On 13/01/16 16:21, Cody Koeninger wrote:
>
> If two rdds have an identical partitioner, joining should not involve a
> shuffle.
>
> You should be able to override the partitioner without calling partitionBy.
>
> Two ways I can think of to do this:
> - subclass or modify the direct stream and kafkardd.  They're private, so
> you'd need to rebuild just the external kafka project, not all of spark
>
> - write a wrapper subclass of rdd that takes a given custom partitioner
> and rdd in the constructor, overrides partitioner, and delegates every
> other method to the wrapped rdd.  This should be possible without
> modification to any existing spark code.  You'd use it something like
>
> val cp = YourCustomPartitioner(...)
> val reference = YourReferenceRDD(cp, ...)
> val stream = KafkaUtils
>
> stream.transform { rdd =>
>   val wrapped = YourWrapper(cp, rdd)
>   wrapped.join(reference)
> }
>
>
> I haven't had reason to do either one of those approaches, so YMMV, but I
> believe others have
>
>
>
>
> On Wed, Jan 13, 2016 at 3:40 AM, ddav  wrote:
>
>> Hi,
>>
>> I have the following use case:
>>
>> 1. Reference data stored in an RDD that is persisted and partitioned
>> using a
>> simple custom partitioner.
>> 2. Input stream from kafka that uses the same partitioner algorithm as the
>> ref data RDD - this partitioning is done in kafka.
>>
>> I am using kafka direct streams so the number of kafka partitions map to
>> the
>> number of partitions in the spark RDD. From testing and the documentation
>> I
>> see Spark does not know anything about how the data has been partitioned
>> in
>> kafka.
>>
>> In my use case I need to join the reference data RDD and the input stream
>> RDD.  Due to the fact I have manually ensured the incoming data from kafka
>> uses the same partitioning algorithm I know the data has been grouped
>> correctly in the input stream RDD in Spark but I cannot do a join without
>> a
>> shuffle step due to the fact Spark has no knowledge of how the data has
>> been
>> partitioned.
>>
>> I have two ways to do this.
>> 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD
>> followed by a join. This results in a shuffle of the input stream RDD and
>> then the co-partitioned join to take place.
>> 2. Call join on the reference data RDD passing in the input stream RDD.
>> Spark will do a shuffle under the hood in this case and the join will take
>> place. The join will do its best to run on a node that has local access to
>> the reference data RDD.
>>
>> Is there any difference between the 2 methods above or will both cause the
>> same sequence of events to take place in Spark?
>> Is all I have stated above correct?
>>
>> Finally, is there any road map feature for looking at allowing the user to
>> push a partitioner into an already created RDD and not to do a shuffle.
>> Spark in this case trusts that the data is setup correctly (as in the use
>> case above) and simply fills in the necessary meta data on the RDD
>> partitions i.e. check the first entry in each partition to determine the
>> partition number of the data.
>>
>> Thank you in advance for any help on this issue.
>> Dave.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: distributeBy using advantage of HDFS or RDD partitioning

2016-01-13 Thread Simon Elliston Ball
If you load data using ORC or parquet, the RDD will have a partition per file, 
so in fact your data frame will not directly match the partitioning of the 
table. 

If you want to process by and guarantee preserving partitioning then 
mapPartition etc will be useful. 

Note that if you perform any DataFrame operations which shuffle, you will end 
up implicitly re-partitioning to spark.sql.shuffle.partitions (default 200).

Simon

> On 13 Jan 2016, at 10:09, Deenar Toraskar  wrote:
> 
> Hi
> 
> I have data in HDFS partitioned by a logical key and would like to preserve 
> the partitioning when creating a dataframe for the same. Is it possible to 
> create a dataframe that preserves partitioning from HDFS or the underlying 
> RDD?
> 
> Regards
> Deenar


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FPGrowth does not handle large result sets

2016-01-13 Thread Ritu Raj Tiwari
Hi Sean:Thanks for checking out my question here. Its possible I am making a 
newbie error. Based on my dataset of about 200,000 transactions and a minimum 
support level of 0.001, I am looking for items that appear at least 200 times. 
Given that the items in my transactions are drawn from a set of about 25,000 (I 
previously thought 17,000), what would be a rational way to determine the 
(peak) memory needs of my driver node?
-Raj 

On Wednesday, January 13, 2016 1:18 AM, Sean Owen  
wrote:
 

 As I said in your JIRA, the collect() in question is bringing results
back to the driver to return them. The assumption is that there aren't
a vast number of frequent items. If they are, they aren't 'frequent'
and your min support is too low.

On Wed, Jan 13, 2016 at 12:43 AM, Ritu Raj Tiwari
 wrote:
> Folks:
> We are running into a problem where FPGrowth seems to choke on data sets
> that we think are not too large. We have about 200,000 transactions. Each
> transaction is composed of on an average 50 items. There are about 17,000
> unique item (SKUs) that might show up in any transaction.
>
> When running locally with 12G ram given to the PySpark process, the FPGrowth
> code fails with out of memory error for minSupport of 0.001. The failure
> occurs when we try to enumerate and save the frequent itemsets. Looking at
> the FPGrowth code
> (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala),
> it seems this is because the genFreqItems() method tries to collect() all
> items. Is there a way the code could be rewritten so it does not try to
> collect and therefore store all frequent item sets in memory?
>
> Thanks for any insights.
>
> -Raj

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



   

Re: FPGrowth does not handle large result sets

2016-01-13 Thread Sean Owen
You're looking for subsets of items that appear in at least 200 of
200,000 transactions, which could be a whole lot. Keep in mind there
are 25,000 items, sure, but already 625,000,000 possible pairs of
items, and trillions of possible 3-item subsets. This sounds like it's
just far too low. Start with 0.1 and work down. I don't think there's
a general formula since if each transaction just contained 1 item, no
sets would be frequent, and if every transaction has ever item, than
all sets are frequent and that number is indescribably large.

On Wed, Jan 13, 2016 at 4:32 PM, Ritu Raj Tiwari
 wrote:
> Hi Sean:
> Thanks for checking out my question here. Its possible I am making a newbie
> error. Based on my dataset of about 200,000 transactions and a minimum
> support level of 0.001, I am looking for items that appear at least 200
> times. Given that the items in my transactions are drawn from a set of about
> 25,000 (I previously thought 17,000), what would be a rational way to
> determine the (peak) memory needs of my driver node?
>
> -Raj
>
>
> On Wednesday, January 13, 2016 1:18 AM, Sean Owen 
> wrote:
>
>
> As I said in your JIRA, the collect() in question is bringing results
> back to the driver to return them. The assumption is that there aren't
> a vast number of frequent items. If they are, they aren't 'frequent'
> and your min support is too low.
>
> On Wed, Jan 13, 2016 at 12:43 AM, Ritu Raj Tiwari
>  wrote:
>> Folks:
>> We are running into a problem where FPGrowth seems to choke on data sets
>> that we think are not too large. We have about 200,000 transactions. Each
>> transaction is composed of on an average 50 items. There are about 17,000
>> unique item (SKUs) that might show up in any transaction.
>>
>> When running locally with 12G ram given to the PySpark process, the
>> FPGrowth
>> code fails with out of memory error for minSupport of 0.001. The failure
>> occurs when we try to enumerate and save the frequent itemsets. Looking at
>> the FPGrowth code
>>
>> (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala),
>> it seems this is because the genFreqItems() method tries to collect() all
>> items. Is there a way the code could be rewritten so it does not try to
>> collect and therefore store all frequent item sets in memory?
>>
>> Thanks for any insights.
>>
>> -Raj
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Running window functions in spark dataframe

2016-01-13 Thread rakesh sharma
Hi all

I am getting hivecontext error when trying to run to run window functions like 
over on ordering clause. Any help to go about. I am running spark locally

Sent from Ouertlook Mobile


-- Forwarded message --
From: "King sami" >
Date: Wed, Jan 13, 2016 at 7:20 AM -0800
Subject: Need 'Learning Spark' Partner
To: "user@spark.apache.org" 
>


Hi,

As I'm beginner in Spark, I'm looking for someone who's also beginner to learn 
and train on Spark together.

Please contact me if interested

Cordially,


Re: How to get the working directory in executor

2016-01-13 Thread Ted Yu
In a bit more detail:
You upload the files using 'hdfs dfs -copyFromLocal' command
Then specify hdfs location of the files on the command line.

Cheers

On Wed, Jan 13, 2016 at 8:05 AM, Ted Yu  wrote:

> Can you place metrics.properties and 
> datainsights-metrics-source-assembly-1.0.jar
> on hdfs ?
>
> Cheers
>
> On Wed, Jan 13, 2016 at 8:01 AM, Byron Wang  wrote:
>
>> I am using the following command to submit Spark job, I hope to send jar
>> and
>> config files to each executor and load it there
>>
>> spark-submit --verbose \
>> --files=/tmp/metrics.properties \
>> --jars /tmp/datainsights-metrics-source-aembly-1.0.jar \
>> --total-executor-cores 4\
>> --conf "spark.metrics.conf=metrics.properties" \
>> --conf
>>
>> "spark.executor.extraClassPath=datainsights-metrics-source-assembly-1.0.jar"
>> \
>> --class org.microsoft.ofe.datainsights.StartServiceSignalPipeline \
>> ./target/datainsights-1.0-jar-with-dependencies.jar
>>
>> --files and --jars is used to send files to executors, I found that the
>> files are sent to the working directory of executor like
>> 'worker/app-x-/0/
>>
>> But when job is running, the executor always throws exception saying that
>> it
>> could not find the file 'metrics.properties'or the class which is
>> contained
>> in 'datainsights-metrics-source-assembly-1.0.jar'. It seems that the job
>> is
>> looking for files under another dir other than working directory.
>>
>> Do you know how to load the file which is sent to executors?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-working-directory-in-executor-tp25962.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: PCA OutOfMemoryError

2016-01-13 Thread Alex Gittens
The PCA.fit function calls the RowMatrix PCA routine, which attempts to
construct the covariance matrix locally on the driver, and then computes
the SVD of that to get the PCs. I'm not sure what's causing the memory
error: RowMatrix.scala:124 is only using 3.5 GB of memory (n*(n+1)/2 with
n=29604 and double precision), so unless you're filling up the memory with
other RDDs, you should have plenty of space on the driver.

One alternative is to manually center your RDD (so make one pass over it to
compute the mean, then another to subtract it out and form a new RDD), then
directly call the computeSVD routine in RowMatrix to compute the SVD of the
gramian matrix of this RDD (e.g., the covariance matrix of the original
RDD) in a distributed manner, so the covariance matrix doesn't need to be
formed explicitly. You can look at the getLowRankFactorization and
convertLowRankFactorizationToEOFs routines at
https://github.com/rustandruin/large-scale-climate/blob/master/src/main/scala/eofs.scala
for example of this approach (call the second on the results of the first
to get the SVD of the input matrix to the first; EOF is another name for
PCA).

This takes about 30 minutes to compute the top 20 PCs of a 46.7K-by-6.3M
dense matrix of doubles (~2 Tb), with most of the time spent on the
distributed matrix-vector multiplies.

Best,
Alex


On Tue, Jan 12, 2016 at 6:39 PM, Bharath Ravi Kumar 
wrote:

> Any suggestion/opinion?
> On 12-Jan-2016 2:06 pm, "Bharath Ravi Kumar"  wrote:
>
>> We're running PCA (selecting 100 principal components) on a dataset that
>> has ~29K columns and is 70G in size stored in ~600 parts on HDFS. The
>> matrix in question is mostly sparse with tens of columns populate in most
>> rows, but a few rows with thousands of columns populated. We're running
>> spark on mesos with driver memory set to 40G and executor memory set to
>> 80G. We're however encountering an out of memory error (included at the end
>> of the message) regardless of the number of rdd partitions or the degree of
>> task parallelism being set. I noticed a warning at the beginning of the PCA
>> computation stage: " WARN
>> org.apache.spark.mllib.linalg.distributed.RowMatrix: 29604 columns will
>> require at least 7011 megabyte  of memory!"
>> I don't understand which memory this refers to. Is this the executor
>> memory?  The driver memory? Any other?
>> The stacktrace appears to indicate that a large array is probably being
>> passed along with the task. Could this array have been passed as a
>> broadcast variable instead ? Any suggestions / workarounds other than
>> re-implementing the algorithm?
>>
>> Thanks,
>> Bharath
>>
>> 
>>
>> Exception in thread "main" java.lang.OutOfMemoryError: Requested array
>> size exceeds VM limit
>> at java.util.Arrays.copyOf(Arrays.java:2271)
>> at
>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>> at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>> at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>> at
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
>> at
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>> at
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1100)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
>> at 

Re: Spark SQL UDF with Struct input parameters

2016-01-13 Thread Deenar Toraskar
I have raised a JIRA to cover this
https://issues.apache.org/jira/browse/SPARK-12809

On 13 January 2016 at 16:05, Deenar Toraskar <
deenar.toras...@thinkreactive.co.uk> wrote:

> Frank
>
> Sorry got my wires crossed, I had come across another issue. Now I
> remember this issue I got around this splitting the structure into 2 arrays
> and then zipping them in the UDF. So
>
> def effectiveExpectedExposure(expectedExposures: Seq[(Seq[Float],
> Seq[Float])])=expectedExposures.map(x=> x._1 *
> x._2).sum/expectedExposures.map(x=>x._1).sum
>
>
> became
>
>
> def expectedPositiveExposureSeq(expectedExposures: Seq[Float],
> timeIntervals : Seq[Float])= timeIntervals.zip(expectedExposures).map(x=>
> (x._1 * x._2)).sum/timeIntervals.sum
>
> Deenar
>
>
>
>
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
>
> On 13 January 2016 at 15:42, Rosner, Frank (Allianz SE) <
> frank.ros...@allianz.com> wrote:
>
>> The problem is that I cannot use a UDF that has a structtype as input
>> (which seems to be the same problem that you were facing). Which key and
>> value are you talking about? They are both Seq[Float] in your example.
>>
>>
>>
>> In my example when I try to call a udf that takes a struct type I get
>>
>>
>>
>> cannot resolve 'UDF(myColumn)' due to data type mismatch: argument 1
>> requires array> type, however, 'myColumn' is of
>> array> type.
>>
>>
>>
>> When I then created a case class instead of using a tuple (so not to have
>> _1 but the correct name) it compiles. But when I execute it, it cannot cast
>> it to the case class because obviously the data does not contain the case
>> class inside.
>>
>>
>>
>> How would rewriting collect as a Spark UDAF help there?
>>
>>
>>
>> Thanks for your quick response!
>>
>> Frank
>>
>>
>>
>> *From:* Deenar Toraskar [mailto:deenar.toras...@thinkreactive.co.uk]
>> *Sent:* Mittwoch, 13. Januar 2016 15:56
>> *To:* Rosner, Frank (Allianz SE)
>> *Subject:* Re: Spark SQL UDF with Struct input parameters
>>
>>
>>
>> Frank
>>
>>
>>
>> I did not find a solution, as a work around I made both the key and value
>> to be of the same data type. I am going to rewrite collect as a Spark UDAF
>> when I have some spare time. You may want to do this if this is a show
>> stopper for you.
>>
>>
>>
>> Regards
>>
>> Deenar
>>
>>
>>
>>
>> *Think Reactive Ltd*
>>
>> deenar.toras...@thinkreactive.co.uk
>>
>> 07714140812
>>
>>
>>
>>
>>
>> On 13 January 2016 at 13:50, Rosner, Frank (Allianz SE) <
>> frank.ros...@allianz.com> wrote:
>>
>> Hey!
>>
>> Did you solve the issue? I am facing the same issue and cannot find a
>> solution.
>>
>> Thanks
>> Frank
>>
>> Hi
>>
>>
>>
>> I am trying to define an UDF that can take an array of tuples as input
>>
>>
>>
>> def effectiveExpectedExposure(expectedExposures: Seq[(Seq[Float],
>>
>> Seq[Float])])=
>>
>> expectedExposures.map(x=> x._1 * x._2).sum/expectedExposures.map(x=>
>>
>> x._1).sum
>>
>>
>>
>> sqlContext.udf.register("expectedPositiveExposure",
>>
>> expectedPositiveExposure _)
>>
>>
>>
>> I get the following message when I try calling this function, where
>>
>> noOfMonths and ee are both floats
>>
>>
>>
>> val df = sqlContext.sql(s"select (collect(struct(noOfMonths, ee))) as eee
>>
>> from netExposureCpty where counterparty = 'xyz'")
>>
>> df.registerTempTable("test")
>>
>> sqlContext.sql("select effectiveExpectedExposure(eee)  from test")
>>
>>
>>
>> Error in SQL statement: AnalysisException: cannot resolve 'UDF(eee)' due
>> to
>>
>> data type mismatch: argument 1 requires array>
>>
>> type, however, 'eee' is of array> type.;
>>
>> line 1 pos 33
>>
>>
>>
>> Deenar
>>
>>
>>
>>
>>
>>
>>
>
>


Re: Kafka Streaming and partitioning

2016-01-13 Thread Dave

So for case 1 below
- subclass or modify the direct stream and kafkardd.  They're private, 
so you'd need to rebuild just the external kafka project, not all of spark
When the data is read from Kafka it will be partitioned correctly with 
the Custom Partitioner passed in to the new direct stream and kafka RDD 
implementations.


For case 2
- write a wrapper subclass of rdd that takes a given custom partitioner 
and rdd in the constructor, overrides partitioner, and delegates every 
other method to the wrapped rdd.  This should be possible without 
modification to any existing spark code.  You'd use it something like 
Am I correct in saying that the data from Kafka will not be read into 
memory in the cluster (kafka server is not located on the Spark Cluster 
in my use case) until the following code is executed

stream.transform { rdd =>
  val wrapped = YourWrapper(cp, rdd)
  wrapped.join(reference)
}
In which case it will run through the partitioner of the wrapped RDD 
when it arrives in the cluster for the first time i.e. no shuffle.


Thanks,
Dave.



On 13/01/16 17:00, Cody Koeninger wrote:
In the case here of a kafkaRDD, the data doesn't reside on the 
cluster, it's not cached by default.  If you're running kafka on the 
same nodes as spark, then data locality would play a factor, but that 
should be handled by the existing getPreferredLocations method.


On Wed, Jan 13, 2016 at 10:46 AM, Dave > wrote:


Thanks Cody, appreciate the response.

With this pattern the partitioners will now match when the join is
executed.
However, does the wrapper RDD not need to set the partition meta
data on the wrapped RDD in order to allow Spark to know where the
data for each partition resides in the cluster.

Thanks,
Dave.


On 13/01/16 16:21, Cody Koeninger wrote:

If two rdds have an identical partitioner, joining should not
involve a shuffle.

You should be able to override the partitioner without calling
partitionBy.

Two ways I can think of to do this:
- subclass or modify the direct stream and kafkardd.  They're
private, so you'd need to rebuild just the external kafka
project, not all of spark

- write a wrapper subclass of rdd that takes a given custom
partitioner and rdd in the constructor, overrides partitioner,
and delegates every other method to the wrapped rdd.  This should
be possible without modification to any existing spark code. 
You'd use it something like


val cp = YourCustomPartitioner(...)
val reference = YourReferenceRDD(cp, ...)
val stream = KafkaUtils

stream.transform { rdd =>
  val wrapped = YourWrapper(cp, rdd)
  wrapped.join(reference)
}


I haven't had reason to do either one of those approaches, so
YMMV, but I believe others have




On Wed, Jan 13, 2016 at 3:40 AM, ddav > wrote:

Hi,

I have the following use case:

1. Reference data stored in an RDD that is persisted and
partitioned using a
simple custom partitioner.
2. Input stream from kafka that uses the same partitioner
algorithm as the
ref data RDD - this partitioning is done in kafka.

I am using kafka direct streams so the number of kafka
partitions map to the
number of partitions in the spark RDD. From testing and the
documentation I
see Spark does not know anything about how the data has been
partitioned in
kafka.

In my use case I need to join the reference data RDD and the
input stream
RDD.  Due to the fact I have manually ensured the incoming
data from kafka
uses the same partitioning algorithm I know the data has been
grouped
correctly in the input stream RDD in Spark but I cannot do a
join without a
shuffle step due to the fact Spark has no knowledge of how
the data has been
partitioned.

I have two ways to do this.
1. Explicitly call PartitionBy(CutomParitioner) on the input
stream RDD
followed by a join. This results in a shuffle of the input
stream RDD and
then the co-partitioned join to take place.
2. Call join on the reference data RDD passing in the input
stream RDD.
Spark will do a shuffle under the hood in this case and the
join will take
place. The join will do its best to run on a node that has
local access to
the reference data RDD.

Is there any difference between the 2 methods above or will
both cause the
same sequence of events to take place in Spark?
Is all I have stated above correct?

Finally, is there any road map feature for looking at
allowing the user to
push a partitioner into an already 

yarn-client: SparkSubmitDriverBootstrapper not found in yarn client mode (1.6.0)

2016-01-13 Thread Lin Zhao
My job runs fine in yarn cluster mode but I have reason to use client mode 
instead. But I'm hitting this error when submitting:
> spark-submit --class com.exabeam.martini.scripts.SparkStreamingTest --master 
> yarn --deploy-mode client --executor-memory 90G --num-executors 3  
> --executor-cores 14 Martini-assembly-0.1.jar yarn-client

Error: Could not find or load main class 
org.apache.spark.deploy.SparkSubmitDriverBootstrapper

 If I replace deploy-mode to cluster the job is submitted successfully. Is 
there a dependency missing from my project? Right now only one I included is 
spark-streaming 1.6.0.


Re: yarn-client: SparkSubmitDriverBootstrapper not found in yarn client mode (1.6.0)

2016-01-13 Thread Ted Yu
Can you show the complete stack trace for the error ?

I searched 1.6.0 code base but didn't find the
class SparkSubmitDriverBootstrapper

Thanks

On Wed, Jan 13, 2016 at 9:31 AM, Lin Zhao  wrote:

> My job runs fine in yarn cluster mode but I have reason to use client mode
> instead. But I'm hitting this error when submitting:
> > spark-submit --class com.exabeam.martini.scripts.SparkStreamingTest
> --master yarn --deploy-mode client --executor-memory 90G --num-executors 3
> --executor-cores 14 Martini-assembly-0.1.jar yarn-client
>
> Error: Could not find or load main class
> org.apache.spark.deploy.SparkSubmitDriverBootstrapper
>
>  If I replace deploy-mode to cluster the job is submitted successfully. Is
> there a dependency missing from my project? Right now only one I included
> is spark-streaming 1.6.0.
>


Re: Is it possible to use SparkSQL JDBC ThriftServer without Hive

2016-01-13 Thread Antonio Piccolboni
Thriftserver creates a HiveContext hence the hive libs, but you don't need
to have Hive running at all. Depending on what you mean by "without Hive"
that could be a positive or negative answer to your question. You need the
dependencies, but you don't need a running instance.

On Wed, Jan 13, 2016 at 3:37 AM angela.whelan 
wrote:

> hi,
> I'm wondering if it is possible to use the SparkSQL JDBC ThriftServer
> without Hive?
>
> The reason I'm asking is that we are unsure about the speed of Hive with
> SparkSQL JDBC connectivity.
>
> I can't find any article online about using SparkSQL JDBC ThriftServer
> without Hive.
>
> Many thanks in advance for any help on this.
>
> Thanks, Angela
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-SparkSQL-JDBC-ThriftServer-without-Hive-tp25959.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: yarn-client: SparkSubmitDriverBootstrapper not found in yarn client mode (1.6.0)

2016-01-13 Thread Jeff Zhang
I also didn't find SparkSubmitDriverBootstrapper, which version of spark
are you using ?

On Wed, Jan 13, 2016 at 9:36 AM, Ted Yu  wrote:

> Can you show the complete stack trace for the error ?
>
> I searched 1.6.0 code base but didn't find the
> class SparkSubmitDriverBootstrapper
>
> Thanks
>
> On Wed, Jan 13, 2016 at 9:31 AM, Lin Zhao  wrote:
>
>> My job runs fine in yarn cluster mode but I have reason to use client
>> mode instead. But I'm hitting this error when submitting:
>> > spark-submit --class com.exabeam.martini.scripts.SparkStreamingTest
>> --master yarn --deploy-mode client --executor-memory 90G --num-executors 3
>> --executor-cores 14 Martini-assembly-0.1.jar yarn-client
>>
>> Error: Could not find or load main class
>> org.apache.spark.deploy.SparkSubmitDriverBootstrapper
>>
>>  If I replace deploy-mode to cluster the job is submitted successfully.
>> Is there a dependency missing from my project? Right now only one I
>> included is spark-streaming 1.6.0.
>>
>
>


-- 
Best Regards

Jeff Zhang


Re: ml.classification.NaiveBayesModel how to reshape theta

2016-01-13 Thread Andy Davidson
Thanks

Andy



From:  Yanbo Liang 
Date:  Wednesday, January 13, 2016 at 6:29 AM
To:  Andrew Davidson 
Cc:  "user @spark" 
Subject:  Re: ml.classification.NaiveBayesModel how to reshape theta

> Yep, row of Matrix theta is the number of classes and column of theta is the
> number of features.
> 
> 2016-01-13 10:47 GMT+08:00 Andy Davidson :
>> I am trying to debug my trained model by exploring theta
>> Theta is a Matrix. The java Doc for Matrix says that it is column major
>> formate
>> 
>> I have trained a NaiveBayesModel. Is the number of classes == to the number
>> of rows? 
>> 
>> int numRows = nbModel.numClasses();
>> 
>> int numColumns = nbModel.numFeatures();
>> 
>> 
>> 
>> Kind regards
>> 
>> 
>> 
>> Andy
> 




Re: yarn-client: SparkSubmitDriverBootstrapper not found in yarn client mode (1.6.0)

2016-01-13 Thread Marcelo Vanzin
SparkSubmitDriverBootstrapper was removed back in Spark 1.4, so it
seems you have a mixbag of 1.3 / 1.6 in your path / classpath and
things are failing because of that.

On Wed, Jan 13, 2016 at 9:31 AM, Lin Zhao  wrote:
> My job runs fine in yarn cluster mode but I have reason to use client mode
> instead. But I'm hitting this error when submitting:
>> spark-submit --class com.exabeam.martini.scripts.SparkStreamingTest
>> --master yarn --deploy-mode client --executor-memory 90G --num-executors 3
>> --executor-cores 14 Martini-assembly-0.1.jar yarn-client
>
> Error: Could not find or load main class
> org.apache.spark.deploy.SparkSubmitDriverBootstrapper
>
>
>  If I replace deploy-mode to cluster the job is submitted successfully. Is
> there a dependency missing from my project? Right now only one I included is
> spark-streaming 1.6.0.



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org