Re: using pyspark with standalone cluster

2015-06-02 Thread Akhil Das
If you want to submit applications to a remote cluster where your port 7077
is opened publically, then you would need to set the *spark.driver.host *(with
the public ip of your laptop) and *spark.driver.port* (optional, if there's
no firewall between your laptop and the remote cluster). Keeping your 7077
open for public is a bad idea, you can read more here
https://www.sigmoid.com/securing-apache-spark-cluster/

Thanks
Best Regards

On Mon, Jun 1, 2015 at 11:48 PM, AlexG swift...@gmail.com wrote:

 I've followed the instructions for setting up a standalone spark cluster
 (on
 EC2):
 - install spark on all the machines
 - enabled passwordless ssh
 - setup the conf/slaves file
 - start the master and slaves with the provided scripts

 The status on the 8080 port of the master tells me that the master and
 executors are all running. I can successfully use pyspark from the master.

 However, if I try to call pyspark remotely from my laptop, with
 MASTER=spark://ip:7077 pyspark,
 I get these errors:
 15/06/01 10:02:14 INFO AppClient$ClientActor: Connecting to master
 akka.tcp://sparkMaster@IP:7077/user/Master...
 15/06/01 10:02:34 INFO AppClient$ClientActor: Connecting to master
 akka.tcp://sparkMaster@IP:7077/user/Master...
 15/06/01 10:02:54 INFO AppClient$ClientActor: Connecting to master
 akka.tcp://sparkMaster@IP:7077/user/Master...
 15/06/01 10:03:14 ERROR SparkDeploySchedulerBackend: Application has been
 killed. Reason: All masters are unresponsive! Giving up.
 15/06/01 10:03:14 ERROR TaskSchedulerImpl: Exiting due to error from
 cluster
 scheduler: All masters are unresponsive! Giving up.

 Any idea what's going on here? I set port 7077 to be publicly accessible...




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/using-pyspark-with-standalone-cluster-tp23099.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Insert overwrite to hive - ArrayIndexOutOfBoundsException

2015-06-02 Thread patcharee

Hi,

I am using spark 1.3.1. I tried to insert (a new partition) into an 
existing partitioned hive table, but got ArrayIndexOutOfBoundsException. 
Below is a code snippet and the debug log. Any suggestions please.


+
case class Record4Dim(key: String, date: Int, hh: Int, x: Int, y: Int, 
z: Int, height: Float,

  u: Float , v: Float, w: Float, ph: Float, phb: Float,
  t: Float, p: Float, pb: Float, qvapor: Float, qgraup: Float,
  qnice: Float, qnrain: Float, tke_pbl: Float, el_pbl: Float)

def flatKeyFromWrf(x: (String, (Map[String,Float], Float))): Record4Dim 
= {  }




val varWHeightFlatRDD = 
varWHeightRDD.map(FlatMapUtilClass().flatKeyFromWrf).toDF()

varWHeightFlatRDD.registerTempTable(table_4Dim)
for (zz - 1 to 51)
hiveContext.sql(INSERT OVERWRITE table 4dim partition (zone= + 
ZONE + ,z= + zz + ,year= + YEAR + ,month= + MONTH + )  +
select date, hh, x, y, height, u, v, w, ph, phb, t, pb, pb, 
qvapor, qgraup, qnice, tke_pbl, el_pbl from table_4Dim where z= + zz);


+

15/06/01 21:07:20 DEBUG YarnHistoryService: Enqueue [1433192840040]: 
SparkListenerTaskEnd(4,0,ResultTask,ExceptionFailure(java.lang.ArrayIndexOutOfBoundsException,18,[Ljava.lang.StackTraceElement;@5783ce22,java.lang.ArrayIndexOutOfBoundsException: 
18
at 
org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:79)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:103)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:100)

at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:100)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:82)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:82)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)


Best,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Union data type

2015-06-02 Thread Agarwal, Shagun
Hi,

As per the doc https://github.com/databricks/spark-avro/blob/master/README.md, 
Union type doesn’t support all kind of combination.
Is there any plan to support union type having string  long in near future?

Thanks
Shagun Agarwal





spark sql - reading data from sql tables having space in column names

2015-06-02 Thread Sachin Goyal
Hi,

We are using spark sql (1.3.1) to load data from Microsoft sql server using
jdbc (as described in
https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
).

It is working fine except when there is a space in column names (we can't
modify the schemas to remove space as it is a legacy database).

Sqoop is able to handle such scenarios by enclosing column names in '[ ]' -
the recommended method from microsoft sql server. (
https://github.com/apache/sqoop/blob/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java
- line no 319)

Is there a way to handle this in spark sql?

Thanks,
sachin


Re: flatMap output on disk / flatMap memory overhead

2015-06-02 Thread Akhil Das
You could try rdd.persist(MEMORY_AND_DISK/DISK_ONLY).flatMap(...), I think
StorageLevel MEMORY_AND_DISK means spark will try to keep the data in
memory and if there isn't sufficient space then it will be shipped to the
disk.

Thanks
Best Regards

On Mon, Jun 1, 2015 at 11:02 PM, octavian.ganea octavian.ga...@inf.ethz.ch
wrote:

 Hi,

 Is there any way to force the output RDD of a  flatMap op to be stored in
 both memory and disk as it is computed ? My RAM would not be able to fit
 the
 entire output of flatMap, so it really needs to starts using disk after the
 RAM gets full. I didn't find any way to force this.

 Also, what is the memory overhead of flatMap ? From my computations, the
 output RDD should fit in memory, but I get the following error after a
 while
 (and I know it's because of memory issues, since running the program with
 1/3 of the input data finishes succesfully)

 15/06/01 19:02:49 ERROR BlockFetcherIterator$BasicBlockFetcherIterator:
 Could not get block(s) from
 ConnectionManagerId(dco-node036-mgt.dco.ethz.ch,57478)
 java.io.IOException: sendMessageReliably failed because ack was not
 received
 within 60 sec
 at

 org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:866)
 at

 org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:865)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:865)
 at

 io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
 at

 io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
 at
 io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
 at java.lang.Thread.run(Thread.java:745)


 Also, I've seen also this:
 https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
 but my understanding is that one should apply something like:
 rdd.flatMap(...).persist(MEMORY_AND_DISK) which assumes that the entire
 output of flatMap is first stored in memory (which is not possible in my
 case) and, only when it's done, is stored on the disk. Please correct me if
 I'm wrong.  Anways, I've tried using this , but I got the same error.

 My config:

 conf.set(spark.cores.max, 128)
 conf.set(spark.akka.frameSize, 1024)
 conf.set(spark.executor.memory, 125g)
 conf.set(spark.shuffle.file.buffer.kb, 1000)
 conf.set(spark.shuffle.consolidateFiles, true)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: HDFS Rest Service not available

2015-06-02 Thread Akhil Das
It says your namenode is down (connection refused on 8020), you can restart
your HDFS by going into hadoop directory and typing sbin/stop-dfs.sh and
then sbin/start-dfs.sh

Thanks
Best Regards

On Tue, Jun 2, 2015 at 5:03 AM, Su She suhsheka...@gmail.com wrote:

 Hello All,

 A bit scared I did something stupid...I killed a few PIDs that were
 listening to ports 2183 (kafka), 4042 (spark app), some of the PIDs
 didn't even seem to be stopped as they still are running when i do

 lsof -i:[port number]

 I'm not sure if the problem started after or before I did these kill
 commands, but I now can't connect to HDFS or start spark. I can't seem
 to access Hue. I am afraid I accidentally killed an important process
 related to HDFS. But, I am not sure what it would be as I couldn't
 even kill the PIDs.

 Is it a coincidence that HDFS failed randomly? Likely that I killed an
 important PID? How can I maybe restart HDFS?

 Thanks a lot!

 Error on Hue:

 Cannot access: /user/ec2-user. The HDFS REST service is not available.
 Note: You are a Hue admin but not a HDFS superuser (which is hdfs).

 HTTPConnectionPool(host='ec2-ip-address.us-west-1.compute.amazonaws.com',
 port=50070): Max retries exceeded with url:
 /webhdfs/v1/user/ec2-user?op=GETFILESTATUSuser.name=huedoas=ec2-user
 (Caused by class 'socket.error': [Errno 111] Connection refused)

 Error when I try to open spark-shell or a spark app:
 java.net.ConnectException: Call From
 ip-10-0-2-216.us-west-1.compute.internal/10.0.2.216 to
 ip-10-0-2-216.us-west-1.compute.internal:8020 failed on connection
 exception: java.net.ConnectException: Connection refused; For more
 details see:  http://wiki.apache.org/hadoop/ConnectionRefused
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at
 org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783)
 at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730)
 at org.apache.hadoop.ipc.Client.call(Client.java:1415)
 at org.apache.hadoop.ipc.Client.call(Client.java:1364)
 at
 org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
 at com.sun.proxy.$Proxy20.getFileInfo(Unknown Source)
 at
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:744)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 at com.sun.proxy.$Proxy21.getFileInfo(Unknown Source)
 at
 org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1921)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1089)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1085)
 at
 org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1085)
 at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
 at
 org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:123)
 at org.apache.spark.util.FileLogger.start(FileLogger.scala:115)
 at
 org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74)
 at org.apache.spark.SparkContext.init(SparkContext.scala:353)
 at
 org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
 at $iwC$$iwC.init(console:9)
 at $iwC.init(console:18)
 at init(console:20)
 at .init(console:24)
 at .clinit(console)
 at .init(console:7)
 at .clinit(console)
 at $print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
 at
 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
 at
 

Re: Spark stages very slow to complete

2015-06-02 Thread Karlson
Hi, the code is some hundreds lines of Python. I can try to compose a 
minimal example as soon as I find the time, though. Any ideas until 
then?



Would you mind posting the code?
On 2 Jun 2015 00:53, Karlson ksonsp...@siberie.de wrote:


Hi,

In all (pyspark) Spark jobs, that become somewhat more involved, I am
experiencing the issue that some stages take a very long time to 
complete
and sometimes don't at all. This clearly correlates with the size of 
my
input data. Looking at the stage details for one such stage, I am 
wondering

where Spark spends all this time. Take this table of the stages task
metrics for example:

Metric  Min 25th
percentile  Median  75th percentile Max
Duration1.4 min 1.5 min 1.7 
min

 1.9 min 2.3 min
Scheduler Delay 1 ms3 ms4 ms
  5 ms23 ms
Task Deserialization Time   1 ms2 ms3 ms
  8 ms22 ms
GC Time 0 ms0 ms0 ms
  0 ms0 ms
Result Serialization Time   0 ms0 ms0 ms
  0 ms1 ms
Getting Result Time 0 ms0 ms0 ms
  0 ms0 ms
Input Size / Records23.9 KB / 1 24.0 KB / 1 24.1 
KB /

1 24.1 KB / 1 24.3 KB / 1

Why is the overall duration almost 2min? Where is all this time spent,
when no progress of the stages is visible? The progress bar simply 
displays
0 succeeded tasks for a very long time before sometimes slowly 
progressing.


Also, the name of the stage displayed above is `javaToPython at 
null:-1`,
which I find very uninformative. I don't even know which action 
exactly is
responsible for this stage. Does anyone experience similar issues or 
have

any advice for me?

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: What is shuffle read and what is shuffle write ?

2015-06-02 Thread Akhil Das
I found an interesting presentation
http://www.slideshare.net/colorant/spark-shuffle-introduction and go
through this thread also
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-shuffle-work-in-spark-td584.html

Thanks
Best Regards

On Tue, Jun 2, 2015 at 3:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Is it input and ouput bytes/record size ?

 --
 Deepak




Re: Spark 1.3.1 bundle does not build - unresolved dependency

2015-06-02 Thread Akhil Das
You can try to skip the tests, try with:

mvn -Dhadoop.version=2.4.0 -Pyarn *-DskipTests* clean package


Thanks
Best Regards

On Tue, Jun 2, 2015 at 2:51 AM, Stephen Boesch java...@gmail.com wrote:

 I downloaded the 1.3.1 distro tarball

 $ll ../spark-1.3.1.tar.gz
 -rw-r-@ 1 steve  staff  8500861 Apr 23 09:58 ../spark-1.3.1.tar.gz

 However the build on it is failing with an unresolved dependency: 
 *configuration
 not public*

 $ build/sbt   assembly -Dhadoop.version=2.5.2 -Pyarn -Phadoop-2.4

 [error] (network-shuffle/*:update) sbt.ResolveException: *unresolved
 dependency: *org.apache.spark#spark-network-common_2.10;1.3.1: *configuration
 not public* in org.apache.spark#spark-network-common_2.10;1.3.1: 'test'.
 It was required from org.apache.spark#spark-network-shuffle_2.10;1.3.1 test

 Is there a known workaround for this?

 thanks




Re: flatMap output on disk / flatMap memory overhead

2015-06-02 Thread octavian.ganea
I was tried using reduceByKey, without success. 

I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey .
However, I got the same error as before, namely the error described here: 
http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html

My task is to count the frequencies of pairs of words that occur in a set of
documents at least 5 times. I know that this final output is sparse and
should comfortably fit in memory. However, the intermediate pairs that are
spilled by flatMap might need to be stored on the disk, but I don't
understand why the persist option does not work and my job fails.

My code:

rdd.persist(StorageLevel.MEMORY_AND_DISK)
 .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type
((word1,word2) , 1)
.reduceByKey((a,b) = (a + b).toShort)
.filter({case((x,y),count) = count = 5})
 

My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One
node I keep for the master, 7 nodes for the workers.

my conf:

conf.set(spark.cores.max, 128)
conf.set(spark.akka.frameSize, 1024)
conf.set(spark.executor.memory, 115g)
conf.set(spark.shuffle.file.buffer.kb, 1000)

my spark-env.sh:
 ulimit -n 20
 SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
-XX:-UseCompressedOops
 SPARK_DRIVER_MEMORY=129G

spark version: 1.1.1

Thank you a lot for your help!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming K-medoids

2015-06-02 Thread Marko Dinic

Erik,

Thank you for your answer. It seems really good, but unfortunately I'm 
not very familiar with Scala, so I have partly understood.


Could you please explain your idea with Spark implementation?

Best regards,
Marko

On Mon 01 Jun 2015 06:35:17 PM CEST, Erik Erlandson wrote:


I haven't given any thought to streaming it, but in case it's useful I do have 
a k-medoids implementation for Spark:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.cluster.KMedoids

Also a blog post about multi-threading it:
http://erikerlandson.github.io/blog/2015/05/06/parallel-k-medoids-using-scala-parseq/



- Original Message -

Hello everyone,

I have an idea and I would like to get a validation from community about
it.

In Mahout there is an implementation of Streaming K-means. I'm
interested in your opinion would it make sense to make a similar
implementation of Streaming K-medoids?

K-medoids has even bigger problems than K-means because it's not
scalable, but can be useful in some cases (e.g. It allows more
sophisticated distance measures).

What is your opinion about such an approach? Does anyone see problems
with it?

Best regards,
Marko

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.4.0-rc3: Actor not found

2015-06-02 Thread Anders Arpteg
Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that
worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster
mode), initial stage starts, but the job fails before any task succeeds
with the following error. Any hints?

[ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
Exception in thread main akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/),
Path(/user/OutputCommitCoordinator)]
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
at
akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
at
akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)


What is shuffle read and what is shuffle write ?

2015-06-02 Thread ๏̯͡๏
Is it input and ouput bytes/record size ?

-- 
Deepak


Re: Embedding your own transformer in Spark.ml Pipleline

2015-06-02 Thread Peter Rudenko

Hi Dimple,
take a look to existing transformers:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
(*it's for spark-1.4)

The idea is just to implement class that extends Transformer 
withHasInputColwithHasOutputCol (if your transformer 1:1 column 
transformer) and has


deftransform(dataset: DataFrame):DataFrame

method.

Thanks,
Peter
On 2015-06-02 20:19, dimple wrote:

Hi,
I would like to embed my own transformer in the Spark.ml Pipleline but do
not see an example of it. Can someone share an example of which
classes/interfaces I need to extend/implement in order to do so. Thanks.

Dimple



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





Re: flatMap output on disk / flatMap memory overhead

2015-06-02 Thread Richard Marscher
Are you sure it's memory related? What is the disk utilization and IO
performance on the workers? The error you posted looks to be related to
shuffle trying to obtain block data from another worker node and failing to
do so in reasonable amount of time. It may still be memory related, but I'm
not sure that other resources are ruled out yet.

On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea octavian.ga...@inf.ethz.ch
wrote:

 I was tried using reduceByKey, without success.

 I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey .
 However, I got the same error as before, namely the error described here:

 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html

 My task is to count the frequencies of pairs of words that occur in a set
 of
 documents at least 5 times. I know that this final output is sparse and
 should comfortably fit in memory. However, the intermediate pairs that are
 spilled by flatMap might need to be stored on the disk, but I don't
 understand why the persist option does not work and my job fails.

 My code:

 rdd.persist(StorageLevel.MEMORY_AND_DISK)
  .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type
 ((word1,word2) , 1)
 .reduceByKey((a,b) = (a + b).toShort)
 .filter({case((x,y),count) = count = 5})


 My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One
 node I keep for the master, 7 nodes for the workers.

 my conf:

 conf.set(spark.cores.max, 128)
 conf.set(spark.akka.frameSize, 1024)
 conf.set(spark.executor.memory, 115g)
 conf.set(spark.shuffle.file.buffer.kb, 1000)

 my spark-env.sh:
  ulimit -n 20
  SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
 -XX:-UseCompressedOops
  SPARK_DRIVER_MEMORY=129G

 spark version: 1.1.1

 Thank you a lot for your help!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: updateStateByKey and kafka direct approach without shuffle

2015-06-02 Thread Cody Koeninger
If you're using the spark partition id directly as the key, then you don't
need to access offset ranges at all, right?
You can create a single instance of a partitioner in advance, and all it
needs to know is the number of partitions (which is just the count of all
the kafka topic/partitions).

On Tue, Jun 2, 2015 at 12:40 PM, Krot Viacheslav krot.vyaches...@gmail.com
wrote:

 Cody,

 Thanks, good point. I fixed getting partition id to:

 class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner {
   override def numPartitions: Int = offsetRanges.size

   override def getPartition(key: Any): Int = {
 // this is set in .map(m = (TaskContext.get().partitionId(), m.value))
 key.asInstanceOf[Int]
   }
 }

 inputStream
 .map(m = (TaskContext.get().partitionId(), m.value))
 .transform { rdd =
 val part = new
 MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
 new ProxyRDDWithPartitioner(rdd, part)
 }
 ...

 But how can I create same partitioner during updateStateByKey call?   I
 have no idea how to access rdd when calling updateStateByKey.

 вт, 2 июня 2015 г. в 19:15, Cody Koeninger c...@koeninger.org:

 I think the general idea is worth pursuing.

 However, this line:

  override def getPartition(key: Any): Int = {
 key.asInstanceOf[(String, Int)]._2
   }

 is using the kafka partition id, not the spark partition index, so it's
 going to give you fewer partitions / incorrect index

 Cast the rdd to HasOffsetRanges, get the offsetRanges from it.  The index
 into the offset range array matches the (spark) partition id.  That will
 also tell you what the value of numPartitions should be.







 On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav 
 krot.vyaches...@gmail.com wrote:

 Hi all,
 In my streaming job I'm using kafka streaming direct approach and want
 to maintain state with updateStateByKey. My PairRDD has message's topic
 name + partition id as a key. So, I assume that updateByState could work
 within same partition as KafkaRDD and not lead to shuffles. Actually this
 is not true, because updateStateByKey leads to cogroup transformation that
 thinks, that state rdd and kafka rdd are not co-partitioned, as kafka rdd
 does not have partitioner at all. So, dependency is considered to be wide
 and leads to shuffle.

 I tried to avoid shuffling by providing custom partitioner to
 updateStateByKey, but KafkaRDD need to use same partitioner. For this I
 created a proxy RDD that just returns my partitioner.

 class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part:
 Partitioner) extends RDD[T](prev) {

   override val partitioner = Some(part)

   override def compute(split: Partition, context: TaskContext):
 Iterator[T] = prev.compute(split, context)

   override protected def getPartitions: Array[Partition] =
 prev.partitions

   override def getPreferredLocations(thePart: Partition): Seq[String] =
 prev.preferredLocations(thePart)
 }

 I use it as:
 val partitioner = new Partitioner {
   // TODO this should be retrieved from kafka
   override def numPartitions: Int = 2

   override def getPartition(key: Any): Int = {
 key.asInstanceOf[(String, Int)]._2
   }
 }

 inputStream
   .map(m = ((m.topic, m.partition), m.value))
   .transform(new ProxyRDDWithPartitioner(_, partitioner))
   .updateStateByKey(func, partitioner)
   

 The question is - is it safe to do such trick?





Re: Spark 1.4.0-rc3: Actor not found

2015-06-02 Thread Yin Huai
Does it happen every time you read a parquet source?

On Tue, Jun 2, 2015 at 3:42 AM, Anders Arpteg arp...@spotify.com wrote:

 The log is from the log aggregation tool (hortonworks, yarn logs ...),
 so both executors and driver. I'll send a private mail to you with the full
 logs. Also, tried another job as you suggested, and it actually worked
 fine. The first job was reading from a parquet source, and the second from
 an avro source. Could there be some issues with the parquet reader?

 Thanks,
 Anders

 On Tue, Jun 2, 2015 at 11:53 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 How about other jobs? Is it an executor log, or a driver log? Could you
 post other logs near this error, please? Thank you.

 Best Regards,
 Shixiong Zhu

 2015-06-02 17:11 GMT+08:00 Anders Arpteg arp...@spotify.com:

 Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that
 worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster
 mode), initial stage starts, but the job fails before any task succeeds
 with the following error. Any hints?

 [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0]
 [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler]
 swallowing exception during message send
 (akka.remote.RemoteTransportExceptionNoStackTrace)
 Exception in thread main akka.actor.ActorNotFound: Actor not found
 for: ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/),
 Path(/user/OutputCommitCoordinator)]
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at
 akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
 at
 akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
 at
 scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
 at
 akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
 at
 akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
 at
 akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
 at
 akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)






Re: Can't build Spark 1.3

2015-06-02 Thread Ritesh Kumar Singh
It did hang for me too. High RAM consumption during build. Had to free a
lot of RAM and introduce swap memory just to get it build in my 3rd attempt.
Everything else looks fine. You can download the prebuilt versions from the
Spark homepage to save yourself from all this trouble.

Thanks,
Ritesh


Re: HDFS Rest Service not available

2015-06-02 Thread Su She
Ahh, this did the trick, I had to get the name node out of same mode
however before it fully worked.

Thanks!

On Tue, Jun 2, 2015 at 12:09 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 It says your namenode is down (connection refused on 8020), you can restart
 your HDFS by going into hadoop directory and typing sbin/stop-dfs.sh and
 then sbin/start-dfs.sh

 Thanks
 Best Regards

 On Tue, Jun 2, 2015 at 5:03 AM, Su She suhsheka...@gmail.com wrote:

 Hello All,

 A bit scared I did something stupid...I killed a few PIDs that were
 listening to ports 2183 (kafka), 4042 (spark app), some of the PIDs
 didn't even seem to be stopped as they still are running when i do

 lsof -i:[port number]

 I'm not sure if the problem started after or before I did these kill
 commands, but I now can't connect to HDFS or start spark. I can't seem
 to access Hue. I am afraid I accidentally killed an important process
 related to HDFS. But, I am not sure what it would be as I couldn't
 even kill the PIDs.

 Is it a coincidence that HDFS failed randomly? Likely that I killed an
 important PID? How can I maybe restart HDFS?

 Thanks a lot!

 Error on Hue:

 Cannot access: /user/ec2-user. The HDFS REST service is not available.
 Note: You are a Hue admin but not a HDFS superuser (which is hdfs).

 HTTPConnectionPool(host='ec2-ip-address.us-west-1.compute.amazonaws.com',
 port=50070): Max retries exceeded with url:
 /webhdfs/v1/user/ec2-user?op=GETFILESTATUSuser.name=huedoas=ec2-user
 (Caused by class 'socket.error': [Errno 111] Connection refused)

 Error when I try to open spark-shell or a spark app:
 java.net.ConnectException: Call From
 ip-10-0-2-216.us-west-1.compute.internal/10.0.2.216 to
 ip-10-0-2-216.us-west-1.compute.internal:8020 failed on connection
 exception: java.net.ConnectException: Connection refused; For more
 details see:  http://wiki.apache.org/hadoop/ConnectionRefused
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at
 org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783)
 at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730)
 at org.apache.hadoop.ipc.Client.call(Client.java:1415)
 at org.apache.hadoop.ipc.Client.call(Client.java:1364)
 at
 org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
 at com.sun.proxy.$Proxy20.getFileInfo(Unknown Source)
 at
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:744)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 at com.sun.proxy.$Proxy21.getFileInfo(Unknown Source)
 at
 org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1921)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1089)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1085)
 at
 org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1085)
 at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
 at
 org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:123)
 at org.apache.spark.util.FileLogger.start(FileLogger.scala:115)
 at
 org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74)
 at org.apache.spark.SparkContext.init(SparkContext.scala:353)
 at
 org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
 at $iwC$$iwC.init(console:9)
 at $iwC.init(console:18)
 at init(console:20)
 at .init(console:24)
 at .clinit(console)
 at .init(console:7)
 at .clinit(console)
 at $print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 

Re: data localisation in spark

2015-06-02 Thread Shushant Arora
 So in spark is after acquiring executors from ClusterManeger, does tasks
are scheduled on executors based on datalocality ?I Mean if in an
application there are 2 jobs and output of 1 job is used as input of
another job.
And in job1 I did persist on some RDD, then while running job2 will it use
the same executor where job1's output was persisted or it acquire executor
again and data movement happens?

And is it true no of execuotrs in an application are fixed and acquired at
start of application  and remains same throught application? If yes, how
does it takes cares of explicit no of reducers in some of apis say
rddd.reduceByKey(func,10);

does at converting DAG to stages it calculates executors required and then
acquire executors/worker nodes ?


On Tue, Jun 2, 2015 at 11:06 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 It is not possible with JavaSparkContext either.  The API mentioned below
 currently does not have any effect (we should document this).

 The primary difference between MR and Spark here is that MR runs each task
 in its own YARN container, while Spark runs multiple tasks within an
 executor, which needs to be requested before Spark knows what tasks it will
 run.  Although dynamic allocation improves that last part.

 -Sandy

 On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora shushantaror...@gmail.com
 wrote:

 Is it possible in JavaSparkContext ?

 JavaSparkContext jsc = new JavaSparkContext(conf);
 JavaRDDStringlines = jsc.textFile(args[0]);

 If yes , does its programmer's responsibilty to first calculate splits
 locations and then instantiate spark context with preferred locations?

 How does its achieved in MR2 with yarn, there is Application Master
 specifies split locations to ResourceManager before acquiring the node
 managers ?



 On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote:

 Take a look at the following SparkContext constructor variant that
 tries to honor the data locality in YARN mode.

   /**
 * :: DeveloperApi ::
 * Alternative constructor for setting preferred locations where Spark
 will create executors.
 *
 * @param preferredNodeLocationData used in YARN mode to select nodes to
 launch containers on.
 * Can be generated using
 [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
 * from a list of input files or InputFormats for the application.
 */
 @DeveloperApi
 def this(config: SparkConf, preferredNodeLocationData: Map[String,
 Set[SplitInfo]]) = {
 this(config)
 this.preferredNodeLocationData = preferredNodeLocationData
 }

 --
 bit1...@163.com


 *From:* Shushant Arora shushantaror...@gmail.com
 *Date:* 2015-05-31 22:54
 *To:* user user@spark.apache.org
 *Subject:* data localisation in spark

 I want to understand how  spark takes care of data localisation in
 cluster mode when run on YARN.

 1.Driver program asks ResourceManager for executors. Does it tell yarn's
 RM to check HDFS blocks of input data and then allocate executors to it.
 And executors remain fixed throughout application or driver program asks
 for new executors when it submits another job in same application , since
 in spark new job is created for each action . If executors are fixed then
 for second job achieving data localisation is impossible?



 2.When executors are done with their processing, does they are marked as
 free in ResourceManager's resoruce queue and  executors directly tell this
 to Rm  instead of via driver's ?

 Thanks
 Shushant






Can't build Spark 1.3

2015-06-02 Thread Yakubovich, Alexey
\
I downloaded the latest Spark (1.3.) from github. Then I tried to build it.
First for scala 2.10 (and hadoop 2.4):

build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package

That resulted in hangup after printing bunch of line like

[INFO] Dependency-reduced POM written at ……
INFO] Dependency-reduced -
Then I tried for scala 2.11

mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package

That resulted in multiple compilation errors.

What I actually want is:
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 
-Phive-thriftserver -DskipTests clean package

Is it only me, who can’t build Spark 1.3?
And, is there any site  to download Spark prebuilt for Hadoop 2.5 and Hive?

Thank you for any help.
Alexey

This message, including any attachments, is the property of Sears Holdings 
Corporation and/or one of its subsidiaries. It is confidential and may contain 
proprietary or legally privileged information. If you are not the intended 
recipient, please delete it without reading the contents. Thank you.


DataFrames coming in SparkR in Apache Spark 1.4.0

2015-06-02 Thread Emaasit
For the impatient R-user, here is a  link
http://people.apache.org/~pwendell/spark-nightly/spark-1.4-docs/latest/sparkr.html
  
to get started working with DataFrames using SparkR.

Or copy and paste this link into your web browser:
http://people.apache.org/~pwendell/spark-nightly/spark-1.4-docs/latest/sparkr.html

Happy coding,
Daniel



-
Daniel Emaasit, 
Ph.D. Research Assistant
Transportation Research Center (TRC)
University of Nevada, Las Vegas
Las Vegas, NV 89154-4015
Cell: 615-649-2489
www.danielemaasit.com 
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrames-coming-in-SparkR-in-Apache-Spark-1-4-0-tp23116.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: updateStateByKey and kafka direct approach without shuffle

2015-06-02 Thread Krot Viacheslav
Thanks, this works.
Hopefully I didn't miss something important with this approach.

вт, 2 июня 2015 г. в 20:15, Cody Koeninger c...@koeninger.org:

 If you're using the spark partition id directly as the key, then you don't
 need to access offset ranges at all, right?
 You can create a single instance of a partitioner in advance, and all it
 needs to know is the number of partitions (which is just the count of all
 the kafka topic/partitions).

 On Tue, Jun 2, 2015 at 12:40 PM, Krot Viacheslav 
 krot.vyaches...@gmail.com wrote:

 Cody,

 Thanks, good point. I fixed getting partition id to:

 class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner
 {
   override def numPartitions: Int = offsetRanges.size

   override def getPartition(key: Any): Int = {
 // this is set in .map(m = (TaskContext.get().partitionId(),
 m.value))
 key.asInstanceOf[Int]
   }
 }

 inputStream
 .map(m = (TaskContext.get().partitionId(), m.value))
 .transform { rdd =
 val part = new
 MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
 new ProxyRDDWithPartitioner(rdd, part)
 }
 ...

 But how can I create same partitioner during updateStateByKey call?   I
 have no idea how to access rdd when calling updateStateByKey.

 вт, 2 июня 2015 г. в 19:15, Cody Koeninger c...@koeninger.org:

 I think the general idea is worth pursuing.

 However, this line:

  override def getPartition(key: Any): Int = {
 key.asInstanceOf[(String, Int)]._2
   }

 is using the kafka partition id, not the spark partition index, so it's
 going to give you fewer partitions / incorrect index

 Cast the rdd to HasOffsetRanges, get the offsetRanges from it.  The
 index into the offset range array matches the (spark) partition id.  That
 will also tell you what the value of numPartitions should be.







 On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav 
 krot.vyaches...@gmail.com wrote:

 Hi all,
 In my streaming job I'm using kafka streaming direct approach and want
 to maintain state with updateStateByKey. My PairRDD has message's topic
 name + partition id as a key. So, I assume that updateByState could work
 within same partition as KafkaRDD and not lead to shuffles. Actually this
 is not true, because updateStateByKey leads to cogroup transformation that
 thinks, that state rdd and kafka rdd are not co-partitioned, as kafka rdd
 does not have partitioner at all. So, dependency is considered to be wide
 and leads to shuffle.

 I tried to avoid shuffling by providing custom partitioner to
 updateStateByKey, but KafkaRDD need to use same partitioner. For this I
 created a proxy RDD that just returns my partitioner.

 class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part:
 Partitioner) extends RDD[T](prev) {

   override val partitioner = Some(part)

   override def compute(split: Partition, context: TaskContext):
 Iterator[T] = prev.compute(split, context)

   override protected def getPartitions: Array[Partition] =
 prev.partitions

   override def getPreferredLocations(thePart: Partition): Seq[String] =
 prev.preferredLocations(thePart)
 }

 I use it as:
 val partitioner = new Partitioner {
   // TODO this should be retrieved from kafka
   override def numPartitions: Int = 2

   override def getPartition(key: Any): Int = {
 key.asInstanceOf[(String, Int)]._2
   }
 }

 inputStream
   .map(m = ((m.topic, m.partition), m.value))
   .transform(new ProxyRDDWithPartitioner(_, partitioner))
   .updateStateByKey(func, partitioner)
   

 The question is - is it safe to do such trick?






Re: Embedding your own transformer in Spark.ml Pipleline

2015-06-02 Thread Dimp Bhat
Thanks Peter. Can you share the Tokenizer.java class for Spark 1.2.1.

Dimple

On Tue, Jun 2, 2015 at 10:51 AM, Peter Rudenko petro.rude...@gmail.com
wrote:

  Hi Dimple,
 take a look to existing transformers:

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
 (*it's for spark-1.4)

 The idea is just to implement class that extends Transformer with
 HasInputCol with HasOutputCol (if your transformer 1:1 column
 transformer) and has

 def transform(dataset: DataFrame): DataFrame

 method.

 Thanks,
 Peter
 On 2015-06-02 20:19, dimple wrote:

 Hi,
 I would like to embed my own transformer in the Spark.ml Pipleline but do
 not see an example of it. Can someone share an example of which
 classes/interfaces I need to extend/implement in order to do so. Thanks.

 Dimple



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: spark sql - reading data from sql tables having space in column names

2015-06-02 Thread David Mitchell
I am having the same problem reading JSON.  There does not seem to be a way
of selecting a field that has a space, Executor Info from the Spark logs.

I suggest that we open a JIRA ticket to address this issue.
 On Jun 2, 2015 10:08 AM, ayan guha guha.a...@gmail.com wrote:

 I would think the easiest way would be to create a view in DB with column
 names with no space.

 In fact, you can pass a sql in place of a real table.

 From documentation: The JDBC table that should be read. Note that
 anything that is valid in a `FROM` clause of a SQL query can be used. For
 example, instead of a full table you could also use a subquery in
 parentheses.

 Kindly let the community know if this works

 On Tue, Jun 2, 2015 at 6:43 PM, Sachin Goyal sachin.go...@jabong.com
 wrote:

 Hi,

 We are using spark sql (1.3.1) to load data from Microsoft sql server
 using jdbc (as described in
 https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
 ).

 It is working fine except when there is a space in column names (we can't
 modify the schemas to remove space as it is a legacy database).

 Sqoop is able to handle such scenarios by enclosing column names in '[ ]'
 - the recommended method from microsoft sql server. (
 https://github.com/apache/sqoop/blob/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java
 - line no 319)

 Is there a way to handle this in spark sql?

 Thanks,
 sachin




 --
 Best Regards,
 Ayan Guha



[OFFTOPIC] Big Data Application Meetup

2015-06-02 Thread Alex Baranau
Hi everyone,

I wanted to drop a note about a newly organized developer meetup in Bay
Area: the Big Data Application Meetup (http://meetup.com/bigdataapps) and
call for speakers. The plan is for meetup topics to be focused on
application use-cases: how developers can build end-to-end solutions with
open-source big data technologies.

If you want to share your experience, please email me back to become a
speaker. If you have any questions - I will be happy to answer them.

We plan for the first event to be hosted by Cask at its HQ in Palo Alto in
end of June. We also will be promoting meetup at Hadoop Summit and Spark
Summit in the following weeks.

Thank you,
Alex Baranau


Re: IDE for sparkR

2015-06-02 Thread Emaasit
Rstudio is the best IDE for running sparkR.
Instructions for this can be found at this  link
https://github.com/apache/spark/tree/branch-1.4/R  . You will need to set
some environment variables as described below.

*Using SparkR from RStudio*

If you wish to use SparkR from RStudio or other R frontends you will need to
set some environment variables which point SparkR to your Spark
installation. For example

# Set this to where Spark is installed
Sys.setenv(SPARK_HOME=/Users/shivaram/spark)
# This line loads SparkR from the installed directory
.libPaths(c(file.path(Sys.getenv(SPARK_HOME), R, lib), .libPaths()))
library(SparkR)
sc - sparkR.init(master=local)



-
Daniel Emaasit, 
Ph.D. Research Assistant
Transportation Research Center (TRC)
University of Nevada, Las Vegas
Las Vegas, NV 89154-4015
Cell: 615-649-2489
www.danielemaasit.com 
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/IDE-for-sparkR-tp4764p23115.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Embedding your own transformer in Spark.ml Pipleline

2015-06-02 Thread Dimp Bhat
Thanks for the quick reply Ram.  Will take a look at the Tokenizer code and
try it out.

Dimple

On Tue, Jun 2, 2015 at 10:42 AM, Ram Sriharsha sriharsha@gmail.com
wrote:

 Hi

 We are in the process of adding examples for feature transformations (
 https://issues.apache.org/jira/browse/SPARK-7546) and this should be
 available shortly on Spark Master.
 In the meanwhile, the best place to start would be to look at how the
 Tokenizer works here:

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala

 You need to implement the Transformer interface as above. In this case a
 UnaryTransformer since the feature transformer acts on one column,
 transforms it and outputs another column.

 and an example of how to build a pipeline that includes a feature
 transformer (the HashingTF is the feature transformer analogous to what you
 would build):

 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala

 but stay tuned, we should have examples in Python, Scala and Java soon

 Ram

 On Tue, Jun 2, 2015 at 10:19 AM, dimple dimp201...@gmail.com wrote:

 Hi,
 I would like to embed my own transformer in the Spark.ml Pipleline but do
 not see an example of it. Can someone share an example of which
 classes/interfaces I need to extend/implement in order to do so. Thanks.

 Dimple



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Issues with Spark Streaming and Manual Clock used for Unit Tests

2015-06-02 Thread mobsniuk
I have a situation where I have multiple tests that use Spark streaming with
Manual clock. The first run is OK and processes the data when I increment
the clock to the sliding window duration. The second test deviates and
doesn't process any data. The traces follows which indicates memory store is
called right after the receiver has finished loading the data for that set.
The second test only called the memory store after the batch has started
processing after the manual clock is incremented. 

The following is a trace that works.

15/06/02 10:39:18 INFO ManualLoadFileBasedReceiverDnsData: Took 78913544
nanos to load data
15/06/02 10:39:18 INFO MemoryStore: ensureFreeSpace(1624896) called with
curMem=14071, maxMem=2061647216
15/06/02 10:39:18 INFO MemoryStore: Block input-0-1433266758000 stored as
values in memory (estimated size 1586.8 KB, free 1964.6 MB)
15/06/02 10:39:18 INFO BlockManagerInfo: Added input-0-1433266758000 in
memory on localhost:54349 (size: 1586.8 KB, free: 1964.6 MB)
15/06/02 10:39:18 INFO BlockManagerMaster: Updated info of block
input-0-1433266758000
15/06/02 10:39:18 INFO BlockGenerator: Pushed block input-0-1433266758000
15/06/02 10:39:37 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is
2000
15/06/02 10:39:37 INFO FlatMapValuedDStream: Time 2000 ms is invalid as
zeroTime is 0 ms and slideDuration is 1 ms and difference is 2000 ms
15/06/02 10:39:37 INFO JobScheduler: No jobs added for time 2000 ms
15/06/02 10:39:37 INFO JobGenerator: Checkpointing graph for time 2000 ms
15/06/02 10:39:37 INFO DStreamGraph: Updating checkpoint data for time 2000
ms
15/06/02 10:39:38 INFO DStreamGraph: Updated checkpoint data for time 2000
ms
15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is
4000
15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is
6000
15/06/02 10:39:38 INFO FlatMapValuedDStream: Time 4000 ms is invalid as
zeroTime is 0 ms and slideDuration is 1 ms and difference is 4000 ms
15/06/02 10:39:38 INFO JobScheduler: No jobs added for time 4000 ms
15/06/02 10:39:38 INFO FlatMapValuedDStream: Time 6000 ms is invalid as
zeroTime is 0 ms and slideDuration is 1 ms and difference is 6000 ms
15/06/02 10:39:38 INFO JobScheduler: No jobs added for time 6000 ms
15/06/02 10:39:38 INFO JobGenerator: Checkpointing graph for time 4000 ms
15/06/02 10:39:38 INFO DStreamGraph: Updating checkpoint data for time 4000
ms
15/06/02 10:39:38 INFO DStreamGraph: Updated checkpoint data for time 4000
ms
15/06/02 10:39:38 INFO CheckpointWriter: Saving checkpoint for time 2000 ms
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-2000'
15/06/02 10:39:38 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/06/02 10:39:38 INFO CheckpointWriter: Checkpoint for time 2000 ms saved
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-2000',
took 18291958 bytes and 87 ms
15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is
8000
15/06/02 10:39:38 INFO JobGenerator: Checkpointing graph for time 6000 ms
15/06/02 10:39:38 INFO DStreamGraph: Updating checkpoint data for time 6000
ms
15/06/02 10:39:38 INFO CheckpointWriter: Saving checkpoint for time 4000 ms
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-4000'
15/06/02 10:39:38 INFO DStreamGraph: Updated checkpoint data for time 6000
ms
15/06/02 10:39:38 INFO CheckpointWriter: Checkpoint for time 4000 ms saved
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-4000',
took 18291960 bytes and 28 ms
15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is
1
15/06/02 10:39:38 INFO CheckpointWriter: Saving checkpoint for time 6000 ms
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-6000'
15/06/02 10:39:38 INFO DStreamGraph: Clearing checkpoint data for time 2000
ms
15/06/02 10:39:38 INFO DStreamGraph: Cleared checkpoint data for time 2000
ms
15/06/02 10:39:38 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/06/02 10:39:38 INFO FlatMapValuedDStream: Time 8000 ms is invalid as
zeroTime is 0 ms and slideDuration is 1 ms and difference is 8000 ms
15/06/02 10:39:38 INFO JobScheduler: No jobs added for time 8000 ms
15/06/02 10:39:38 INFO DStreamGraph: Clearing checkpoint data for time 4000
ms
15/06/02 10:39:38 INFO DStreamGraph: Cleared checkpoint data for time 4000
ms
15/06/02 10:39:38 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/06/02 10:39:38 INFO StateDStream: Time 0 ms is invalid as zeroTime is 0
ms and slideDuration is 1 ms and difference is 0 ms
15/06/02 10:39:38 INFO FlatMappedDStream: Slicing from 2000 ms to 1 ms
(aligned to 2000 ms and 1 ms)
15/06/02 10:39:38 INFO CheckpointWriter: Checkpoint for time 6000 ms saved
to file

Re: map - reduce only with disk

2015-06-02 Thread Matei Zaharia
You shouldn't have to persist the RDD at all, just call flatMap and reduce on 
it directly. If you try to persist it, that will try to load the original dat 
into memory, but here you are only scanning through it once.

Matei

 On Jun 2, 2015, at 2:09 AM, Octavian Ganea octavian.ga...@inf.ethz.ch wrote:
 
 Thanks,
 
 I was actually using reduceByKey, not groupByKey. 
 
 I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . 
 However, I got the same error as before, namely the error described here: 
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html
  
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html
 
 My task is to count the frequencies of pairs of words that occur in a set of 
 documents at least 5 times. I know that this final output is sparse and 
 should comfortably fit in memory. However, the intermediate pairs that are 
 spilled by flatMap might need to be stored on the disk, but I don't 
 understand why the persist option does not work and my job fails.
 
 My code:
 
 rdd.persist(StorageLevel.MEMORY_AND_DISK)
  .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type 
 ((word1,word2) , 1)
 .reduceByKey((a,b) = (a + b).toShort)
 .filter({case((x,y),count) = count = 5})
  
 
 My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One 
 node I keep for the master, 7 nodes for the workers.
 
 my conf:
 
 conf.set(spark.cores.max, 128)
 conf.set(spark.akka.frameSize, 1024)
 conf.set(spark.executor.memory, 115g)
 conf.set(spark.shuffle.file.buffer.kb, 1000)
 
 my spark-env.sh:
  ulimit -n 20
  SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit 
 -XX:-UseCompressedOops
  SPARK_DRIVER_MEMORY=129G
 
 spark version: 1.1.1
 
 Thank you a lot for your help!
 
 
 2015-06-02 4:40 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com 
 mailto:matei.zaha...@gmail.com:
 As long as you don't use cache(), these operations will go from disk to disk, 
 and will only use a fixed amount of memory to build some intermediate 
 results. However, note that because you're using groupByKey, that needs the 
 values for each key to all fit in memory at once. In this case, if you're 
 going to reduce right after, you should use reduceByKey, which will be more 
 efficient.
 
 Matei
 
  On Jun 1, 2015, at 2:21 PM, octavian.ganea octavian.ga...@inf.ethz.ch 
  mailto:octavian.ga...@inf.ethz.ch wrote:
 
  Dear all,
 
  Does anyone know how can I force Spark to use only the disk when doing a
  simple flatMap(..).groupByKey.reduce(_ + _) ? Thank you!
 
 
 
  --
  View this message in context: 
  http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html
   
  http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
  mailto:user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org 
  mailto:user-h...@spark.apache.org
 
 
 
 
 
 -- 
 Octavian Ganea
 
 Research assistant at ETH Zurich
 octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch
 http://da.inf.ethz.ch/people/OctavianGanea/ 
 http://da.inf.ethz.ch/people/OctavianGanea/



Re: Compute Median in Spark Dataframe

2015-06-02 Thread Olivier Girardot
Nice to hear from you Holden ! I ended up trying exactly that (Column) -
but I may have done it wrong :

In [*5*]: g.agg(Column(percentile(value, 0.5)))
Py4JError: An error occurred while calling o97.agg. Trace:
py4j.Py4JException: Method agg([class java.lang.String, class
scala.collection.immutable.Nil$]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)

Any idea ?

Olivier.
Le mar. 2 juin 2015 à 18:02, Holden Karau hol...@pigscanfly.ca a écrit :

 Not super easily, the GroupedData class uses a strToExpr function which
 has a pretty limited set of functions so we cant pass in the name of an
 arbitrary hive UDAF (unless I'm missing something). We can instead
 construct an column with the expression you want and then pass it in to
 agg() that way (although then you need to call the hive UDAF there). There
 are some private classes in hiveUdfs.scala which expose hiveUdaf's as Spark
 SQL AggregateExpressions, but they are private.

 On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 I've finally come to the same conclusion, but isn't there any way to call
 this Hive UDAFs from the agg(percentile(key,0.5)) ??

 Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a
 écrit :

 Like this...sqlContext should be a HiveContext instance

 case class KeyValue(key: Int, value: String)
 val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF
 df.registerTempTable(table)
 sqlContext.sql(select percentile(key,0.5) from table).show()

 ​

 On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Hi everyone,
 Is there any way to compute a median on a column using Spark's
 Dataframe. I know you can use stats in a RDD but I'd rather stay within a
 dataframe.
 Hive seems to imply that using ntile one can compute percentiles,
 quartiles and therefore a median.
 Does anyone have experience with this ?

 Regards,

 Olivier.





 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau



Re: Can't build Spark 1.3

2015-06-02 Thread Ted Yu
Have you run zinc during build ?

See build/mvn which installs zinc.

Cheers

On Tue, Jun 2, 2015 at 12:26 PM, Ritesh Kumar Singh 
riteshoneinamill...@gmail.com wrote:

 It did hang for me too. High RAM consumption during build. Had to free a
 lot of RAM and introduce swap memory just to get it build in my 3rd attempt.
 Everything else looks fine. You can download the prebuilt versions from
 the Spark homepage to save yourself from all this trouble.

 Thanks,
 Ritesh



Re: Compute Median in Spark Dataframe

2015-06-02 Thread Holden Karau
So for column you need to pass in a Java function, I have some sample code
which does this but it does terrible things to access Spark internals.

On Tuesday, June 2, 2015, Olivier Girardot o.girar...@lateral-thoughts.com
wrote:

 Nice to hear from you Holden ! I ended up trying exactly that (Column) -
 but I may have done it wrong :

 In [*5*]: g.agg(Column(percentile(value, 0.5)))
 Py4JError: An error occurred while calling o97.agg. Trace:
 py4j.Py4JException: Method agg([class java.lang.String, class
 scala.collection.immutable.Nil$]) does not exist
 at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)

 Any idea ?

 Olivier.
 Le mar. 2 juin 2015 à 18:02, Holden Karau hol...@pigscanfly.ca
 javascript:_e(%7B%7D,'cvml','hol...@pigscanfly.ca'); a écrit :

 Not super easily, the GroupedData class uses a strToExpr function which
 has a pretty limited set of functions so we cant pass in the name of an
 arbitrary hive UDAF (unless I'm missing something). We can instead
 construct an column with the expression you want and then pass it in to
 agg() that way (although then you need to call the hive UDAF there). There
 are some private classes in hiveUdfs.scala which expose hiveUdaf's as Spark
 SQL AggregateExpressions, but they are private.

 On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com
 javascript:_e(%7B%7D,'cvml','o.girar...@lateral-thoughts.com'); wrote:

 I've finally come to the same conclusion, but isn't there any way to
 call this Hive UDAFs from the agg(percentile(key,0.5)) ??

 Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com
 javascript:_e(%7B%7D,'cvml','yana.kadiy...@gmail.com'); a écrit :

 Like this...sqlContext should be a HiveContext instance

 case class KeyValue(key: Int, value: String)
 val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF
 df.registerTempTable(table)
 sqlContext.sql(select percentile(key,0.5) from table).show()

 ​

 On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com
 javascript:_e(%7B%7D,'cvml','o.girar...@lateral-thoughts.com');
 wrote:

 Hi everyone,
 Is there any way to compute a median on a column using Spark's
 Dataframe. I know you can use stats in a RDD but I'd rather stay within a
 dataframe.
 Hive seems to imply that using ntile one can compute percentiles,
 quartiles and therefore a median.
 Does anyone have experience with this ?

 Regards,

 Olivier.





 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau
Linked In: https://www.linkedin.com/in/holdenkarau


Re: Best strategy for Pandas - Spark

2015-06-02 Thread Olivier Girardot
Thanks for the answer, I'm currently doing exactly that.
I'll try to sum-up the usual Pandas = Spark Dataframe caveats soon.

Regards,

Olivier.

Le mar. 2 juin 2015 à 02:38, Davies Liu dav...@databricks.com a écrit :

 The second one sounds reasonable, I think.

 On Thu, Apr 30, 2015 at 1:42 AM, Olivier Girardot
 o.girar...@lateral-thoughts.com wrote:
  Hi everyone,
  Let's assume I have a complex workflow of more than 10 datasources as
 input
  - 20 computations (some creating intermediary datasets and some merging
  everything for the final computation) - some taking on average 1 minute
 to
  complete and some taking more than 30 minutes.
 
  What would be for you the best strategy to port this to Apache Spark ?
 
  Transform the whole flow into a Spark Job (PySpark or Scala)
  Transform only part of the flow (the heavy lifting ~30 min parts) using
 the
  same language (PySpark)
  Transform only part of the flow and pipe the rest from Scala to Python
 
  Regards,
 
  Olivier.



Re: Re: spark 1.3.1 jars in repo1.maven.org

2015-06-02 Thread Shixiong Zhu
Ryan - I sent a PR to fix your issue:
https://github.com/apache/spark/pull/6599

Edward - I have no idea why the following error happened. ContextCleaner
doesn't use any Hadoop API. Could you try Spark 1.3.0? It's supposed to
support both hadoop 1 and hadoop 2.

* Exception in thread Spark Context Cleaner
java.lang.NoClassDefFoundError: 0
at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149)


Best Regards,
Shixiong Zhu

2015-06-03 0:08 GMT+08:00 Ryan Williams ryan.blake.willi...@gmail.com:

 I think this is causing issues upgrading ADAM
 https://github.com/bigdatagenomics/adam to Spark 1.3.1 (cf. adam#690
 https://github.com/bigdatagenomics/adam/pull/690#issuecomment-107769383);
 attempting to build against Hadoop 1.0.4 yields errors like:

 2015-06-02 15:57:44 ERROR Executor:96 - Exception in task 0.0 in stage 0.0
 (TID 0)
 *java.lang.IncompatibleClassChangeError: Found class
 org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected*
 at
 org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95)
 at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 2015-06-02 15:57:44 WARN  TaskSetManager:71 - Lost task 0.0 in stage 0.0
 (TID 0, localhost): java.lang.IncompatibleClassChangeError: Found class
 org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected
 at
 org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95)
 at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 TaskAttemptContext is a class in Hadoop 1.0.4, but an interface in Hadoop
 2; Spark 1.3.1 expects the interface but is getting the class.

 It sounds like, while I *can* depend on Spark 1.3.1 and Hadoop 1.0.4, I
 then need to hope that I don't exercise certain Spark code paths that run
 afoul of differences between Hadoop 1 and 2; does that seem correct?

 Thanks!

 On Wed, May 20, 2015 at 1:52 PM Sean Owen so...@cloudera.com wrote:

 I don't think any of those problems are related to Hadoop. Have you
 looked at userClassPathFirst settings?

 On Wed, May 20, 2015 at 6:46 PM, Edward Sargisson ejsa...@gmail.com
 wrote:

 Hi Sean and Ted,
 Thanks for your replies.

 I don't have our current problems nicely written up as good questions
 yet. I'm still sorting out classpath issues, etc.
 In case it is of help, I'm seeing:
 * Exception in thread Spark Context Cleaner
 java.lang.NoClassDefFoundError: 0
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149)
 * We've been having clashing dependencies between a colleague and I
 because of the aforementioned classpath issue
 * The clashing dependencies are also causing issues with what jetty
 libraries are available in the classloader from Spark and don't clash with
 existing libraries we have.

 More anon,

 Cheers,
 Edward



  Original Message 
  Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20 00:38
 From: Sean Owen so...@cloudera.com To: Edward Sargisson 
 esa...@pobox.com Cc: user user@spark.apache.org


 Yes, the published artifacts can only refer to one version of anything
 (OK, modulo publishing a large number of variants under classifiers).

 You aren't intended to rely on Spark's transitive dependencies for
 anything. Compiling against the Spark API has no relation to what
 version of Hadoop it binds against because it's not part of any API.
 You mark the Spark dependency even as provided in your build and get
 all the Spark/Hadoop bindings at runtime from our cluster.

 What problem are you experiencing?


 On Wed, May 20, 2015 at 2:17 AM, Edward Sargisson esa...@pobox.com
 wrote:

 Hi,
 

Re: Re: spark 1.3.1 jars in repo1.maven.org

2015-06-02 Thread Ryan Williams
Thanks so much Shixiong! This is great.

On Tue, Jun 2, 2015 at 8:26 PM Shixiong Zhu zsxw...@gmail.com wrote:

 Ryan - I sent a PR to fix your issue:
 https://github.com/apache/spark/pull/6599

 Edward - I have no idea why the following error happened. ContextCleaner
 doesn't use any Hadoop API. Could you try Spark 1.3.0? It's supposed to
 support both hadoop 1 and hadoop 2.


 * Exception in thread Spark Context Cleaner
 java.lang.NoClassDefFoundError: 0
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149)


 Best Regards,
 Shixiong Zhu

 2015-06-03 0:08 GMT+08:00 Ryan Williams ryan.blake.willi...@gmail.com:

 I think this is causing issues upgrading ADAM
 https://github.com/bigdatagenomics/adam to Spark 1.3.1 (cf. adam#690
 https://github.com/bigdatagenomics/adam/pull/690#issuecomment-107769383);
 attempting to build against Hadoop 1.0.4 yields errors like:

 2015-06-02 15:57:44 ERROR Executor:96 - Exception in task 0.0 in stage
 0.0 (TID 0)
 *java.lang.IncompatibleClassChangeError: Found class
 org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected*
 at
 org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95)
 at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 2015-06-02 15:57:44 WARN  TaskSetManager:71 - Lost task 0.0 in stage 0.0
 (TID 0, localhost): java.lang.IncompatibleClassChangeError: Found class
 org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected
 at
 org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95)
 at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 TaskAttemptContext is a class in Hadoop 1.0.4, but an interface in Hadoop
 2; Spark 1.3.1 expects the interface but is getting the class.

 It sounds like, while I *can* depend on Spark 1.3.1 and Hadoop 1.0.4, I
 then need to hope that I don't exercise certain Spark code paths that run
 afoul of differences between Hadoop 1 and 2; does that seem correct?

 Thanks!

 On Wed, May 20, 2015 at 1:52 PM Sean Owen so...@cloudera.com wrote:

 I don't think any of those problems are related to Hadoop. Have you
 looked at userClassPathFirst settings?

 On Wed, May 20, 2015 at 6:46 PM, Edward Sargisson ejsa...@gmail.com
 wrote:

 Hi Sean and Ted,
 Thanks for your replies.

 I don't have our current problems nicely written up as good questions
 yet. I'm still sorting out classpath issues, etc.
 In case it is of help, I'm seeing:
 * Exception in thread Spark Context Cleaner
 java.lang.NoClassDefFoundError: 0
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149)
 * We've been having clashing dependencies between a colleague and I
 because of the aforementioned classpath issue
 * The clashing dependencies are also causing issues with what jetty
 libraries are available in the classloader from Spark and don't clash with
 existing libraries we have.

 More anon,

 Cheers,
 Edward



  Original Message 
  Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20
 00:38 From: Sean Owen so...@cloudera.com To: Edward Sargisson 
 esa...@pobox.com Cc: user user@spark.apache.org


 Yes, the published artifacts can only refer to one version of anything
 (OK, modulo publishing a large number of variants under classifiers).

 You aren't intended to rely on Spark's transitive dependencies for
 anything. Compiling against the Spark API has no relation to what
 version of Hadoop it binds against because it's not part of any API.
 You mark the Spark dependency even as provided in your build and get
 all the Spark/Hadoop bindings at runtime from our cluster.


Re: Spark 1.4 YARN Application Master fails with 500 connect refused

2015-06-02 Thread Night Wolf
 Just testing with Spark 1.3, it looks like it sets the proxy correctly to
be the YARN RM host (0101)

15/06/03 10:34:19 INFO yarn.ApplicationMaster: Registered signal handlers
for [TERM, HUP, INT]
15/06/03 10:34:20 INFO yarn.ApplicationMaster: ApplicationAttemptId:
appattempt_1432690361766_0596_01
15/06/03 10:34:20 INFO spark.SecurityManager: Changing view acls to: nw
15/06/03 10:34:20 INFO spark.SecurityManager: Changing modify acls to: nw
15/06/03 10:34:20 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(nw); users with modify permissions: Set(nw)
15/06/03 10:34:20 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/06/03 10:34:21 INFO Remoting: Starting remoting
15/06/03 10:34:21 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkYarnAM@qtausc-pphd0137.hadoop.local:43972]
15/06/03 10:34:21 INFO util.Utils: Successfully started service
'sparkYarnAM' on port 43972.
15/06/03 10:34:21 INFO yarn.ApplicationMaster: Waiting for Spark driver to
be reachable.
15/06/03 10:34:21 INFO yarn.ApplicationMaster: Driver now available:
edge-node-77.skynet.hadoop:36387
15/06/03 10:34:21 INFO yarn.ApplicationMaster: Listen to driver:
akka.tcp://sparkDriver@edge-node-77.skynet.hadoop:36387/user/YarnScheduler
*15/06/03 10:34:21 INFO yarn.ApplicationMaster: Add WebUI Filter.
AddWebUIFilter(org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,Map(PROXY_HOSTS
- qtausc-pphd0101.hadoop.local, PROXY_URI_BASES -
http://qtausc-pphd0101.hadoop.local:8088/proxy/application_1432690361766_0596
http://qtausc-pphd0101.hadoop.local:8088/proxy/application_1432690361766_0596),/proxy/application_1432690361766_0596)*
15/06/03 10:34:21 INFO yarn.YarnRMClient: Registering the ApplicationMaster
15/06/03 10:34:21 INFO yarn.YarnAllocator: Will request 2 executor
containers, each with 1 cores and 1408 MB memory including 384 MB overhead
15/06/03 10:34:21 INFO yarn.YarnAllocator: Container request (host: Any,
capability: memory:1408, vCores:1, disks:0.0)
15/06/03 10:34:21 INFO yarn.YarnAllocator: Container request (host: Any,
capability: memory:1408, vCores:1, disks:0.0)
15/06/03 10:34:21 INFO yarn.ApplicationMaster: Started progress reporter
thread - sleep time : 5000
15/06/03 10:34:21 INFO impl.AMRMClientImpl: Received new token for :
qtausc-pphd0151.hadoop.local:50941
15/06/03 10:34:21 INFO yarn.YarnAllocator: Launching container
container_1432690361766_0596_01_02 for on host
qtausc-pphd0151.hadoop.local
15/06/03 10:34:21 INFO yarn.YarnAllocator: Launching ExecutorRunnable.
driverUrl: 
akka.tcp://sparkDriver@edge-node-77.skynet.hadoop:36387/user/CoarseGrainedScheduler,
 executorHostname: qtausc-pphd0151.hadoop.local
15/06/03 10:34:21 INFO yarn.YarnAllocator: Received 1 containers from YARN,
launching executors on 1 of them.
15/06/03 10:34:21 INFO yarn.ExecutorRunnable: Starting Executor Container
15/06/03 10:34:21 INFO impl.ContainerManagementProtocolProxy:
yarn.client.max-nodemanagers-proxies : 500
15/06/03 10:34:21 INFO yarn.ExecutorRunnable: Setting up
ContainerLaunchContext
15/06/03 10:34:21 INFO yarn.ExecutorRunnable: Preparing Local resources
15/06/03 10:34:21 INFO yarn.ExecutorRunnable: Prepared Local resources
Map(__spark__.jar - resource { scheme: maprfs port: -1 file:
/user/nw/.sparkStaging/application_1432690361766_0596/spark-assembly-1.3.1-hadoop2.5.1-mapr-1501.jar
} size: 130013450 timestamp: 1433291656330 type: FILE visibility: PRIVATE)
15/06/03 10:34:21 INFO yarn.ExecutorRunnable: Setting up executor with
environment: Map(CLASSPATH -
{{PWD}}CPS{{PWD}}/__spark__.jarCPS$HADOOP_CONF_DIRCPS$HADOOP_COMMON_HOME/share/hadoop/common/*CPS$HADOOP_COMMON_HOME/share/hadoop/common/lib/*CPS$HADOOP_HDFS_HOME/share/hadoop/hdfs/*CPS$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*CPS$HADOOP_YARN_HOME/share/hadoop/yarn/*CPS$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*CPS/opt/mapr/lib/*:/opt/mapr/hadoop/hadoop-2.5.1/share/hadoop/yarn/*:/opt/mapr/hadoop/hadoop-2.5.1/share/hadoop/common/lib/*:/opt/mapr/hive/hive-current/lib/*,
SPARK_LOG_URL_STDERR -
http://qtausc-pphd0151.hadoop.local:8042/node/containerlogs/container_1432690361766_0596_01_02/nw/stderr?start=0,
SPARK_DIST_CLASSPATH -
/opt/mapr/lib/*:/opt/mapr/hadoop/hadoop-2.5.1/share/hadoop/yarn/*:/opt/mapr/hadoop/hadoop-2.5.1/share/hadoop/common/lib/*:/opt/mapr/hive/hive-current/lib/*,
SPARK_YARN_STAGING_DIR - .sparkStaging/application_1432690361766_0596,
SPARK_YARN_CACHE_FILES_FILE_SIZES - 130013450, SPARK_USER - nw,
SPARK_YARN_CACHE_FILES_VISIBILITIES - PRIVATE, SPARK_YARN_MODE - true,
SPARK_YARN_CACHE_FILES_TIME_STAMPS - 1433291656330, SPARK_LOG_URL_STDOUT
-
http://qtausc-pphd0151.hadoop.local:8042/node/containerlogs/container_1432690361766_0596_01_02/nw/stdout?start=0,
SPARK_YARN_CACHE_FILES -
maprfs:/user/nw/.sparkStaging/application_1432690361766_0596/spark-assembly-1.3.1-hadoop2.5.1-mapr-1501.jar#__spark__.jar)
15/06/03 10:34:21 INFO yarn.ExecutorRunnable: Setting up executor with
commands: 

Re: Spark 1.4 YARN Application Master fails with 500 connect refused

2015-06-02 Thread Marcelo Vanzin
That code hasn't changed at all between 1.3 and 1.4; it also has been
working fine for me.

Are you sure you're using exactly the same Hadoop libraries (since you're
building with -Phadoop-provided) and Hadoop configuration in both cases?

On Tue, Jun 2, 2015 at 5:29 PM, Night Wolf nightwolf...@gmail.com wrote:

 Hi all,

 Trying out Spark 1.4 on MapR Hadoop 2.5.1 running in yarn-client mode.
 Seems the application master doesn't work anymore, I get a 500 connect
 refused, even when I hit the IP/port of the spark UI directly. The logs
 don't show much.

 I build spark with Java 6, hive  scala 2.10 and 2.11. I've tried with and
 without -Phadoop-provided

 *Build command;*

 ./make-distribution.sh --name mapr4.0.2_yarn_j6_2.10 --tgz -Pyarn -Pmapr4
 -Phadoop-2.4 -Pmapr4 -Phive -Phadoop-provided
 -Dhadoop.version=2.5.1-mapr-1501 -Dyarn.version=2.5.1-mapr-1501 -DskipTests
 -e -X

 *Logs from spark shell;*

 15/06/03 00:10:56 INFO server.AbstractConnector: Started
 SelectChannelConnector@0.0.0.0:4040
 15/06/03 00:10:56 INFO util.Utils: Successfully started service 'SparkUI'
 on port 4040.
 15/06/03 00:10:56 INFO ui.SparkUI: Started SparkUI at
 http://172.31.10.14:4040
 15/06/03 00:10:57 INFO yarn.Client: Requesting a new application from
 cluster with 71 NodeManagers
 15/06/03 00:10:57 INFO yarn.Client: Verifying our application has not
 requested more than the maximum memory capability of the cluster (112640 MB
 per container)
 15/06/03 00:10:57 INFO yarn.Client: Will allocate AM container, with 896
 MB memory including 384 MB overhead
 15/06/03 00:10:57 INFO yarn.Client: Setting up container launch context
 for our AM
 15/06/03 00:10:57 INFO yarn.Client: Preparing resources for our AM
 container
 15/06/03 00:10:57 INFO yarn.Client: Uploading resource
 file:///apps/spark/spark-1.4.0-SNAPSHOT-bin-mapr4.0.2_yarn_j6_2.11/lib/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.1-mapr-1501.jar
 -
 maprfs:/user/nw/.sparkStaging/application_1432690361766_0593/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.1-mapr-1501.jar
 15/06/03 00:10:58 INFO yarn.Client: Uploading resource
 file:/tmp/spark-5e42f904-ff83-4c93-bd35-4c3e20226a8a/__hadoop_conf__983379693214711.zip
 -
 maprfs:/user/nw/.sparkStaging/application_1432690361766_0593/__hadoop_conf__983379693214711.zip
 15/06/03 00:10:58 INFO yarn.Client: Setting up the launch environment for
 our AM container
 15/06/03 00:10:58 INFO spark.SecurityManager: Changing view acls to: nw
 15/06/03 00:10:58 INFO spark.SecurityManager: Changing modify acls to: nw
 15/06/03 00:10:58 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(nw); users with modify permissions: Set(nw)
 15/06/03 00:10:58 INFO yarn.Client: Submitting application 593 to
 ResourceManager
 15/06/03 00:10:58 INFO security.ExternalTokenManagerFactory: Initialized
 external token manager class -
 com.mapr.hadoop.yarn.security.MapRTicketManager
 15/06/03 00:10:58 INFO impl.YarnClientImpl: Submitted application
 application_1432690361766_0593
 15/06/03 00:10:59 INFO yarn.Client: Application report for
 application_1432690361766_0593 (state: ACCEPTED)
 15/06/03 00:10:59 INFO yarn.Client:
  client token: N/A
  diagnostics: N/A
  ApplicationMaster host: N/A
  ApplicationMaster RPC port: -1
  queue: default
  start time: 1433290258143
  final status: UNDEFINED
  tracking URL:
 http://qtausc-pphd0101.hadoop.local:8088/proxy/application_1432690361766_0593/
  user: nw
 15/06/03 00:11:00 INFO yarn.Client: Application report for
 application_1432690361766_0593 (state: ACCEPTED)
 15/06/03 00:11:01 INFO yarn.Client: Application report for
 application_1432690361766_0593 (state: ACCEPTED)
 15/06/03 00:11:02 INFO yarn.Client: Application report for
 application_1432690361766_0593 (state: ACCEPTED)
 15/06/03 00:11:03 INFO yarn.Client: Application report for
 application_1432690361766_0593 (state: ACCEPTED)
 15/06/03 00:11:03 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
 ApplicationMaster registered as AkkaRpcEndpointRef(Actor[akka.tcp://
 sparkYarnAM@192.168.81.167:36542/user/YarnAM#1631897818])
 15/06/03 00:11:03 INFO cluster.YarnClientSchedulerBackend: Add WebUI
 Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
 Map(PROXY_HOSTS - qtausc-pphd0167.hadoop.local, PROXY_URI_BASES -
 http://qtausc-pphd0167.hadoop.local:8088/proxy/application_1432690361766_0593),
 /proxy/application_1432690361766_0593
 15/06/03 00:11:03 INFO ui.JettyUtils: Adding filter:
 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
 15/06/03 00:11:04 INFO yarn.Client: Application report for
 application_1432690361766_0593 (state: RUNNING)
 15/06/03 00:11:04 INFO yarn.Client:
  client token: N/A
  diagnostics: N/A
  ApplicationMaster host: 192.168.81.167
  ApplicationMaster RPC port: 0
  queue: default
  start time: 1433290258143
  final status: UNDEFINED
  tracking URL:
 http://qtausc-pphd0101.hadoop.local:8088/proxy/application_1432690361766_0593/
  user: nw
 15/06/03 

Re: Re: spark 1.3.1 jars in repo1.maven.org

2015-06-02 Thread Sean Owen
We are having a separate discussion about this but, I don't understand why
this is a problem? You're supposed to build Spark for Hadoop 1 if you run
it on Hadoop 1 and I am not sure that is happening here, given the error. I
do not think this should change as I do not see that it fixes something.

Let's please concentrate the follow up on the JIRA since you already made
one.

On Wed, Jun 3, 2015 at 2:26 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 Ryan - I sent a PR to fix your issue:
 https://github.com/apache/spark/pull/6599

 Edward - I have no idea why the following error happened. ContextCleaner
 doesn't use any Hadoop API. Could you try Spark 1.3.0? It's supposed to
 support both hadoop 1 and hadoop 2.

 * Exception in thread Spark Context Cleaner
 java.lang.NoClassDefFoundError: 0
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149)


 Best Regards,
 Shixiong Zhu

 2015-06-03 0:08 GMT+08:00 Ryan Williams ryan.blake.willi...@gmail.com:

 I think this is causing issues upgrading ADAM
 https://github.com/bigdatagenomics/adam to Spark 1.3.1 (cf. adam#690
 https://github.com/bigdatagenomics/adam/pull/690#issuecomment-107769383);
 attempting to build against Hadoop 1.0.4 yields errors like:

 2015-06-02 15:57:44 ERROR Executor:96 - Exception in task 0.0 in stage
 0.0 (TID 0)
 *java.lang.IncompatibleClassChangeError: Found class
 org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected*
 at
 org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95)
 at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 2015-06-02 15:57:44 WARN  TaskSetManager:71 - Lost task 0.0 in stage 0.0
 (TID 0, localhost): java.lang.IncompatibleClassChangeError: Found class
 org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected
 at
 org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95)
 at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 TaskAttemptContext is a class in Hadoop 1.0.4, but an interface in Hadoop
 2; Spark 1.3.1 expects the interface but is getting the class.

 It sounds like, while I *can* depend on Spark 1.3.1 and Hadoop 1.0.4, I
 then need to hope that I don't exercise certain Spark code paths that run
 afoul of differences between Hadoop 1 and 2; does that seem correct?

 Thanks!

 On Wed, May 20, 2015 at 1:52 PM Sean Owen so...@cloudera.com wrote:

 I don't think any of those problems are related to Hadoop. Have you
 looked at userClassPathFirst settings?

 On Wed, May 20, 2015 at 6:46 PM, Edward Sargisson ejsa...@gmail.com
 wrote:

 Hi Sean and Ted,
 Thanks for your replies.

 I don't have our current problems nicely written up as good questions
 yet. I'm still sorting out classpath issues, etc.
 In case it is of help, I'm seeing:
 * Exception in thread Spark Context Cleaner
 java.lang.NoClassDefFoundError: 0
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149)
 * We've been having clashing dependencies between a colleague and I
 because of the aforementioned classpath issue
 * The clashing dependencies are also causing issues with what jetty
 libraries are available in the classloader from Spark and don't clash with
 existing libraries we have.

 More anon,

 Cheers,
 Edward



  Original Message 
  Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20
 00:38 From: Sean Owen so...@cloudera.com To: Edward Sargisson 
 esa...@pobox.com Cc: user user@spark.apache.org


 Yes, the published artifacts can only refer to one version of anything
 (OK, modulo publishing a large number of variants under 

Re: Can't build Spark

2015-06-02 Thread Mulugeta Mammo
Spark 1.3.1, Scala 2.11.6, Maven 3.3.3, I'm behind proxy, have set my proxy
settings in maven settings.

Thanks,

On Tue, Jun 2, 2015 at 2:54 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you give us some more information ?
 Such as:
 which Spark release you were building
 what command you used
 Scala version you used

 Thanks

 On Tue, Jun 2, 2015 at 2:50 PM, Mulugeta Mammo mulugeta.abe...@gmail.com
 wrote:

 building Spark is throwing errors, any ideas?


 [FATAL] Non-resolvable parent POM: Could not transfer artifact 
 org.apache:apache:pom:14 from/to central ( 
 http://repo.maven.apache.org/maven2): Error transferring file: 
 repo.maven.apache.org from  
 http://repo.maven.apache.org/maven2/org/apache/apache/14/apache-14.pom and 
 'parent.relativePath' points at wrong local POM @ line 21, column 11

 at 
 org.apache.maven.model.building.DefaultModelProblemCollector.newModelBuildingException(DefaultModelProblemCollector.java:195)
 at 
 org.apache.maven.model.building.DefaultModelBuilder.readParentExternally(DefaultModelBuilder.java:841)





Re: Embedding your own transformer in Spark.ml Pipleline

2015-06-02 Thread Dimp Bhat
I found this :
https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/ml/feature/Tokenizer.html
which indicates the Tokenizer did exist in Spark 1.2.0 then and not in
1.2.1?

On Tue, Jun 2, 2015 at 12:45 PM, Peter Rudenko petro.rude...@gmail.com
wrote:

  I'm afraid there's no such class for 1.2.1. This API was added to 1.3.0
 AFAIK.


 On 2015-06-02 21:40, Dimp Bhat wrote:

 Thanks Peter. Can you share the Tokenizer.java class for Spark 1.2.1.

  Dimple

 On Tue, Jun 2, 2015 at 10:51 AM, Peter Rudenko petro.rude...@gmail.com
 wrote:

  Hi Dimple,
 take a look to existing transformers:

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
 (*it's for spark-1.4)

 The idea is just to implement class that extends Transformer with
 HasInputCol with HasOutputCol (if your transformer 1:1 column
 transformer) and has

 def transform(dataset: DataFrame): DataFrame

 method.

 Thanks,
 Peter
 On 2015-06-02 20:19, dimple wrote:

 Hi,
 I would like to embed my own transformer in the Spark.ml Pipleline but do
 not see an example of it. Can someone share an example of which
 classes/interfaces I need to extend/implement in order to do so. Thanks.

 Dimple



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Can't build Spark

2015-06-02 Thread Ted Yu
Can you give us some more information ?
Such as:
which Spark release you were building
what command you used
Scala version you used

Thanks

On Tue, Jun 2, 2015 at 2:50 PM, Mulugeta Mammo mulugeta.abe...@gmail.com
wrote:

 building Spark is throwing errors, any ideas?


 [FATAL] Non-resolvable parent POM: Could not transfer artifact 
 org.apache:apache:pom:14 from/to central ( 
 http://repo.maven.apache.org/maven2): Error transferring file: 
 repo.maven.apache.org from  
 http://repo.maven.apache.org/maven2/org/apache/apache/14/apache-14.pom and 
 'parent.relativePath' points at wrong local POM @ line 21, column 11

 at 
 org.apache.maven.model.building.DefaultModelProblemCollector.newModelBuildingException(DefaultModelProblemCollector.java:195)
 at 
 org.apache.maven.model.building.DefaultModelBuilder.readParentExternally(DefaultModelBuilder.java:841)




Scripting with groovy

2015-06-02 Thread Paolo Platter
Hi all,

Has anyone tried to add Scripting capabilities to spark streaming using groovy?
I would like to stop the streaming context, update a transformation function 
written in groovy( for example to manipulate json ), restart the streaming 
context and obtain a new behavior without re-submit the application.

Is it possible? Do you think it makes sense or there is a smarter way to 
accomplish that?

Thanks
Paolo

Inviata dal mio Windows Phone


Re: map - reduce only with disk

2015-06-02 Thread Matei Zaharia
Yup, exactly.

All the workers will use local disk in addition to RAM, but maybe one thing you 
need to configure is the directory to use for that. It should be set trough 
spark.local.dir. By default it's /tmp, which on some machines is also in RAM, 
so that could be a problem. You should set it to a folder on a real disk, or 
even better, a comma-separated list of disks (e.g. /mnt1,/mnt2) if you have 
multiple disks.

Matei

 On Jun 2, 2015, at 1:03 PM, Octavian Ganea octavian.ga...@inf.ethz.ch wrote:
 
 Thanks a lot! 
 
 So my understanding is that you call persist only if you need to use the rdd 
 at least twice to compute different things. So, if I just need the RDD for a 
 single scan , like in a simple flatMap(..).reduceByKey(..).saveAsTextFile(..) 
 how do I force the slaves to use the hard-disk (in addition to the RAM) when 
 the RAM is full and not to fail like they do now?
 
 Thank you! 
 
 2015-06-02 21:25 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com 
 mailto:matei.zaha...@gmail.com:
 You shouldn't have to persist the RDD at all, just call flatMap and reduce on 
 it directly. If you try to persist it, that will try to load the original dat 
 into memory, but here you are only scanning through it once.
 
 Matei
 
 On Jun 2, 2015, at 2:09 AM, Octavian Ganea octavian.ga...@inf.ethz.ch 
 mailto:octavian.ga...@inf.ethz.ch wrote:
 
 Thanks,
 
 I was actually using reduceByKey, not groupByKey. 
 
 I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . 
 However, I got the same error as before, namely the error described here: 
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html
  
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html
 
 My task is to count the frequencies of pairs of words that occur in a set of 
 documents at least 5 times. I know that this final output is sparse and 
 should comfortably fit in memory. However, the intermediate pairs that are 
 spilled by flatMap might need to be stored on the disk, but I don't 
 understand why the persist option does not work and my job fails.
 
 My code:
 
 rdd.persist(StorageLevel.MEMORY_AND_DISK)
  .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type 
 ((word1,word2) , 1)
 .reduceByKey((a,b) = (a + b).toShort)
 .filter({case((x,y),count) = count = 5})
  
 
 My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One 
 node I keep for the master, 7 nodes for the workers.
 
 my conf:
 
 conf.set(spark.cores.max, 128)
 conf.set(spark.akka.frameSize, 1024)
 conf.set(spark.executor.memory, 115g)
 conf.set(spark.shuffle.file.buffer.kb, 1000)
 
 my spark-env.sh:
  ulimit -n 20
  SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit 
 -XX:-UseCompressedOops
  SPARK_DRIVER_MEMORY=129G
 
 spark version: 1.1.1
 
 Thank you a lot for your help!
 
 
 2015-06-02 4:40 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com 
 mailto:matei.zaha...@gmail.com:
 As long as you don't use cache(), these operations will go from disk to 
 disk, and will only use a fixed amount of memory to build some intermediate 
 results. However, note that because you're using groupByKey, that needs the 
 values for each key to all fit in memory at once. In this case, if you're 
 going to reduce right after, you should use reduceByKey, which will be more 
 efficient.
 
 Matei
 
  On Jun 1, 2015, at 2:21 PM, octavian.ganea octavian.ga...@inf.ethz.ch 
  mailto:octavian.ga...@inf.ethz.ch wrote:
 
  Dear all,
 
  Does anyone know how can I force Spark to use only the disk when doing a
  simple flatMap(..).groupByKey.reduce(_ + _) ? Thank you!
 
 
 
  --
  View this message in context: 
  http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html
   
  http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com 
  http://nabble.com/.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
  mailto:user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org 
  mailto:user-h...@spark.apache.org
 
 
 
 
 
 -- 
 Octavian Ganea
 
 Research assistant at ETH Zurich
 octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch
 http://da.inf.ethz.ch/people/OctavianGanea/ 
 http://da.inf.ethz.ch/people/OctavianGanea/
 
 
 
 
 -- 
 Octavian Ganea
 
 Research assistant at ETH Zurich
 octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch
 http://da.inf.ethz.ch/people/OctavianGanea/ 
 http://da.inf.ethz.ch/people/OctavianGanea/



Can't build Spark

2015-06-02 Thread Mulugeta Mammo
building Spark is throwing errors, any ideas?


[FATAL] Non-resolvable parent POM: Could not transfer artifact
org.apache:apache:pom:14 from/to central (
http://repo.maven.apache.org/maven2): Error transferring file:
repo.maven.apache.org from
http://repo.maven.apache.org/maven2/org/apache/apache/14/apache-14.pom
and 'parent.relativePath' points at wrong local POM @ line 21, column
11

at 
org.apache.maven.model.building.DefaultModelProblemCollector.newModelBuildingException(DefaultModelProblemCollector.java:195)
at 
org.apache.maven.model.building.DefaultModelBuilder.readParentExternally(DefaultModelBuilder.java:841)


Re: How to monitor Spark Streaming from Kafka?

2015-06-02 Thread Ruslan Dautkhanov
Nobody mentioned CM yet? Kafka is now supported by CM/CDH 5.4

http://www.cloudera.com/content/cloudera/en/documentation/cloudera-kafka/latest/PDF/cloudera-kafka.pdf




-- 
Ruslan Dautkhanov

On Mon, Jun 1, 2015 at 5:19 PM, Dmitry Goldenberg dgoldenberg...@gmail.com
wrote:

 Thank you, Tathagata, Cody, Otis.

 - Dmitry


 On Mon, Jun 1, 2015 at 6:57 PM, Otis Gospodnetic 
 otis.gospodne...@gmail.com wrote:

 I think you can use SPM - http://sematext.com/spm - it will give you all
 Spark and all Kafka metrics, including offsets broken down by topic, etc.
 out of the box.  I see more and more people using it to monitor various
 components in data processing pipelines, a la
 http://blog.sematext.com/2015/04/22/monitoring-stream-processing-tools-cassandra-kafka-and-spark/

 Otis

 On Mon, Jun 1, 2015 at 5:23 PM, dgoldenberg dgoldenberg...@gmail.com
 wrote:

 Hi,

 What are some of the good/adopted approached to monitoring Spark
 Streaming
 from Kafka?  I see that there are things like
 http://quantifind.github.io/KafkaOffsetMonitor, for example.  Do they
 all
 assume that Receiver-based streaming is used?

 Then Note that one disadvantage of this approach (Receiverless Approach,
 #2) is that it does not update offsets in Zookeeper, hence
 Zookeeper-based
 Kafka monitoring tools will not show progress. However, you can access
 the
 offsets processed by this approach in each batch and update Zookeeper
 yourself.

 The code sample, however, seems sparse. What do you need to do here? -
  directKafkaStream.foreachRDD(
  new FunctionJavaPairRDDlt;String, String, Void() {
  @Override
  public Void call(JavaPairRDDString, Integer rdd) throws
 IOException {
  OffsetRange[] offsetRanges =
 ((HasOffsetRanges)rdd).offsetRanges
  // offsetRanges.length = # of Kafka partitions being
 consumed
  ...
  return null;
  }
  }
  );

 and if these are updated, will KafkaOffsetMonitor work?

 Monitoring seems to center around the notion of a consumer group.  But in
 the receiverless approach, code on the Spark consumer side doesn't seem
 to
 expose a consumer group parameter.  Where does it go?  Can I/should I
 just
 pass in group.id as part of the kafkaParams HashMap?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Can't build Spark

2015-06-02 Thread Ted Yu
I ran dev/change-version-to-2.11.sh first.

I used the following command but didn't reproduce the error below:

mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package

My env: maven 3.3.1

Possibly the error was related to proxy setting.

FYI

On Tue, Jun 2, 2015 at 3:14 PM, Mulugeta Mammo mulugeta.abe...@gmail.com
wrote:

 Spark 1.3.1, Scala 2.11.6, Maven 3.3.3, I'm behind proxy, have set my
 proxy settings in maven settings.

 Thanks,

 On Tue, Jun 2, 2015 at 2:54 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you give us some more information ?
 Such as:
 which Spark release you were building
 what command you used
 Scala version you used

 Thanks

 On Tue, Jun 2, 2015 at 2:50 PM, Mulugeta Mammo mulugeta.abe...@gmail.com
  wrote:

 building Spark is throwing errors, any ideas?


 [FATAL] Non-resolvable parent POM: Could not transfer artifact 
 org.apache:apache:pom:14 from/to central ( 
 http://repo.maven.apache.org/maven2): Error transferring file: 
 repo.maven.apache.org from  
 http://repo.maven.apache.org/maven2/org/apache/apache/14/apache-14.pom and 
 'parent.relativePath' points at wrong local POM @ line 21, column 11

 at 
 org.apache.maven.model.building.DefaultModelProblemCollector.newModelBuildingException(DefaultModelProblemCollector.java:195)
 at 
 org.apache.maven.model.building.DefaultModelBuilder.readParentExternally(DefaultModelBuilder.java:841)






Behavior of the spark.streaming.kafka.maxRatePerPartition config param?

2015-06-02 Thread dgoldenberg
Hi,

Could someone explain the behavior of the
spark.streaming.kafka.maxRatePerPartition parameter? The doc says An
important (configuration) is spark.streaming.kafka.maxRatePerPartition which
is the maximum rate at which each Kafka partition will be read by (the)
direct API.

What is the default behavior for this parameter? From some testing it
appears that with it not being set, the RDD size tends to be quite low. With
it set, we're seeing the consumer picking up items off the topic quite more
actively, e.g. -Dspark.streaming.kafka.maxRatePerPartition=1000 in
--driver-java-options.

Does this parameter set the RDD size to a very low value? 

seems to be defaulting to 0... but what's the effect of that?
  protected val maxMessagesPerPartition: Option[Long] = {
val ratePerSec = context.sparkContext.getConf.getInt(
  spark.streaming.kafka.maxRatePerPartition, 0)
if (ratePerSec  0) {
  val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble /
1000
  Some((secsPerBatch * ratePerSec).toLong)
} else {
  None
}
  }
  // limits the maximum number of messages per partition
  protected def clamp(
leaderOffsets: Map[TopicAndPartition, LeaderOffset]):
Map[TopicAndPartition, LeaderOffset] = {
maxMessagesPerPartition.map { mmp =
  leaderOffsets.map { case (tp, lo) =
tp - lo.copy(offset = Math.min(currentOffsets(tp) + mmp,
lo.offset))
  }
}.getOrElse(leaderOffsets)
  }

what would we limit by default?  And once Spark Streaming does pick up
messages, would it be at the maximum value? does it ever fall below max even
if there are max or more than max in the topic? Thanks.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Behavior-of-the-spark-streaming-kafka-maxRatePerPartition-config-param-tp23117.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-06-02 Thread Ji ZHANG
Hi,

Thanks for you information. I'll give spark1.4 a try when it's released.

On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com wrote:

 Could you try it out with Spark 1.4 RC3?

 Also pinging, Cloudera folks, they may be aware of something.

 BTW, the way I have debugged memory leaks in the past is as follows.

 Run with a small driver memory, say 1 GB. Periodically (maybe a script),
 take snapshots of histogram and also do memory dumps. Say every hour. And
 then compare the difference between two histo/dumps that are few hours
 separated (more the better). Diffing histo is easy. Diff two dumps can be
 done in JVisualVM, it will show the diff in the objects that got added in
 the later dump. That makes it easy to debug what is not getting cleaned.

 TD


 On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Thanks for you reply. Here's the top 30 entries of jmap -histo:live
 result:

  num #instances #bytes  class name
 --
1: 40802  145083848  [B
2: 99264   12716112  methodKlass
3: 99264   12291480  constMethodKlass
4:  84729144816  constantPoolKlass
5:  84727625192  instanceKlassKlass
6:   1866097824
  [Lscala.concurrent.forkjoin.ForkJoinTask;
7:  70454804832  constantPoolCacheKlass
8:1391684453376  java.util.HashMap$Entry
9:  94273542512  methodDataKlass
   10:1413123391488
  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
   11:1354913251784  java.lang.Long
   12: 261922765496  [C
   13:   8131140560  [Ljava.util.HashMap$Entry;
   14:  89971061936  java.lang.Class
   15: 16022 851384  [[I
   16: 16447 789456  java.util.zip.Inflater
   17: 13855 723376  [S
   18: 17282 691280  java.lang.ref.Finalizer
   19: 25725 617400  java.lang.String
   20:   320 570368
  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
   21: 16066 514112
  java.util.concurrent.ConcurrentHashMap$HashEntry
   22: 12288 491520
  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
   23: 13343 426976
  java.util.concurrent.locks.ReentrantLock$NonfairSync
   24: 12288 396416
  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
   25: 16447 394728  java.util.zip.ZStreamRef
   26:   565 370080  [I
   27:   508 272288  objArrayKlassKlass
   28: 16233 259728  java.lang.Object
   29:   771 209232
  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
   30:  2524 192312  [Ljava.lang.Object;

 But as I mentioned above, the heap memory seems OK, the extra memory is
 consumed by some off-heap data. I can't find a way to figure out what is in
 there.

 Besides, I did some extra experiments, i.e. run the same program in
 difference environments to test whether it has off-heap memory issue:

 spark1.0 + standalone = no
 spark1.0 + yarn = no
 spark1.3 + standalone = no
 spark1.3 + yarn = yes

 I'm using CDH5.1, so the spark1.0 is provided by cdh, and
 spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

 I could use spark1.0 + yarn, but I can't find a way to handle the logs,
 level and rolling, so it'll explode the harddrive.

 Currently I'll stick to spark1.0 + standalone, until our ops team decides
 to upgrade cdh.



 On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com
 wrote:

 While you are running is it possible for you login into the YARN node
 and get histograms of live objects using jmap -histo:live. That may
 reveal something.


 On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Unfortunately, they're still growing, both driver and executors.

 I run the same job with local mode, everything is fine.

 On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you replace your counting part with this?

 logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



 Thanks
 Best Regards

 On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I wrote a simple test job, it only does very basic operations. for
 example:

 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
 Map(topic - 1)).map(_._2)
 val logs = lines.flatMap { line =
   try {
 Some(parse(line).extract[Impression])
   } catch {
 case _: Exception = None
   }
 }

 logs.filter(_.s_id  0).count.foreachRDD { rdd =
   rdd.foreachPartition { iter =
 iter.foreach(count = logger.info(count.toString))
   }
 }

 It receives messages from Kafka, parse the json, filter and count 

Application is always in process when I check out logs of completed application

2015-06-02 Thread amghost
I run spark application in spark standalone cluster with client deploy mode.
I want to check out the logs of my finished application, but I always get  a
page telling me Application history not found - Application xxx is still in
process.
I am pretty sure that the application has indeed completed because I can see
it in the Completed Applications list show by Spark WebUI, and I have also
found the log file with suffix .inprocessin the directory set by
spark.eventLog.dir in my spark-default.conf

Oh, BTW, I am using spark 1.3.0

So, is there anything I missed?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Application-is-always-in-process-when-I-check-out-logs-of-completed-application-tp23123.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-06-02 Thread Tathagata Das
Could you try it out with Spark 1.4 RC3?

Also pinging, Cloudera folks, they may be aware of something.

BTW, the way I have debugged memory leaks in the past is as follows.

Run with a small driver memory, say 1 GB. Periodically (maybe a script),
take snapshots of histogram and also do memory dumps. Say every hour. And
then compare the difference between two histo/dumps that are few hours
separated (more the better). Diffing histo is easy. Diff two dumps can be
done in JVisualVM, it will show the diff in the objects that got added in
the later dump. That makes it easy to debug what is not getting cleaned.

TD


On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Thanks for you reply. Here's the top 30 entries of jmap -histo:live result:

  num #instances #bytes  class name
 --
1: 40802  145083848  [B
2: 99264   12716112  methodKlass
3: 99264   12291480  constMethodKlass
4:  84729144816  constantPoolKlass
5:  84727625192  instanceKlassKlass
6:   1866097824
  [Lscala.concurrent.forkjoin.ForkJoinTask;
7:  70454804832  constantPoolCacheKlass
8:1391684453376  java.util.HashMap$Entry
9:  94273542512  methodDataKlass
   10:1413123391488
  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
   11:1354913251784  java.lang.Long
   12: 261922765496  [C
   13:   8131140560  [Ljava.util.HashMap$Entry;
   14:  89971061936  java.lang.Class
   15: 16022 851384  [[I
   16: 16447 789456  java.util.zip.Inflater
   17: 13855 723376  [S
   18: 17282 691280  java.lang.ref.Finalizer
   19: 25725 617400  java.lang.String
   20:   320 570368
  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
   21: 16066 514112
  java.util.concurrent.ConcurrentHashMap$HashEntry
   22: 12288 491520
  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
   23: 13343 426976
  java.util.concurrent.locks.ReentrantLock$NonfairSync
   24: 12288 396416
  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
   25: 16447 394728  java.util.zip.ZStreamRef
   26:   565 370080  [I
   27:   508 272288  objArrayKlassKlass
   28: 16233 259728  java.lang.Object
   29:   771 209232
  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
   30:  2524 192312  [Ljava.lang.Object;

 But as I mentioned above, the heap memory seems OK, the extra memory is
 consumed by some off-heap data. I can't find a way to figure out what is in
 there.

 Besides, I did some extra experiments, i.e. run the same program in
 difference environments to test whether it has off-heap memory issue:

 spark1.0 + standalone = no
 spark1.0 + yarn = no
 spark1.3 + standalone = no
 spark1.3 + yarn = yes

 I'm using CDH5.1, so the spark1.0 is provided by cdh, and
 spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

 I could use spark1.0 + yarn, but I can't find a way to handle the logs,
 level and rolling, so it'll explode the harddrive.

 Currently I'll stick to spark1.0 + standalone, until our ops team decides
 to upgrade cdh.



 On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote:

 While you are running is it possible for you login into the YARN node and
 get histograms of live objects using jmap -histo:live. That may reveal
 something.


 On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Unfortunately, they're still growing, both driver and executors.

 I run the same job with local mode, everything is fine.

 On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you replace your counting part with this?

 logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



 Thanks
 Best Regards

 On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I wrote a simple test job, it only does very basic operations. for
 example:

 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
 Map(topic - 1)).map(_._2)
 val logs = lines.flatMap { line =
   try {
 Some(parse(line).extract[Impression])
   } catch {
 case _: Exception = None
   }
 }

 logs.filter(_.s_id  0).count.foreachRDD { rdd =
   rdd.foreachPartition { iter =
 iter.foreach(count = logger.info(count.toString))
   }
 }

 It receives messages from Kafka, parse the json, filter and count the
 records, and then print it to logs.

 Thanks.


 On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Hi Zhang,

 Could you paste your 

Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-06-02 Thread Ji ZHANG
Hi,

Thanks for you reply. Here's the top 30 entries of jmap -histo:live result:

 num #instances #bytes  class name
--
   1: 40802  145083848  [B
   2: 99264   12716112  methodKlass
   3: 99264   12291480  constMethodKlass
   4:  84729144816  constantPoolKlass
   5:  84727625192  instanceKlassKlass
   6:   1866097824
 [Lscala.concurrent.forkjoin.ForkJoinTask;
   7:  70454804832  constantPoolCacheKlass
   8:1391684453376  java.util.HashMap$Entry
   9:  94273542512  methodDataKlass
  10:1413123391488
 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
  11:1354913251784  java.lang.Long
  12: 261922765496  [C
  13:   8131140560  [Ljava.util.HashMap$Entry;
  14:  89971061936  java.lang.Class
  15: 16022 851384  [[I
  16: 16447 789456  java.util.zip.Inflater
  17: 13855 723376  [S
  18: 17282 691280  java.lang.ref.Finalizer
  19: 25725 617400  java.lang.String
  20:   320 570368
 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
  21: 16066 514112
 java.util.concurrent.ConcurrentHashMap$HashEntry
  22: 12288 491520
 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
  23: 13343 426976
 java.util.concurrent.locks.ReentrantLock$NonfairSync
  24: 12288 396416
 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
  25: 16447 394728  java.util.zip.ZStreamRef
  26:   565 370080  [I
  27:   508 272288  objArrayKlassKlass
  28: 16233 259728  java.lang.Object
  29:   771 209232
 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
  30:  2524 192312  [Ljava.lang.Object;

But as I mentioned above, the heap memory seems OK, the extra memory is
consumed by some off-heap data. I can't find a way to figure out what is in
there.

Besides, I did some extra experiments, i.e. run the same program in
difference environments to test whether it has off-heap memory issue:

spark1.0 + standalone = no
spark1.0 + yarn = no
spark1.3 + standalone = no
spark1.3 + yarn = yes

I'm using CDH5.1, so the spark1.0 is provided by cdh, and
spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

I could use spark1.0 + yarn, but I can't find a way to handle the logs,
level and rolling, so it'll explode the harddrive.

Currently I'll stick to spark1.0 + standalone, until our ops team decides
to upgrade cdh.



On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote:

 While you are running is it possible for you login into the YARN node and
 get histograms of live objects using jmap -histo:live. That may reveal
 something.


 On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Unfortunately, they're still growing, both driver and executors.

 I run the same job with local mode, everything is fine.

 On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you replace your counting part with this?

 logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



 Thanks
 Best Regards

 On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I wrote a simple test job, it only does very basic operations. for
 example:

 val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic
 - 1)).map(_._2)
 val logs = lines.flatMap { line =
   try {
 Some(parse(line).extract[Impression])
   } catch {
 case _: Exception = None
   }
 }

 logs.filter(_.s_id  0).count.foreachRDD { rdd =
   rdd.foreachPartition { iter =
 iter.foreach(count = logger.info(count.toString))
   }
 }

 It receives messages from Kafka, parse the json, filter and count the
 records, and then print it to logs.

 Thanks.


 On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Zhang,

 Could you paste your code in a gist? Not sure what you are doing
 inside the code to fill up memory.

 Thanks
 Best Regards

 On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com
 wrote:

 Hi,

 Yes, I'm using createStream, but the storageLevel param is by default
 MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
 don't think Kafka messages will be cached in driver.


 On Thu, May 28, 2015 at 12:24 AM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Are you using the createStream or createDirectStream api? If its the
 former, you can try setting the StorageLevel to MEMORY_AND_DISK (it 
 might
 slow things down though). Another way would be to try the later one.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG 

Re: in GraphX,program with Pregel runs slower and slower after several iterations

2015-06-02 Thread Cheuk Lam
I've been encountering something similar too.  I suspected that was related
to the lineage growth of the graph/RDDs.  So I checkpoint the graph every 60
Pregel rounds, after doing which my program doesn't slow down any more
(except that every checkpoint takes some extra time).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/in-GraphX-program-with-Pregel-runs-slower-and-slower-after-several-iterations-tp23121p23122.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.4 YARN Application Master fails with 500 connect refused

2015-06-02 Thread Night Wolf
Thanks Marcelo - looks like it was my fault. Seems when we deployed the new
version of spark it was picking up the wrong yarn site and setting the
wrong proxy host. All good now!



On Wed, Jun 3, 2015 at 11:01 AM, Marcelo Vanzin van...@cloudera.com wrote:

 That code hasn't changed at all between 1.3 and 1.4; it also has been
 working fine for me.

 Are you sure you're using exactly the same Hadoop libraries (since you're
 building with -Phadoop-provided) and Hadoop configuration in both cases?

 On Tue, Jun 2, 2015 at 5:29 PM, Night Wolf nightwolf...@gmail.com wrote:

 Hi all,

 Trying out Spark 1.4 on MapR Hadoop 2.5.1 running in yarn-client mode.
 Seems the application master doesn't work anymore, I get a 500 connect
 refused, even when I hit the IP/port of the spark UI directly. The logs
 don't show much.

 I build spark with Java 6, hive  scala 2.10 and 2.11. I've tried with
 and without -Phadoop-provided

 *Build command;*

 ./make-distribution.sh --name mapr4.0.2_yarn_j6_2.10 --tgz -Pyarn -Pmapr4
 -Phadoop-2.4 -Pmapr4 -Phive -Phadoop-provided
 -Dhadoop.version=2.5.1-mapr-1501 -Dyarn.version=2.5.1-mapr-1501 -DskipTests
 -e -X

 *Logs from spark shell;*

 15/06/03 00:10:56 INFO server.AbstractConnector: Started
 SelectChannelConnector@0.0.0.0:4040
 15/06/03 00:10:56 INFO util.Utils: Successfully started service 'SparkUI'
 on port 4040.
 15/06/03 00:10:56 INFO ui.SparkUI: Started SparkUI at
 http://172.31.10.14:4040
 15/06/03 00:10:57 INFO yarn.Client: Requesting a new application from
 cluster with 71 NodeManagers
 15/06/03 00:10:57 INFO yarn.Client: Verifying our application has not
 requested more than the maximum memory capability of the cluster (112640 MB
 per container)
 15/06/03 00:10:57 INFO yarn.Client: Will allocate AM container, with 896
 MB memory including 384 MB overhead
 15/06/03 00:10:57 INFO yarn.Client: Setting up container launch context
 for our AM
 15/06/03 00:10:57 INFO yarn.Client: Preparing resources for our AM
 container
 15/06/03 00:10:57 INFO yarn.Client: Uploading resource
 file:///apps/spark/spark-1.4.0-SNAPSHOT-bin-mapr4.0.2_yarn_j6_2.11/lib/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.1-mapr-1501.jar
 -
 maprfs:/user/nw/.sparkStaging/application_1432690361766_0593/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.1-mapr-1501.jar
 15/06/03 00:10:58 INFO yarn.Client: Uploading resource
 file:/tmp/spark-5e42f904-ff83-4c93-bd35-4c3e20226a8a/__hadoop_conf__983379693214711.zip
 -
 maprfs:/user/nw/.sparkStaging/application_1432690361766_0593/__hadoop_conf__983379693214711.zip
 15/06/03 00:10:58 INFO yarn.Client: Setting up the launch environment for
 our AM container
 15/06/03 00:10:58 INFO spark.SecurityManager: Changing view acls to: nw
 15/06/03 00:10:58 INFO spark.SecurityManager: Changing modify acls to: nw
 15/06/03 00:10:58 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(nw); users with modify permissions: Set(nw)
 15/06/03 00:10:58 INFO yarn.Client: Submitting application 593 to
 ResourceManager
 15/06/03 00:10:58 INFO security.ExternalTokenManagerFactory: Initialized
 external token manager class -
 com.mapr.hadoop.yarn.security.MapRTicketManager
 15/06/03 00:10:58 INFO impl.YarnClientImpl: Submitted application
 application_1432690361766_0593
 15/06/03 00:10:59 INFO yarn.Client: Application report for
 application_1432690361766_0593 (state: ACCEPTED)
 15/06/03 00:10:59 INFO yarn.Client:
  client token: N/A
  diagnostics: N/A
  ApplicationMaster host: N/A
  ApplicationMaster RPC port: -1
  queue: default
  start time: 1433290258143
  final status: UNDEFINED
  tracking URL:
 http://qtausc-pphd0101.hadoop.local:8088/proxy/application_1432690361766_0593/
  user: nw
 15/06/03 00:11:00 INFO yarn.Client: Application report for
 application_1432690361766_0593 (state: ACCEPTED)
 15/06/03 00:11:01 INFO yarn.Client: Application report for
 application_1432690361766_0593 (state: ACCEPTED)
 15/06/03 00:11:02 INFO yarn.Client: Application report for
 application_1432690361766_0593 (state: ACCEPTED)
 15/06/03 00:11:03 INFO yarn.Client: Application report for
 application_1432690361766_0593 (state: ACCEPTED)
 15/06/03 00:11:03 INFO
 cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster
 registered as AkkaRpcEndpointRef(Actor[akka.tcp://
 sparkYarnAM@192.168.81.167:36542/user/YarnAM#1631897818])
 15/06/03 00:11:03 INFO cluster.YarnClientSchedulerBackend: Add WebUI
 Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
 Map(PROXY_HOSTS - qtausc-pphd0167.hadoop.local, PROXY_URI_BASES -
 http://qtausc-pphd0167.hadoop.local:8088/proxy/application_1432690361766_0593),
 /proxy/application_1432690361766_0593
 15/06/03 00:11:03 INFO ui.JettyUtils: Adding filter:
 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
 15/06/03 00:11:04 INFO yarn.Client: Application report for
 application_1432690361766_0593 (state: RUNNING)
 15/06/03 00:11:04 INFO yarn.Client:
  client token: N/A
  diagnostics: N/A
 

Filter operation to return two RDDs at once.

2015-06-02 Thread ๏̯͡๏
I want to do this

val qtSessionsWithQt = rawQtSession.filter(_._2.qualifiedTreatmentId !=
NULL_VALUE)

val guidUidMapSessions = rawQtSession.filter(_._2.qualifiedTreatmentId
== NULL_VALUE)

This will run two different stages can this be done in one stage ?

val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession.*magicFilter*
(_._2.qualifiedTreatmentId != NULL_VALUE)




-- 
Deepak


How to create fewer output files for Spark job ?

2015-06-02 Thread ๏̯͡๏
I am running a series of spark functions with 9000 executors and its
resulting in 9000+ files that is execeeding the namespace file count qutota.

How can Spark be configured to use CombinedOutputFormat.
{code}

protected def writeOutputRecords(detailRecords:
RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) {

val writeJob = new Job()

val schema = SchemaUtil.outputSchema(_detail)

AvroJob.setOutputKeySchema(writeJob, schema)

detailRecords.saveAsNewAPIHadoopFile(outputDir,

  classOf[AvroKey[GenericRecord]],

  classOf[org.apache.hadoop.io.NullWritable],

  classOf[AvroKeyOutputFormat[GenericRecord]],

  writeJob.getConfiguration)

  }
{code}

-- 
Deepak


build jar with all dependencies

2015-06-02 Thread Pa Rö
hello community,

i have build a jar file from my spark app with maven (mvn clean compile
assembly:single) and the following pom file:

project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=
http://www.w3.org/2001/XMLSchema-instance;
  xsi:schemaLocation=http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd;
  modelVersion4.0.0/modelVersion

  groupIdmgm.tp.bigdata/groupId
  artifactIdma-spark/artifactId
  version0.0.1-SNAPSHOT/version
  packagingjar/packaging

  namema-spark/name
  urlhttp://maven.apache.org/url

  properties
project.build.sourceEncodingUTF-8/project.build.sourceEncoding
  /properties

  repositories
repository
  idcloudera/id
  urlhttps://repository.cloudera.com/artifactory/cloudera-repos//url
/repository
  /repositories

  dependencies
dependency
  groupIdjunit/groupId
  artifactIdjunit/artifactId
  version3.8.1/version
  scopetest/scope
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.1.0-cdh5.2.5/version
/dependency
dependency
groupIdmgm.tp.bigdata/groupId
artifactIdma-commons/artifactId
version0.0.1-SNAPSHOT/version
/dependency
  /dependencies

  build
  plugins
plugin
  artifactIdmaven-assembly-plugin/artifactId
  configuration
archive
  manifest
mainClassmgm.tp.bigdata.ma_spark.SparkMain/mainClass
  /manifest
/archive
descriptorRefs
  descriptorRefjar-with-dependencies/descriptorRef
/descriptorRefs
  /configuration
/plugin
  /plugins
/build
/project

if i run my app with  java -jar
ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar on terminal, i get the
following error message:

proewer@proewer-VirtualBox:~/Schreibtisch$ java -jar
ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar
2015-Jun-02 12:53:36,348 [main] org.apache.spark.util.Utils
 WARN  - Your hostname, proewer-VirtualBox resolves to a loopback address:
127.0.1.1; using 10.0.2.15 instead (on interface eth0)
2015-Jun-02 12:53:36,350 [main] org.apache.spark.util.Utils
 WARN  - Set SPARK_LOCAL_IP if you need to bind to another address
2015-Jun-02 12:53:36,401 [main] org.apache.spark.SecurityManager
 INFO  - Changing view acls to: proewer
2015-Jun-02 12:53:36,402 [main] org.apache.spark.SecurityManager
 INFO  - Changing modify acls to: proewer
2015-Jun-02 12:53:36,403 [main] org.apache.spark.SecurityManager
 INFO  - SecurityManager: authentication disabled; ui acls disabled; users
with view permissions: Set(proewer); users with modify permissions:
Set(proewer)
Exception in thread main com.typesafe.config.ConfigException$Missing: No
configuration setting found for key 'akka.version'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:115)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:136)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:142)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:150)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:155)
at
com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:197)
at akka.actor.ActorSystem$Settings.init(ActorSystem.scala:136)
at akka.actor.ActorSystemImpl.init(ActorSystem.scala:470)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)
at
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156)
at org.apache.spark.SparkContext.init(SparkContext.scala:203)
at
org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53)
at mgm.tp.bigdata.ma_spark.SparkMain.main(SparkMain.java:38)

what i do wrong?

best regards,
paul


Re: Spark 1.4.0-rc3: Actor not found

2015-06-02 Thread Shixiong Zhu
How about other jobs? Is it an executor log, or a driver log? Could you
post other logs near this error, please? Thank you.

Best Regards,
Shixiong Zhu

2015-06-02 17:11 GMT+08:00 Anders Arpteg arp...@spotify.com:

 Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that
 worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster
 mode), initial stage starts, but the job fails before any task succeeds
 with the following error. Any hints?

 [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0]
 [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler]
 swallowing exception during message send
 (akka.remote.RemoteTransportExceptionNoStackTrace)
 Exception in thread main akka.actor.ActorNotFound: Actor not found for:
 ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/),
 Path(/user/OutputCommitCoordinator)]
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at
 akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
 at
 akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
 at
 scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
 at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
 at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
 at
 akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
 at
 akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)




Shared / NFS filesystems

2015-06-02 Thread Pradyumna Achar
Hello!

I have Spark running in standalone mode, and there are a bunch of worker
nodes connected to the master.
The workers have a shared file system, but the master node doesn't.
Is this something that's not going to work? i.e., should the master node
also be on the same shared filesystem mounted on the same path as all of
the workers?

Thanks!
Pradyumna


Re: Shared / NFS filesystems

2015-06-02 Thread Akhil Das
You can run/submit your code from one of the worker which has access to the
file system and it should be fine i think. Give it a try.

Thanks
Best Regards

On Tue, Jun 2, 2015 at 3:22 PM, Pradyumna Achar pradyumna.ac...@gmail.com
wrote:

 Hello!

 I have Spark running in standalone mode, and there are a bunch of worker
 nodes connected to the master.
 The workers have a shared file system, but the master node doesn't.
 Is this something that's not going to work? i.e., should the master node
 also be on the same shared filesystem mounted on the same path as all of
 the workers?

 Thanks!
 Pradyumna



Re: Spark 1.4.0-rc3: Actor not found

2015-06-02 Thread Anders Arpteg
The log is from the log aggregation tool (hortonworks, yarn logs ...), so
both executors and driver. I'll send a private mail to you with the full
logs. Also, tried another job as you suggested, and it actually worked
fine. The first job was reading from a parquet source, and the second from
an avro source. Could there be some issues with the parquet reader?

Thanks,
Anders

On Tue, Jun 2, 2015 at 11:53 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 How about other jobs? Is it an executor log, or a driver log? Could you
 post other logs near this error, please? Thank you.

 Best Regards,
 Shixiong Zhu

 2015-06-02 17:11 GMT+08:00 Anders Arpteg arp...@spotify.com:

 Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that
 worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster
 mode), initial stage starts, but the job fails before any task succeeds
 with the following error. Any hints?

 [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0]
 [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler]
 swallowing exception during message send
 (akka.remote.RemoteTransportExceptionNoStackTrace)
 Exception in thread main akka.actor.ActorNotFound: Actor not found for:
 ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/),
 Path(/user/OutputCommitCoordinator)]
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at
 akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
 at
 akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
 at
 scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
 at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
 at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
 at
 akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
 at
 akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)





Re: build jar with all dependencies

2015-06-02 Thread Yana Kadiyska
Can you run using spark-submit? What is happening is that you are running a
simple java program -- you've wrapped spark-core in your fat jar but at
runtime you likely need the whole Spark system in order to run your
application. I would mark the spark-core as provided(so you don't wrap it
in your fat jar)  and run via spark submit. If you insist on running via
java for whatever reason, see the runtime path that spark submit sets up
and make sure you include all of these jars when you run your app

On Tue, Jun 2, 2015 at 9:57 AM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 okay, but how i can compile my app to run this without -Dconfig.file=alt_
 reference1.conf?

 2015-06-02 15:43 GMT+02:00 Yana Kadiyska yana.kadiy...@gmail.com:

 This looks like your app is not finding your Typesafe config. The config
 should usually be placed in particular folder under your app to be seen
 correctly. If it's in a non-standard location you can
 pass  -Dconfig.file=alt_reference1.conf to java to tell it where to look.
 If this is a config that belogs to Spark and not your app, I'd recommend
 running your jar via spark submit (that should run) and dump out the
 classpath/variables that spark submit sets up...

 On Tue, Jun 2, 2015 at 6:58 AM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 hello community,

 i have build a jar file from my spark app with maven (mvn clean compile
 assembly:single) and the following pom file:

 project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=
 http://www.w3.org/2001/XMLSchema-instance;
   xsi:schemaLocation=http://maven.apache.org/POM/4.0.0
 http://maven.apache.org/xsd/maven-4.0.0.xsd;
   modelVersion4.0.0/modelVersion

   groupIdmgm.tp.bigdata/groupId
   artifactIdma-spark/artifactId
   version0.0.1-SNAPSHOT/version
   packagingjar/packaging

   namema-spark/name
   urlhttp://maven.apache.org/url

   properties
 project.build.sourceEncodingUTF-8/project.build.sourceEncoding
   /properties

   repositories
 repository
   idcloudera/id
   urlhttps://repository.cloudera.com/artifactory/cloudera-repos/
 /url
 /repository
   /repositories

   dependencies
 dependency
   groupIdjunit/groupId
   artifactIdjunit/artifactId
   version3.8.1/version
   scopetest/scope
 /dependency
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.1.0-cdh5.2.5/version
 /dependency
 dependency
 groupIdmgm.tp.bigdata/groupId
 artifactIdma-commons/artifactId
 version0.0.1-SNAPSHOT/version
 /dependency
   /dependencies

   build
   plugins
 plugin
   artifactIdmaven-assembly-plugin/artifactId
   configuration
 archive
   manifest
 mainClassmgm.tp.bigdata.ma_spark.SparkMain/mainClass
   /manifest
 /archive
 descriptorRefs
   descriptorRefjar-with-dependencies/descriptorRef
 /descriptorRefs
   /configuration
 /plugin
   /plugins
 /build
 /project

 if i run my app with  java -jar
 ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar on terminal, i get the
 following error message:

 proewer@proewer-VirtualBox:~/Schreibtisch$ java -jar
 ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar
 2015-Jun-02 12:53:36,348 [main] org.apache.spark.util.Utils
  WARN  - Your hostname, proewer-VirtualBox resolves to a loopback
 address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0)
 2015-Jun-02 12:53:36,350 [main] org.apache.spark.util.Utils
  WARN  - Set SPARK_LOCAL_IP if you need to bind to another address
 2015-Jun-02 12:53:36,401 [main] org.apache.spark.SecurityManager
  INFO  - Changing view acls to: proewer
 2015-Jun-02 12:53:36,402 [main] org.apache.spark.SecurityManager
  INFO  - Changing modify acls to: proewer
 2015-Jun-02 12:53:36,403 [main] org.apache.spark.SecurityManager
  INFO  - SecurityManager: authentication disabled; ui acls disabled;
 users with view permissions: Set(proewer); users with modify permissions:
 Set(proewer)
 Exception in thread main com.typesafe.config.ConfigException$Missing:
 No configuration setting found for key 'akka.version'
 at
 com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:115)
 at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:136)
 at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:142)
 at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:150)
 at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:155)
 at
 com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:197)
 at akka.actor.ActorSystem$Settings.init(ActorSystem.scala:136)
 at akka.actor.ActorSystemImpl.init(ActorSystem.scala:470)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
 at
 org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
 at
 

Re: spark sql - reading data from sql tables having space in column names

2015-06-02 Thread ayan guha
I would think the easiest way would be to create a view in DB with column
names with no space.

In fact, you can pass a sql in place of a real table.

From documentation: The JDBC table that should be read. Note that anything
that is valid in a `FROM` clause of a SQL query can be used. For example,
instead of a full table you could also use a subquery in parentheses.

Kindly let the community know if this works

On Tue, Jun 2, 2015 at 6:43 PM, Sachin Goyal sachin.go...@jabong.com
wrote:

 Hi,

 We are using spark sql (1.3.1) to load data from Microsoft sql server
 using jdbc (as described in
 https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
 ).

 It is working fine except when there is a space in column names (we can't
 modify the schemas to remove space as it is a legacy database).

 Sqoop is able to handle such scenarios by enclosing column names in '[ ]'
 - the recommended method from microsoft sql server. (
 https://github.com/apache/sqoop/blob/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java
 - line no 319)

 Is there a way to handle this in spark sql?

 Thanks,
 sachin




-- 
Best Regards,
Ayan Guha


Re: build jar with all dependencies

2015-06-02 Thread Pa Rö
okay, but how i can compile my app to run this without -Dconfig.file=alt_
reference1.conf?

2015-06-02 15:43 GMT+02:00 Yana Kadiyska yana.kadiy...@gmail.com:

 This looks like your app is not finding your Typesafe config. The config
 should usually be placed in particular folder under your app to be seen
 correctly. If it's in a non-standard location you can
 pass  -Dconfig.file=alt_reference1.conf to java to tell it where to look.
 If this is a config that belogs to Spark and not your app, I'd recommend
 running your jar via spark submit (that should run) and dump out the
 classpath/variables that spark submit sets up...

 On Tue, Jun 2, 2015 at 6:58 AM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 hello community,

 i have build a jar file from my spark app with maven (mvn clean compile
 assembly:single) and the following pom file:

 project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=
 http://www.w3.org/2001/XMLSchema-instance;
   xsi:schemaLocation=http://maven.apache.org/POM/4.0.0
 http://maven.apache.org/xsd/maven-4.0.0.xsd;
   modelVersion4.0.0/modelVersion

   groupIdmgm.tp.bigdata/groupId
   artifactIdma-spark/artifactId
   version0.0.1-SNAPSHOT/version
   packagingjar/packaging

   namema-spark/name
   urlhttp://maven.apache.org/url

   properties
 project.build.sourceEncodingUTF-8/project.build.sourceEncoding
   /properties

   repositories
 repository
   idcloudera/id
   urlhttps://repository.cloudera.com/artifactory/cloudera-repos/
 /url
 /repository
   /repositories

   dependencies
 dependency
   groupIdjunit/groupId
   artifactIdjunit/artifactId
   version3.8.1/version
   scopetest/scope
 /dependency
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.1.0-cdh5.2.5/version
 /dependency
 dependency
 groupIdmgm.tp.bigdata/groupId
 artifactIdma-commons/artifactId
 version0.0.1-SNAPSHOT/version
 /dependency
   /dependencies

   build
   plugins
 plugin
   artifactIdmaven-assembly-plugin/artifactId
   configuration
 archive
   manifest
 mainClassmgm.tp.bigdata.ma_spark.SparkMain/mainClass
   /manifest
 /archive
 descriptorRefs
   descriptorRefjar-with-dependencies/descriptorRef
 /descriptorRefs
   /configuration
 /plugin
   /plugins
 /build
 /project

 if i run my app with  java -jar
 ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar on terminal, i get the
 following error message:

 proewer@proewer-VirtualBox:~/Schreibtisch$ java -jar
 ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar
 2015-Jun-02 12:53:36,348 [main] org.apache.spark.util.Utils
  WARN  - Your hostname, proewer-VirtualBox resolves to a loopback
 address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0)
 2015-Jun-02 12:53:36,350 [main] org.apache.spark.util.Utils
  WARN  - Set SPARK_LOCAL_IP if you need to bind to another address
 2015-Jun-02 12:53:36,401 [main] org.apache.spark.SecurityManager
  INFO  - Changing view acls to: proewer
 2015-Jun-02 12:53:36,402 [main] org.apache.spark.SecurityManager
  INFO  - Changing modify acls to: proewer
 2015-Jun-02 12:53:36,403 [main] org.apache.spark.SecurityManager
  INFO  - SecurityManager: authentication disabled; ui acls disabled;
 users with view permissions: Set(proewer); users with modify permissions:
 Set(proewer)
 Exception in thread main com.typesafe.config.ConfigException$Missing:
 No configuration setting found for key 'akka.version'
 at
 com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:115)
 at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:136)
 at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:142)
 at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:150)
 at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:155)
 at
 com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:197)
 at akka.actor.ActorSystem$Settings.init(ActorSystem.scala:136)
 at akka.actor.ActorSystemImpl.init(ActorSystem.scala:470)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
 at
 org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
 at
 org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
 at
 org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
 at
 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)
 at
 org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156)
 at 

Re: build jar with all dependencies

2015-06-02 Thread Yana Kadiyska
This looks like your app is not finding your Typesafe config. The config
should usually be placed in particular folder under your app to be seen
correctly. If it's in a non-standard location you can
pass  -Dconfig.file=alt_reference1.conf to java to tell it where to look.
If this is a config that belogs to Spark and not your app, I'd recommend
running your jar via spark submit (that should run) and dump out the
classpath/variables that spark submit sets up...

On Tue, Jun 2, 2015 at 6:58 AM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 hello community,

 i have build a jar file from my spark app with maven (mvn clean compile
 assembly:single) and the following pom file:

 project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=
 http://www.w3.org/2001/XMLSchema-instance;
   xsi:schemaLocation=http://maven.apache.org/POM/4.0.0
 http://maven.apache.org/xsd/maven-4.0.0.xsd;
   modelVersion4.0.0/modelVersion

   groupIdmgm.tp.bigdata/groupId
   artifactIdma-spark/artifactId
   version0.0.1-SNAPSHOT/version
   packagingjar/packaging

   namema-spark/name
   urlhttp://maven.apache.org/url

   properties
 project.build.sourceEncodingUTF-8/project.build.sourceEncoding
   /properties

   repositories
 repository
   idcloudera/id
   urlhttps://repository.cloudera.com/artifactory/cloudera-repos/
 /url
 /repository
   /repositories

   dependencies
 dependency
   groupIdjunit/groupId
   artifactIdjunit/artifactId
   version3.8.1/version
   scopetest/scope
 /dependency
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.1.0-cdh5.2.5/version
 /dependency
 dependency
 groupIdmgm.tp.bigdata/groupId
 artifactIdma-commons/artifactId
 version0.0.1-SNAPSHOT/version
 /dependency
   /dependencies

   build
   plugins
 plugin
   artifactIdmaven-assembly-plugin/artifactId
   configuration
 archive
   manifest
 mainClassmgm.tp.bigdata.ma_spark.SparkMain/mainClass
   /manifest
 /archive
 descriptorRefs
   descriptorRefjar-with-dependencies/descriptorRef
 /descriptorRefs
   /configuration
 /plugin
   /plugins
 /build
 /project

 if i run my app with  java -jar
 ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar on terminal, i get the
 following error message:

 proewer@proewer-VirtualBox:~/Schreibtisch$ java -jar
 ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar
 2015-Jun-02 12:53:36,348 [main] org.apache.spark.util.Utils
  WARN  - Your hostname, proewer-VirtualBox resolves to a loopback address:
 127.0.1.1; using 10.0.2.15 instead (on interface eth0)
 2015-Jun-02 12:53:36,350 [main] org.apache.spark.util.Utils
  WARN  - Set SPARK_LOCAL_IP if you need to bind to another address
 2015-Jun-02 12:53:36,401 [main] org.apache.spark.SecurityManager
  INFO  - Changing view acls to: proewer
 2015-Jun-02 12:53:36,402 [main] org.apache.spark.SecurityManager
  INFO  - Changing modify acls to: proewer
 2015-Jun-02 12:53:36,403 [main] org.apache.spark.SecurityManager
  INFO  - SecurityManager: authentication disabled; ui acls disabled; users
 with view permissions: Set(proewer); users with modify permissions:
 Set(proewer)
 Exception in thread main com.typesafe.config.ConfigException$Missing: No
 configuration setting found for key 'akka.version'
 at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:115)
 at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:136)
 at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:142)
 at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:150)
 at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:155)
 at
 com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:197)
 at akka.actor.ActorSystem$Settings.init(ActorSystem.scala:136)
 at akka.actor.ActorSystemImpl.init(ActorSystem.scala:470)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
 at
 org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
 at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
 at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
 at
 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)
 at
 org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156)
 at org.apache.spark.SparkContext.init(SparkContext.scala:203)
 at
 org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53)
 at mgm.tp.bigdata.ma_spark.SparkMain.main(SparkMain.java:38)

 what i do 

Re: How to read sequence File.

2015-06-02 Thread ๏̯͡๏
Spark Shell:

val x =
sc.sequenceFile(/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761,
classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.Text])

OR

val x =
sc.sequenceFile(/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761,
classOf[org.apache.hadoop.io.Text],
classOf[org.apache.hadoop.io.LongWritable])

OR

val x =
sc.sequenceFile(/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761,
classOf[org.apache.hadoop.io. LongWritable],
classOf[org.apache.hadoop.io.Text])

x.take(10).foreach(println)

is throwing

===
Exception:

15/06/02 05:49:51 ERROR executor.Executor: Exception in task 0.0 in stage
2.0 (TID 2)

java.io.NotSerializableException: org.apache.hadoop.io.Text

Serialization stack:

- object not serializable (class: org.apache.hadoop.io.Text, value:
290090268£2013112699)

- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)

- object (class scala.Tuple2,
(290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.001.0019.00.00.0020.0020.0020.00113NY000-99902000-04-01
05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH))

- element of array (index: 0)

- array (class [Lscala.Tuple2;, size 10)

at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)

at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

15/06/02 05:49:51 ERROR scheduler.TaskSetManager: Task 0.0 in stage 2.0
(TID 2) had a not serializable result: org.apache.hadoop.io.Text

Serialization stack:

- object not serializable (class: org.apache.hadoop.io.Text, value:
290090268£2013112699)

- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)

- object (class scala.Tuple2,
(290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.001.0019.00.00.0020.0020.0020.00113NY000-99902000-04-01
05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH))

- element of array (index: 0)

- array (class [Lscala.Tuple2;, size 10); not retrying

15/06/02 05:49:51 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0,
whose tasks have all completed, from pool

15/06/02 05:49:51 INFO scheduler.TaskSchedulerImpl: Cancelling stage 2

15/06/02 05:49:51 INFO scheduler.DAGScheduler: Stage 2 (take at
console:24) failed in 0.032 s

15/06/02 05:49:51 INFO scheduler.DAGScheduler: Job 2 failed: take at
console:24, took 0.041207 s

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0
in stage 2.0 (TID 2) had a not serializable result:
org.apache.hadoop.io.Text

Serialization stack:

- object not serializable (class: org.apache.hadoop.io.Text, value:
290090268£2013112699)

- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)

- object (class scala.Tuple2,
(290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.001.0019.00.00.0020.0020.0020.00113NY000-99902000-04-01
05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH))

- element of array (index: 0)

- array (class [Lscala.Tuple2;, size 10)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

at scala.Option.foreach(Option.scala:236)

at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


===

./bin/spark-shell -v  --driver-class-path

Re: Compute Median in Spark Dataframe

2015-06-02 Thread Yana Kadiyska
Like this...sqlContext should be a HiveContext instance

case class KeyValue(key: Int, value: String)
val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF
df.registerTempTable(table)
sqlContext.sql(select percentile(key,0.5) from table).show()

​

On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot 
o.girar...@lateral-thoughts.com wrote:

 Hi everyone,
 Is there any way to compute a median on a column using Spark's Dataframe.
 I know you can use stats in a RDD but I'd rather stay within a dataframe.
 Hive seems to imply that using ntile one can compute percentiles,
 quartiles and therefore a median.
 Does anyone have experience with this ?

 Regards,

 Olivier.



Re: How to read sequence File.

2015-06-02 Thread Akhil Das
Basically, you need to convert it to a serializable format before doing the
collect/take.

You can fire up a spark shell and paste this:

val sFile = sc.sequenceFile[LongWritable, Text](/home/akhld/sequence
 /sigmoid)
   *.map(_._2.toString)*
 sFile.take(5).foreach(println)


Use the attached sequence file generator and generated sequence file that i
used for testing.

Also note:If you don't do the .map to convert to string, then it will end
up with the serializable Exception that you are hitting.

[image: Inline image 1]


Thanks
Best Regards

On Tue, Jun 2, 2015 at 6:21 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Spark Shell:

 val x =
 sc.sequenceFile(/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761,
 classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.Text])

 OR

 val x =
 sc.sequenceFile(/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761,
 classOf[org.apache.hadoop.io.Text],
 classOf[org.apache.hadoop.io.LongWritable])

 OR

 val x =
 sc.sequenceFile(/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761,
 classOf[org.apache.hadoop.io. LongWritable],
 classOf[org.apache.hadoop.io.Text])

 x.take(10).foreach(println)

 is throwing

 ===
 Exception:

 15/06/02 05:49:51 ERROR executor.Executor: Exception in task 0.0 in stage
 2.0 (TID 2)

 java.io.NotSerializableException: org.apache.hadoop.io.Text

 Serialization stack:

 - object not serializable (class: org.apache.hadoop.io.Text, value:
 290090268£2013112699)

 - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)

 - object (class scala.Tuple2,
 (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.001.0019.00.00.0020.0020.0020.00113NY000-99902000-04-01
 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH))

 - element of array (index: 0)

 - array (class [Lscala.Tuple2;, size 10)

 at
 org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)

 at
 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

 at
 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)

 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)

 15/06/02 05:49:51 ERROR scheduler.TaskSetManager: Task 0.0 in stage 2.0
 (TID 2) had a not serializable result: org.apache.hadoop.io.Text

 Serialization stack:

 - object not serializable (class: org.apache.hadoop.io.Text, value:
 290090268£2013112699)

 - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)

 - object (class scala.Tuple2,
 (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.001.0019.00.00.0020.0020.0020.00113NY000-99902000-04-01
 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH))

 - element of array (index: 0)

 - array (class [Lscala.Tuple2;, size 10); not retrying

 15/06/02 05:49:51 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0,
 whose tasks have all completed, from pool

 15/06/02 05:49:51 INFO scheduler.TaskSchedulerImpl: Cancelling stage 2

 15/06/02 05:49:51 INFO scheduler.DAGScheduler: Stage 2 (take at
 console:24) failed in 0.032 s

 15/06/02 05:49:51 INFO scheduler.DAGScheduler: Job 2 failed: take at
 console:24, took 0.041207 s

 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0.0 in stage 2.0 (TID 2) had a not serializable result:
 org.apache.hadoop.io.Text

 Serialization stack:

 - object not serializable (class: org.apache.hadoop.io.Text, value:
 290090268£2013112699)

 - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)

 - object (class scala.Tuple2,
 (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.001.0019.00.00.0020.0020.0020.00113NY000-99902000-04-01
 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH))

 - element of array (index: 0)

 - array (class [Lscala.Tuple2;, size 10)

 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)

 at
 

Transactional guarantee while saving DataFrame into a DB

2015-06-02 Thread Mohammad Tariq
Hi list,

With the help of Spark DataFrame API we can save a DataFrame into a
database table through insertIntoJDBC() call. However, I could not find any
info about how it handles the transactional guarantee. What if my program
gets killed during the processing? Would it end up in partial load?

Is it somehow possible to handle these kind of scenarios? Rollback or
something of that sort?

Many thanks.

P.S : I am using spark-1.3.1-bin-hadoop2.4 with java 1.7

[image: http://]
Tariq, Mohammad
about.me/mti
[image: http://]
http://about.me/mti


Re: Compute Median in Spark Dataframe

2015-06-02 Thread Olivier Girardot
I've finally come to the same conclusion, but isn't there any way to call
this Hive UDAFs from the agg(percentile(key,0.5)) ??

Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a
écrit :

 Like this...sqlContext should be a HiveContext instance

 case class KeyValue(key: Int, value: String)
 val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF
 df.registerTempTable(table)
 sqlContext.sql(select percentile(key,0.5) from table).show()

 ​

 On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Hi everyone,
 Is there any way to compute a median on a column using Spark's Dataframe.
 I know you can use stats in a RDD but I'd rather stay within a dataframe.
 Hive seems to imply that using ntile one can compute percentiles,
 quartiles and therefore a median.
 Does anyone have experience with this ?

 Regards,

 Olivier.





Re: build jar with all dependencies

2015-06-02 Thread Paul Röwer

which maven dependency i need, too??

http://www.cloudera.com/content/cloudera/en/documentation/core/v5-2-x/topics/cdh_vd_cdh5_maven_repo.html

Am 02.06.2015 um 16:04 schrieb Yana Kadiyska:
Can you run using spark-submit? What is happening is that you are 
running a simple java program -- you've wrapped spark-core in your fat 
jar but at runtime you likely need the whole Spark system in order to 
run your application. I would mark the spark-core as provided(so you 
don't wrap it in your fat jar)  and run via spark submit. If you 
insist on running via java for whatever reason, see the runtime path 
that spark submit sets up and make sure you include all of these jars 
when you run your app


On Tue, Jun 2, 2015 at 9:57 AM, Pa Rö paul.roewer1...@googlemail.com 
mailto:paul.roewer1...@googlemail.com wrote:


okay, but how i can compile my app to run this without
-Dconfig.file=alt_
reference1.conf?

2015-06-02 15:43 GMT+02:00 Yana Kadiyska yana.kadiy...@gmail.com
mailto:yana.kadiy...@gmail.com:

This looks like your app is not finding your Typesafe config.
The config should usually be placed in particular folder under
your app to be seen correctly. If it's in a non-standard
location you can pass  -Dconfig.file=alt_reference1.conf to
java to tell it where to look. If this is a config that belogs
to Spark and not your app, I'd recommend running your jar via
spark submit (that should run) and dump out the
classpath/variables that spark submit sets up...

On Tue, Jun 2, 2015 at 6:58 AM, Pa Rö
paul.roewer1...@googlemail.com
mailto:paul.roewer1...@googlemail.com wrote:

hello community,

i have build a jar file from my spark app with maven (mvn
clean compile assembly:single) and the following pom file:

project xmlns=http://maven.apache.org/POM/4.0.0;
xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
  xsi:schemaLocation=http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd;
modelVersion4.0.0/modelVersion

groupIdmgm.tp.bigdata/groupId
artifactIdma-spark/artifactId
version0.0.1-SNAPSHOT/version
packagingjar/packaging

namema-spark/name
  urlhttp://maven.apache.org/url

  properties
project.build.sourceEncodingUTF-8/project.build.sourceEncoding
  /properties

  repositories
repository
idcloudera/id
 
urlhttps://repository.cloudera.com/artifactory/cloudera-repos//url

/repository
  /repositories

  dependencies
dependency
groupIdjunit/groupId
artifactIdjunit/artifactId
version3.8.1/version
scopetest/scope
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.1.0-cdh5.2.5/version
/dependency
dependency
groupIdmgm.tp.bigdata/groupId
artifactIdma-commons/artifactId
version0.0.1-SNAPSHOT/version
/dependency
  /dependencies

  build
  plugins
plugin
artifactIdmaven-assembly-plugin/artifactId
  configuration
archive
  manifest
mainClassmgm.tp.bigdata.ma_spark.SparkMain/mainClass
  /manifest
/archive
descriptorRefs
descriptorRefjar-with-dependencies/descriptorRef
/descriptorRefs
  /configuration
/plugin
  /plugins
/build
/project

if i run my app with  java -jar
ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar on
terminal, i get the following error message:

proewer@proewer-VirtualBox:~/Schreibtisch$ java -jar
ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar
2015-Jun-02 12:53:36,348 [main] org.apache.spark.util.Utils
 WARN  - Your hostname, proewer-VirtualBox resolves to a
loopback address: 127.0.1.1; using 10.0.2.15 instead (on
interface eth0)
2015-Jun-02 12:53:36,350 [main] org.apache.spark.util.Utils
 WARN  - Set SPARK_LOCAL_IP if you need to bind to another
address
2015-Jun-02 12:53:36,401 [main]
org.apache.spark.SecurityManager
 INFO  - Changing view acls to: proewer
2015-Jun-02 12:53:36,402 [main]
org.apache.spark.SecurityManager
 INFO  - Changing modify acls to: proewer
2015-Jun-02 12:53:36,403 [main]

Re: Compute Median in Spark Dataframe

2015-06-02 Thread Holden Karau
Not super easily, the GroupedData class uses a strToExpr function which has
a pretty limited set of functions so we cant pass in the name of an
arbitrary hive UDAF (unless I'm missing something). We can instead
construct an column with the expression you want and then pass it in to
agg() that way (although then you need to call the hive UDAF there). There
are some private classes in hiveUdfs.scala which expose hiveUdaf's as Spark
SQL AggregateExpressions, but they are private.

On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot 
o.girar...@lateral-thoughts.com wrote:

 I've finally come to the same conclusion, but isn't there any way to call
 this Hive UDAFs from the agg(percentile(key,0.5)) ??

 Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a
 écrit :

 Like this...sqlContext should be a HiveContext instance

 case class KeyValue(key: Int, value: String)
 val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF
 df.registerTempTable(table)
 sqlContext.sql(select percentile(key,0.5) from table).show()

 ​

 On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Hi everyone,
 Is there any way to compute a median on a column using Spark's
 Dataframe. I know you can use stats in a RDD but I'd rather stay within a
 dataframe.
 Hive seems to imply that using ntile one can compute percentiles,
 quartiles and therefore a median.
 Does anyone have experience with this ?

 Regards,

 Olivier.





-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau
Linked In: https://www.linkedin.com/in/holdenkarau


Re: Re: spark 1.3.1 jars in repo1.maven.org

2015-06-02 Thread Ryan Williams
I think this is causing issues upgrading ADAM
https://github.com/bigdatagenomics/adam to Spark 1.3.1 (cf. adam#690
https://github.com/bigdatagenomics/adam/pull/690#issuecomment-107769383);
attempting to build against Hadoop 1.0.4 yields errors like:

2015-06-02 15:57:44 ERROR Executor:96 - Exception in task 0.0 in stage 0.0
(TID 0)
*java.lang.IncompatibleClassChangeError: Found class
org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected*
at
org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95)
at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-06-02 15:57:44 WARN  TaskSetManager:71 - Lost task 0.0 in stage 0.0
(TID 0, localhost): java.lang.IncompatibleClassChangeError: Found class
org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected
at
org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95)
at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

TaskAttemptContext is a class in Hadoop 1.0.4, but an interface in Hadoop
2; Spark 1.3.1 expects the interface but is getting the class.

It sounds like, while I *can* depend on Spark 1.3.1 and Hadoop 1.0.4, I
then need to hope that I don't exercise certain Spark code paths that run
afoul of differences between Hadoop 1 and 2; does that seem correct?

Thanks!

On Wed, May 20, 2015 at 1:52 PM Sean Owen so...@cloudera.com wrote:

 I don't think any of those problems are related to Hadoop. Have you looked
 at userClassPathFirst settings?

 On Wed, May 20, 2015 at 6:46 PM, Edward Sargisson ejsa...@gmail.com
 wrote:

 Hi Sean and Ted,
 Thanks for your replies.

 I don't have our current problems nicely written up as good questions
 yet. I'm still sorting out classpath issues, etc.
 In case it is of help, I'm seeing:
 * Exception in thread Spark Context Cleaner
 java.lang.NoClassDefFoundError: 0
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149)
 * We've been having clashing dependencies between a colleague and I
 because of the aforementioned classpath issue
 * The clashing dependencies are also causing issues with what jetty
 libraries are available in the classloader from Spark and don't clash with
 existing libraries we have.

 More anon,

 Cheers,
 Edward



  Original Message 
  Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20 00:38
 From: Sean Owen so...@cloudera.com To: Edward Sargisson 
 esa...@pobox.com Cc: user user@spark.apache.org


 Yes, the published artifacts can only refer to one version of anything
 (OK, modulo publishing a large number of variants under classifiers).

 You aren't intended to rely on Spark's transitive dependencies for
 anything. Compiling against the Spark API has no relation to what
 version of Hadoop it binds against because it's not part of any API.
 You mark the Spark dependency even as provided in your build and get
 all the Spark/Hadoop bindings at runtime from our cluster.

 What problem are you experiencing?


 On Wed, May 20, 2015 at 2:17 AM, Edward Sargisson esa...@pobox.com
 wrote:

 Hi,
 I'd like to confirm an observation I've just made. Specifically that spark
 is only available in repo1.maven.org for one Hadoop variant.

 The Spark source can be compiled against a number of different Hadoops
 using
 profiles. Yay.
 However, the spark jars in repo1.maven.org appear to be compiled against
 one
 specific Hadoop and no other differentiation is made. (I can see a
 difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in
 the version I compiled locally).

 The implication here is that if you have a pom file asking for
 spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2
 

updateStateByKey and kafka direct approach without shuffle

2015-06-02 Thread Krot Viacheslav
Hi all,
In my streaming job I'm using kafka streaming direct approach and want to
maintain state with updateStateByKey. My PairRDD has message's topic name +
partition id as a key. So, I assume that updateByState could work within
same partition as KafkaRDD and not lead to shuffles. Actually this is not
true, because updateStateByKey leads to cogroup transformation that thinks,
that state rdd and kafka rdd are not co-partitioned, as kafka rdd does not
have partitioner at all. So, dependency is considered to be wide and leads
to shuffle.

I tried to avoid shuffling by providing custom partitioner to
updateStateByKey, but KafkaRDD need to use same partitioner. For this I
created a proxy RDD that just returns my partitioner.

class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part: Partitioner)
extends RDD[T](prev) {

  override val partitioner = Some(part)

  override def compute(split: Partition, context: TaskContext): Iterator[T]
= prev.compute(split, context)

  override protected def getPartitions: Array[Partition] = prev.partitions

  override def getPreferredLocations(thePart: Partition): Seq[String] =
prev.preferredLocations(thePart)
}

I use it as:
val partitioner = new Partitioner {
  // TODO this should be retrieved from kafka
  override def numPartitions: Int = 2

  override def getPartition(key: Any): Int = {
key.asInstanceOf[(String, Int)]._2
  }
}

inputStream
  .map(m = ((m.topic, m.partition), m.value))
  .transform(new ProxyRDDWithPartitioner(_, partitioner))
  .updateStateByKey(func, partitioner)
  

The question is - is it safe to do such trick?


Re: union and reduceByKey wrong shuffle?

2015-06-02 Thread Josh Rosen
Ah, interesting.  While working on my new Tungsten shuffle manager, I came
up with some nice testing interfaces for allowing me to manually trigger
spills in order to deterministically test those code paths without
requiring large amounts of data to be shuffled.  Maybe I could make similar
test interface changes to the existing shuffle code, which might make it
easier to reproduce this in an isolated environment.

On Mon, Jun 1, 2015 at 11:41 PM, Igor Berman igor.ber...@gmail.com wrote:

 Hi,
 small mock data doesn't reproduce the problem. IMHO problem is reproduced
 when we make shuffle big enough to split data into disk.
 We will work on it to understand and reproduce the problem(not first
 priority though...)


 On 1 June 2015 at 23:02, Josh Rosen rosenvi...@gmail.com wrote:

 How much work is to produce a small standalone reproduction?  Can you
 create an Avro file with some mock data, maybe 10 or so records, then
 reproduce this locally?

 On Mon, Jun 1, 2015 at 12:31 PM, Igor Berman igor.ber...@gmail.com
 wrote:

 switching to use simple pojos instead of using avro for spark
 serialization solved the problem(I mean reading avro from s3 and than
 mapping each avro object to it's pojo serializable counterpart with same
 fields, pojo is registered withing kryo)
 Any thought where to look for a problem/misconfiguration?

 On 31 May 2015 at 22:48, Igor Berman igor.ber...@gmail.com wrote:

 Hi
 We are using spark 1.3.1
 Avro-chill (tomorrow will check if its important) we register avro
 classes from java
 Avro 1.7.6
 On May 31, 2015 22:37, Josh Rosen rosenvi...@gmail.com wrote:

 Which Spark version are you using?  I'd like to understand whether
 this change could be caused by recent Kryo serializer re-use changes in
 master / Spark 1.4.

 On Sun, May 31, 2015 at 11:31 AM, igor.berman igor.ber...@gmail.com
 wrote:

 after investigation the problem is somehow connected to avro
 serialization
 with kryo + chill-avro(mapping avro object to simple scala case class
 and
 running reduce on these case class objects solves the problem)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/union-and-reduceByKey-wrong-shuffle-tp23092p23093.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: data localisation in spark

2015-06-02 Thread Shushant Arora
Is it possible in JavaSparkContext ?

JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDDStringlines = jsc.textFile(args[0]);

If yes , does its programmer's responsibilty to first calculate splits
locations and then instantiate spark context with preferred locations?

How does its achieved in MR2 with yarn, there is Application Master
specifies split locations to ResourceManager before acquiring the node
managers ?



On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote:

 Take a look at the following SparkContext constructor variant that tries
 to honor the data locality in YARN mode.

   /**
 * :: DeveloperApi ::
 * Alternative constructor for setting preferred locations where Spark will
 create executors.
 *
 * @param preferredNodeLocationData used in YARN mode to select nodes to
 launch containers on.
 * Can be generated using
 [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
 * from a list of input files or InputFormats for the application.
 */
 @DeveloperApi
 def this(config: SparkConf, preferredNodeLocationData: Map[String,
 Set[SplitInfo]]) = {
 this(config)
 this.preferredNodeLocationData = preferredNodeLocationData
 }

 --
 bit1...@163.com


 *From:* Shushant Arora shushantaror...@gmail.com
 *Date:* 2015-05-31 22:54
 *To:* user user@spark.apache.org
 *Subject:* data localisation in spark

 I want to understand how  spark takes care of data localisation in cluster
 mode when run on YARN.

 1.Driver program asks ResourceManager for executors. Does it tell yarn's
 RM to check HDFS blocks of input data and then allocate executors to it.
 And executors remain fixed throughout application or driver program asks
 for new executors when it submits another job in same application , since
 in spark new job is created for each action . If executors are fixed then
 for second job achieving data localisation is impossible?



 2.When executors are done with their processing, does they are marked as
 free in ResourceManager's resoruce queue and  executors directly tell this
 to Rm  instead of via driver's ?

 Thanks
 Shushant




Re: java.io.IOException: FAILED_TO_UNCOMPRESS(5)

2015-06-02 Thread Josh Rosen
My suggestion is that you change the Spark setting which controls the
compression codec that Spark uses for internal data transfers. Set
spark.io.compression.codec
to lzf in your SparkConf.

On Mon, Jun 1, 2015 at 8:46 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Hello Josh,
 Are you suggesting to store the source data in LZF compression and use the
 same Spark code as is ?
 Currently its stored in sequence file format and compressed with GZIP.

 First line of the data:

 (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'
 org.apache.hadoop.io.compress.GzipCodec?v?
 )

 Regards,
 Deepak

 On Tue, Jun 2, 2015 at 4:16 AM, Josh Rosen rosenvi...@gmail.com wrote:

 If you can't run a patched Spark version, then you could also consider
 using LZF compression instead, since that codec isn't affected by this bug.

 On Mon, Jun 1, 2015 at 3:32 PM, Andrew Or and...@databricks.com wrote:

 Hi Deepak,

 This is a notorious bug that is being tracked at
 https://issues.apache.org/jira/browse/SPARK-4105. We have fixed one
 source of this bug (it turns out Snappy had a bug in buffer reuse that
 caused data corruption). There are other known sources that are being
 addressed in outstanding patches currently.

 Since you're using 1.3.1 my guess is that you don't have this patch:
 https://github.com/apache/spark/pull/6176, which I believe should fix
 the issue in your case. It's merged for 1.3.2 (not yet released) but not in
 time for 1.3.1, so feel free to patch it yourself and see if it works.

 -Andrew


 2015-06-01 8:00 GMT-07:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Any suggestions ?

 I using Spark 1.3.1 to read   sequence file stored in Sequence File
 format
 (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec?v?
 )

 with this code and settings
 sc.sequenceFile(dwTable, classOf[Text], classOf[Text]).partitionBy(new
 org.apache.spark.HashPartitioner(2053))
 .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
   .set(spark.kryoserializer.buffer.mb,
 arguments.get(buffersize).get)
   .set(spark.kryoserializer.buffer.max.mb,
 arguments.get(maxbuffersize).get)
   .set(spark.driver.maxResultSize,
 arguments.get(maxResultSize).get)
   .set(spark.yarn.maxAppAttempts, 0)
   //.set(spark.akka.askTimeout, arguments.get(askTimeout).get)
   //.set(spark.akka.timeout, arguments.get(akkaTimeout).get)
   //.set(spark.worker.timeout, arguments.get(workerTimeout).get)

 .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum]))


 and values are
 buffersize=128 maxbuffersize=1068 maxResultSize=200G


 And i see this exception in each executor task

 FetchFailed(BlockManagerId(278, phxaishdc9dn1830.stratus.phx.ebay.com,
 54757), shuffleId=6, mapId=2810, reduceId=1117, message=

 org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)

 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)

 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)

 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)

 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)

 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

 at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

 at
 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)

 at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)

 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

 at org.apache.spark.scheduler.Task.run(Task.scala:64)

 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)

 *Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)*

 at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)

 at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)

 at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)

 at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)

 at
 org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135)

 at
 

Re: Spark 1.3.0: how to let Spark history load old records?

2015-06-02 Thread Otis Gospodnetic
I think Spark doesn't keep historical metrics. You can use something like
SPM for that -
http://blog.sematext.com/2014/01/30/announcement-apache-storm-monitoring-in-spm/

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr  Elasticsearch Support * http://sematext.com/


On Mon, Jun 1, 2015 at 11:36 PM, Haopu Wang hw...@qilinsoft.com wrote:

 When I start the Spark master process, the old records are not shown in
 the monitoring UI.

 How to show the old records? Thank you very much!


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Scala By the Bay + Big Data Scala 2015 Program Announced

2015-06-02 Thread Alexy Khrabrov
The programs and schedules for Scala By the Bay (SBTB) and Big Data Scala
By the Bay (BDS) 2015 conferences are announced and published:

Scala By the Bay http://scala.bythebay.io — August 13-16 (
scala.bythebay.io)

Big Data Scala By the Bay http://bigdatascala.bythebay.io — August 16-18 (
bigdatascala.bythebay.io)

There are 77 best talks from the leading companies using Scala, Spark, and
other Scala-based projects in production, including Twitter, Salesforce,
Cloudera, Verizon, Comcast, Spotify, Hootsuite, Typesafe, Databricks,
Nitro, Liveperson, Tableau, and many others.

SBTB + BDS Schedule http://scala.bythebay.io/schedule.html

SBTB and BDS are separate conferences, with BDS expanding into data science
and data management.  They share an innovative end-to-end pipeline training
on 8/16, when in one day, we’ll teach hundreds of developers to build a
web-scale startup on Mesos, Akka, Kafka, Spark, and Cassandra, taught by
engineers from Mesosphere, Typesafe, Confluent, Databricks, and DataStax,
respectively.


For the first time in the history of any of the Scala conferences, Twitter
adds a whole Finagle Day to SBTB, teaching OSS developers the biggest
real-time Scala stack in production via hands-on workshops taught by
Twitter engineers, and a series of talks from Finagle creators and users
inside and outside Twitter.


SBTB+BDS topics include higher-order abstractions for multiple application
areas, data pipelines, “big” data analytics, Machine Learning and Natural
Language Processing, datacenter management with Mesos, and more.  The key
themes unifying both conferences is applying rigorous Functional
Programming principles for DRY and elegant codebases that can grow with
smart teams as companies go web-scale, and the emergence of Reactive
Systems that replace ETL with common object models applied across all
stages of an application — from API to message bus to real-time analytics.
Several versions of the resulting “lambda architectures” will be presented.

Martin Odersky, the creator of Scala, will keynote the By the Bay
conferences for the first time.

Jonas Bonér, the CTO of Typesafe, is developing a completely new talk for
Scala By the Bay, which he will keynote together with Dean Wampler, Dick
Wall, Vidhya Narayanan, and Andrew Headrick.  Vidhya leads the Verizon
OnCue Scala team, with some of the highest concentration of FP talent in
the world, and Andrew, formerly the Akka architect at Ticketfly, is now a
CEO of InnoVint, a local startup managing wineries with Scala — proving  we
have the most fun local Scala conference.

Special thanks go to Cloudera, who crystallized the Big Data Scala
conference, and whose Chairman and Chief Strategy Officer, Mike Olson, will
keynote it together with Martin Odersky, Matei Zaharia, Jay Kreps, and
Debora Donato.  Cloudera is backing Scala and Spark across the two key
themes of Big Data Scala — end-to-end data pipelines in Scala and Data
Science on the JVM.  Big Data Scala includes several topics which are the
focus of the SF Text, Text By the Bay, SF Spark and Friends communities (
sftext.org, text.bythebay.io, sfspark.org, respectively).


We had a record number of submissions this year and had to make tough
choices to keep the conferences to two tracks each.  This is the biggest
and the best Scala By the Bay conferences we’ve put together so far.


Given the program is finally published, we’re pushing back late bird to
June 15th.  We have about 400 seats capacity and a significant portion was
already claimed even before the schedule was announced, mostly by folks
returning from the previous years.  All the previous By the Bay conferences
sold out, and conferences will sell out quickly this time, so reserve your
seat soon.  We're welcoming sponsors, old and new, who get a block of seats
as well -- email spons...@scalabythebay.org for prospectus.  All 18
sponsors of the 2014 edition were hiring.

We have special programs for non-profits/making the world better kinds of
projects, email organiz...@scalabythebay.org or organiz...@bigdatascala.org
if you use Scala for Good and want to attend SBTB+BDS, and get community
support for your projects and teams (even teams of one!).

See you on the shores of Lake Merritt in August!
A+  SBTB+BDS


Embedding your own transformer in Spark.ml Pipleline

2015-06-02 Thread dimple
Hi,
I would like to embed my own transformer in the Spark.ml Pipleline but do
not see an example of it. Can someone share an example of which
classes/interfaces I need to extend/implement in order to do so. Thanks.

Dimple



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: data localisation in spark

2015-06-02 Thread Sandy Ryza
It is not possible with JavaSparkContext either.  The API mentioned below
currently does not have any effect (we should document this).

The primary difference between MR and Spark here is that MR runs each task
in its own YARN container, while Spark runs multiple tasks within an
executor, which needs to be requested before Spark knows what tasks it will
run.  Although dynamic allocation improves that last part.

-Sandy

On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 Is it possible in JavaSparkContext ?

 JavaSparkContext jsc = new JavaSparkContext(conf);
 JavaRDDStringlines = jsc.textFile(args[0]);

 If yes , does its programmer's responsibilty to first calculate splits
 locations and then instantiate spark context with preferred locations?

 How does its achieved in MR2 with yarn, there is Application Master
 specifies split locations to ResourceManager before acquiring the node
 managers ?



 On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote:

 Take a look at the following SparkContext constructor variant that tries
 to honor the data locality in YARN mode.

   /**
 * :: DeveloperApi ::
 * Alternative constructor for setting preferred locations where Spark
 will create executors.
 *
 * @param preferredNodeLocationData used in YARN mode to select nodes to
 launch containers on.
 * Can be generated using
 [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
 * from a list of input files or InputFormats for the application.
 */
 @DeveloperApi
 def this(config: SparkConf, preferredNodeLocationData: Map[String,
 Set[SplitInfo]]) = {
 this(config)
 this.preferredNodeLocationData = preferredNodeLocationData
 }

 --
 bit1...@163.com


 *From:* Shushant Arora shushantaror...@gmail.com
 *Date:* 2015-05-31 22:54
 *To:* user user@spark.apache.org
 *Subject:* data localisation in spark

 I want to understand how  spark takes care of data localisation in
 cluster mode when run on YARN.

 1.Driver program asks ResourceManager for executors. Does it tell yarn's
 RM to check HDFS blocks of input data and then allocate executors to it.
 And executors remain fixed throughout application or driver program asks
 for new executors when it submits another job in same application , since
 in spark new job is created for each action . If executors are fixed then
 for second job achieving data localisation is impossible?



 2.When executors are done with their processing, does they are marked as
 free in ResourceManager's resoruce queue and  executors directly tell this
 to Rm  instead of via driver's ?

 Thanks
 Shushant





Re: updateStateByKey and kafka direct approach without shuffle

2015-06-02 Thread Krot Viacheslav
Cody,

Thanks, good point. I fixed getting partition id to:

class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner {
  override def numPartitions: Int = offsetRanges.size

  override def getPartition(key: Any): Int = {
// this is set in .map(m = (TaskContext.get().partitionId(), m.value))
key.asInstanceOf[Int]
  }
}

inputStream
.map(m = (TaskContext.get().partitionId(), m.value))
.transform { rdd =
val part = new
MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
new ProxyRDDWithPartitioner(rdd, part)
}
...

But how can I create same partitioner during updateStateByKey call?   I
have no idea how to access rdd when calling updateStateByKey.

вт, 2 июня 2015 г. в 19:15, Cody Koeninger c...@koeninger.org:

 I think the general idea is worth pursuing.

 However, this line:

  override def getPartition(key: Any): Int = {
 key.asInstanceOf[(String, Int)]._2
   }

 is using the kafka partition id, not the spark partition index, so it's
 going to give you fewer partitions / incorrect index

 Cast the rdd to HasOffsetRanges, get the offsetRanges from it.  The index
 into the offset range array matches the (spark) partition id.  That will
 also tell you what the value of numPartitions should be.







 On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav 
 krot.vyaches...@gmail.com wrote:

 Hi all,
 In my streaming job I'm using kafka streaming direct approach and want to
 maintain state with updateStateByKey. My PairRDD has message's topic name +
 partition id as a key. So, I assume that updateByState could work within
 same partition as KafkaRDD and not lead to shuffles. Actually this is not
 true, because updateStateByKey leads to cogroup transformation that thinks,
 that state rdd and kafka rdd are not co-partitioned, as kafka rdd does not
 have partitioner at all. So, dependency is considered to be wide and leads
 to shuffle.

 I tried to avoid shuffling by providing custom partitioner to
 updateStateByKey, but KafkaRDD need to use same partitioner. For this I
 created a proxy RDD that just returns my partitioner.

 class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part:
 Partitioner) extends RDD[T](prev) {

   override val partitioner = Some(part)

   override def compute(split: Partition, context: TaskContext):
 Iterator[T] = prev.compute(split, context)

   override protected def getPartitions: Array[Partition] = prev.partitions

   override def getPreferredLocations(thePart: Partition): Seq[String] =
 prev.preferredLocations(thePart)
 }

 I use it as:
 val partitioner = new Partitioner {
   // TODO this should be retrieved from kafka
   override def numPartitions: Int = 2

   override def getPartition(key: Any): Int = {
 key.asInstanceOf[(String, Int)]._2
   }
 }

 inputStream
   .map(m = ((m.topic, m.partition), m.value))
   .transform(new ProxyRDDWithPartitioner(_, partitioner))
   .updateStateByKey(func, partitioner)
   

 The question is - is it safe to do such trick?





Re: Embedding your own transformer in Spark.ml Pipleline

2015-06-02 Thread Ram Sriharsha
Hi

We are in the process of adding examples for feature transformations (
https://issues.apache.org/jira/browse/SPARK-7546) and this should be
available shortly on Spark Master.
In the meanwhile, the best place to start would be to look at how the
Tokenizer works here:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala

You need to implement the Transformer interface as above. In this case a
UnaryTransformer since the feature transformer acts on one column,
transforms it and outputs another column.

and an example of how to build a pipeline that includes a feature
transformer (the HashingTF is the feature transformer analogous to what you
would build):
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala

but stay tuned, we should have examples in Python, Scala and Java soon

Ram

On Tue, Jun 2, 2015 at 10:19 AM, dimple dimp201...@gmail.com wrote:

 Hi,
 I would like to embed my own transformer in the Spark.ml Pipleline but do
 not see an example of it. Can someone share an example of which
 classes/interfaces I need to extend/implement in order to do so. Thanks.

 Dimple



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Join highly skewed datasets

2015-06-02 Thread ๏̯͡๏
We use Scoobi + MR to perform joins and we particularly use blockJoin() API
of scoobi


/** Perform an equijoin with another distributed list where this list is
considerably smaller
* than the right (but too large to fit in memory), and where the keys of
right may be
* particularly skewed. */

 def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A, B))] =
Relational.blockJoin(left, right)


I am trying to do a POC and what Spark join API(s) is recommended to
achieve something similar ?

Please suggest.

-- 
Deepak


SparkSQL: How to specify replication factor on the persisted parquet files?

2015-06-02 Thread Haopu Wang
Hi,

I'm trying to save SparkSQL DataFrame to a persistent Hive table using
the default parquet data source.

I don't know how to change the replication factor of the generated
parquet files on HDFS.

I tried to set dfs.replication on HiveContext but that didn't work.
Any suggestions are appreciated very much!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: union and reduceByKey wrong shuffle?

2015-06-02 Thread Igor Berman
Hi,
small mock data doesn't reproduce the problem. IMHO problem is reproduced
when we make shuffle big enough to split data into disk.
We will work on it to understand and reproduce the problem(not first
priority though...)


On 1 June 2015 at 23:02, Josh Rosen rosenvi...@gmail.com wrote:

 How much work is to produce a small standalone reproduction?  Can you
 create an Avro file with some mock data, maybe 10 or so records, then
 reproduce this locally?

 On Mon, Jun 1, 2015 at 12:31 PM, Igor Berman igor.ber...@gmail.com
 wrote:

 switching to use simple pojos instead of using avro for spark
 serialization solved the problem(I mean reading avro from s3 and than
 mapping each avro object to it's pojo serializable counterpart with same
 fields, pojo is registered withing kryo)
 Any thought where to look for a problem/misconfiguration?

 On 31 May 2015 at 22:48, Igor Berman igor.ber...@gmail.com wrote:

 Hi
 We are using spark 1.3.1
 Avro-chill (tomorrow will check if its important) we register avro
 classes from java
 Avro 1.7.6
 On May 31, 2015 22:37, Josh Rosen rosenvi...@gmail.com wrote:

 Which Spark version are you using?  I'd like to understand whether this
 change could be caused by recent Kryo serializer re-use changes in master /
 Spark 1.4.

 On Sun, May 31, 2015 at 11:31 AM, igor.berman igor.ber...@gmail.com
 wrote:

 after investigation the problem is somehow connected to avro
 serialization
 with kryo + chill-avro(mapping avro object to simple scala case class
 and
 running reduce on these case class objects solves the problem)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/union-and-reduceByKey-wrong-shuffle-tp23092p23093.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org