Re: Hive context datanucleus error

2015-03-23 Thread Udit Mehta
has this issue been fixed in spark 1.2:
https://issues.apache.org/jira/browse/SPARK-2624

On Mon, Mar 23, 2015 at 9:19 PM, Udit Mehta  wrote:

> I am trying to run a simple query to view tables in my hive metastore
> using hive context.
> I am getting this error:
> spark Persistence process has been specified to use a *ClassLoader
> Resolve* of name "datanucleus" yet this has not been found by the
> DataNucleus plugin mechanism. Please check your CLASSPATH and plugin
> specification.
> 
>
> I am able to access the metastore using the spark-sql.
> Can someone point out what the issue could be?
>
> thanks
>


diffrence in PCA of MLib vs H2o in R

2015-03-23 Thread roni
I am trying to compute PCA  using  computePrincipalComponents.
I  also computed PCA using h2o in R and R's prcomp. The answers I get from
H2o and R's prComp (non h2o) is same when I set the options for H2o as
standardized=FALSE and for r's prcomp as center = false.

How do I make sure that the settings for MLib PCA is same as I am using for
H2o or prcomp.

Thanks
Roni


RE: Use pig load function in spark

2015-03-23 Thread Dai, Kevin
Hi, Yin

But our data is customized sequence file which can be read by our customized 
load in pig

And I want to use spark to reuse these load function to read data and transfer 
them to the RDD.

Best Regards,
Kevin.

From: Yin Huai [mailto:yh...@databricks.com]
Sent: 2015年3月24日 11:53
To: Dai, Kevin
Cc: Paul Brown; user@spark.apache.org
Subject: Re: Use pig load function in spark

Hello Kevin,

You can take a look at our generic load 
function.

For example, you can use

val df = sqlContext.load("/myData", "parquet")
To load a parquet dataset stored in "/myData" as a 
DataFrame.

You can use it to load data stored in various formats, like json (Spark 
built-in), parquet (Spark built-in), 
avro, and 
csv.

Thanks,

Yin

On Mon, Mar 23, 2015 at 7:14 PM, Dai, Kevin 
mailto:yun...@ebay.com>> wrote:
Hi, Paul

You are right.

The story is that we have a lot of pig load function to load our different data.

And now we want to use spark to read and process these data.

So we want to figure out a way to reuse our existing load function in spark to 
read these data.

Any idea?

Best Regards,
Kevin.

From: Paul Brown [mailto:p...@mult.ifario.us]
Sent: 2015年3月24日 4:11
To: Dai, Kevin
Subject: Re: Use pig load function in spark


The answer is "Maybe, but you probably don't want to do that.".

A typical Pig load function is devoted to bridging external data into Pig's 
type system, but you don't really need to do that in Spark because it is 
(thankfully) not encumbered by Pig's type system.  What you probably want to do 
is to figure out a way to use native Spark facilities (e.g., textFile) coupled 
with some of the logic out of your Pig load function necessary to turn your 
external data into an RDD.


—
p...@mult.ifario.us | Multifarious, Inc. | 
http://mult.ifario.us/

On Mon, Mar 23, 2015 at 2:29 AM, Dai, Kevin 
mailto:yun...@ebay.com>> wrote:
Hi, all

Can spark use pig’s load function to load data?

Best Regards,
Kevin.




Re: Is it possible to use json4s 3.2.11 with Spark 1.3.0?

2015-03-23 Thread Alexey Zinoviev
Thanks Marcelo, this options solved the problem (I'm using 1.3.0), but it
works only if I remove "extends Logging" from the object, with "extends
Logging" it return:

Exception in thread "main" java.lang.LinkageError: loader constraint
violation in interface itable initialization: when resolving method
"App1$.logInfo(Lscala/Function0;Ljava/lang/Throwable;)V" the class loader
(instance of org/apache/spark/util/ChildFirstURLClassLoader) of the current
class, App1$, and the class loader (instance of
sun/misc/Launcher$AppClassLoader) for interface org/apache/spark/Logging
have different Class objects for the type scala/Function0 used in the
signature
at App1.main(App1.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Do you have any idea what's wrong with Logging?

PS: I'm running it with spark-1.3.0/bin/spark-submit --class App1 --conf
spark.driver.userClassPathFirst=true --conf
spark.executor.userClassPathFirst=true
$HOME/projects/sparkapp/target/scala-2.10/sparkapp-assembly-1.0.jar

Thanks,
Alexey


On Tue, Mar 24, 2015 at 5:03 AM, Marcelo Vanzin  wrote:

> You could build a far jar for your application containing both your
> code and the json4s library, and then run Spark with these two
> options:
>
>   spark.driver.userClassPathFirst=true
>   spark.executor.userClassPathFirst=true
>
> Both only work in 1.3. (1.2 has spark.files.userClassPathFirst, but
> that only works for executors.)
>
>
> On Mon, Mar 23, 2015 at 2:12 PM, Alexey Zinoviev
>  wrote:
> > Spark has a dependency on json4s 3.2.10, but this version has several
> bugs
> > and I need to use 3.2.11. I added json4s-native 3.2.11 dependency to
> > build.sbt and everything compiled fine. But when I spark-submit my JAR it
> > provides me with 3.2.10.
> >
> >
> > build.sbt
> >
> > import sbt.Keys._
> >
> > name := "sparkapp"
> >
> > version := "1.0"
> >
> > scalaVersion := "2.10.4"
> >
> > libraryDependencies += "org.apache.spark" %% "spark-core"  % "1.3.0" %
> > "provided"
> >
> > libraryDependencies += "org.json4s" %% "json4s-native" % "3.2.11"`
> >
> >
> > plugins.sbt
> >
> > logLevel := Level.Warn
> >
> > resolvers += Resolver.url("artifactory",
> > url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases
> "))(Resolver.ivyStylePatterns)
> >
> > addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
> >
> >
> > App1.scala
> >
> > import org.apache.spark.SparkConf
> > import org.apache.spark.rdd.RDD
> > import org.apache.spark.{Logging, SparkConf, SparkContext}
> > import org.apache.spark.SparkContext._
> >
> > object App1 extends Logging {
> >   def main(args: Array[String]) = {
> > val conf = new SparkConf().setAppName("App1")
> > val sc = new SparkContext(conf)
> > println(s"json4s version: ${org.json4s.BuildInfo.version.toString}")
> >   }
> > }
> >
> >
> >
> > sbt 0.13.7, sbt-assembly 0.13.0, Scala 2.10.4
> >
> > Is it possible to force 3.2.11 version usage?
> >
> > Thanks,
> > Alexey
>
>
>
> --
> Marcelo
>


Re: Is it possible to use json4s 3.2.11 with Spark 1.3.0?

2015-03-23 Thread Alexey Zinoviev
Thanks Ted, I'll try, hope there's no transitive dependencies on 3.2.10.

On Tue, Mar 24, 2015 at 4:21 AM, Ted Yu  wrote:

> Looking at core/pom.xml :
> 
>   org.json4s
>   json4s-jackson_${scala.binary.version}
>   3.2.10
> 
>
> The version is hard coded.
>
> You can rebuild Spark 1.3.0 with json4s 3.2.11
>
> Cheers
>
> On Mon, Mar 23, 2015 at 2:12 PM, Alexey Zinoviev <
> alexey.zinov...@gmail.com> wrote:
>
>> Spark has a dependency on json4s 3.2.10, but this version has several
>> bugs and I need to use 3.2.11. I added json4s-native 3.2.11 dependency to
>> build.sbt and everything compiled fine. But when I spark-submit my JAR it
>> provides me with 3.2.10.
>>
>>
>> build.sbt
>>
>> import sbt.Keys._
>>
>> name := "sparkapp"
>>
>> version := "1.0"
>>
>> scalaVersion := "2.10.4"
>>
>> libraryDependencies += "org.apache.spark" %% "spark-core"  % "1.3.0" %
>> "provided"
>>
>> libraryDependencies += "org.json4s" %% "json4s-native" % "3.2.11"`
>>
>>
>> plugins.sbt
>>
>> logLevel := Level.Warn
>>
>> resolvers += Resolver.url("artifactory", url("
>> http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases
>> "))(Resolver.ivyStylePatterns)
>>
>> addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
>>
>>
>> App1.scala
>>
>> import org.apache.spark.SparkConf
>> import org.apache.spark.rdd.RDD
>> import org.apache.spark.{Logging, SparkConf, SparkContext}
>> import org.apache.spark.SparkContext._
>>
>> object App1 extends Logging {
>>   def main(args: Array[String]) = {
>> val conf = new SparkConf().setAppName("App1")
>> val sc = new SparkContext(conf)
>> println(s"json4s version: ${org.json4s.BuildInfo.version.toString}")
>>   }
>> }
>>
>>
>>
>> sbt 0.13.7, sbt-assembly 0.13.0, Scala 2.10.4
>>
>> Is it possible to force 3.2.11 version usage?
>>
>> Thanks,
>> Alexey
>>
>
>


Hive context datanucleus error

2015-03-23 Thread Udit Mehta
I am trying to run a simple query to view tables in my hive metastore using
hive context.
I am getting this error:
spark Persistence process has been specified to use a *ClassLoader Resolve* of
name "datanucleus" yet this has not been found by the DataNucleus plugin
mechanism. Please check your CLASSPATH and plugin specification.


I am able to access the metastore using the spark-sql.
Can someone point out what the issue could be?

thanks


Re: Date and decimal datatype not working

2015-03-23 Thread Yin Huai
To store to csv file, you can use Spark-CSV
 library.

On Mon, Mar 23, 2015 at 5:35 PM, BASAK, ANANDA  wrote:

>  Thanks. This worked well as per your suggestions. I had to run following:
>
> val TABLE_A =
> sc.textFile("/Myhome/SPARK/files/table_a_file.txt").map(_.split("|")).map(p
> => ROW_A(p(0).trim.toLong, p(1), p(2).trim.toInt, p(3), BigDecimal(p(4)),
> BigDecimal(p(5)), BigDecimal(p(6
>
>
>
> Now I am stuck at another step. I have run a SQL query, where I am
> Selecting from all the fields with some where clause , TSTAMP filtered with
> date range and order by TSTAMP clause. That is running fine.
>
>
>
> Then I am trying to store the output in a CSV file. I am using
> saveAsTextFile(“filename”) function. But it is giving error. Can you please
> help me to write a proper syntax to store output in a CSV file?
>
>
>
>
>
> Thanks & Regards
>
> ---
>
> Ananda Basak
>
> Ph: 425-213-7092
>
>
>
> *From:* BASAK, ANANDA
> *Sent:* Tuesday, March 17, 2015 3:08 PM
> *To:* Yin Huai
> *Cc:* user@spark.apache.org
> *Subject:* RE: Date and decimal datatype not working
>
>
>
> Ok, thanks for the suggestions. Let me try and will confirm all.
>
>
>
> Regards
>
> Ananda
>
>
>
> *From:* Yin Huai [mailto:yh...@databricks.com]
> *Sent:* Tuesday, March 17, 2015 3:04 PM
> *To:* BASAK, ANANDA
> *Cc:* user@spark.apache.org
> *Subject:* Re: Date and decimal datatype not working
>
>
>
> p(0) is a String. So, you need to explicitly convert it to a Long. e.g.
> p(0).trim.toLong. You also need to do it for p(2). For those BigDecimals
> value, you need to create BigDecimal objects from your String values.
>
>
>
> On Tue, Mar 17, 2015 at 5:55 PM, BASAK, ANANDA  wrote:
>
>   Hi All,
>
> I am very new in Spark world. Just started some test coding from last
> week. I am using spark-1.2.1-bin-hadoop2.4 and scala coding.
>
> I am having issues while using Date and decimal data types. Following is
> my code that I am simply running on scala prompt. I am trying to define a
> table and point that to my flat file containing raw data (pipe delimited
> format). Once that is done, I will run some SQL queries and put the output
> data in to another flat file with pipe delimited format.
>
>
>
> ***
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> import sqlContext.createSchemaRDD
>
>
>
>
>
> // Define row and table
>
> case class ROW_A(
>
>   TSTAMP:   Long,
>
>   USIDAN: String,
>
>   SECNT:Int,
>
>   SECT:   String,
>
>   BLOCK_NUM:BigDecimal,
>
>   BLOCK_DEN:BigDecimal,
>
>   BLOCK_PCT:BigDecimal)
>
>
>
> val TABLE_A =
> sc.textFile("/Myhome/SPARK/files/table_a_file.txt").map(_.split("|")).map(p
> => ROW_A(p(0), p(1), p(2), p(3), p(4), p(5), p(6)))
>
>
>
> TABLE_A.registerTempTable("TABLE_A")
>
>
>
> ***
>
>
>
> The second last command is giving error, like following:
>
> :17: error: type mismatch;
>
> found   : String
>
> required: Long
>
>
>
> Looks like the content from my flat file are considered as String always
> and not as Date or decimal. How can I make Spark to take them as Date or
> decimal types?
>
>
>
> Regards
>
> Ananda
>
>
>


Re: Use pig load function in spark

2015-03-23 Thread Yin Huai
Hello Kevin,

You can take a look at our generic load function

.

For example, you can use

val df = sqlContext.load("/myData", "parquet")

To load a parquet dataset stored in "/myData" as a DataFrame
.

You can use it to load data stored in various formats, like json (Spark
built-in), parquet (Spark built-in), avro
, and csv
.

Thanks,

Yin

On Mon, Mar 23, 2015 at 7:14 PM, Dai, Kevin  wrote:

>  Hi, Paul
>
>
>
> You are right.
>
>
>
> The story is that we have a lot of pig load function to load our different
> data.
>
>
>
> And now we want to use spark to read and process these data.
>
>
>
> So we want to figure out a way to reuse our existing load function in
> spark to read these data.
>
>
>
> Any idea?
>
>
>
> Best Regards,
>
> Kevin.
>
>
>
> *From:* Paul Brown [mailto:p...@mult.ifario.us]
> *Sent:* 2015年3月24日 4:11
> *To:* Dai, Kevin
> *Subject:* Re: Use pig load function in spark
>
>
>
>
>
> The answer is "Maybe, but you probably don't want to do that.".
>
>
>
> A typical Pig load function is devoted to bridging external data into
> Pig's type system, but you don't really need to do that in Spark because it
> is (thankfully) not encumbered by Pig's type system.  What you probably
> want to do is to figure out a way to use native Spark facilities (e.g.,
> textFile) coupled with some of the logic out of your Pig load function
> necessary to turn your external data into an RDD.
>
>
>
>
>   —
> p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/
>
>
>
> On Mon, Mar 23, 2015 at 2:29 AM, Dai, Kevin  wrote:
>
> Hi, all
>
>
>
> Can spark use pig’s load function to load data?
>
>
>
> Best Regards,
>
> Kevin.
>
>
>


how to cache table with OFF_HEAP storage level in SparkSQL thriftserver

2015-03-23 Thread LiuZeshan
hi all:

I got a spark on yarn cluster (spark-1.3.0, hadoop-2.2.0) with hive-0.12.0 and 
tachyon-0.6.1, 
and now I start SparkSQL thriftserver with start-thriftserver.sh, and use 
beeline to connect to thriftserver according to spark document. 


My question is: how to cache table with specified storage level, such as 
OFF_HEAP to me?


I have dug into spark document and spark-user mail list, and did not get any 
idea. 


If I run `cache table TABLENAME` in beeline prompt line, I find this on monitor 
UI. 
I think rdd is cached in default storage level(MEMORY_ONLY), that is not what I 
want.

Thanks

2C04F90E@476EDA34.00DC1055
Description: Binary data


Re: newbie quesiton - spark with mesos

2015-03-23 Thread Anirudha Jadhav
My bad there, I was using the correct link for docs. The spark shell runs
correctly, the framework is registered fine on mesos.

is there some setting i am missing:
this is my spark-env.sh>>>

export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so
export SPARK_EXECUTOR_URI=http://100.125.5.93/sparkx.tgz
export SPARK_LOCAL_IP=127.0.0.1



here is what i see on the slave node.

less
20150226-160708-78932-5050-8971-S0/frameworks/20150323-205508-78932-5050-29804-0012/executors/20150226-160708-78932-5050-8971-S0/runs/cceea834-c4d9-49d6-a579-8352f1889b56/stderr
>>>>>

WARNING: Logging before InitGoogleLogging() is written to STDERR
I0324 02:30:29.389225 27755 fetcher.cpp:76] Fetching URI '
http://100.125.5.93/sparkx.tgz'
I0324 02:30:29.389361 27755 fetcher.cpp:126] Downloading '
http://100.125.5.93/sparkx.tgz' to
'/tmp/mesos/slaves/20150226-160708-78932-5050-8971-S0/frameworks/20150323-205508-78932-5050-29804-0012/executors/20150226-160708-78932-5050-8971-S0/runs/cceea834-c4d9-49d6-a579-8352f1889b56/sparkx.tgz'
I0324 02:30:35.353446 27755 fetcher.cpp:64] Extracted resource
'/tmp/mesos/slaves/20150226-160708-78932-5050-8971-S0/frameworks/20150323-205508-78932-5050-29804-0012/executors/20150226-160708-78932-5050-8971-S0/runs/cceea834-c4d9-49d6-a579-8352f1889b56/sparkx.tgz'
into
'/tmp/mesos/slaves/20150226-160708-78932-5050-8971-S0/frameworks/20150323-205508-78932-5050-29804-0012/executors/20150226-160708-78932-5050-8971-S0/runs/cceea834-c4d9-49d6-a579-8352f1889b56'
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/03/24 02:30:37 INFO MesosExecutorBackend: Registered signal handlers for
[TERM, HUP, INT]
I0324 02:30:37.071077 27863 exec.cpp:132] Version: 0.21.1
I0324 02:30:37.080971 27885 exec.cpp:206] Executor registered on slave
20150226-160708-78932-5050-8971-S0
15/03/24 02:30:37 INFO MesosExecutorBackend: Registered with Mesos as
executor ID 20150226-160708-78932-5050-8971-S0 with 1 cpus
15/03/24 02:30:37 INFO SecurityManager: Changing view acls to: ubuntu
15/03/24 02:30:37 INFO SecurityManager: Changing modify acls to: ubuntu
15/03/24 02:30:37 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(ubuntu); users
with modify permissions: Set(ubuntu)
15/03/24 02:30:37 INFO Slf4jLogger: Slf4jLogger started
15/03/24 02:30:37 INFO Remoting: Starting remoting
15/03/24 02:30:38 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkexecu...@mesos-si2.dny1.bcpc.bloomberg.com:50542]
15/03/24 02:30:38 INFO Utils: Successfully started service 'sparkExecutor'
on port 50542.
15/03/24 02:30:38 INFO AkkaUtils: Connecting to MapOutputTracker:
akka.tcp://sparkDriver@localhost:51849/user/MapOutputTracker
15/03/24 02:30:38 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://sparkDriver@localhost:51849]. Address is now gated for
5000 ms, all messages to this address will be delivered to dead letters.
Reason: Connection refused: localhost/127.0.0.1:51849
akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(akka.tcp://sparkDriver@localhost:51849/),
Path(/user/MapOutputTracker)]
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
at
akka.remote.RemoteActorRefProv

RE: Use pig load function in spark

2015-03-23 Thread Dai, Kevin
Hi, Paul

You are right.

The story is that we have a lot of pig load function to load our different data.

And now we want to use spark to read and process these data.

So we want to figure out a way to reuse our existing load function in spark to 
read these data.

Any idea?

Best Regards,
Kevin.

From: Paul Brown [mailto:p...@mult.ifario.us]
Sent: 2015年3月24日 4:11
To: Dai, Kevin
Subject: Re: Use pig load function in spark


The answer is "Maybe, but you probably don't want to do that.".

A typical Pig load function is devoted to bridging external data into Pig's 
type system, but you don't really need to do that in Spark because it is 
(thankfully) not encumbered by Pig's type system.  What you probably want to do 
is to figure out a way to use native Spark facilities (e.g., textFile) coupled 
with some of the logic out of your Pig load function necessary to turn your 
external data into an RDD.


—
p...@mult.ifario.us | Multifarious, Inc. | 
http://mult.ifario.us/

On Mon, Mar 23, 2015 at 2:29 AM, Dai, Kevin 
mailto:yun...@ebay.com>> wrote:
Hi, all

Can spark use pig’s load function to load data?

Best Regards,
Kevin.



Re: Invalid ContainerId ... Caused by: java.lang.NumberFormatException: For input string: "e04"

2015-03-23 Thread Marcelo Vanzin
This happens most probably because the Spark 1.3 you have downloaded
is built against an older version of the Hadoop libraries than those
used by CDH, and those libraries cannot parse the container IDs
generated by CDH.

You can try to work around this by manually adding CDH jars to the
front of the classpath by setting "spark.driver.extraClassPath" and
"spark.executor.extraClassPath" to "/usr/lib/hadoop/client/*" (or the
respective location if you're using parcels).


On Mon, Mar 23, 2015 at 6:32 PM, Manoj Samel  wrote:
> Spark 1.3, CDH 5.3.2, Kerberos
>
> Setup works fine with base configuration, spark-shell can be used in yarn
> client mode etc.
>
> When work recovery feature is enabled via
> http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/admin_ha_yarn_work_preserving_recovery.html,
> the spark-shell fails with following log
>
> 15/03/24 01:20:16 ERROR yarn.ApplicationMaster: Uncaught exception:
> java.lang.IllegalArgumentException: Invalid ContainerId:
> container_e04_1427159778706_0002_01_01
> at
> org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182)
> at
> org.apache.spark.deploy.yarn.YarnRMClient.getAttemptId(YarnRMClient.scala:93)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:83)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:576)
> at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
> at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:574)
> at
> org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:597)
> at
> org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
> Caused by: java.lang.NumberFormatException: For input string: "e04"
> at
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Long.parseLong(Long.java:589)
> at java.lang.Long.parseLong(Long.java:631)
> at
> org.apache.hadoop.yarn.util.ConverterUtils.toApplicationAttemptId(ConverterUtils.java:137)
> at
> org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:177)
> ... 12 more
> 15/03/24 01:20:16 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 10, (reason: Uncaught exception: Invalid ContainerId:
> container_e04_1427159778706_0002_01_01)
>
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is it possible to use json4s 3.2.11 with Spark 1.3.0?

2015-03-23 Thread Marcelo Vanzin
You could build a far jar for your application containing both your
code and the json4s library, and then run Spark with these two
options:

  spark.driver.userClassPathFirst=true
  spark.executor.userClassPathFirst=true

Both only work in 1.3. (1.2 has spark.files.userClassPathFirst, but
that only works for executors.)


On Mon, Mar 23, 2015 at 2:12 PM, Alexey Zinoviev
 wrote:
> Spark has a dependency on json4s 3.2.10, but this version has several bugs
> and I need to use 3.2.11. I added json4s-native 3.2.11 dependency to
> build.sbt and everything compiled fine. But when I spark-submit my JAR it
> provides me with 3.2.10.
>
>
> build.sbt
>
> import sbt.Keys._
>
> name := "sparkapp"
>
> version := "1.0"
>
> scalaVersion := "2.10.4"
>
> libraryDependencies += "org.apache.spark" %% "spark-core"  % "1.3.0" %
> "provided"
>
> libraryDependencies += "org.json4s" %% "json4s-native" % "3.2.11"`
>
>
> plugins.sbt
>
> logLevel := Level.Warn
>
> resolvers += Resolver.url("artifactory",
> url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases";))(Resolver.ivyStylePatterns)
>
> addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
>
>
> App1.scala
>
> import org.apache.spark.SparkConf
> import org.apache.spark.rdd.RDD
> import org.apache.spark.{Logging, SparkConf, SparkContext}
> import org.apache.spark.SparkContext._
>
> object App1 extends Logging {
>   def main(args: Array[String]) = {
> val conf = new SparkConf().setAppName("App1")
> val sc = new SparkContext(conf)
> println(s"json4s version: ${org.json4s.BuildInfo.version.toString}")
>   }
> }
>
>
>
> sbt 0.13.7, sbt-assembly 0.13.0, Scala 2.10.4
>
> Is it possible to force 3.2.11 version usage?
>
> Thanks,
> Alexey



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Invalid ContainerId ... Caused by: java.lang.NumberFormatException: For input string: "e04"

2015-03-23 Thread Manoj Samel
Spark 1.3, CDH 5.3.2, Kerberos

Setup works fine with base configuration, spark-shell can be used in yarn
client mode etc.

When work recovery feature is enabled via
http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/admin_ha_yarn_work_preserving_recovery.html,
the spark-shell fails with following log

15/03/24 01:20:16 ERROR yarn.ApplicationMaster: Uncaught exception:
java.lang.IllegalArgumentException: Invalid ContainerId:
container_e04_1427159778706_0002_01_01
at
org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182)
at
org.apache.spark.deploy.yarn.YarnRMClient.getAttemptId(YarnRMClient.scala:93)
at
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:83)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:576)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
at
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:574)
at
org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:597)
at
org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
Caused by: java.lang.NumberFormatException: For input string: "e04"
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at
org.apache.hadoop.yarn.util.ConverterUtils.toApplicationAttemptId(ConverterUtils.java:137)
at
org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:177)
... 12 more
15/03/24 01:20:16 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 10, (reason: Uncaught exception: Invalid ContainerId:
container_e04_1427159778706_0002_01_01)


Re: RE: Spark SQL udf(ScalaUdf) is very slow

2015-03-23 Thread ??o0/ka????
Hi, Cheng Hao, thank you for your reply. 


I create a issue https://issues.apache.org/jira/browse/SPARK-6483 for this.








-- Original --
From:  "Cheng, Hao";;
Date:  Mon, Mar 23, 2015 10:08 PM
To:  "??o0/ka"<441586...@qq.com>; 
"user@spark.apache.org"; 

Subject:  RE: Spark SQL udf(ScalaUdf) is very slow



  
This is a very interesting issue, the root reason for the lower performance 
probably is, in Scala UDF, Spark SQL converts the data type from internal 
representation  to Scala representation via Scala reflection recursively.
 
 
 
Can you create a Jira issue for tracking this? I can start to work on the 
improvement soon.
 
 
 
From: zzcclp [mailto:441586...@qq.com] 
 Sent: Monday, March 23, 2015 5:10 PM
 To: user@spark.apache.org
 Subject: Spark SQL udf(ScalaUdf) is very slow
 
 
 
My test env: 1. Spark version is 1.3.0 2. 3 node per 80G/20C 3. read 250G 
parquet files from hdfs Test case: 1. register "floor" func with command: 
sqlContext.udf.register("floor", (ts: Int) => ts - ts % 300), then run with sql 
"select chan, floor(ts) as tt, sum(size) from qlogbase3 group by chan, 
floor(ts)", it takes 17 minutes. == Physical Plan == Aggregate false, 
[chan#23015,PartialGroup#23500], [chan#23015,PartialGroup#23500 AS 
tt#23494,CombineSum(PartialSum#23499L) AS c2#23495L] Exchange (HashPartitioning 
[chan#23015,PartialGroup#23500], 54) Aggregate  true, 
[chan#23015,scalaUDF(ts#23016)], [chan#23015,scalaUDF(ts#23016) AS 
PartialGroup#23500,SUM(size#23023L) AS PartialSum#23499L] PhysicalRDD 
[chan#23015,ts#23016,size#23023L], MapPartitionsRDD[115] at map at 
newParquet.scala:562 2. run with sql "select  chan, (ts - ts % 300) as tt, 
sum(size) from qlogbase3 group by chan, (ts - ts % 300)", it takes only 5 
minutes. == Physical Plan == Aggregate false, [chan#23015,PartialGroup#23349], 
[chan#23015,PartialGroup#23349 AS tt#23343,CombineSum(PartialSum#23348L) AS 
c2#23344L] Exchange (HashPartitioning [chan#23015,PartialGroup#23349], 54) 
Aggregate  true, [chan#23015,(ts#23016 - (ts#23016 % 300))], 
[chan#23015,(ts#23016 - (ts#23016 % 300)) AS 
PartialGroup#23349,SUM(size#23023L) AS PartialSum#23348L] PhysicalRDD 
[chan#23015,ts#23016,size#23023L], MapPartitionsRDD[83] at map at 
newParquet.scala:562 3. use HiveContext with sql "select chan, floor((ts - ts % 
300)) as tt, sum(size) from qlogbase3 group by chan, floor((ts - ts % 300))" it 
takes only 5 minutes too. == Physical Plan == Aggregate false, 
[chan#23015,PartialGroup#23108L], [chan#23015,PartialGroup#23108L  AS 
tt#23102L,CombineSum(PartialSum#23107L) AS _c2#23103L] Exchange 
(HashPartitioning [chan#23015,PartialGroup#23108L], 54) Aggregate true, 
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
 - (ts#23016 % 300)))], 
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
  - (ts#23016 % 300))) AS PartialGroup#23108L,SUM(size#23023L) AS 
PartialSum#23107L] PhysicalRDD [chan#23015,ts#23016,size#23023L], 
MapPartitionsRDD[28] at map at newParquet.scala:562 Why? ScalaUdf is so slow?? 
How to improve it? 
  
 
 
View this message in context:  Spark SQL udf(ScalaUdf) is very slow
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Is it possible to use json4s 3.2.11 with Spark 1.3.0?

2015-03-23 Thread Ted Yu
Looking at core/pom.xml :

  org.json4s
  json4s-jackson_${scala.binary.version}
  3.2.10


The version is hard coded.

You can rebuild Spark 1.3.0 with json4s 3.2.11

Cheers

On Mon, Mar 23, 2015 at 2:12 PM, Alexey Zinoviev 
wrote:

> Spark has a dependency on json4s 3.2.10, but this version has several bugs
> and I need to use 3.2.11. I added json4s-native 3.2.11 dependency to
> build.sbt and everything compiled fine. But when I spark-submit my JAR it
> provides me with 3.2.10.
>
>
> build.sbt
>
> import sbt.Keys._
>
> name := "sparkapp"
>
> version := "1.0"
>
> scalaVersion := "2.10.4"
>
> libraryDependencies += "org.apache.spark" %% "spark-core"  % "1.3.0" %
> "provided"
>
> libraryDependencies += "org.json4s" %% "json4s-native" % "3.2.11"`
>
>
> plugins.sbt
>
> logLevel := Level.Warn
>
> resolvers += Resolver.url("artifactory", url("
> http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases
> "))(Resolver.ivyStylePatterns)
>
> addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
>
>
> App1.scala
>
> import org.apache.spark.SparkConf
> import org.apache.spark.rdd.RDD
> import org.apache.spark.{Logging, SparkConf, SparkContext}
> import org.apache.spark.SparkContext._
>
> object App1 extends Logging {
>   def main(args: Array[String]) = {
> val conf = new SparkConf().setAppName("App1")
> val sc = new SparkContext(conf)
> println(s"json4s version: ${org.json4s.BuildInfo.version.toString}")
>   }
> }
>
>
>
> sbt 0.13.7, sbt-assembly 0.13.0, Scala 2.10.4
>
> Is it possible to force 3.2.11 version usage?
>
> Thanks,
> Alexey
>


Re: Should I do spark-sql query on HDFS or hive?

2015-03-23 Thread Denny Lee
>From the standpoint of Spark SQL accessing the files - when it is hitting
Hive, it is in effect hitting HDFS as well.  Hive provides a great
framework where the table structure is already well defined.But
underneath it, Hive is just accessing files from HDFS so you are hitting
HDFS either way.  HTH!

On Tue, Mar 17, 2015 at 3:41 AM 李铖  wrote:

> Hi,everybody.
>
> I am new in spark. Now I want to do interactive sql query using spark sql.
> spark sql can run under hive or loading files from hdfs.
>
> Which is better or faster?
>
> Thanks.
>


RE: Date and decimal datatype not working

2015-03-23 Thread BASAK, ANANDA
Thanks. This worked well as per your suggestions. I had to run following:
val TABLE_A = 
sc.textFile("/Myhome/SPARK/files/table_a_file.txt").map(_.split("|")).map(p => 
ROW_A(p(0).trim.toLong, p(1), p(2).trim.toInt, p(3), BigDecimal(p(4)), 
BigDecimal(p(5)), BigDecimal(p(6

Now I am stuck at another step. I have run a SQL query, where I am Selecting 
from all the fields with some where clause , TSTAMP filtered with date range 
and order by TSTAMP clause. That is running fine.

Then I am trying to store the output in a CSV file. I am using 
saveAsTextFile(“filename”) function. But it is giving error. Can you please 
help me to write a proper syntax to store output in a CSV file?


Thanks & Regards
---
Ananda Basak
Ph: 425-213-7092

From: BASAK, ANANDA
Sent: Tuesday, March 17, 2015 3:08 PM
To: Yin Huai
Cc: user@spark.apache.org
Subject: RE: Date and decimal datatype not working

Ok, thanks for the suggestions. Let me try and will confirm all.

Regards
Ananda

From: Yin Huai [mailto:yh...@databricks.com]
Sent: Tuesday, March 17, 2015 3:04 PM
To: BASAK, ANANDA
Cc: user@spark.apache.org
Subject: Re: Date and decimal datatype not working

p(0) is a String. So, you need to explicitly convert it to a Long. e.g. 
p(0).trim.toLong. You also need to do it for p(2). For those BigDecimals value, 
you need to create BigDecimal objects from your String values.

On Tue, Mar 17, 2015 at 5:55 PM, BASAK, ANANDA 
mailto:ab9...@att.com>> wrote:
Hi All,
I am very new in Spark world. Just started some test coding from last week. I 
am using spark-1.2.1-bin-hadoop2.4 and scala coding.
I am having issues while using Date and decimal data types. Following is my 
code that I am simply running on scala prompt. I am trying to define a table 
and point that to my flat file containing raw data (pipe delimited format). 
Once that is done, I will run some SQL queries and put the output data in to 
another flat file with pipe delimited format.

***
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD


// Define row and table
case class ROW_A(
  TSTAMP:   Long,
  USIDAN: String,
  SECNT:Int,
  SECT:   String,
  BLOCK_NUM:BigDecimal,
  BLOCK_DEN:BigDecimal,
  BLOCK_PCT:BigDecimal)

val TABLE_A = 
sc.textFile("/Myhome/SPARK/files/table_a_file.txt").map(_.split("|")).map(p => 
ROW_A(p(0), p(1), p(2), p(3), p(4), p(5), p(6)))

TABLE_A.registerTempTable("TABLE_A")

***

The second last command is giving error, like following:
:17: error: type mismatch;
found   : String
required: Long

Looks like the content from my flat file are considered as String always and 
not as Date or decimal. How can I make Spark to take them as Date or decimal 
types?

Regards
Ananda



GraphX Pregal optimization

2015-03-23 Thread Clare Huang
Hi all,

I have been testing to use Spark Graphx to do large sparse matrix
multiplication for 3D image reconstruction.  I used pregal API to forward
and back project the images based on a graph respresentation of a large
sparse matrix.  I was wondering how one can optimize the Pregal operation
with respect to the number of partitions that I used for graph
parallelization and how I cache the intermediate variables with respect to
the system that I will run on (a local multicore server vs. a super
computer cluster)

Thanks a lot for your time and help in advance!

Cheers,
Clare


Weird exception in Spark job

2015-03-23 Thread nitinkak001
I am trying to run a Hive query from Spark code through HiveContext. Anybody
knows what these exceptions mean? I have no clue

LogType: stderr
LogLength: 3345
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/jars/avro-tools-1.7.6-cdh5.3.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/jars/pig-0.12.0-cdh5.3.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/jars/slf4j-simple-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1655)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:115)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:161)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[1 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at akka.remote.Remoting.start(Remoting.scala:173)
at
akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1684)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1675)
at 
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:122)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
... 4 more

LogType: stdout
LogLength: 7460
Log Contents:
2015-03-23 18:54:55,571 INFO  [main] executor.CoarseGrainedExecutorBackend
(SignalLogger.scala:register(47)) - Registered signal handlers for [TERM,
HUP, INT]
2015-03-23 18:54:56,898 INFO  [main] spark.SecurityManager
(Logging.scala:logInfo(59)) - Changing view acls to: yarn,kakn
2015-03-23 18:54:56,901 INFO  [main] spark.SecurityManager
(Logging.scala:logInfo(59)) - Changing modify acls to: yarn,kakn
2015-03-23 18:54:56,911 INFO  [main] spark.SecurityManager
(Logging.scala:logInfo(59)) - SecurityManager: authentication disabled; ui
acls disabled; users with view permissions: Set(yarn, kakn); users with
modify permissions: Set(yarn, kakn)
2015-03-23 18:54:57,725 INFO 
[driverPropsFetcher-akka.actor.default-dispatcher-3] slf4j.Slf4jLogger
(Slf4jLogger.scala:applyOrElse(80)) - Slf4jLogger started
2015-03-23 18:54:57,810 INFO 
[driverPropsFetcher-akka.actor.default-dispatcher-3] Remoting
(Slf4jLogger.scala:apply$mcV$sp(74)) - Starting remoting
2015-03-23 18:54:57,871 ERROR
[driverPropsFetcher-akka.actor.default-dispatcher-5] actor.ActorSystemImpl
(Slf4jLogger.scala:apply$mcV$sp(66)) - Uncaught fatal error from thread
[driverPropsFetcher-akka.actor.default-dispatcher-4] shutting down
ActorSystem [dri

Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-23 Thread Patrick Wendell
Hey Yiannis,

If you just perform a count on each "name", "date" pair... can it succeed?
If so, can you do a count and then order by to find the largest one?

I'm wondering if there is a single pathologically large group here that is
somehow causing OOM.

Also, to be clear, you are getting GC limit warnings on the executors, not
the driver. Correct?

- Patrick

On Mon, Mar 23, 2015 at 10:21 AM, Martin Goodson 
wrote:

> Have you tried to repartition() your original data to make more partitions
> before you aggregate?
>
>
> --
> Martin Goodson  |  VP Data Science
> (0)20 3397 1240
> [image: Inline image 1]
>
> On Mon, Mar 23, 2015 at 4:12 PM, Yiannis Gkoufas 
> wrote:
>
>> Hi Yin,
>>
>> Yes, I have set spark.executor.memory to 8g and the worker memory to 16g
>> without any success.
>> I cannot figure out how to increase the number of mapPartitions tasks.
>>
>> Thanks a lot
>>
>> On 20 March 2015 at 18:44, Yin Huai  wrote:
>>
>>> spark.sql.shuffle.partitions only control the number of tasks in the
>>> second stage (the number of reducers). For your case, I'd say that the
>>> number of tasks in the first state (number of mappers) will be the number
>>> of files you have.
>>>
>>> Actually, have you changed "spark.executor.memory" (it controls the
>>> memory for an executor of your application)? I did not see it in your
>>> original email. The difference between worker memory and executor memory
>>> can be found at (
>>> http://spark.apache.org/docs/1.3.0/spark-standalone.html),
>>>
>>> SPARK_WORKER_MEMORY
>>> Total amount of memory to allow Spark applications to use on the
>>> machine, e.g. 1000m, 2g (default: total memory minus 1 GB); note that
>>> each application's individual memory is configured using its
>>> spark.executor.memory property.
>>>
>>>
>>> On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas 
>>> wrote:
>>>
 Actually I realized that the correct way is:

 sqlContext.sql("set spark.sql.shuffle.partitions=1000")

 but I am still experiencing the same behavior/error.

 On 20 March 2015 at 16:04, Yiannis Gkoufas 
 wrote:

> Hi Yin,
>
> the way I set the configuration is:
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> sqlContext.setConf("spark.sql.shuffle.partitions","1000");
>
> it is the correct way right?
> In the mapPartitions task (the first task which is launched), I get
> again the same number of tasks and again the same error. :(
>
> Thanks a lot!
>
> On 19 March 2015 at 17:40, Yiannis Gkoufas 
> wrote:
>
>> Hi Yin,
>>
>> thanks a lot for that! Will give it a shot and let you know.
>>
>> On 19 March 2015 at 16:30, Yin Huai  wrote:
>>
>>> Was the OOM thrown during the execution of first stage (map) or the
>>> second stage (reduce)? If it was the second stage, can you increase the
>>> value of spark.sql.shuffle.partitions and see if the OOM disappears?
>>>
>>> This setting controls the number of reduces Spark SQL will use and
>>> the default is 200. Maybe there are too many distinct values and the 
>>> memory
>>> pressure on every task (of those 200 reducers) is pretty high. You can
>>> start with 400 and increase it until the OOM disappears. Hopefully this
>>> will help.
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>>
>>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas <
>>> johngou...@gmail.com> wrote:
>>>
 Hi Yin,

 Thanks for your feedback. I have 1700 parquet files, sized 100MB
 each. The number of tasks launched is equal to the number of parquet 
 files.
 Do you have any idea on how to deal with this situation?

 Thanks a lot
 On 18 Mar 2015 17:35, "Yin Huai"  wrote:

> Seems there are too many distinct groups processed in a task,
> which trigger the problem.
>
> How many files do your dataset have and how large is a file? Seems
> your query will be executed with two stages, table scan and map-side
> aggregation in the first stage and the final round of reduce-side
> aggregation in the second stage. Can you take a look at the numbers of
> tasks launched in these two stages?
>
> Thanks,
>
> Yin
>
> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <
> johngou...@gmail.com> wrote:
>
>> Hi there, I set the executor memory to 8g but it didn't help
>>
>> On 18 March 2015 at 13:59, Cheng Lian 
>> wrote:
>>
>>> You should probably increase executor memory by setting
>>> "spark.executor.memory".
>>>
>>> Full list of available configurations can be found here
>>> http://spark.apache.org/docs/latest/configuration.html
>>>
>>> Cheng
>>>
>>>
>>> On 3/18/15 9:15 PM, Yiannis Gkoufas

Re: Spark-thriftserver Issue

2015-03-23 Thread Zhan Zhang
Probably the port is already used by others, e.g., hive. You can change the 
port similar to below


 ./sbin/start-thriftserver.sh --master yarn --executor-memory 512m --hiveconf 
hive.server2.thrift.port=10001

Thanks.

Zhan Zhang

On Mar 23, 2015, at 12:01 PM, Neil Dev 
mailto:neilk...@gmail.com>> wrote:

Hi,

I am having issue starting spark-thriftserver. I'm running spark 1.3.with
Hadoop 2.4.0. I would like to be able to change its port too so, I can hive
hive-thriftserver as well as spark-thriftserver running at the same time.

Starting sparkthrift server:-
sudo ./start-thriftserver.sh --master spark://ip-172-31-10-124:7077
--executor-memory 2G

Error:-
I created the folder manually but still getting the following error
Exception in thread "main" java.lang.IllegalArgumentException: Log
directory /tmp/spark-events does not exist.


I am getting the following error
15/03/23 15:07:02 ERROR thrift.ThriftCLIService: Error:
org.apache.thrift.transport.TTransportException: Could not create
ServerSocket on address0.0.0.0/0.0.0.0:1.
   at
org.apache.thrift.transport.TServerSocket.(TServerSocket.java:93)
   at
org.apache.thrift.transport.TServerSocket.(TServerSocket.java:79)
   at
org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236)
   at
org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69)
   at java.lang.Thread.run(Thread.java:745)

Thanks
Neil



Re: Shuffle Spill Memory and Shuffle Spill Disk

2015-03-23 Thread Sandy Ryza
Hi Bijay,

The Shuffle Spill (Disk) is the total number of bytes written to disk by
records spilled during the shuffle.  The Shuffle Spill (Memory) is the
amount of space the spilled records occupied in memory before they were
spilled.  These differ because the serialized format is more compact, and
the on-disk version can be compressed as well.

-Sandy

On Mon, Mar 23, 2015 at 5:29 PM, Bijay Pathak 
wrote:

> Hello,
>
> I am running  TeraSort  on
> 100GB of data. The final metrics I am getting on Shuffle Spill are:
>
> Shuffle Spill(Memory): 122.5 GB
> Shuffle Spill(Disk): 3.4 GB
>
> What's the difference and relation between these two metrics? Does these
> mean 122.5 GB was spill from memory during the shuffle?
>
> thank you,
> bijay
>


Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-23 Thread Manoj Samel
Log shows stack traces that seem to match the assert in JIRA so it seems I
am hitting the issue. Thanks for the heads up ...

15/03/23 20:29:50 ERROR actor.OneForOneStrategy: assertion failed:
Allocator killed more executors than are allocated!
java.lang.AssertionError: assertion failed: Allocator killed more executors
than are allocated!
at scala.Predef$.assert(Predef.scala:179)
at
org.apache.spark.deploy.yarn.YarnAllocator.killExecutor(YarnAllocator.scala:152)
at
org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1$$anonfun$applyOrElse$6.apply(ApplicationMaster.scala:547)
at
org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1$$anonfun$applyOrElse$6.apply(ApplicationMaster.scala:547)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1.applyOrElse(ApplicationMaster.scala:547)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.deploy.yarn.ApplicationMaster$AMActor.aroundReceive(ApplicationMaster.scala:506)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

On Mon, Mar 23, 2015 at 2:25 PM, Marcelo Vanzin  wrote:

> On Mon, Mar 23, 2015 at 2:15 PM, Manoj Samel 
> wrote:
> > Found the issue above error - the setting for spark_shuffle was
> incomplete.
> >
> > Now it is able to ask and get additional executors. The issue is once
> they
> > are released, it is not able to proceed with next query.
>
> That looks like SPARK-6325, which unfortunately was not fixed in time
> for 1.3.0...
>
> --
> Marcelo
>


Shuffle Spill Memory and Shuffle Spill Disk

2015-03-23 Thread Bijay Pathak
Hello,

I am running  TeraSort  on 100GB
of data. The final metrics I am getting on Shuffle Spill are:

Shuffle Spill(Memory): 122.5 GB
Shuffle Spill(Disk): 3.4 GB

What's the difference and relation between these two metrics? Does these
mean 122.5 GB was spill from memory during the shuffle?

thank you,
bijay


Re: How to use DataFrame with MySQL

2015-03-23 Thread Rishi Yadav
for me, it's only working if I set --driver-class-path to mysql library.

On Sun, Mar 22, 2015 at 11:29 PM, gavin zhang  wrote:

> OK,I found what the problem is: It couldn't work with
> mysql-connector-5.0.8.
> I updated the connector version to 5.1.34 and it worked.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-DataFrame-with-MySQL-tp22178p22182.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-23 Thread Marcelo Vanzin
On Mon, Mar 23, 2015 at 2:15 PM, Manoj Samel  wrote:
> Found the issue above error - the setting for spark_shuffle was incomplete.
>
> Now it is able to ask and get additional executors. The issue is once they
> are released, it is not able to proceed with next query.

That looks like SPARK-6325, which unfortunately was not fixed in time
for 1.3.0...

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-23 Thread Manoj Samel
Found the issue above error - the setting for spark_shuffle was incomplete.

Now it is able to ask and get additional executors. The issue is once they
are released, it is not able to proceed with next query.

The environment is CDH 5.3.2 (Hadoop 2.5) with Kerberos & Spark 1.3

After idle time, the release of executor gives a stack trace under WARN and
returns to the prompt (in spark-shell)

15/03/23 20:55:50 INFO YarnClientSchedulerBackend: Requesting to kill
executor(s) 2
15/03/23 20:55:50 INFO ExecutorAllocationManager: Removing executor 2
because it has been idle for 60 seconds (new desired total will be 6)
15/03/23 20:55:50 INFO YarnClientSchedulerBackend: Requesting to kill
executor(s) 5
15/03/23 20:55:50 INFO ExecutorAllocationManager: Removing executor 5
because it has been idle for 60 seconds (new desired total will be 5)
15/03/23 20:55:50 INFO YarnClientSchedulerBackend: Requesting to kill
executor(s) 1
15/03/23 20:55:51 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkExecutor@xxxl:47358] has failed, address is now
gated for [5000] ms. Reason is: [Disassociated].
15/03/23 20:55:51 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkExecutor@yyy:51807] has failed, address is now
gated for [5000] ms. Reason is: [Disassociated].
15/03/23 20:55:52 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkExecutor@zzz:54623] has failed, address is now
gated for [5000] ms. Reason is: [Disassociated].
15/03/23 20:56:20 WARN AkkaUtils: Error sending message [message =
KillExecutors(ArrayBuffer(1))] in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:171)
at
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1$$anonfun$applyOrElse$4.apply$mcV$sp(YarnSchedulerBackend.scala:136)
at
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1$$anonfun$applyOrElse$4.apply(YarnSchedulerBackend.scala:136)
at
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1$$anonfun$applyOrElse$4.apply(YarnSchedulerBackend.scala:136)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/03/23 20:56:20 WARN AkkaUtils: Error sending message [message =
KillExecutors(ArrayBuffer(1))] in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:171)
at
org.apache.spark.scheduler.cluster.YarnSchedulerBackend.doKillExecutors(YarnSchedulerBackend.scala:68)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors(CoarseGrainedSchedulerBackend.scala:375)
at org.apache.spark.SparkContext.killExecutors(SparkContext.scala:1173)
at
org.apache.spark.ExecutorAllocationClient$class.killExecutor(ExecutorAllocationClient.scala:49)
at org.apache.spark.SparkContext.killExecutor(SparkContext.scala:1186)
at org.apache.spark.ExecutorAllocationManager.org
$apache$spark$ExecutorAllocationManager$$removeExecutor(ExecutorAllocationManager.scala:353)
at
org.apache.spark.ExecutorAllocationManager$$anonfun$org$apache$spark$ExecutorAllocationManager$$schedule$1.apply(ExecutorAllocationManager.scala:237)
at
org.apache.spark.ExecutorAllocationManager$$anonfun$org$apache$spark$ExecutorAllocationManager$$schedule$1.apply(ExecutorAllocationManager.scala:234)
at
scala.collection.mutable.MapLike$$anonfun$retain$2.apply(MapLike.scala:213)
at
scala.collection.mutable.MapLike$$anonfun$retain$2.apply(MapLike.scala:212)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.Traversabl

Is it possible to use json4s 3.2.11 with Spark 1.3.0?

2015-03-23 Thread Alexey Zinoviev
Spark has a dependency on json4s 3.2.10, but this version has several bugs
and I need to use 3.2.11. I added json4s-native 3.2.11 dependency to
build.sbt and everything compiled fine. But when I spark-submit my JAR it
provides me with 3.2.10.


build.sbt

import sbt.Keys._

name := "sparkapp"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core"  % "1.3.0" %
"provided"

libraryDependencies += "org.json4s" %% "json4s-native" % "3.2.11"`


plugins.sbt

logLevel := Level.Warn

resolvers += Resolver.url("artifactory", url("
http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases
"))(Resolver.ivyStylePatterns)

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")


App1.scala

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.SparkContext._

object App1 extends Logging {
  def main(args: Array[String]) = {
val conf = new SparkConf().setAppName("App1")
val sc = new SparkContext(conf)
println(s"json4s version: ${org.json4s.BuildInfo.version.toString}")
  }
}



sbt 0.13.7, sbt-assembly 0.13.0, Scala 2.10.4

Is it possible to force 3.2.11 version usage?

Thanks,
Alexey


Re: Using a different spark jars than the one on the cluster

2015-03-23 Thread Denny Lee
+1  - I currently am doing what Marcelo is suggesting as I have a CDH 5.2
cluster (with Spark 1.1) and I'm also running Spark 1.3.0+ side-by-side in
my cluster.

On Wed, Mar 18, 2015 at 1:23 PM Marcelo Vanzin  wrote:

> Since you're using YARN, you should be able to download a Spark 1.3.0
> tarball from Spark's website and use spark-submit from that
> installation to launch your app against the YARN cluster.
>
> So effectively you would have 1.2.0 and 1.3.0 side-by-side in your cluster.
>
> On Wed, Mar 18, 2015 at 11:09 AM, jaykatukuri  wrote:
> > Hi all,
> > I am trying to run my job which needs spark-sql_2.11-1.3.0.jar.
> > The cluster that I am running on is still on spark-1.2.0.
> >
> > I tried the following :
> >
> > spark-submit --class class-name --num-executors 100 --master yarn
> > application_jar--jars hdfs:///path/spark-sql_2.11-1.3.0.jar
> > hdfs:///input_data
> >
> > But, this did not work, I get an error that it is not able to find a
> > class/method that is in spark-sql_2.11-1.3.0.jar .
> >
> > org.apache.spark.sql.SQLContext.implicits()Lorg/
> apache/spark/sql/SQLContext$implicits$
> >
> > The question in general is how do we use a different version of spark
> jars
> > (spark-core, spark-sql, spark-ml etc) than the one's running on a
> cluster ?
> >
> > Thanks,
> > Jay
> >
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Using-a-different-spark-jars-than-the-
> one-on-the-cluster-tp22125.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


hadoop input/output format advanced control

2015-03-23 Thread Koert Kuipers
currently its pretty hard to control the Hadoop Input/Output formats used
in Spark. The conventions seems to be to add extra parameters to all
methods and then somewhere deep inside the code (for example in
PairRDDFunctions.saveAsHadoopFile) all these parameters get translated into
settings on the Hadoop Configuration object.

for example for compression i see "codec: Option[Class[_ <:
CompressionCodec]] = None" added to a bunch of methods.

how scalable is this solution really?

for example i need to read from a hadoop dataset and i dont want the input
(part) files to get split up. the way to do this is to set
"mapred.min.split.size". now i dont want to set this at the level of the
SparkContext (which can be done), since i dont want it to apply to input
formats in general. i want it to apply to just this one specific input
dataset i need to read. which leaves me with no options currently. i could
go add yet another input parameter to all the methods
(SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile,
etc.). but that seems ineffective.

why can we not expose a Map[String, String] or some other generic way to
manipulate settings for hadoop input/output formats? it would require
adding one more parameter to all methods to deal with hadoop input/output
formats, but after that its done. one parameter to rule them all

then i could do:
val x = sc.textFile("/some/path", formatSettings =
Map("mapred.min.split.size" -> "12345"))

or
rdd.saveAsTextFile("/some/path, formatSettings =
Map(mapred.output.compress" -> "true", "mapred.output.compression.codec" ->
"somecodec"))


Re: Getting around Serializability issues for types not in my control

2015-03-23 Thread Adelbert Chang
Instantiating the instance? The actual instance it's complaining about is:

https://github.com/scalaz/scalaz/blob/16838556c9309225013f917e577072476f46dc14/core/src/main/scala/scalaz/std/Option.scala#L10-11

The specific import where it's picking up the instance is:

https://github.com/scalaz/scalaz/blob/16838556c9309225013f917e577072476f46dc14/core/src/main/scala/scalaz/std/Option.scala#L227


Note the object extends OptionInstances which contains that instance.

Is the suggestion to pass in something like new OptionInstances { } into
the RDD#aggregate call?

On Mon, Mar 23, 2015 at 1:09 PM, Cody Koeninger  wrote:

> Have you tried instantiating the instance inside the closure, rather than
> outside of it?
>
> If that works, you may need to switch to use mapPartition /
> foreachPartition for efficiency reasons.
>
>
> On Mon, Mar 23, 2015 at 3:03 PM, Adelbert Chang 
> wrote:
>
>> Is there no way to pull out the bits of the instance I want before I sent
>> it through the closure for aggregate? I did try pulling things out, along
>> the lines of
>>
>> def foo[G[_], B](blah: Blah)(implicit G: Applicative[G]) = {
>>   val lift: B => G[RDD[B]] = b =>
>> G.point(sparkContext.parallelize(List(b)))
>>
>>   rdd.aggregate(/* use lift in here */)
>> }
>>
>> But that doesn't seem to work either, still seems to be trying to
>> serialize the Applicative... :(
>>
>> On Mon, Mar 23, 2015 at 12:27 PM, Dean Wampler 
>> wrote:
>>
>>> Well, it's complaining about trait OptionInstances which is defined in
>>> Option.scala in the std package. Use scalap or javap on the scalaz library
>>> to find out which member of the trait is the problem, but since it says
>>> "$$anon$1", I suspect it's the first value member, "implicit val
>>> optionInstance", which has a long list of mixin traits, one of which is
>>> probably at fault. OptionInstances is huge, so there might be other
>>> offenders.
>>>
>>> Scalaz wasn't designed for distributed systems like this, so you'll
>>> probably find many examples of nonserializability. An alternative is to
>>> avoid using Scalaz in any closures passed to Spark methods, but that's
>>> probably not what you want.
>>>
>>> dean
>>>
>>> Dean Wampler, Ph.D.
>>> Author: Programming Scala, 2nd Edition
>>>  (O'Reilly)
>>> Typesafe 
>>> @deanwampler 
>>> http://polyglotprogramming.com
>>>
>>> On Mon, Mar 23, 2015 at 12:03 PM, adelbertc  wrote:
>>>
 Hey all,

 I'd like to use the Scalaz library in some of my Spark jobs, but am
 running
 into issues where some stuff I use from Scalaz is not serializable. For
 instance, in Scalaz there is a trait

 /** In Scalaz */
 trait Applicative[F[_]] {
   def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) => C): F[C]
   def point[A](a: => A): F[A]
 }

 But when I try to use it in say, in an `RDD#aggregate` call I get:


 Caused by: java.io.NotSerializableException:
 scalaz.std.OptionInstances$$anon$1
 Serialization stack:
 - object not serializable (class:
 scalaz.std.OptionInstances$$anon$1,
 value: scalaz.std.OptionInstances$$anon$1@4516ee8c)
 - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1,
 type:
 interface scalaz.Applicative)
 - object (class dielectric.syntax.RDDOps$$anonfun$1,
 )
 - field (class:
 dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
 name: apConcat$1, type: interface scala.Function2)
 - object (class
 dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
 )

 Outside of submitting a PR to Scalaz to make things Serializable, what
 can I
 do to make things Serializable? I considered something like

 implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]):
 SomeSerializableType[F] =
   new SomeSerializableType { ... } ??

 Not sure how to go about doing it - I looked at java.io.Externalizable
 but
 given `scalaz.Applicative` has no value members I'm not sure how to
 implement the interface.

 Any guidance would be much appreciated - thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>>
>> --
>> Adelbert (Allen) Chang
>>
>
>


-- 
Adelbert (Allen) Chang


Re: objectFile uses only java serializer?

2015-03-23 Thread Ted Yu
bq. it uses Utils.deserialize, which is always using Java serialization.

I agree with your finding.

On Mon, Mar 23, 2015 at 1:14 PM, Koert Kuipers  wrote:

> in the comments on SparkContext.objectFile it says:
> "It will also be pretty slow if you use the default serializer (Java
> serialization)"
>
> this suggests the spark.serializer is used, which means i can switch to
> the much faster kryo serializer. however when i look at the code it uses
> Utils.deserialize, which is always using Java serialization.
>
> did i get that right? and is this desired?
> it seems straightforward to switch objectFile to use the serializer as
> specified by spark.serializer (although it might being in new classloader
> issues).
>


Re: spark disk-to-disk

2015-03-23 Thread Reynold Xin
Maybe implement a very simple function that uses the Hadoop API to read in
based on file names (i.e. parts)?

On Mon, Mar 23, 2015 at 10:55 AM, Koert Kuipers  wrote:

> there is a way to reinstate the partitioner, but that requires
> sc.objectFile to read exactly what i wrote, which means sc.objectFile
> should never split files on reading (a feature of hadoop file inputformat
> that gets in the way here).
>
> On Mon, Mar 23, 2015 at 1:39 PM, Koert Kuipers  wrote:
>
>> i just realized the major limitation is that i lose partitioning info...
>>
>> On Mon, Mar 23, 2015 at 1:34 AM, Reynold Xin  wrote:
>>
>>>
>>> On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers 
>>> wrote:
>>>
 so finally i can resort to:
 rdd.saveAsObjectFile(...)
 sc.objectFile(...)
 but that seems like a rather broken abstraction.


>>> This seems like a fine solution to me.
>>>
>>>
>>
>


objectFile uses only java serializer?

2015-03-23 Thread Koert Kuipers
in the comments on SparkContext.objectFile it says:
"It will also be pretty slow if you use the default serializer (Java
serialization)"

this suggests the spark.serializer is used, which means i can switch to the
much faster kryo serializer. however when i look at the code it uses
Utils.deserialize, which is always using Java serialization.

did i get that right? and is this desired?
it seems straightforward to switch objectFile to use the serializer as
specified by spark.serializer (although it might being in new classloader
issues).


Re: Spark per app logging

2015-03-23 Thread Udit Mehta
Yes each application can use its own log4j.properties but I am not sure how
to configure log4j so that the driver and executor write to file. This is
because if we set the "spark.executor.extraJavaOptions" it will read from a
file and that is not what I need.
How do I configure log4j from the app so that the driver and the executors
use these configs?

Thanks,
Udit

On Sat, Mar 21, 2015 at 3:13 AM, Jeffrey Jedele 
wrote:

> Hi,
> I'm not completely sure about this either, but this is what we are doing
> currently:
> Configure your logging to write to STDOUT, not to a file explicitely.
> Spark will capture stdour and stderr and separate the messages into a
> app/driver folder structure in the configured worker directory.
>
> We then use logstash to collect the logs and index them to a elasticsearch
> cluster (Spark seems to produce a lot of logging data). With some simple
> regex processing, you also get the application id as searchable field.
>
> Regards,
> Jeff
>
> 2015-03-20 22:37 GMT+01:00 Ted Yu :
>
>> Are these jobs the same jobs, just run by different users or, different
>> jobs ?
>> If the latter, can each application use its own log4j.properties ?
>>
>> Cheers
>>
>> On Fri, Mar 20, 2015 at 1:43 PM, Udit Mehta  wrote:
>>
>>> Hi,
>>>
>>> We have spark setup such that there are various users running multiple
>>> jobs at the same time. Currently all the logs go to 1 file specified in the
>>> log4j.properties.
>>> Is it possible to configure log4j in spark for per app/user logging
>>> instead of sending all logs to 1 file mentioned in the log4j.properties?
>>>
>>> Thanks
>>> Udit
>>>
>>
>>
>


Re: Getting around Serializability issues for types not in my control

2015-03-23 Thread Cody Koeninger
Have you tried instantiating the instance inside the closure, rather than
outside of it?

If that works, you may need to switch to use mapPartition /
foreachPartition for efficiency reasons.


On Mon, Mar 23, 2015 at 3:03 PM, Adelbert Chang  wrote:

> Is there no way to pull out the bits of the instance I want before I sent
> it through the closure for aggregate? I did try pulling things out, along
> the lines of
>
> def foo[G[_], B](blah: Blah)(implicit G: Applicative[G]) = {
>   val lift: B => G[RDD[B]] = b =>
> G.point(sparkContext.parallelize(List(b)))
>
>   rdd.aggregate(/* use lift in here */)
> }
>
> But that doesn't seem to work either, still seems to be trying to
> serialize the Applicative... :(
>
> On Mon, Mar 23, 2015 at 12:27 PM, Dean Wampler 
> wrote:
>
>> Well, it's complaining about trait OptionInstances which is defined in
>> Option.scala in the std package. Use scalap or javap on the scalaz library
>> to find out which member of the trait is the problem, but since it says
>> "$$anon$1", I suspect it's the first value member, "implicit val
>> optionInstance", which has a long list of mixin traits, one of which is
>> probably at fault. OptionInstances is huge, so there might be other
>> offenders.
>>
>> Scalaz wasn't designed for distributed systems like this, so you'll
>> probably find many examples of nonserializability. An alternative is to
>> avoid using Scalaz in any closures passed to Spark methods, but that's
>> probably not what you want.
>>
>> dean
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>>  (O'Reilly)
>> Typesafe 
>> @deanwampler 
>> http://polyglotprogramming.com
>>
>> On Mon, Mar 23, 2015 at 12:03 PM, adelbertc  wrote:
>>
>>> Hey all,
>>>
>>> I'd like to use the Scalaz library in some of my Spark jobs, but am
>>> running
>>> into issues where some stuff I use from Scalaz is not serializable. For
>>> instance, in Scalaz there is a trait
>>>
>>> /** In Scalaz */
>>> trait Applicative[F[_]] {
>>>   def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) => C): F[C]
>>>   def point[A](a: => A): F[A]
>>> }
>>>
>>> But when I try to use it in say, in an `RDD#aggregate` call I get:
>>>
>>>
>>> Caused by: java.io.NotSerializableException:
>>> scalaz.std.OptionInstances$$anon$1
>>> Serialization stack:
>>> - object not serializable (class:
>>> scalaz.std.OptionInstances$$anon$1,
>>> value: scalaz.std.OptionInstances$$anon$1@4516ee8c)
>>> - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1,
>>> type:
>>> interface scalaz.Applicative)
>>> - object (class dielectric.syntax.RDDOps$$anonfun$1, )
>>> - field (class:
>>> dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
>>> name: apConcat$1, type: interface scala.Function2)
>>> - object (class
>>> dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
>>> )
>>>
>>> Outside of submitting a PR to Scalaz to make things Serializable, what
>>> can I
>>> do to make things Serializable? I considered something like
>>>
>>> implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]):
>>> SomeSerializableType[F] =
>>>   new SomeSerializableType { ... } ??
>>>
>>> Not sure how to go about doing it - I looked at java.io.Externalizable
>>> but
>>> given `scalaz.Applicative` has no value members I'm not sure how to
>>> implement the interface.
>>>
>>> Any guidance would be much appreciated - thanks!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> Adelbert (Allen) Chang
>


Re: SchemaRDD/DataFrame result partitioned according to the underlying datasource partitions

2015-03-23 Thread Michael Armbrust
There is not an interface to this at this time, and in general I'm hesitant
to open up interfaces where the user could make a mistake where they think
something is going to improve performance but will actually impact
correctness.  Since, as you say, we are picking the partitioner
automatically in the query planner its much harder to know if you are
actually going to preserve the expected partitioning.

Additionally, its also a little more complicated when reading in data as
even if you have files that are partitioned correctly, the InputFormat is
free to split those files, violating our assumptions about partitioning.

On Mon, Mar 23, 2015 at 10:22 AM, Stephen Boesch  wrote:

>
> Is there a way to take advantage of the underlying datasource partitions
> when generating a DataFrame/SchemaRDD via catalyst?  It seems from the sql
> module that the only options are RangePartitioner and HashPartitioner - and
> further that those are selected automatically by the code .  It was not
> apparent that either the underlying partitioning were translated to the
> partitions presented in the rdd or that a custom partitioner were possible
> to be provided.
>
> The motivation would be to subsequently use df.map (with
> preservesPartitioning=true) and/or df.mapPartitions (likewise) to perform
> operations that work within the original datasource partitions - thus
> avoiding a shuffle.
>
>
>
>
>
>
>


Re: Getting around Serializability issues for types not in my control

2015-03-23 Thread Adelbert Chang
Is there no way to pull out the bits of the instance I want before I sent
it through the closure for aggregate? I did try pulling things out, along
the lines of

def foo[G[_], B](blah: Blah)(implicit G: Applicative[G]) = {
  val lift: B => G[RDD[B]] = b => G.point(sparkContext.parallelize(List(b)))

  rdd.aggregate(/* use lift in here */)
}

But that doesn't seem to work either, still seems to be trying to serialize
the Applicative... :(

On Mon, Mar 23, 2015 at 12:27 PM, Dean Wampler 
wrote:

> Well, it's complaining about trait OptionInstances which is defined in
> Option.scala in the std package. Use scalap or javap on the scalaz library
> to find out which member of the trait is the problem, but since it says
> "$$anon$1", I suspect it's the first value member, "implicit val
> optionInstance", which has a long list of mixin traits, one of which is
> probably at fault. OptionInstances is huge, so there might be other
> offenders.
>
> Scalaz wasn't designed for distributed systems like this, so you'll
> probably find many examples of nonserializability. An alternative is to
> avoid using Scalaz in any closures passed to Spark methods, but that's
> probably not what you want.
>
> dean
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
>  (O'Reilly)
> Typesafe 
> @deanwampler 
> http://polyglotprogramming.com
>
> On Mon, Mar 23, 2015 at 12:03 PM, adelbertc  wrote:
>
>> Hey all,
>>
>> I'd like to use the Scalaz library in some of my Spark jobs, but am
>> running
>> into issues where some stuff I use from Scalaz is not serializable. For
>> instance, in Scalaz there is a trait
>>
>> /** In Scalaz */
>> trait Applicative[F[_]] {
>>   def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) => C): F[C]
>>   def point[A](a: => A): F[A]
>> }
>>
>> But when I try to use it in say, in an `RDD#aggregate` call I get:
>>
>>
>> Caused by: java.io.NotSerializableException:
>> scalaz.std.OptionInstances$$anon$1
>> Serialization stack:
>> - object not serializable (class:
>> scalaz.std.OptionInstances$$anon$1,
>> value: scalaz.std.OptionInstances$$anon$1@4516ee8c)
>> - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1,
>> type:
>> interface scalaz.Applicative)
>> - object (class dielectric.syntax.RDDOps$$anonfun$1, )
>> - field (class:
>> dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
>> name: apConcat$1, type: interface scala.Function2)
>> - object (class
>> dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
>> )
>>
>> Outside of submitting a PR to Scalaz to make things Serializable, what
>> can I
>> do to make things Serializable? I considered something like
>>
>> implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]):
>> SomeSerializableType[F] =
>>   new SomeSerializableType { ... } ??
>>
>> Not sure how to go about doing it - I looked at java.io.Externalizable but
>> given `scalaz.Applicative` has no value members I'm not sure how to
>> implement the interface.
>>
>> Any guidance would be much appreciated - thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Adelbert (Allen) Chang


Re: JDBC DF using DB2

2015-03-23 Thread Ted Yu
bq. is to modify compute_classpath.sh on all worker nodes to include your
driver JARs.

Please follow the above advice.

Cheers

On Mon, Mar 23, 2015 at 12:34 PM, Jack Arenas  wrote:

> Hi Team,
>
>
>
> I’m trying to create a DF using jdbc as detailed here
> 
>  –
> I’m currently using DB2 v9.7.0.6 and I’ve tried to use the db2jcc.jar and
> db2jcc_license_cu.jar combo, and while it works in --master local using the
> command below, I get some strange behavior in --master yarn-client. Here is
> the command:
>
>
>
> *val df = sql.load("jdbc", Map("url" ->
> "jdbc:db2://:/:currentSchema=;user=;password=;",
> "driver" -> "com.ibm.db2.jcc.DB2Driver", "dbtable" -> ""))*
>
>
>
> It seems to also be working on yarn-client because once executed I get the
> following log:
>
> *df: org.apache.spark.sql.DataFrame = [DATE_FIELD: date, INT_FIELD: int,
> DOUBLE_FIELD: double]*
>
>
>
> Which indicates me that Spark was able to connect to the DB. But once I
> run *df.count() *or *df.take(5).foreach(println)* in order to operate on
> the data and get a result, I get back a ‘*No suitable driver found*’
> exception, which makes me think the driver wasn’t shipped with the spark
> job.
>
>
>
> I’ve tried using *--driver-class-path, --jars, SPARK_CLASSPATH* to add
> the jars to the spark job. I also have the jars in my*$CLASSPATH* and
> *$HADOOP_CLASSPATH*.
>
>
>
> I also saw this in the trouble shooting section, but quite frankly I’m not
> sure what primordial class loader it’s talking about:
>
>
>
> The JDBC driver class must be visible to the primordial class loader on
> the client session and on all executors. This is because Java’s
> DriverManager class does a security check that results in it ignoring all
> drivers not visible to the primordial class loader when one goes to open a
> connection. One convenient way to do this is to modify compute_classpath.sh
> on all worker nodes to include your driver JARs.
>
>
> Any advice is welcome!
>
>
>
> Thanks,
>
> Jack
>
>
>
>
>
>
>


Re: Use pig load function in spark

2015-03-23 Thread Denny Lee
You may be able to utilize Spork (Pig on Apache Spark) as a mechanism to do
this: https://github.com/sigmoidanalytics/spork


On Mon, Mar 23, 2015 at 2:29 AM Dai, Kevin  wrote:

>  Hi, all
>
>
>
> Can spark use pig’s load function to load data?
>
>
>
> Best Regards,
>
> Kevin.
>


JDBC DF using DB2

2015-03-23 Thread Jack Arenas
Hi Team, 
 
I’m trying to create a DF using jdbc as detailed here – I’m currently using DB2 
v9.7.0.6 and I’ve tried to use the db2jcc.jar and db2jcc_license_cu.jar combo, 
and while it works in --master local using the command below, I get some 
strange behavior in --master yarn-client. Here is the command:
 
val df = sql.load("jdbc", Map("url" -> 
"jdbc:db2://:/:currentSchema=;user=;password=;",
 "driver" -> "com.ibm.db2.jcc.DB2Driver", "dbtable" -> ""))
 
It seems to also be working on yarn-client because once executed I get the 
following log:
df: org.apache.spark.sql.DataFrame = [DATE_FIELD: date, INT_FIELD: int, 
DOUBLE_FIELD: double]
 
Which indicates me that Spark was able to connect to the DB. But once I run 
df.count() or df.take(5).foreach(println) in order to operate on the data and 
get a result, I get back a ‘No suitable driver found’ exception, which makes me 
think the driver wasn’t shipped with the spark job.
 
I’ve tried using --driver-class-path, --jars, SPARK_CLASSPATH to add the jars 
to the spark job. I also have the jars in my$CLASSPATH and $HADOOP_CLASSPATH.
 
I also saw this in the trouble shooting section, but quite frankly I’m not sure 
what primordial class loader it’s talking about:
 
The JDBC driver class must be visible to the primordial class loader on the 
client session and on all executors. This is because Java’s DriverManager class 
does a security check that results in it ignoring all drivers not visible to 
the primordial class loader when one goes to open a connection. One convenient 
way to do this is to modify compute_classpath.sh on all worker nodes to include 
your driver JARs.

Any advice is welcome!
 
Thanks,
Jack
 
 
 

Re: Getting around Serializability issues for types not in my control

2015-03-23 Thread Dean Wampler
Well, it's complaining about trait OptionInstances which is defined in
Option.scala in the std package. Use scalap or javap on the scalaz library
to find out which member of the trait is the problem, but since it says
"$$anon$1", I suspect it's the first value member, "implicit val
optionInstance", which has a long list of mixin traits, one of which is
probably at fault. OptionInstances is huge, so there might be other
offenders.

Scalaz wasn't designed for distributed systems like this, so you'll
probably find many examples of nonserializability. An alternative is to
avoid using Scalaz in any closures passed to Spark methods, but that's
probably not what you want.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Mon, Mar 23, 2015 at 12:03 PM, adelbertc  wrote:

> Hey all,
>
> I'd like to use the Scalaz library in some of my Spark jobs, but am running
> into issues where some stuff I use from Scalaz is not serializable. For
> instance, in Scalaz there is a trait
>
> /** In Scalaz */
> trait Applicative[F[_]] {
>   def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) => C): F[C]
>   def point[A](a: => A): F[A]
> }
>
> But when I try to use it in say, in an `RDD#aggregate` call I get:
>
>
> Caused by: java.io.NotSerializableException:
> scalaz.std.OptionInstances$$anon$1
> Serialization stack:
> - object not serializable (class:
> scalaz.std.OptionInstances$$anon$1,
> value: scalaz.std.OptionInstances$$anon$1@4516ee8c)
> - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1,
> type:
> interface scalaz.Applicative)
> - object (class dielectric.syntax.RDDOps$$anonfun$1, )
> - field (class:
> dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
> name: apConcat$1, type: interface scala.Function2)
> - object (class
> dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
> )
>
> Outside of submitting a PR to Scalaz to make things Serializable, what can
> I
> do to make things Serializable? I considered something like
>
> implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]):
> SomeSerializableType[F] =
>   new SomeSerializableType { ... } ??
>
> Not sure how to go about doing it - I looked at java.io.Externalizable but
> given `scalaz.Applicative` has no value members I'm not sure how to
> implement the interface.
>
> Any guidance would be much appreciated - thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


SparkEnv

2015-03-23 Thread Koert Kuipers
is it safe to access SparkEnv.get inside say mapPartitions?
i need to get a Serializer (so SparkEnv.get.serializer)

thanks


Re: Strange behavior with PySpark when using Join() and zip()

2015-03-23 Thread Ofer Mendelevitch
Thanks Sean,

Sorting definitely solves it, but I was hoping it could be avoided :)

In the documentation for Classification in ML-Lib for example, zip() is used to 
create labelsAndPredictions:

-
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.util import MLUtils

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
#  Note: Use larger numTrees in practice.
#  Setting featureSubsetStrategy="auto" lets the algorithm choose.
model = RandomForest.trainClassifier(trainingData, numClasses=2, 
categoricalFeaturesInfo={}, numTrees=3, featureSubsetStrategy="auto”, 
impurity='gini', maxDepth=4, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / 
float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(model.toDebugString())



The reason the zip() works here is because the RDD is loaded from a file.
If it was generated with something that includes a JOIN() it won’t work due to 
this same issue.

Maybe worth mentioning in the docs then?

Ofer



> On Mar 23, 2015, at 11:40 AM, Sean Owen  wrote:
> 
> I think the explanation is that the join does not guarantee any order,
> since it causes a shuffle in general, and it is computed twice in the
> first example, resulting in a difference for d1 and d2.
> 
> You can persist() the result of the join and in practice I believe
> you'd find it behaves as expected, although that is even not 100%
> guaranteed since a block could be lost and recomputed (differently).
> 
> If order matters, and it does for zip(), then the reliable way to
> guarantee a well defined ordering for zipping is to sort the RDDs.
> 
> On Mon, Mar 23, 2015 at 6:27 PM, Ofer Mendelevitch
>  wrote:
>> Hi,
>> 
>> I am running into a strange issue when doing a JOIN of two RDDs followed by
>> ZIP from PySpark.
>> It’s part of a more complex application, but was able to narrow it down to a
>> simplified example that’s easy to replicate and causes the same problem to
>> appear:
>> 
>> 
>> raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)])
>> data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair:
>> ','.join([x for x in pair[1]]))
>> d1 = data.map(lambda s: s.split(',')[0])
>> d2 = data.map(lambda s: s.split(',')[1])
>> x = d1.zip(d2)
>> 
>> print x.take(10)
>> 
>> 
>> The output is:
>> 
>> 
>> [('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81',
>> 'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'),
>> ('v83', 'v83')]
>> 
>> 
>> As you can see, the ordering of items is not preserved anymore in all cases.
>> (e.g., ‘v81’ is preserved, and ‘v45’ is not)
>> Is it not supposed to be preserved?
>> 
>> If I do the same thing without the JOIN:
>> 
>> data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100))
>> d1 = data.map(lambda s: s.split(',')[0])
>> d2 = data.map(lambda s: s.split(',')[1])
>> x = d1.zip(d2)
>> 
>> print x.take(10)
>> 
>> The output is:
>> 
>> 
>> [('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'),
>> ('v5', 'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')]
>> 
>> 
>> As expected.
>> 
>> Anyone run into this or a similar issue?
>> 
>> Ofer



Re: Strange behavior with PySpark when using Join() and zip()

2015-03-23 Thread Sean Owen
I think this is a bad example since testData is not deterministic at
all. I thought we had fixed this or similar examples in the past? As
in https://github.com/apache/spark/pull/1250/files

Hm, anyone see a reason that shouldn't be changed too?

On Mon, Mar 23, 2015 at 7:00 PM, Ofer Mendelevitch
 wrote:
> Thanks Sean,
>
> Sorting definitely solves it, but I was hoping it could be avoided :)
>
> In the documentation for Classification in ML-Lib for example, zip() is used
> to create labelsAndPredictions:
>
> from pyspark.mllib.tree import RandomForest
> from pyspark.mllib.util import MLUtils
>
> # Load and parse the data file into an RDD of LabeledPoint.
> data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
> # Split the data into training and test sets (30% held out for testing)
> (trainingData, testData) = data.randomSplit([0.7, 0.3])
>
> # Train a RandomForest model.
> #  Empty categoricalFeaturesInfo indicates all features are continuous.
> #  Note: Use larger numTrees in practice.
> #  Setting featureSubsetStrategy="auto" lets the algorithm choose.
> model = RandomForest.trainClassifier(trainingData, numClasses=2,
> categoricalFeaturesInfo={},
>  numTrees=3,
> featureSubsetStrategy="auto",
>  impurity='gini', maxDepth=4,
> maxBins=32)
>
> # Evaluate model on test instances and compute test error
> predictions = model.predict(testData.map(lambda x: x.features))
> labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
> testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() /
> float(testData.count())
> print('Test Error = ' + str(testErr))
> print('Learned classification forest model:')
> print(model.toDebugString())
>
>
> The reason the zip() works here is because the RDD is loaded from a file.
> If it was generated with something that includes a JOIN() it won’t work due
> to this same issue.
>
> Maybe worth mentioning in the docs then?
>
> Ofer
>
>
>
> On Mar 23, 2015, at 11:40 AM, Sean Owen  wrote:
>
> I think the explanation is that the join does not guarantee any order,
> since it causes a shuffle in general, and it is computed twice in the
> first example, resulting in a difference for d1 and d2.
>
> You can persist() the result of the join and in practice I believe
> you'd find it behaves as expected, although that is even not 100%
> guaranteed since a block could be lost and recomputed (differently).
>
> If order matters, and it does for zip(), then the reliable way to
> guarantee a well defined ordering for zipping is to sort the RDDs.
>
> On Mon, Mar 23, 2015 at 6:27 PM, Ofer Mendelevitch
>  wrote:
>
> Hi,
>
> I am running into a strange issue when doing a JOIN of two RDDs followed by
> ZIP from PySpark.
> It’s part of a more complex application, but was able to narrow it down to a
> simplified example that’s easy to replicate and causes the same problem to
> appear:
>
>
> raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)])
> data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair:
> ','.join([x for x in pair[1]]))
> d1 = data.map(lambda s: s.split(',')[0])
> d2 = data.map(lambda s: s.split(',')[1])
> x = d1.zip(d2)
>
> print x.take(10)
>
>
> The output is:
>
>
> [('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81',
> 'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'),
> ('v83', 'v83')]
>
>
> As you can see, the ordering of items is not preserved anymore in all cases.
> (e.g., ‘v81’ is preserved, and ‘v45’ is not)
> Is it not supposed to be preserved?
>
> If I do the same thing without the JOIN:
>
> data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100))
> d1 = data.map(lambda s: s.split(',')[0])
> d2 = data.map(lambda s: s.split(',')[1])
> x = d1.zip(d2)
>
> print x.take(10)
>
> The output is:
>
>
> [('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'),
> ('v5', 'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')]
>
>
> As expected.
>
> Anyone run into this or a similar issue?
>
> Ofer
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Converting SparkSQL query to Scala query

2015-03-23 Thread Dean Wampler
There isn't any automated way. Note that as the DataFrame implementation
improves, it will probably do a better job with query optimization than
hand-rolled Scala code. I don't know if that's true yet, though.

For now, there are a few examples at the beginning of the DataFrame
scaladocs

page
showing typical scenarios. Also, the DataFrameSuite of tests in the Spark
source code has many examples.

If you scan through the list of methods, you'll see that many of them have
the same name as the corresponding SQL keyword.

HTH,

Dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Mon, Mar 23, 2015 at 11:42 AM, nishitd  wrote:

> I have a complex SparkSQL query of the nature
>
> select a.a, b.b, c.c from a,b,c where a.x = b.x and b.y = c.y
>
> How do I convert this efficiently into scala query of
>
> a.join(b,..,..)
>
> and so on. Can anyone help me with this? If my question needs more
> clarification, please let me know.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Converting-SparkSQL-query-to-Scala-query-tp22192.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Getting around Serializability issues for types not in my control

2015-03-23 Thread adelbertc
Hey all,

I'd like to use the Scalaz library in some of my Spark jobs, but am running
into issues where some stuff I use from Scalaz is not serializable. For
instance, in Scalaz there is a trait

/** In Scalaz */
trait Applicative[F[_]] {
  def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) => C): F[C]
  def point[A](a: => A): F[A]
}

But when I try to use it in say, in an `RDD#aggregate` call I get:


Caused by: java.io.NotSerializableException:
scalaz.std.OptionInstances$$anon$1
Serialization stack:
- object not serializable (class: scalaz.std.OptionInstances$$anon$1,
value: scalaz.std.OptionInstances$$anon$1@4516ee8c)
- field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1, type:
interface scalaz.Applicative)
- object (class dielectric.syntax.RDDOps$$anonfun$1, )
- field (class: dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
name: apConcat$1, type: interface scala.Function2)
- object (class dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
)

Outside of submitting a PR to Scalaz to make things Serializable, what can I
do to make things Serializable? I considered something like

implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]):
SomeSerializableType[F] =
  new SomeSerializableType { ... } ??

Not sure how to go about doing it - I looked at java.io.Externalizable but
given `scalaz.Applicative` has no value members I'm not sure how to
implement the interface.

Any guidance would be much appreciated - thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: newbie quesiton - spark with mesos

2015-03-23 Thread Dean Wampler
That's a very old page, try this instead:

http://spark.apache.org/docs/latest/running-on-mesos.html

When you run your Spark job on Mesos, tasks will be started on the slave
nodes as needed, since "fine-grained" mode is the default.

For a job like your example, very few tasks will be needed. Actually only
one would be enough, but the default number of partitions will be used. I
believe 8 is the default for Mesos. For local mode ("local[*]"), it's the
number of cores. You can also set the propoerty "spark.default.parallelism".

HTH,

Dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Mon, Mar 23, 2015 at 11:46 AM, Anirudha Jadhav  wrote:

> i have a mesos cluster, which i deploy spark to by using instructions on
> http://spark.apache.org/docs/0.7.2/running-on-mesos.html
>
> after that the spark shell starts up fine.
> then i try the following on the shell:
>
> val data = 1 to 1
>
> val distData = sc.parallelize(data)
>
> distData.filter(_< 10).collect()
>
> open spark web ui at host:4040 and see an active job.
>
> NOW, how do i start workers or spark workers on mesos ? who completes my
> job?
> thanks,
>
> --
> Ani
>


Spark-thriftserver Issue

2015-03-23 Thread Neil Dev
Hi,

I am having issue starting spark-thriftserver. I'm running spark 1.3.with
Hadoop 2.4.0. I would like to be able to change its port too so, I can hive
hive-thriftserver as well as spark-thriftserver running at the same time.

Starting sparkthrift server:-
sudo ./start-thriftserver.sh --master spark://ip-172-31-10-124:7077
--executor-memory 2G

Error:-
I created the folder manually but still getting the following error
Exception in thread "main" java.lang.IllegalArgumentException: Log
directory /tmp/spark-events does not exist.


I am getting the following error
15/03/23 15:07:02 ERROR thrift.ThriftCLIService: Error:
org.apache.thrift.transport.TTransportException: Could not create
ServerSocket on address0.0.0.0/0.0.0.0:1.
at
org.apache.thrift.transport.TServerSocket.(TServerSocket.java:93)
at
org.apache.thrift.transport.TServerSocket.(TServerSocket.java:79)
at
org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236)
at
org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69)
at java.lang.Thread.run(Thread.java:745)

Thanks
Neil


newbie quesiton - spark with mesos

2015-03-23 Thread Anirudha Jadhav
i have a mesos cluster, which i deploy spark to by using instructions on
http://spark.apache.org/docs/0.7.2/running-on-mesos.html

after that the spark shell starts up fine.
then i try the following on the shell:

val data = 1 to 1

val distData = sc.parallelize(data)

distData.filter(_< 10).collect()

open spark web ui at host:4040 and see an active job.

NOW, how do i start workers or spark workers on mesos ? who completes my
job?
thanks,

-- 
Ani


Converting SparkSQL query to Scala query

2015-03-23 Thread nishitd
I have a complex SparkSQL query of the nature

select a.a, b.b, c.c from a,b,c where a.x = b.x and b.y = c.y

How do I convert this efficiently into scala query of

a.join(b,..,..)

and so on. Can anyone help me with this? If my question needs more
clarification, please let me know.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Converting-SparkSQL-query-to-Scala-query-tp22192.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Strange behavior with PySpark when using Join() and zip()

2015-03-23 Thread Sean Owen
I think the explanation is that the join does not guarantee any order,
since it causes a shuffle in general, and it is computed twice in the
first example, resulting in a difference for d1 and d2.

You can persist() the result of the join and in practice I believe
you'd find it behaves as expected, although that is even not 100%
guaranteed since a block could be lost and recomputed (differently).

If order matters, and it does for zip(), then the reliable way to
guarantee a well defined ordering for zipping is to sort the RDDs.

On Mon, Mar 23, 2015 at 6:27 PM, Ofer Mendelevitch
 wrote:
> Hi,
>
> I am running into a strange issue when doing a JOIN of two RDDs followed by
> ZIP from PySpark.
> It’s part of a more complex application, but was able to narrow it down to a
> simplified example that’s easy to replicate and causes the same problem to
> appear:
>
>
> raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)])
> data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair:
> ','.join([x for x in pair[1]]))
> d1 = data.map(lambda s: s.split(',')[0])
> d2 = data.map(lambda s: s.split(',')[1])
> x = d1.zip(d2)
>
> print x.take(10)
>
>
> The output is:
>
>
> [('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81',
> 'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'),
> ('v83', 'v83')]
>
>
> As you can see, the ordering of items is not preserved anymore in all cases.
> (e.g., ‘v81’ is preserved, and ‘v45’ is not)
> Is it not supposed to be preserved?
>
> If I do the same thing without the JOIN:
>
> data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100))
> d1 = data.map(lambda s: s.split(',')[0])
> d2 = data.map(lambda s: s.split(',')[1])
> x = d1.zip(d2)
>
> print x.take(10)
>
> The output is:
>
>
> [('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'),
> ('v5', 'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')]
>
>
> As expected.
>
> Anyone run into this or a similar issue?
>
> Ofer

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark error NoClassDefFoundError: org/apache/hadoop/mapred/InputSplit

2015-03-23 Thread , Roy
now job started but it stuck at some different level

15/03/23 14:09:00 INFO BlockManagerInfo: Added rdd_10_0 on disk on
ip-10-0-3-171.ec2.internal:58704 (size: 138.8 MB)

so i checked yarn logs on ip-10-0-3-171.ec2.internal but I dint see any
errors ?

anyone know whats going on here ?



*Thanks...*

*Roy*

On Mon, Mar 23, 2015 at 12:13 PM, Ted Yu  wrote:

> InputSplit is in hadoop-mapreduce-client-core jar
>
> Please check that the jar is in your classpath.
>
> Cheers
>
> On Mon, Mar 23, 2015 at 8:10 AM, , Roy  wrote:
>
>> Hi,
>>
>>
>>   I am using CDH 5.3.2 packages installation through Cloudera Manager
>> 5.3.2
>>
>> I am trying to run one spark job with following command
>>
>> PYTHONPATH=~/code/utils/ spark-submit --master yarn --executor-memory 3G
>> --num-executors 30 --driver-memory 2G --executor-cores 2 --name=analytics
>> /home/abc/code/updb/spark/UPDB3analytics.py -date 2015-03-01
>>
>> but I am getting following error
>>
>> 15/03/23 11:06:49 WARN TaskSetManager: Lost task 9.0 in stage 0.0 (TID 7,
>> hdp003.dev.xyz.com): java.lang.NoClassDefFoundError:
>> org/apache/hadoop/mapred/InputSplit
>> at java.lang.Class.getDeclaredConstructors0(Native Method)
>> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2532)
>> at java.lang.Class.getDeclaredConstructors(Class.java:1901)
>> at
>> java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1749)
>> at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:72)
>> at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:250)
>> at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:248)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at
>> java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:247)
>> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:613)
>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.hadoop.mapred.InputSplit
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>> ... 25 more
>>
>> here is the full trace
>>
>> https://gist.github.com/anonymous/3492f0ec63d7a23c47cf
>>
>>
>


Re: How to check that a dataset is sorted after it has been written out?

2015-03-23 Thread Michael Albert
Thanks for the information! (to all who responded)
The code below *seems* to work.Any hidden gotcha's that anyone sees?
And still, in "terasort", how did they check that the data was actually sorted? 
:-)
-Mike
class MyInputFormat[T]    extends parquet.hadoop.ParquetInputFormat[T]{    
override def getSplits(jobContext: org.apache.hadoop.mapreduce.JobContext)      
  :java.util.List[org.apache.hadoop.mapreduce.InputSplit] =    {        val 
splits = super.getSplits(jobContext)           import 
scala.collection.JavaConversions._        splits.sortBy{ split => split match { 
                        case fileSplit                            
:org.apache.hadoop.mapreduce.lib.input.FileSplit                                
        => (fileSplit.getPath.getName,                                          
   fileSplit.getStart)                         case _ => ("",-1L) } }    }}

  From: Sean Owen 
 To: Michael Albert  
Cc: User  
 Sent: Monday, March 23, 2015 7:31 AM
 Subject: Re: How to check that a dataset is sorted after it has been written 
out?
   
Data is not (necessarily) sorted when read from disk, no. A file might
have many blocks even, and while a block yields a partition in
general, the order in which those partitions appear in the RDD is not
defined. This is why you'd sort if you need the data sorted.

I think you could conceivably make some custom RDD or InputFormat that
reads blocks in a well-defined order and, assuming the data is sorted
in some knowable way on disk, then must have them sorted. I think
that's even been brought up.

Deciding whether the data is sorted is quite different. You'd have to
decide what ordering you expect (is part 0 before part 1? should it be
sorted in a part file?) and then just verify that externally.



On Fri, Mar 20, 2015 at 10:41 PM, Michael Albert
 wrote:
> Greetings!
>
> I sorted a dataset in Spark and then wrote it out in avro/parquet.
>
> Then I wanted to check that it was sorted.
>
> It looks like each partition has been sorted, but when reading in, the first
> "partition" (i.e., as
> seen in the partition index of mapPartitionsWithIndex) is not the same  as
> implied by
> the names of the parquet files (even when the number of partitions is the
> same in the
> rdd which was read as on disk).
>
> If I "take()" a few hundred values, they are sorted, but they are *not* the
> same as if I
> explicitly open "part-r-0.parquet" and take values from that.
>
> It seems that when opening the rdd, the "partitions" of the rdd are not in
> the same
> order as implied by the data on disk (i.e., "part-r-0.parquet,
> part-r-1.parquet, etc).
>
> So, how might one read the data so that one maintains the sort order?
>
> And while on the subject, after the "terasort", how did they check that the
> data was actually sorted correctly? (or did they :-) ? ).
>
> Is there any way to read the data back in so as to preserve the sort, or do
> I need to
> "zipWithIndex" before writing it out, and write the index at that time? (I
> haven't tried the
> latter yet).
>
> Thanks!
> -Mike
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  

Strange behavior with PySpark when using Join() and zip()

2015-03-23 Thread Ofer Mendelevitch
Hi,

I am running into a strange issue when doing a JOIN of two RDDs followed by ZIP 
from PySpark.
It’s part of a more complex application, but was able to narrow it down to a 
simplified example that’s easy to replicate and causes the same problem to 
appear:


raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)])
data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair: 
','.join([x for x in pair[1]]))
d1 = data.map(lambda s: s.split(',')[0])
d2 = data.map(lambda s: s.split(',')[1])
x = d1.zip(d2)

print x.take(10)


The output is:


[('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81', 
'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'), ('v83', 
'v83')]

As you can see, the ordering of items is not preserved anymore in all cases. 
(e.g., ‘v81’ is preserved, and ‘v45’ is not)
Is it not supposed to be preserved?

If I do the same thing without the JOIN:

data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100))
d1 = data.map(lambda s: s.split(',')[0])
d2 = data.map(lambda s: s.split(',')[1])
x = d1.zip(d2)

print x.take(10)


The output is:


[('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'), ('v5', 
'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')]

As expected.

Anyone run into this or a similar issue?

Ofer


Re: Spark streaming alerting

2015-03-23 Thread Tathagata Das
Something like that is not really supported out of the box. You will have
to implement your RPC mechanism (sending stuff back to the driver for
forwarding) own for that.

TD

On Mon, Mar 23, 2015 at 9:43 AM, Mohit Anchlia 
wrote:

> I think I didn't explain myself properly :) What I meant to say was that
> generally spark worker runs on either on HDFS's data nodes or on Cassandra
> nodes, which typically is in a private network (protected). When a
> condition is matched it's difficult to send out the alerts directly from
> the worker nodes because of the security concerns. I was wondering if there
> is a way to listen on the events as they occur on the sliding window scale
> or is the best way to accomplish is to post back to a queue?
>
> On Mon, Mar 23, 2015 at 2:22 AM, Khanderao Kand Gmail <
> khanderao.k...@gmail.com> wrote:
>
>> Akhil
>>
>> You are right in tour answer to what Mohit wrote. However what Mohit
>> seems to be alluring but did not write properly might be different.
>>
>> Mohit
>>
>> You are wrong in saying "generally" streaming works in HDFS and cassandra
>> . Streaming typically works with streaming or queing source like Kafka,
>> kinesis, Twitter, flume, zeroMQ, etc (but can also from HDFS and S3 )
>> However , streaming context ( "receiver" wishing the streaming context )
>> gets events/messages/records and forms a time window based batch (RDD)-
>>
>> So there is a maximum gap of window time from alert message was available
>> to spark and when the processing happens. I think you meant about this.
>>
>> As per spark programming model, RDD is the right way to deal with data.
>> If you are fine with the minimum delay of say a sec (based on min time
>> window that dstreaming can support) then what Rohit gave is a right model.
>>
>> Khanderao
>>
>> On Mar 22, 2015, at 11:39 PM, Akhil Das 
>> wrote:
>>
>> What do you mean you can't send it directly from spark workers? Here's a
>> simple approach which you could do:
>>
>> val data = ssc.textFileStream("sigmoid/")
>> val dist = data.filter(_.contains("ERROR")).foreachRDD(rdd =>
>> alert("Errors :" + rdd.count()))
>>
>> And the alert() function could be anything triggering an email or sending
>> an SMS alert.
>>
>> Thanks
>> Best Regards
>>
>> On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia 
>> wrote:
>>
>>> Is there a module in spark streaming that lets you listen to
>>> the alerts/conditions as they happen in the streaming module? Generally
>>> spark streaming components will execute on large set of clusters like hdfs
>>> or Cassandra, however when it comes to alerting you generally can't send it
>>> directly from the spark workers, which means you need a way to listen to
>>> the alerts.
>>>
>>
>>
>


Re: Write to Parquet File in Python

2015-03-23 Thread chuwiey
Hey Akriti23,

pyspark gives you a saveAsParquetFile() api, to save your rdd as parquet.
You will however, need to infer the schema or describe it manually before
you can do so. Here are some docs about that (v1.2.1, you can search for the
others, they're relatively similar 1.1 and up): 
http://spark.apache.org/docs/1.2.1/sql-programming-guide.html#inferring-the-schema-using-reflection
http://spark.apache.org/docs/1.2.1/sql-programming-guide.html#parquet-files

As for whether it is the most efficient way to do a range query, that's a
more difficult question and it would be helpful if you could give some more
information. Another thing to think about is that you could just use a temp
table, and not store the parquet all together. <- same docs, just read
through them



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Write-to-Parquet-File-in-Python-tp22186p22191.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: version conflict common-net

2015-03-23 Thread Sean Owen
I think it's spark.yarn.user.classpath.first in 1.2, and
spark.{driver,executor}.extraClassPath in 1.3.  Obviously that's for
if you are using YARN, in the first instance.

On Mon, Mar 23, 2015 at 5:41 PM, Jacob Abraham  wrote:
> Hi Sean,
>
> Thanks a ton for you reply.
>
> The particular situation I have is case (3) that you have mentioned. The
> class that I am using from commons-net is FTPClient(). This class is present
> in both the 2.2 version and the 3.3 version. However, in the 3.3 version
> there are two additional methods (among several others)
> "setAutoDetectUTF8()" and "setControlKeepAliveTimeout()" that we require to
> use.
>
> The class FTPClient() and the two methods are a part of a custom receiver
> ("ZipStream"), that we wrote. Our spark app is deployed on YARN. Looking
> online and doing more research I found this link - SPARK-939
>
> In the comments section, I found a mention that there is already a flag
> "spark.yarn.user.classpath.first" to make this happen. I have not yet tried
> it out. I guess, this should do the trick right? Also, there are some more
> JIRA items I see that, indicate combining the
> "spark.files.userClassPathFirst" and "spark.yarn.user.classpath.first". It
> has been marked as resolved. However I am a bit confused about the state of
> the JIRA as far as Cloudera version of spark. We are using spark that comes
> with CDH 5.3.2 (Spark 1.2.0). I think this combined flag is marked for spark
> 1.3.
>
>
> If all else fails shade 3.3... :)
>
>
> Thanks again,
> -Jacob
>
>
>
>
>
>
>
>
>
>
>
>
>
> On Fri, Mar 20, 2015 at 10:07 AM, Sean Owen  wrote:
>>
>> It's not a crazy question, no. I'm having a bit of trouble figuring
>> out what's happening. Commons Net 2.2 is what's used by Spark. The
>> error appears to come from Spark. But the error is not finding a
>> method that did not exist in 2.2. I am not sure what ZipStream is, for
>> example. This could be a bizarre situation where classloader rules
>> mean that part of 2.2 and part of 3.3 are being used. For example,
>> let's say:
>>
>> - your receiver uses 3.3 classes that are only in 3.3, so they are
>> found in your user classloader
>> - 3.3 classes call some class that also existed in 2.2, but those are
>> found in the Spark classloader.
>> - 2.2 class doesn't have methods that 3.3 expects
>>
>> userClassPathFirst is often a remedy. There are several versions of
>> this flag though. For example you need a different one if on YARN to
>> have it take effect.
>>
>> It's worth ruling that out first. If all else fails you can shade 3.3.
>>
>> On Fri, Mar 20, 2015 at 11:44 AM, Jacob Abraham 
>> wrote:
>> > Anyone? or is this question nonsensical... and I am doing something
>> > fundamentally wrong?
>> >
>> >
>> >
>> > On Mon, Mar 16, 2015 at 5:33 PM, Jacob Abraham 
>> > wrote:
>> >>
>> >> Hi Folks,
>> >>
>> >> I have a situation where I am getting a version conflict between java
>> >> libraries that is used by my application and ones used by spark.
>> >>
>> >> Following are the details -
>> >>
>> >> I use spark provided by Cloudera running on the CDH5.3.2 cluster (Spark
>> >> 1.2.0-cdh5.3.2). The library that is causing the conflict is
>> >> commons-net.
>> >>
>> >> In our spark application we use commons-net with version 3.3.
>> >>
>> >> However I found out that spark uses commons-net version 2.2.
>> >>
>> >> Hence when we try to submit our application using spark-submit, I end
>> >> up
>> >> getting, a NoSuchMethodError()
>> >>
>> >> Error starting receiver 5 -
>> >>
>> >>
>> >> java.lang.NoSuchMethodError:
>> >> org.apache.commons.net.ftp.FTPClient.setAutodetectUTF8(Z)V
>> >>
>> >>  at ZipStream.onStart(ZipStream.java:55)
>> >>  at
>> >>
>> >> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>> >>  at
>> >>
>> >> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>> >>  at
>> >>
>> >> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
>> >>  at
>> >>
>> >> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)
>> >>
>> >>   .
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> Now, if I change the commons-net version to 2.2, the job runs fine
>> >> (expect
>> >> for the fact that some of the features we use from the commons-net 3.3
>> >> are
>> >> not there).
>> >>
>> >>
>> >>
>> >> How does one resolve such an issue where sparks uses one set of
>> >> libraries
>> >> and our user application requires the same set of libraries, but just a
>> >> different version of it (In my case commons-net 2.2 vs 3.3).
>> >>
>> >>
>> >> I see that there is a setting that I can supply -
>> >> "spark.files.userClassPathFirst", but the documentation says that it is
>> >> experimental and for us this did not work at all.
>> >>
>> >>
>> >> Thanks in advance.
>> >>
>> >>
>> >> Regards,
>> >>
>> >> -Jacob
>> >>
>> >>
>> 

Re: spark disk-to-disk

2015-03-23 Thread Koert Kuipers
there is a way to reinstate the partitioner, but that requires
sc.objectFile to read exactly what i wrote, which means sc.objectFile
should never split files on reading (a feature of hadoop file inputformat
that gets in the way here).

On Mon, Mar 23, 2015 at 1:39 PM, Koert Kuipers  wrote:

> i just realized the major limitation is that i lose partitioning info...
>
> On Mon, Mar 23, 2015 at 1:34 AM, Reynold Xin  wrote:
>
>>
>> On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers  wrote:
>>
>>> so finally i can resort to:
>>> rdd.saveAsObjectFile(...)
>>> sc.objectFile(...)
>>> but that seems like a rather broken abstraction.
>>>
>>>
>> This seems like a fine solution to me.
>>
>>
>


Re: version conflict common-net

2015-03-23 Thread Jacob Abraham
Hi Sean,

Thanks a ton for you reply.

The particular situation I have is case (3) that you have mentioned. The
class that I am using from commons-net is FTPClient(). This class is
present in both the 2.2 version and the 3.3 version. However, in the 3.3
version there are two additional methods (among several others)
"setAutoDetectUTF8()" and "setControlKeepAliveTimeout()" that we require to
use.

The class FTPClient() and the two methods are a part of a custom receiver
("ZipStream"), that we wrote. Our spark app is deployed on YARN. Looking
online and doing more research I found this link - SPARK-939


In the comments section, I found a mention that there is already a flag "
spark.yarn.user.classpath.first" to make this happen. I have not yet tried
it out. I guess, this should do the trick right? Also, there are some more
JIRA items I see that, indicate combining the "
spark.files.userClassPathFirst" and "spark.yarn.user.classpath.first". It
has been marked as resolved. However I am a bit confused about the state of
the JIRA as far as Cloudera version of spark. We are using spark that comes
with CDH 5.3.2 (Spark 1.2.0). I think this combined flag is marked for
spark 1.3.


If all else fails shade 3.3... :)


Thanks again,
-Jacob













On Fri, Mar 20, 2015 at 10:07 AM, Sean Owen  wrote:

> It's not a crazy question, no. I'm having a bit of trouble figuring
> out what's happening. Commons Net 2.2 is what's used by Spark. The
> error appears to come from Spark. But the error is not finding a
> method that did not exist in 2.2. I am not sure what ZipStream is, for
> example. This could be a bizarre situation where classloader rules
> mean that part of 2.2 and part of 3.3 are being used. For example,
> let's say:
>
> - your receiver uses 3.3 classes that are only in 3.3, so they are
> found in your user classloader
> - 3.3 classes call some class that also existed in 2.2, but those are
> found in the Spark classloader.
> - 2.2 class doesn't have methods that 3.3 expects
>
> userClassPathFirst is often a remedy. There are several versions of
> this flag though. For example you need a different one if on YARN to
> have it take effect.
>
> It's worth ruling that out first. If all else fails you can shade 3.3.
>
> On Fri, Mar 20, 2015 at 11:44 AM, Jacob Abraham 
> wrote:
> > Anyone? or is this question nonsensical... and I am doing something
> > fundamentally wrong?
> >
> >
> >
> > On Mon, Mar 16, 2015 at 5:33 PM, Jacob Abraham 
> wrote:
> >>
> >> Hi Folks,
> >>
> >> I have a situation where I am getting a version conflict between java
> >> libraries that is used by my application and ones used by spark.
> >>
> >> Following are the details -
> >>
> >> I use spark provided by Cloudera running on the CDH5.3.2 cluster (Spark
> >> 1.2.0-cdh5.3.2). The library that is causing the conflict is
> commons-net.
> >>
> >> In our spark application we use commons-net with version 3.3.
> >>
> >> However I found out that spark uses commons-net version 2.2.
> >>
> >> Hence when we try to submit our application using spark-submit, I end up
> >> getting, a NoSuchMethodError()
> >>
> >> Error starting receiver 5 -
> >>
> >>
> >> java.lang.NoSuchMethodError:
> >> org.apache.commons.net.ftp.FTPClient.setAutodetectUTF8(Z)V
> >>
> >>  at ZipStream.onStart(ZipStream.java:55)
> >>  at
> >>
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
> >>  at
> >>
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
> >>  at
> >>
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
> >>  at
> >>
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)
> >>
> >>   .
> >>
> >>
> >>
> >>
> >>
> >>
> >> Now, if I change the commons-net version to 2.2, the job runs fine
> (expect
> >> for the fact that some of the features we use from the commons-net 3.3
> are
> >> not there).
> >>
> >>
> >>
> >> How does one resolve such an issue where sparks uses one set of
> libraries
> >> and our user application requires the same set of libraries, but just a
> >> different version of it (In my case commons-net 2.2 vs 3.3).
> >>
> >>
> >> I see that there is a setting that I can supply -
> >> "spark.files.userClassPathFirst", but the documentation says that it is
> >> experimental and for us this did not work at all.
> >>
> >>
> >> Thanks in advance.
> >>
> >>
> >> Regards,
> >>
> >> -Jacob
> >>
> >>
> >>
> >>
> >
>


Re: spark disk-to-disk

2015-03-23 Thread Koert Kuipers
i just realized the major limitation is that i lose partitioning info...

On Mon, Mar 23, 2015 at 1:34 AM, Reynold Xin  wrote:

>
> On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers  wrote:
>
>> so finally i can resort to:
>> rdd.saveAsObjectFile(...)
>> sc.objectFile(...)
>> but that seems like a rather broken abstraction.
>>
>>
> This seems like a fine solution to me.
>
>


SchemaRDD/DataFrame result partitioned according to the underlying datasource partitions

2015-03-23 Thread Stephen Boesch
Is there a way to take advantage of the underlying datasource partitions
when generating a DataFrame/SchemaRDD via catalyst?  It seems from the sql
module that the only options are RangePartitioner and HashPartitioner - and
further that those are selected automatically by the code .  It was not
apparent that either the underlying partitioning were translated to the
partitions presented in the rdd or that a custom partitioner were possible
to be provided.

The motivation would be to subsequently use df.map (with
preservesPartitioning=true) and/or df.mapPartitions (likewise) to perform
operations that work within the original datasource partitions - thus
avoiding a shuffle.


Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-23 Thread Martin Goodson
Have you tried to repartition() your original data to make more partitions
before you aggregate?


-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]

On Mon, Mar 23, 2015 at 4:12 PM, Yiannis Gkoufas 
wrote:

> Hi Yin,
>
> Yes, I have set spark.executor.memory to 8g and the worker memory to 16g
> without any success.
> I cannot figure out how to increase the number of mapPartitions tasks.
>
> Thanks a lot
>
> On 20 March 2015 at 18:44, Yin Huai  wrote:
>
>> spark.sql.shuffle.partitions only control the number of tasks in the
>> second stage (the number of reducers). For your case, I'd say that the
>> number of tasks in the first state (number of mappers) will be the number
>> of files you have.
>>
>> Actually, have you changed "spark.executor.memory" (it controls the
>> memory for an executor of your application)? I did not see it in your
>> original email. The difference between worker memory and executor memory
>> can be found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html
>> ),
>>
>> SPARK_WORKER_MEMORY
>> Total amount of memory to allow Spark applications to use on the machine,
>> e.g. 1000m, 2g (default: total memory minus 1 GB); note that each
>> application's individual memory is configured using its
>> spark.executor.memory property.
>>
>>
>> On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas 
>> wrote:
>>
>>> Actually I realized that the correct way is:
>>>
>>> sqlContext.sql("set spark.sql.shuffle.partitions=1000")
>>>
>>> but I am still experiencing the same behavior/error.
>>>
>>> On 20 March 2015 at 16:04, Yiannis Gkoufas  wrote:
>>>
 Hi Yin,

 the way I set the configuration is:

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 sqlContext.setConf("spark.sql.shuffle.partitions","1000");

 it is the correct way right?
 In the mapPartitions task (the first task which is launched), I get
 again the same number of tasks and again the same error. :(

 Thanks a lot!

 On 19 March 2015 at 17:40, Yiannis Gkoufas 
 wrote:

> Hi Yin,
>
> thanks a lot for that! Will give it a shot and let you know.
>
> On 19 March 2015 at 16:30, Yin Huai  wrote:
>
>> Was the OOM thrown during the execution of first stage (map) or the
>> second stage (reduce)? If it was the second stage, can you increase the
>> value of spark.sql.shuffle.partitions and see if the OOM disappears?
>>
>> This setting controls the number of reduces Spark SQL will use and
>> the default is 200. Maybe there are too many distinct values and the 
>> memory
>> pressure on every task (of those 200 reducers) is pretty high. You can
>> start with 400 and increase it until the OOM disappears. Hopefully this
>> will help.
>>
>> Thanks,
>>
>> Yin
>>
>>
>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas <
>> johngou...@gmail.com> wrote:
>>
>>> Hi Yin,
>>>
>>> Thanks for your feedback. I have 1700 parquet files, sized 100MB
>>> each. The number of tasks launched is equal to the number of parquet 
>>> files.
>>> Do you have any idea on how to deal with this situation?
>>>
>>> Thanks a lot
>>> On 18 Mar 2015 17:35, "Yin Huai"  wrote:
>>>
 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems
 your query will be executed with two stages, table scan and map-side
 aggregation in the first stage and the final round of reduce-side
 aggregation in the second stage. Can you take a look at the numbers of
 tasks launched in these two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <
 johngou...@gmail.com> wrote:

> Hi there, I set the executor memory to 8g but it didn't help
>
> On 18 March 2015 at 13:59, Cheng Lian 
> wrote:
>
>> You should probably increase executor memory by setting
>> "spark.executor.memory".
>>
>> Full list of available configurations can be found here
>> http://spark.apache.org/docs/latest/configuration.html
>>
>> Cheng
>>
>>
>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>>
>>> Hi there,
>>>
>>> I was trying the new DataFrame API with some basic operations on
>>> a parquet dataset.
>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker
>>> in a standalone cluster mode.
>>> The code is the following:
>>>
>>> val people = sqlContext.parquetFile("/data.parquet");
>>> val res = people.groupBy("name","date").
>>> agg(sum("power"),sum("supply")).take(10);
>>> System.out.println(res);
>>>
>>> The

Re: Why doesn't the --conf parameter work in yarn-cluster mode (but works in yarn-client and local)?

2015-03-23 Thread Sandy Ryza
Hi Emre,

The --conf property is meant to work with yarn-cluster mode.
System.getProperty("key") isn't guaranteed, but new SparkConf().get("key")
should.  Does it not?

-Sandy

On Mon, Mar 23, 2015 at 8:39 AM, Emre Sevinc  wrote:

> Hello,
>
> According to Spark Documentation at
> https://spark.apache.org/docs/1.2.1/submitting-applications.html :
>
>   --conf: Arbitrary Spark configuration property in key=value format. For
> values that contain spaces wrap “key=value” in quotes (as shown).
>
> And indeed, when I use that parameter, in my Spark program I can retrieve
> the value of the key by using:
>
> System.getProperty("key");
>
> This works when I test my program locally, and also in yarn-client mode, I
> can log the value of the key and see that it matches what I wrote in the
> command line, but it returns *null* when I submit the very same program in
> *yarn-cluster* mode.
>
> Why can't I retrieve the value of key given as --conf "key=value" when I
> submit my Spark application in *yarn-cluster* mode?
>
> Any ideas and/or workarounds?
>
>
> --
> Emre Sevinç
> http://www.bigindustries.be/
>
>


Re: Is yarn-standalone mode deprecated?

2015-03-23 Thread Sandy Ryza
The former is deprecated.  However, the latter is functionally equivalent
to it.  Both launch an app in what is now called "yarn-cluster" mode.

Oozie now also has a native Spark action, though I'm not familiar on the
specifics.

-Sandy

On Mon, Mar 23, 2015 at 1:01 PM, Nitin kak  wrote:

> To be more clear, I am talking about
>
> SPARK_JAR= ./bin/spark-class 
> org.apache.spark.deploy.yarn.Client \
>   --jar  \
>   --class  \
>   --args  \
>   --num-workers  \
>   --master-class 
>   --master-memory  \
>   --worker-memory  \
>   --worker-cores  \
>   --name  \
>   --queue  \
>   --addJars  \
>   --files  \
>   --archives 
>
> which I thought was the yarn-standalone mode
>
> vs
>
> spark-submit
>
> ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
> --master yarn-cluster \
> --num-executors 3 \
> --driver-memory 4g \
> --executor-memory 2g \
> --executor-cores 1 \
> --queue thequeue \
> lib/spark-examples*.jar
>
>
> I didnt see example of ./bin/spark-class in 1.2.0 documentation, so am
> wondering if that is deprecated.
>
>
>
>
>
> On Mon, Mar 23, 2015 at 12:11 PM, Sandy Ryza 
> wrote:
>
>> The mode is not deprecated, but the name "yarn-standalone" is now
>> deprecated.  It's now referred to as "yarn-cluster".
>>
>> -Sandy
>>
>> On Mon, Mar 23, 2015 at 11:49 AM, nitinkak001 
>> wrote:
>>
>>> Is yarn-standalone mode deprecated in Spark now. The reason I am asking
>>> is
>>> because while I can find it in 0.9.0
>>> documentation(https://spark.apache.org/docs/0.9.0/running-on-yarn.html).
>>> I
>>> am not able to find it in 1.2.0.
>>>
>>> I am using this mode to run the Spark jobs from Oozie as a java action.
>>> Removing this mode will prevent me from doing that. Are there any other
>>> ways
>>> of running a Spark job from Oozie other than Shell action?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-yarn-standalone-mode-deprecated-tp22188.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Parquet file + increase read parallelism

2015-03-23 Thread SamyaMaiti
Hi All,

Suppose I have a parquet file of 100 MB in HDFS & my HDFS block is 64MB, so
I have 2 block of data.

When I do, *sqlContext.parquetFile("path")* followed by an action , two
tasks are stared on two partitions.

My intend is to read this 2 blocks in more partitions to fully utilize my
cluster resources & increase parallelism. 

Is there a way to do so like in case of
sc.textFile("path",*numberOfPartitions*).

Please note, I don't want to do *repartition* as that would result in lot of
shuffle.

Thanks in advance.

Regards,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-file-increase-read-parallelism-tp22190.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark error NoClassDefFoundError: org/apache/hadoop/mapred/InputSplit

2015-03-23 Thread Marcelo Vanzin
This feels very CDH-specific (or even CM-specific), I'd suggest
following up on cdh-u...@cloudera.org instead.

On Mon, Mar 23, 2015 at 8:10 AM, , Roy  wrote:
> 15/03/23 11:06:49 WARN TaskSetManager: Lost task 9.0 in stage 0.0 (TID 7,
> hdp003.dev.xyz.com): java.lang.NoClassDefFoundError:
> org/apache/hadoop/mapred/InputSplit

Is the error always on this host? If it is, I'd double check in CM
that this host is properly configured as a YARN and Spark gateway.

To double check, run "hadoop --config /etc/hadoop/conf classpath" on
the node where you're launching the job, then compare the entries to
the filesystem on that node, and make sure all the paths exist.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming alerting

2015-03-23 Thread Mohit Anchlia
I think I didn't explain myself properly :) What I meant to say was that
generally spark worker runs on either on HDFS's data nodes or on Cassandra
nodes, which typically is in a private network (protected). When a
condition is matched it's difficult to send out the alerts directly from
the worker nodes because of the security concerns. I was wondering if there
is a way to listen on the events as they occur on the sliding window scale
or is the best way to accomplish is to post back to a queue?

On Mon, Mar 23, 2015 at 2:22 AM, Khanderao Kand Gmail <
khanderao.k...@gmail.com> wrote:

> Akhil
>
> You are right in tour answer to what Mohit wrote. However what Mohit seems
> to be alluring but did not write properly might be different.
>
> Mohit
>
> You are wrong in saying "generally" streaming works in HDFS and cassandra
> . Streaming typically works with streaming or queing source like Kafka,
> kinesis, Twitter, flume, zeroMQ, etc (but can also from HDFS and S3 )
> However , streaming context ( "receiver" wishing the streaming context )
> gets events/messages/records and forms a time window based batch (RDD)-
>
> So there is a maximum gap of window time from alert message was available
> to spark and when the processing happens. I think you meant about this.
>
> As per spark programming model, RDD is the right way to deal with data.
> If you are fine with the minimum delay of say a sec (based on min time
> window that dstreaming can support) then what Rohit gave is a right model.
>
> Khanderao
>
> On Mar 22, 2015, at 11:39 PM, Akhil Das 
> wrote:
>
> What do you mean you can't send it directly from spark workers? Here's a
> simple approach which you could do:
>
> val data = ssc.textFileStream("sigmoid/")
> val dist = data.filter(_.contains("ERROR")).foreachRDD(rdd =>
> alert("Errors :" + rdd.count()))
>
> And the alert() function could be anything triggering an email or sending
> an SMS alert.
>
> Thanks
> Best Regards
>
> On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia 
> wrote:
>
>> Is there a module in spark streaming that lets you listen to
>> the alerts/conditions as they happen in the streaming module? Generally
>> spark streaming components will execute on large set of clusters like hdfs
>> or Cassandra, however when it comes to alerting you generally can't send it
>> directly from the spark workers, which means you need a way to listen to
>> the alerts.
>>
>
>


Re: PySpark, ResultIterable and taking a list and saving it into different parquet files

2015-03-23 Thread chuwiey
In case anyone wants to learn about my solution for this:
groupByKey is highly inefficient due to the swapping of elements between the
different partitions as well as requiring enough mem in each worker to
handle the elements for each group. So instead of using groupByKey, I ended
up taking the flatMap result, and using subtractByKey in such a way that I
ended up with multiple rdds only including the key I wanted; Now I can
iterate over each rdd independently and end up with multiple parquets.

Thinking of submitting a splitByKeys() pull request, that would take an
array of keys and an rdd, and return an array of rdds each with only one of
the keys. Any thoughts around this?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-ResultIterable-and-taking-a-list-and-saving-it-into-different-parquet-files-tp22152p22189.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-23 Thread Yiannis Gkoufas
Hi Yin,

Yes, I have set spark.executor.memory to 8g and the worker memory to 16g
without any success.
I cannot figure out how to increase the number of mapPartitions tasks.

Thanks a lot

On 20 March 2015 at 18:44, Yin Huai  wrote:

> spark.sql.shuffle.partitions only control the number of tasks in the
> second stage (the number of reducers). For your case, I'd say that the
> number of tasks in the first state (number of mappers) will be the number
> of files you have.
>
> Actually, have you changed "spark.executor.memory" (it controls the
> memory for an executor of your application)? I did not see it in your
> original email. The difference between worker memory and executor memory
> can be found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html
> ),
>
> SPARK_WORKER_MEMORY
> Total amount of memory to allow Spark applications to use on the machine,
> e.g. 1000m, 2g (default: total memory minus 1 GB); note that each
> application's individual memory is configured using its
> spark.executor.memory property.
>
>
> On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas 
> wrote:
>
>> Actually I realized that the correct way is:
>>
>> sqlContext.sql("set spark.sql.shuffle.partitions=1000")
>>
>> but I am still experiencing the same behavior/error.
>>
>> On 20 March 2015 at 16:04, Yiannis Gkoufas  wrote:
>>
>>> Hi Yin,
>>>
>>> the way I set the configuration is:
>>>
>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>> sqlContext.setConf("spark.sql.shuffle.partitions","1000");
>>>
>>> it is the correct way right?
>>> In the mapPartitions task (the first task which is launched), I get
>>> again the same number of tasks and again the same error. :(
>>>
>>> Thanks a lot!
>>>
>>> On 19 March 2015 at 17:40, Yiannis Gkoufas  wrote:
>>>
 Hi Yin,

 thanks a lot for that! Will give it a shot and let you know.

 On 19 March 2015 at 16:30, Yin Huai  wrote:

> Was the OOM thrown during the execution of first stage (map) or the
> second stage (reduce)? If it was the second stage, can you increase the
> value of spark.sql.shuffle.partitions and see if the OOM disappears?
>
> This setting controls the number of reduces Spark SQL will use and the
> default is 200. Maybe there are too many distinct values and the memory
> pressure on every task (of those 200 reducers) is pretty high. You can
> start with 400 and increase it until the OOM disappears. Hopefully this
> will help.
>
> Thanks,
>
> Yin
>
>
> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas  > wrote:
>
>> Hi Yin,
>>
>> Thanks for your feedback. I have 1700 parquet files, sized 100MB
>> each. The number of tasks launched is equal to the number of parquet 
>> files.
>> Do you have any idea on how to deal with this situation?
>>
>> Thanks a lot
>> On 18 Mar 2015 17:35, "Yin Huai"  wrote:
>>
>>> Seems there are too many distinct groups processed in a task, which
>>> trigger the problem.
>>>
>>> How many files do your dataset have and how large is a file? Seems
>>> your query will be executed with two stages, table scan and map-side
>>> aggregation in the first stage and the final round of reduce-side
>>> aggregation in the second stage. Can you take a look at the numbers of
>>> tasks launched in these two stages?
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <
>>> johngou...@gmail.com> wrote:
>>>
 Hi there, I set the executor memory to 8g but it didn't help

 On 18 March 2015 at 13:59, Cheng Lian 
 wrote:

> You should probably increase executor memory by setting
> "spark.executor.memory".
>
> Full list of available configurations can be found here
> http://spark.apache.org/docs/latest/configuration.html
>
> Cheng
>
>
> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>
>> Hi there,
>>
>> I was trying the new DataFrame API with some basic operations on
>> a parquet dataset.
>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker
>> in a standalone cluster mode.
>> The code is the following:
>>
>> val people = sqlContext.parquetFile("/data.parquet");
>> val res = people.groupBy("name","date").
>> agg(sum("power"),sum("supply")).take(10);
>> System.out.println(res);
>>
>> The dataset consists of 16 billion entries.
>> The error I get is java.lang.OutOfMemoryError: GC overhead limit
>> exceeded
>>
>> My configuration is:
>>
>> spark.serializer org.apache.spark.serializer.KryoSerializer
>> spark.driver.memory6g
>> spark.executor.extraJavaOptions -XX:+UseCompressedOops
>> spark.shuffle.managersort
>>>

Re: Spark error NoClassDefFoundError: org/apache/hadoop/mapred/InputSplit

2015-03-23 Thread Ted Yu
InputSplit is in hadoop-mapreduce-client-core jar

Please check that the jar is in your classpath.

Cheers

On Mon, Mar 23, 2015 at 8:10 AM, , Roy  wrote:

> Hi,
>
>
>   I am using CDH 5.3.2 packages installation through Cloudera Manager 5.3.2
>
> I am trying to run one spark job with following command
>
> PYTHONPATH=~/code/utils/ spark-submit --master yarn --executor-memory 3G
> --num-executors 30 --driver-memory 2G --executor-cores 2 --name=analytics
> /home/abc/code/updb/spark/UPDB3analytics.py -date 2015-03-01
>
> but I am getting following error
>
> 15/03/23 11:06:49 WARN TaskSetManager: Lost task 9.0 in stage 0.0 (TID 7,
> hdp003.dev.xyz.com): java.lang.NoClassDefFoundError:
> org/apache/hadoop/mapred/InputSplit
> at java.lang.Class.getDeclaredConstructors0(Native Method)
> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2532)
> at java.lang.Class.getDeclaredConstructors(Class.java:1901)
> at
> java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1749)
> at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:72)
> at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:250)
> at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:248)
> at java.security.AccessController.doPrivileged(Native Method)
> at
> java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:247)
> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:613)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.mapred.InputSplit
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 25 more
>
> here is the full trace
>
> https://gist.github.com/anonymous/3492f0ec63d7a23c47cf
>
>


Re: Is yarn-standalone mode deprecated?

2015-03-23 Thread Sandy Ryza
The mode is not deprecated, but the name "yarn-standalone" is now
deprecated.  It's now referred to as "yarn-cluster".

-Sandy

On Mon, Mar 23, 2015 at 11:49 AM, nitinkak001  wrote:

> Is yarn-standalone mode deprecated in Spark now. The reason I am asking is
> because while I can find it in 0.9.0
> documentation(https://spark.apache.org/docs/0.9.0/running-on-yarn.html). I
> am not able to find it in 1.2.0.
>
> I am using this mode to run the Spark jobs from Oozie as a java action.
> Removing this mode will prevent me from doing that. Are there any other
> ways
> of running a Spark job from Oozie other than Shell action?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-yarn-standalone-mode-deprecated-tp22188.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Is yarn-standalone mode deprecated?

2015-03-23 Thread nitinkak001
Is yarn-standalone mode deprecated in Spark now. The reason I am asking is
because while I can find it in 0.9.0
documentation(https://spark.apache.org/docs/0.9.0/running-on-yarn.html). I
am not able to find it in 1.2.0. 

I am using this mode to run the Spark jobs from Oozie as a java action.
Removing this mode will prevent me from doing that. Are there any other ways
of running a Spark job from Oozie other than Shell action? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-yarn-standalone-mode-deprecated-tp22188.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.2. loses often all executors

2015-03-23 Thread Ted Yu
In this thread:
http://search-hadoop.com/m/JW1q5DM69G

I only saw two replies. Maybe some people forgot to use 'Reply to All' ?

Cheers

On Mon, Mar 23, 2015 at 8:19 AM, mrm  wrote:

> Hi,
>
> I have received three replies to my question on my personal e-mail, why
> don't they also show up on the mailing list? I would like to reply to the 3
> users through a thread.
>
> Thanks,
> Maria
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162p22187.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark 1.2. loses often all executors

2015-03-23 Thread mrm
Hi, 

I have received three replies to my question on my personal e-mail, why
don't they also show up on the mailing list? I would like to reply to the 3
users through a thread.

Thanks,
Maria



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162p22187.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: join two DataFrames, same column name

2015-03-23 Thread Eric Friedman
>
> You can include * and a column alias in the same select clause
> var df1 = sqlContext.sql("select *, column_id AS table1_id from table1")


FYI, this does not ultimately work as the * still includes column_id and
you cannot have two columns of that name in the joined DataFrame.  So I
ended up aliasing both sides of the join.

On Sun, Mar 22, 2015 at 1:25 PM, Michael Armbrust 
wrote:

> You can include * and a column alias in the same select clause
> var df1 = sqlContext.sql("select *, column_id AS table1_id from table1")
>
>
> I'm also hoping to resolve SPARK-6376
>  before Spark 1.3.1
> which will let you do something like:
> var df1 = sqlContext.sql("select * from table1").as("t1")
> var df2 = sqlContext.sql("select * from table2).as("t2")
> df1.join(df2, df1("column_id") === df2("column_id")).select("t1.column_id")
>
> Finally, there is SPARK-6380
>  that hopes to simplify
> this particular case.
>
> Michael
>
> On Sat, Mar 21, 2015 at 3:02 PM, Eric Friedman 
> wrote:
>
>> I have a couple of data frames that I pulled from SparkSQL and the
>> primary key of one is a foreign key of the same name in the other.  I'd
>> rather not have to specify each column in the SELECT statement just so that
>> I can rename this single column.
>>
>> When I try to join the data frames, I get an exception because it finds
>> the two columns of the same name to be ambiguous.  Is there a way to
>> specify which side of the join comes from data frame A and which comes from
>> B?
>>
>> var df1 = sqlContext.sql("select * from table1")
>> var df2 = sqlContext.sql("select * from table2)
>>
>> df1.join(df2, df1("column_id") === df2("column_id"))
>>
>
>


Re: JAVA_HOME problem with upgrade to 1.3.0

2015-03-23 Thread Williams, Ken


> From: , Ken Williams 
> mailto:ken.willi...@windlogics.com>>
> Date: Thursday, March 19, 2015 at 10:59 AM
> To: Spark list mailto:user@spark.apache.org>>
> Subject: JAVA_HOME problem with upgrade to 1.3.0
>
> […]
> Finally, I go and check the YARN app master’s web interface (since the job is 
> shown, I know it at least made it that far), and the
> only logs it shows are these:
>
> Log Type: stderr
> Log Length: 61
> /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory
>
> Log Type: stdout
> Log Length: 0

I’m still interested in a solution to this issue if anyone has comments.  I 
also posted to SO if that’s more convenient:


http://stackoverflow.com/questions/29170280/java-home-error-with-upgrade-to-spark-1-3-0

Thanks,

  -Ken




CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the intended 
recipient(s) and may contain confidential and privileged information. Any 
unauthorized review, use, disclosure or distribution of any kind is strictly 
prohibited. If you are not the intended recipient, please contact the sender 
via reply e-mail and destroy all copies of the original message. Thank you.


Spark error NoClassDefFoundError: org/apache/hadoop/mapred/InputSplit

2015-03-23 Thread , Roy
Hi,


  I am using CDH 5.3.2 packages installation through Cloudera Manager 5.3.2

I am trying to run one spark job with following command

PYTHONPATH=~/code/utils/ spark-submit --master yarn --executor-memory 3G
--num-executors 30 --driver-memory 2G --executor-cores 2 --name=analytics
/home/abc/code/updb/spark/UPDB3analytics.py -date 2015-03-01

but I am getting following error

15/03/23 11:06:49 WARN TaskSetManager: Lost task 9.0 in stage 0.0 (TID 7,
hdp003.dev.xyz.com): java.lang.NoClassDefFoundError:
org/apache/hadoop/mapred/InputSplit
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2532)
at java.lang.Class.getDeclaredConstructors(Class.java:1901)
at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1749)
at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:250)
at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:248)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:247)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:613)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.mapred.InputSplit
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 25 more

here is the full trace

https://gist.github.com/anonymous/3492f0ec63d7a23c47cf


Re: join two DataFrames, same column name

2015-03-23 Thread Eric Friedman
Michael, thank you for the workaround and for letting me know of the
upcoming enhancements, both of which sound appealing.

On Sun, Mar 22, 2015 at 1:25 PM, Michael Armbrust 
wrote:

> You can include * and a column alias in the same select clause
> var df1 = sqlContext.sql("select *, column_id AS table1_id from table1")
>
>
> I'm also hoping to resolve SPARK-6376
>  before Spark 1.3.1
> which will let you do something like:
> var df1 = sqlContext.sql("select * from table1").as("t1")
> var df2 = sqlContext.sql("select * from table2).as("t2")
> df1.join(df2, df1("column_id") === df2("column_id")).select("t1.column_id")
>
> Finally, there is SPARK-6380
>  that hopes to simplify
> this particular case.
>
> Michael
>
> On Sat, Mar 21, 2015 at 3:02 PM, Eric Friedman 
> wrote:
>
>> I have a couple of data frames that I pulled from SparkSQL and the
>> primary key of one is a foreign key of the same name in the other.  I'd
>> rather not have to specify each column in the SELECT statement just so that
>> I can rename this single column.
>>
>> When I try to join the data frames, I get an exception because it finds
>> the two columns of the same name to be ambiguous.  Is there a way to
>> specify which side of the join comes from data frame A and which comes from
>> B?
>>
>> var df1 = sqlContext.sql("select * from table1")
>> var df2 = sqlContext.sql("select * from table2)
>>
>> df1.join(df2, df1("column_id") === df2("column_id"))
>>
>
>


Re: RDD storage in spark steaming

2015-03-23 Thread Jeffrey Jedele
Hey Abhi,
many of StreamingContext's methods to create input streams take a
StorageLevel parameter to configure this behavior. RDD partitions are
generally stored in the in-memory cache of worker nodes I think. You can
also configure replication and spilling to disk if needed.

Regards,
Jeff

2015-03-23 15:26 GMT+01:00 abhi :

> HI,
> i have a simple question about creating RDD . Whenever RDD is created in
> spark streaming for the particular time window .When does the RDD gets
> stored .
>
> 1. Does it get stored at the Driver machine ? or it gets stored on all the
> machines in the cluster ?
> 2. Does the data gets stored in memory by default ? Can it store at the
> memory and disk ? How can it configured ?
>
>
> Thanks,
> Abhi
>
>


RDD storage in spark steaming

2015-03-23 Thread abhi
HI,
i have a simple question about creating RDD . Whenever RDD is created in
spark streaming for the particular time window .When does the RDD gets
stored .

1. Does it get stored at the Driver machine ? or it gets stored on all the
machines in the cluster ?
2. Does the data gets stored in memory by default ? Can it store at the
memory and disk ? How can it configured ?


Thanks,
Abhi


Spark RDD mapped to Hbase to be updateable

2015-03-23 Thread Siddharth Ubale
Hi,

We have a JavaRDD mapped to a hbase table  and when we query on the Hbase table 
using Spark-sql API we can access the data. However when we update Hbase table 
while the SparkSQL & SparkConf is intialised we cannot see updated data. Is 
there any way we can have the RDD mapped to Hbase updated as and when Hbase 
table reflects any change??

Thanks,
Siddharth Ubale,
Synchronized Communications
#43, Velankani Tech Park, Block No. II,
3rd Floor, Electronic City Phase I,
Bangalore – 560 100
Tel : +91 80 3202 4060
Web: www.syncoms.com
[LogoNEWmohLARGE]
London|Bangalore|Orlando

we innovate, plan, execute, and transform the business​



RE: Spark SQL udf(ScalaUdf) is very slow

2015-03-23 Thread Cheng, Hao
This is a very interesting issue, the root reason for the lower performance 
probably is, in Scala UDF, Spark SQL converts the data type from internal 
representation to Scala representation via Scala reflection recursively.

Can you create a Jira issue for tracking this? I can start to work on the 
improvement soon.

From: zzcclp [mailto:441586...@qq.com]
Sent: Monday, March 23, 2015 5:10 PM
To: user@spark.apache.org
Subject: Spark SQL udf(ScalaUdf) is very slow

My test env: 1. Spark version is 1.3.0 2. 3 node per 80G/20C 3. read 250G 
parquet files from hdfs Test case: 1. register "floor" func with command: 
sqlContext.udf.register("floor", (ts: Int) => ts - ts % 300), then run with sql 
"select chan, floor(ts) as tt, sum(size) from qlogbase3 group by chan, 
floor(ts)", it takes 17 minutes. == Physical Plan == Aggregate false, 
[chan#23015,PartialGroup#23500], [chan#23015,PartialGroup#23500 AS 
tt#23494,CombineSum(PartialSum#23499L) AS c2#23495L] Exchange (HashPartitioning 
[chan#23015,PartialGroup#23500], 54) Aggregate true, 
[chan#23015,scalaUDF(ts#23016)], [chan#23015,scalaUDF(ts#23016) AS 
PartialGroup#23500,SUM(size#23023L) AS PartialSum#23499L] PhysicalRDD 
[chan#23015,ts#23016,size#23023L], MapPartitionsRDD[115] at map at 
newParquet.scala:562 2. run with sql "select chan, (ts - ts % 300) as tt, 
sum(size) from qlogbase3 group by chan, (ts - ts % 300)", it takes only 5 
minutes. == Physical Plan == Aggregate false, [chan#23015,PartialGroup#23349], 
[chan#23015,PartialGroup#23349 AS tt#23343,CombineSum(PartialSum#23348L) AS 
c2#23344L] Exchange (HashPartitioning [chan#23015,PartialGroup#23349], 54) 
Aggregate true, [chan#23015,(ts#23016 - (ts#23016 % 300))], 
[chan#23015,(ts#23016 - (ts#23016 % 300)) AS 
PartialGroup#23349,SUM(size#23023L) AS PartialSum#23348L] PhysicalRDD 
[chan#23015,ts#23016,size#23023L], MapPartitionsRDD[83] at map at 
newParquet.scala:562 3. use HiveContext with sql "select chan, floor((ts - ts % 
300)) as tt, sum(size) from qlogbase3 group by chan, floor((ts - ts % 300))" it 
takes only 5 minutes too. == Physical Plan == Aggregate false, 
[chan#23015,PartialGroup#23108L], [chan#23015,PartialGroup#23108L AS 
tt#23102L,CombineSum(PartialSum#23107L) AS _c2#23103L] Exchange 
(HashPartitioning [chan#23015,PartialGroup#23108L], 54) Aggregate true, 
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
 - (ts#23016 % 300)))], 
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
 - (ts#23016 % 300))) AS PartialGroup#23108L,SUM(size#23023L) AS 
PartialSum#23107L] PhysicalRDD [chan#23015,ts#23016,size#23023L], 
MapPartitionsRDD[28] at map at newParquet.scala:562 Why? ScalaUdf is so slow?? 
How to improve it?

View this message in context: Spark SQL udf(ScalaUdf) is very 
slow
Sent from the Apache Spark User List mailing list 
archive at Nabble.com.


Re: registerTempTable is not a member of RDD on spark 1.2?

2015-03-23 Thread IT CTO
Yes!

Any reason this happen in my environment and not in any sample code I found?
Should I fix something in the path or env?

Eran

On Mon, Mar 23, 2015 at 3:50 PM, Ted Yu  wrote:

> Have you tried adding the following ?
>
> import org.apache.spark.sql.SQLContext
>
> Cheers
>
> On Mon, Mar 23, 2015 at 6:45 AM, IT CTO  wrote:
>
>> Thanks.
>> I am new to the environment and running cloudera CDH5.3 with spark in it.
>>
>> apparently when running in spark-shell this command  val sqlContext = new
>> SQLContext(sc)
>> I am failing with the not found type SQLContext
>>
>> Any idea why?
>>
>> On Mon, Mar 23, 2015 at 3:05 PM, Dean Wampler 
>> wrote:
>>
>>> In 1.2 it's a member of SchemaRDD and it becomes available on RDD
>>> (through the "type class" mechanism) when you add a SQLContext, like so.
>>>
>>> val sqlContext = new SQLContext(sc)import sqlContext._
>>>
>>>
>>> In 1.3, the method has moved to the new DataFrame type.
>>>
>>> Dean Wampler, Ph.D.
>>> Author: Programming Scala, 2nd Edition
>>>  (O'Reilly)
>>> Typesafe 
>>> @deanwampler 
>>> http://polyglotprogramming.com
>>>
>>> On Mon, Mar 23, 2015 at 5:25 AM, IT CTO  wrote:
>>>
 Hi,

 I am running spark when I use sc.version I get 1.2 but when I call
 registerTempTable("MyTable") I get error saying registedTempTable is not a
 member of RDD

 Why?

 --
 Eran | CTO

>>>
>>>
>>
>>
>> --
>> Eran | CTO
>>
>
>


-- 
Eran | CTO


Re: registerTempTable is not a member of RDD on spark 1.2?

2015-03-23 Thread Ted Yu
Have you tried adding the following ?

import org.apache.spark.sql.SQLContext

Cheers

On Mon, Mar 23, 2015 at 6:45 AM, IT CTO  wrote:

> Thanks.
> I am new to the environment and running cloudera CDH5.3 with spark in it.
>
> apparently when running in spark-shell this command  val sqlContext = new
> SQLContext(sc)
> I am failing with the not found type SQLContext
>
> Any idea why?
>
> On Mon, Mar 23, 2015 at 3:05 PM, Dean Wampler 
> wrote:
>
>> In 1.2 it's a member of SchemaRDD and it becomes available on RDD
>> (through the "type class" mechanism) when you add a SQLContext, like so.
>>
>> val sqlContext = new SQLContext(sc)import sqlContext._
>>
>>
>> In 1.3, the method has moved to the new DataFrame type.
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>>  (O'Reilly)
>> Typesafe 
>> @deanwampler 
>> http://polyglotprogramming.com
>>
>> On Mon, Mar 23, 2015 at 5:25 AM, IT CTO  wrote:
>>
>>> Hi,
>>>
>>> I am running spark when I use sc.version I get 1.2 but when I call
>>> registerTempTable("MyTable") I get error saying registedTempTable is not a
>>> member of RDD
>>>
>>> Why?
>>>
>>> --
>>> Eran | CTO
>>>
>>
>>
>
>
> --
> Eran | CTO
>


Re: registerTempTable is not a member of RDD on spark 1.2?

2015-03-23 Thread IT CTO
Thanks.
I am new to the environment and running cloudera CDH5.3 with spark in it.

apparently when running in spark-shell this command  val sqlContext = new
SQLContext(sc)
I am failing with the not found type SQLContext

Any idea why?

On Mon, Mar 23, 2015 at 3:05 PM, Dean Wampler  wrote:

> In 1.2 it's a member of SchemaRDD and it becomes available on RDD (through
> the "type class" mechanism) when you add a SQLContext, like so.
>
> val sqlContext = new SQLContext(sc)import sqlContext._
>
>
> In 1.3, the method has moved to the new DataFrame type.
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
>  (O'Reilly)
> Typesafe 
> @deanwampler 
> http://polyglotprogramming.com
>
> On Mon, Mar 23, 2015 at 5:25 AM, IT CTO  wrote:
>
>> Hi,
>>
>> I am running spark when I use sc.version I get 1.2 but when I call
>> registerTempTable("MyTable") I get error saying registedTempTable is not a
>> member of RDD
>>
>> Why?
>>
>> --
>> Eran | CTO
>>
>
>


-- 
Eran | CTO


Re: Saving Dstream into a single file

2015-03-23 Thread Dean Wampler
You can use the coalesce method to reduce the number of partitions. You can
reduce to one if the data is not too big. Then write the output.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Mon, Mar 16, 2015 at 2:42 PM, Zhan Zhang  wrote:

> Each RDD has multiple partitions, each of them will produce one hdfs file
> when saving output. I don’t think you are allowed to have multiple file
> handler writing to the same hdfs file.  You still can load multiple files
> into hive tables, right?
>
> Thanks..
>
> Zhan Zhang
>
> On Mar 15, 2015, at 7:31 AM, tarek_abouzeid 
> wrote:
>
> > i am doing word count example on flume stream and trying to save output
> as
> > text files in HDFS , but in the save directory i got multiple sub
> > directories each having files with small size , i wonder if there is a
> way
> > to append in a large file instead of saving in multiple files , as i
> intend
> > to save the output in hive hdfs directory so i can query the result using
> > hive
> >
> > hope anyone have a workaround for this issue , Thanks in advance
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Dstream-into-a-single-file-tp22058.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


[no subject]

2015-03-23 Thread Udbhav Agarwal
Hi,
I am querying hbase via Spark SQL with java APIs.
Step -1
creating
JavaPairRdd, then JavaRdd, then JavaSchemaRdd.applySchema objects.
Step -2
sqlContext.sql(sql query).

If am updating my hbase database between these two steps(by hbase shell in some 
other console) the query in step two is not picking the updated data from the 
table. Its showing the old data. Can somebody tell how to let spark know I have 
updated my database after spark has created Rdds.




Thanks,
Udbhav Agarwal



Re: registerTempTable is not a member of RDD on spark 1.2?

2015-03-23 Thread Dean Wampler
In 1.2 it's a member of SchemaRDD and it becomes available on RDD (through
the "type class" mechanism) when you add a SQLContext, like so.

val sqlContext = new SQLContext(sc)import sqlContext._


In 1.3, the method has moved to the new DataFrame type.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Mon, Mar 23, 2015 at 5:25 AM, IT CTO  wrote:

> Hi,
>
> I am running spark when I use sc.version I get 1.2 but when I call
> registerTempTable("MyTable") I get error saying registedTempTable is not a
> member of RDD
>
> Why?
>
> --
> Eran | CTO
>


Why doesn't the --conf parameter work in yarn-cluster mode (but works in yarn-client and local)?

2015-03-23 Thread Emre Sevinc
Hello,

According to Spark Documentation at
https://spark.apache.org/docs/1.2.1/submitting-applications.html :

  --conf: Arbitrary Spark configuration property in key=value format. For
values that contain spaces wrap “key=value” in quotes (as shown).

And indeed, when I use that parameter, in my Spark program I can retrieve
the value of the key by using:

System.getProperty("key");

This works when I test my program locally, and also in yarn-client mode, I
can log the value of the key and see that it matches what I wrote in the
command line, but it returns *null* when I submit the very same program in
*yarn-cluster* mode.

Why can't I retrieve the value of key given as --conf "key=value" when I
submit my Spark application in *yarn-cluster* mode?

Any ideas and/or workarounds?


-- 
Emre Sevinç
http://www.bigindustries.be/


registerTempTable is not a member of RDD on spark 1.2?

2015-03-23 Thread IT CTO
Hi,

I am running spark when I use sc.version I get 1.2 but when I call
registerTempTable("MyTable") I get error saying registedTempTable is not a
member of RDD

Why?

-- 
Eran | CTO


Re: How to check that a dataset is sorted after it has been written out?

2015-03-23 Thread Sean Owen
Data is not (necessarily) sorted when read from disk, no. A file might
have many blocks even, and while a block yields a partition in
general, the order in which those partitions appear in the RDD is not
defined. This is why you'd sort if you need the data sorted.

I think you could conceivably make some custom RDD or InputFormat that
reads blocks in a well-defined order and, assuming the data is sorted
in some knowable way on disk, then must have them sorted. I think
that's even been brought up.

Deciding whether the data is sorted is quite different. You'd have to
decide what ordering you expect (is part 0 before part 1? should it be
sorted in a part file?) and then just verify that externally.

On Fri, Mar 20, 2015 at 10:41 PM, Michael Albert
 wrote:
> Greetings!
>
> I sorted a dataset in Spark and then wrote it out in avro/parquet.
>
> Then I wanted to check that it was sorted.
>
> It looks like each partition has been sorted, but when reading in, the first
> "partition" (i.e., as
> seen in the partition index of mapPartitionsWithIndex) is not the same  as
> implied by
> the names of the parquet files (even when the number of partitions is the
> same in the
> rdd which was read as on disk).
>
> If I "take()" a few hundred values, they are sorted, but they are *not* the
> same as if I
> explicitly open "part-r-0.parquet" and take values from that.
>
> It seems that when opening the rdd, the "partitions" of the rdd are not in
> the same
> order as implied by the data on disk (i.e., "part-r-0.parquet,
> part-r-1.parquet, etc).
>
> So, how might one read the data so that one maintains the sort order?
>
> And while on the subject, after the "terasort", how did they check that the
> data was actually sorted correctly? (or did they :-) ? ).
>
> Is there any way to read the data back in so as to preserve the sort, or do
> I need to
> "zipWithIndex" before writing it out, and write the index at that time? (I
> haven't tried the
> latter yet).
>
> Thanks!
> -Mike
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to handle under-performing nodes in the cluster

2015-03-23 Thread Sean Owen
Why is it under-performing? this just says it executed fewer tasks,
which could be because of data locality, etc. Better is to look at
whether stages are slowed down by straggler tasks, and if so, whether
they come from one machine, and if so what may be different about that
one.

On Fri, Mar 20, 2015 at 2:35 PM, Yiannis Gkoufas  wrote:
> Hi all,
>
> I have 6 nodes in the cluster and one of the nodes is clearly
> under-performing:
>
>
>
> I was wandering what is the impact of having such issues? Also what is the
> recommended way to workaround it?
>
> Thanks a lot,
> Yiannis

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Data/File structure Validation

2015-03-23 Thread Ahmed Nawar
Dear Raunak,

   Source system provided logs with some errors. I need to make sure each
row is in correct format (number of columns/ attributes and data types is
correct) and move incorrect Rows to separated List.

Of course i can do my logic but i need to make sure there is no direct way.


Thanks,
Nawwar


On Mon, Mar 23, 2015 at 1:14 PM, Raunak Jhawar 
wrote:

> CSV is a structured format and JSON is not (semi structured). It is
> obvious for different JSON documents to have differing schema? What are you
> trying to do here?
>
> --
> Thanks,
> Raunak Jhawar
> m: 09820890034
>
>
>
>
>
>
> On Mon, Mar 23, 2015 at 2:18 PM, Ahmed Nawar 
> wrote:
>
>> Dears,
>>
>> Is there any way to validate the CSV, Json ... Files while loading to
>> DataFrame.
>> I need to ignore corrupted rows.(Rows with not matching with the
>> schema).
>>
>>
>> Thanks,
>> Ahmed Nawwar
>>
>
>


Re: How Does aggregate work

2015-03-23 Thread Paweł Szulc
It is actually number of cores. If your processor has hyperthreading then
it will be more (number of processors your OS sees)

niedz., 22 mar 2015, 4:51 PM Ted Yu użytkownik 
napisał:

> I assume spark.default.parallelism is 4 in the VM Ashish was using.
>
> Cheers
>


Re: Spark Sql with python udf fail

2015-03-23 Thread lonely Feb
sql("SELECT * FROM ").foreach(println)

can be executed successfully. So the problem may still be in UDF code. How
can i print the the line with ArrayIndexOutOfBoundsException in catalyst?

2015-03-23 17:04 GMT+08:00 lonely Feb :

> ok i'll try asap
>
> 2015-03-23 17:00 GMT+08:00 Cheng Lian :
>
>>  I suspect there is a malformed row in your input dataset. Could you try
>> something like this to confirm:
>>
>> sql("SELECT * FROM ").foreach(println)
>>
>> If there does exist a malformed line, you should see similar exception.
>> And you can catch it with the help of the output. Notice that the messages
>> are printed to stdout on executor side.
>>
>> On 3/23/15 4:36 PM, lonely Feb wrote:
>>
>>   I caught exceptions in the python UDF code, flush exceptions into a
>> single file, and made sure the the column number of the output lines as
>> same as sql schema.
>>
>>  Sth. interesting is that my output line of the UDF code is just 10
>> columns, and the exception above is java.lang.
>> ArrayIndexOutOfBoundsException: 9, is there any inspirations?
>>
>> 2015-03-23 16:24 GMT+08:00 Cheng Lian :
>>
>>> Could you elaborate on the UDF code?
>>>
>>>
>>> On 3/23/15 3:43 PM, lonely Feb wrote:
>>>
 Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a
 sql job with python udf i got a exception:

 java.lang.ArrayIndexOutOfBoundsException: 9
 at
 org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
 at
 org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
 at
 org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166)
 at
 org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30)
 at
 org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30)
 at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:390)
 at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
 at
 org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:156)
 at
 org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:151)
 at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601)
 at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)

 I suspected there was an odd line in the input file. But the input file
 is so large and i could not found any abnormal lines with several jobs to
 check. How can i get the abnormal line here ?

>>>
>>>
>>​
>>
>
>


Use pig load function in spark

2015-03-23 Thread Dai, Kevin
Hi, all

Can spark use pig's load function to load data?

Best Regards,
Kevin.


Re: Data/File structure Validation

2015-03-23 Thread Ahmed Nawar
Dear Taotao,

Yes, I tried sparkCSV.


Thanks,
Nawwar


On Mon, Mar 23, 2015 at 12:20 PM, Taotao.Li  wrote:

> can it load successfully if the format is invalid?
>
> --
> *发件人: *"Ahmed Nawar" 
> *收件人: *user@spark.apache.org
> *发送时间: *星期一, 2015年 3 月 23日 下午 4:48:54
> *主题: *Data/File structure Validation
>
> Dears,
>
> Is there any way to validate the CSV, Json ... Files while loading to
> DataFrame.
> I need to ignore corrupted rows.(Rows with not matching with the
> schema).
>
>
> Thanks,
> Ahmed Nawwar
>
>
>
> --
>
>
> *---*
>
> *Thanks & Best regards*
>
> 李涛涛 Taotao · Li  |  Fixed Income@Datayes  |  Software Engineer
>
> 地址:上海市浦东新区陆家嘴西路99号万向大厦8楼, 200120
> Address :Wanxiang Towen 8F, Lujiazui West Rd. No.99, Pudong New District,
> Shanghai, 200120
>
> 电话|Phone:021-60216502  手机|Mobile: +86-18202171279
>
>


Re: Spark streaming alerting

2015-03-23 Thread Khanderao Kand Gmail
Akhil 

You are right in tour answer to what Mohit wrote. However what Mohit seems to 
be alluring but did not write properly might be different.

Mohit

You are wrong in saying "generally" streaming works in HDFS and cassandra . 
Streaming typically works with streaming or queing source like Kafka, kinesis, 
Twitter, flume, zeroMQ, etc (but can also from HDFS and S3 ) However , 
streaming context ( "receiver" wishing the streaming context ) gets 
events/messages/records and forms a time window based batch (RDD)- 

So there is a maximum gap of window time from alert message was available to 
spark and when the processing happens. I think you meant about this. 

As per spark programming model, RDD is the right way to deal with data.  If you 
are fine with the minimum delay of say a sec (based on min time window that 
dstreaming can support) then what Rohit gave is a right model. 

Khanderao

> On Mar 22, 2015, at 11:39 PM, Akhil Das  wrote:
> 
> What do you mean you can't send it directly from spark workers? Here's a 
> simple approach which you could do:
> 
> val data = ssc.textFileStream("sigmoid/")
> val dist = data.filter(_.contains("ERROR")).foreachRDD(rdd => 
> alert("Errors :" + rdd.count()))
> 
> And the alert() function could be anything triggering an email or sending an 
> SMS alert.
> 
> Thanks
> Best Regards
> 
>> On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia  
>> wrote:
>> Is there a module in spark streaming that lets you listen to the 
>> alerts/conditions as they happen in the streaming module? Generally spark 
>> streaming components will execute on large set of clusters like hdfs or 
>> Cassandra, however when it comes to alerting you generally can't send it 
>> directly from the spark workers, which means you need a way to listen to the 
>> alerts.
> 


  1   2   >