Re: Why are there different parts in my CSV?

2015-02-14 Thread Akhil Das
You can directly write to hbase with Spark. Here's and example for doing
that https://issues.apache.org/jira/browse/SPARK-944

Thanks
Best Regards

On Sat, Feb 14, 2015 at 2:55 PM, Su She suhsheka...@gmail.com wrote:

 Hello Akhil, thank you for your continued help!

 1) So, if I can write it in programitically after every batch, then
 technically I should be able to have just the csv files in one directory.
 However, can the /desired/output/file.txt be in hdfs? If it is only local,
 I am not sure if it will help me for my use case I describe in 2)

 so can i do something like this hadoop fs -getmerge /output/dir/on/hdfs
 desired/dir/in/hdfs ?

 2) Just to make sure I am going on the right path...my end use case is to
 use hive or hbase to create a database off these csv files. Is there an
 easy way for hive to read /user/test/many sub directories/with one csv file
 in each into a table?

 Thank you!


 On Sat, Feb 14, 2015 at 12:39 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Simplest way would be to merge the output files at the end of your job
 like:

 hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt

 ​If you want to do it pro grammatically, then you can use the ​
 FileUtil.copyMerge API
 ​.​ like:

 FileUtil.copyMerge(FileSystem of source(hdfs), /output-location,
 FileSystem of destination(hdfs), Path to the merged files /merged-ouput,
 true(to delete the original dir),null)



 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote:

 Thanks Akhil for the suggestion, it is now only giving me one part -
 . Is there anyway I can just create a file rather than a directory? It
 doesn't seem like there is just a saveAsTextFile option for
 JavaPairRecieverDstream.

 Also, for the copy/merge api, how would I add that to my spark job?

 Thanks Akhil!

 Best,

 Su

 On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 For streaming application, for every batch it will create a new
 directory and puts the data in it. If you don't want to have multiple files
 inside the directory as part- then you can do a repartition before the
 saveAs* call.

 messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);


 Thanks
 Best Regards

 On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 I am writing simple word counts to hdfs using
 messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);

 1) However, each 2 seconds I getting a new *directory *that is titled
 as a csv. So i'll have test.csv, which will be a directory that has two
 files inside of it called part-0 and part 1 (something like that).
 This obv makes it very hard for me to read the data stored in the csv
 files. I am wondering if there is a better way to store the
 JavaPairRecieverDStream and JavaPairDStream?

 2) I know there is a copy/merge hadoop api for merging files...can
 this be done inside java? I am not sure the logic behind this api if I am
 using spark streaming which is constantly making new files.

 Thanks a lot for the help!








Configration Problem? (need help to get Spark job executed)

2015-02-14 Thread NORD SC
Hi all,

I am new to spark and seem to have hit a common newbie obstacle.

I have a pretty simple setup and job but I am unable to get past this error 
when executing a job:

TaskSchedulerImpl: Initial job has not accepted any resources; check your 
cluster UI to ensure that workers are registered and have sufficient memory”

I have so far gained a basic understanding of worker/executor/driver memory, 
but have run out of ideas what to try next - maybe someone has a clue.


My setup:

Three node standalone cluster with C* and spark on each node and the Datastax 
C*/Spark connector JAR placed on each node.

On the master I have the slaves configured in conf/slaves and I am using 
sbin/start-all.sh to start the whole cluster.

On each node I have this in conf/spark-defauls.conf

spark.masterspark://devpeng-db-cassandra-1:7077
spark.eventLog.enabled   true
spark.serializer org.apache.spark.serializer.KryoSerializer

spark.executor.extraClassPath  
/opt/spark-cassandra-connector-assembly-1.2.0-alpha1.jar

and this in conf/spart-env.sh

SPARK_WORKER_MEMORY=6g



My App looks like this

object TestApp extends App {
  val conf = new SparkConf(true).set(spark.cassandra.connection.host, 
devpeng-db-cassandra-1.)
  val sc = new SparkContext(spark://devpeng-db-cassandra-1:7077, testApp, 
conf)
  val rdd = sc.cassandraTable(test, kv)
  println(“Count: “ + String.valueOf(rdd.count) )
  println(rdd.first)
}

Any kind of idea what to check next would help me at this point, I think.

Jan

Log of the application start:

[info] Loading project definition from 
/Users/jan/projects/gkh/jump/workspace/gkh-spark-example/project
[info] Set current project to csconnect (in build 
file:/Users/jan/projects/gkh/jump/workspace/gkh-spark-example/)
[info] Compiling 1 Scala source to 
/Users/jan/projects/gkh/jump/workspace/gkh-spark-example/target/scala-2.10/classes...
[info] Running jump.TestApp 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/02/14 10:30:11 INFO SecurityManager: Changing view acls to: jan
15/02/14 10:30:11 INFO SecurityManager: Changing modify acls to: jan
15/02/14 10:30:11 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(jan); users with 
modify permissions: Set(jan)
15/02/14 10:30:11 INFO Slf4jLogger: Slf4jLogger started
15/02/14 10:30:11 INFO Remoting: Starting remoting
15/02/14 10:30:12 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkDriver@xx:58197]
15/02/14 10:30:12 INFO Utils: Successfully started service 'sparkDriver' on 
port 58197.
15/02/14 10:30:12 INFO SparkEnv: Registering MapOutputTracker
15/02/14 10:30:12 INFO SparkEnv: Registering BlockManagerMaster
15/02/14 10:30:12 INFO DiskBlockManager: Created local directory at 
/var/folders/vr/w3whx92d0356g5nj1p6s59grgn/T/spark-local-20150214103012-5b53
15/02/14 10:30:12 INFO MemoryStore: MemoryStore started with capacity 530.3 MB
2015-02-14 10:30:12.304 java[24999:3b07] Unable to load realm info from 
SCDynamicStore
15/02/14 10:30:12 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
15/02/14 10:30:12 INFO HttpFileServer: HTTP File server directory is 
/var/folders/vr/w3whx92d0356g5nj1p6s59grgn/T/spark-48459a22-c1ff-42d5-8b8e-cc89fe84933d
15/02/14 10:30:12 INFO HttpServer: Starting HTTP Server
15/02/14 10:30:12 INFO Utils: Successfully started service 'HTTP file server' 
on port 58198.
15/02/14 10:30:12 INFO Utils: Successfully started service 'SparkUI' on port 
4040.
15/02/14 10:30:12 INFO SparkUI: Started SparkUI at http://xx:4040
15/02/14 10:30:12 INFO AppClient$ClientActor: Connecting to master 
spark://devpeng-db-cassandra-1:7077...
15/02/14 10:30:13 INFO SparkDeploySchedulerBackend: Connected to Spark cluster 
with app ID app-20150214103013-0001
15/02/14 10:30:13 INFO AppClient$ClientActor: Executor added: 
app-20150214103013-0001/0 on 
worker-20150214102534-devpeng-db-cassandra-2.devpeng 
(devpeng-db-cassandra-2.devpeng.x:57563) with 8 cores
15/02/14 10:30:13 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20150214103013-0001/0 on hostPort devpeng-db-cassandra-2.devpeng.:57563 
with 8 cores, 512.0 MB RAM
15/02/14 10:30:13 INFO AppClient$ClientActor: Executor added: 
app-20150214103013-0001/1 on 
worker-20150214102534-devpeng-db-cassandra-3.devpeng.-38773 
(devpeng-db-cassandra-3.devpeng.xx:38773) with 8 cores
15/02/14 10:30:13 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20150214103013-0001/1 on hostPort 
devpeng-db-cassandra-3.devpeng.xe:38773 with 8 cores, 512.0 MB RAM
15/02/14 10:30:13 INFO AppClient$ClientActor: Executor updated: 
app-20150214103013-0001/0 is now LOADING
15/02/14 10:30:13 INFO AppClient$ClientActor: Executor updated: 
app-20150214103013-0001/1 is now LOADING
15/02/14 10:30:13 INFO AppClient$ClientActor: Executor updated: 

Re: Why are there different parts in my CSV?

2015-02-14 Thread Su She
http://stackoverflow.com/questions/23527941/how-to-write-to-csv-in-spark

Just read this...seems like it should be easily readable. Thanks!


On Sat, Feb 14, 2015 at 1:36 AM, Su She suhsheka...@gmail.com wrote:

 Thanks Akhil for the link. Is there a reason why there is a new directory
 created for each batch? Is this a format that is easily readable by other
 applications such as hive/impala?


 On Sat, Feb 14, 2015 at 1:28 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 You can directly write to hbase with Spark. Here's and example for doing
 that https://issues.apache.org/jira/browse/SPARK-944

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 2:55 PM, Su She suhsheka...@gmail.com wrote:

 Hello Akhil, thank you for your continued help!

 1) So, if I can write it in programitically after every batch, then
 technically I should be able to have just the csv files in one directory.
 However, can the /desired/output/file.txt be in hdfs? If it is only local,
 I am not sure if it will help me for my use case I describe in 2)

 so can i do something like this hadoop fs -getmerge /output/dir/on/hdfs
 desired/dir/in/hdfs ?

 2) Just to make sure I am going on the right path...my end use case is
 to use hive or hbase to create a database off these csv files. Is there an
 easy way for hive to read /user/test/many sub directories/with one csv file
 in each into a table?

 Thank you!


 On Sat, Feb 14, 2015 at 12:39 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Simplest way would be to merge the output files at the end of your job
 like:

 hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt

 ​If you want to do it pro grammatically, then you can use the ​
 FileUtil.copyMerge API
 ​.​ like:

 FileUtil.copyMerge(FileSystem of source(hdfs), /output-location,
 FileSystem of destination(hdfs), Path to the merged files /merged-ouput,
 true(to delete the original dir),null)



 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote:

 Thanks Akhil for the suggestion, it is now only giving me one part -
 . Is there anyway I can just create a file rather than a directory? It
 doesn't seem like there is just a saveAsTextFile option for
 JavaPairRecieverDstream.

 Also, for the copy/merge api, how would I add that to my spark job?

 Thanks Akhil!

 Best,

 Su

 On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 For streaming application, for every batch it will create a new
 directory and puts the data in it. If you don't want to have multiple 
 files
 inside the directory as part- then you can do a repartition before 
 the
 saveAs* call.

 messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);


 Thanks
 Best Regards

 On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com
 wrote:

 Hello Everyone,

 I am writing simple word counts to hdfs using
 messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);

 1) However, each 2 seconds I getting a new *directory *that is
 titled as a csv. So i'll have test.csv, which will be a directory that 
 has
 two files inside of it called part-0 and part 1 (something like
 that). This obv makes it very hard for me to read the data stored in the
 csv files. I am wondering if there is a better way to store the
 JavaPairRecieverDStream and JavaPairDStream?

 2) I know there is a copy/merge hadoop api for merging files...can
 this be done inside java? I am not sure the logic behind this api if I 
 am
 using spark streaming which is constantly making new files.

 Thanks a lot for the help!










Re: Why are there different parts in my CSV?

2015-02-14 Thread Akhil Das
Simplest way would be to merge the output files at the end of your job like:

hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt

​If you want to do it pro grammatically, then you can use the ​
FileUtil.copyMerge API
​.​ like:

FileUtil.copyMerge(FileSystem of source(hdfs), /output-location, FileSystem
of destination(hdfs), Path to the merged files /merged-ouput, true(to
delete the original dir),null)



Thanks
Best Regards

On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote:

 Thanks Akhil for the suggestion, it is now only giving me one part - .
 Is there anyway I can just create a file rather than a directory? It
 doesn't seem like there is just a saveAsTextFile option for
 JavaPairRecieverDstream.

 Also, for the copy/merge api, how would I add that to my spark job?

 Thanks Akhil!

 Best,

 Su

 On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 For streaming application, for every batch it will create a new directory
 and puts the data in it. If you don't want to have multiple files inside
 the directory as part- then you can do a repartition before the saveAs*
 call.

 messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);


 Thanks
 Best Regards

 On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 I am writing simple word counts to hdfs using
 messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);

 1) However, each 2 seconds I getting a new *directory *that is titled
 as a csv. So i'll have test.csv, which will be a directory that has two
 files inside of it called part-0 and part 1 (something like that).
 This obv makes it very hard for me to read the data stored in the csv
 files. I am wondering if there is a better way to store the
 JavaPairRecieverDStream and JavaPairDStream?

 2) I know there is a copy/merge hadoop api for merging files...can this
 be done inside java? I am not sure the logic behind this api if I am using
 spark streaming which is constantly making new files.

 Thanks a lot for the help!






Strategy to automatically configure spark workers env params in standalone mode

2015-02-14 Thread Mike Sam
We are planning to use varying servers spec (32 GB, 64GB, 244GB RAM or even
higher and varying cores) for an standalone deployment of spark but we do
not know the spec of the server ahead of time and we need to script up some
logic that will run on the server on boot and automatically set the
following params on the server based on what it reads from OS about cores
and memory

SPARK_WORKER_CORES
SPARK_WORKER_MEMORY
SPARK_WORKER_INSTANCES

What could such script the logic to be based on the memory size and number
of cores seen by this script? In other words, what are the recommend rule
of thumps to divide up a server (specially for larger rams) without knowing
about the spark application and data size ahead of time?

Thanks,
Mike


Re: SQLContext.applySchema strictness

2015-02-14 Thread nitin
AFAIK, this is the expected behavior. You have to make sure that the schema
matches the row. It won't give any error when you apply the schema as it
doesn't validate the nature of data.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650p21653.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: failing GraphX application ('GC overhead limit exceeded', 'Lost executor', 'Connection refused', etc.)

2015-02-14 Thread Matthew Cornell
Oops! I forgot to excerpt the errors and warnings from that file:

15/02/12 08:02:03 ERROR TaskSchedulerImpl: Lost executor 4 on 
compute-0-3.wright: remote Akka client disassociated

15/02/12 08:03:00 WARN TaskSetManager: Lost task 1.0 in stage 28.0 (TID 37, 
compute-0-1.wright): java.lang.OutOfMemoryError: GC overhead limit exceeded

15/02/12 08:05:06 WARN TaskSetManager: Lost task 0.0 in stage 31.1 (TID 48, 
compute-0-2.wright): FetchFailed(BlockManagerId(0, wright.cs.umass.edu, 60837), 
shuffleId=0, mapId=1, reduceId=1, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
wright.cs.umass.edu/128.119.241.146:60837
Caused by: java.io.IOException: Failed to connect to 
wright.cs.umass.edu/128.119.241.146:60837
Caused by: java.net.ConnectException: Connection refused: 
wright.cs.umass.edu/128.119.241.146:60837


I see Lost executor messages on all nodes, not just the 0-3 one, so it's not 
node-specific.

Any ideas about how to fix this?

Thanks again,

matt


On Feb 12, 2015, at 10:46 AM, Matthew Cornell wrote:

 Hi Folks,
 
 I'm running a five-step path following-algorithm on a movie graph with 120K 
 verticies and 400K edges. The graph has vertices for actors, directors, 
 movies, users, and user ratings, and my Scala code is walking the path 
 rating  movie  rating  user  rating. There are 75K rating nodes and 
 each has ~100 edges. My program iterates over each path item, calling 
 aggregateMessages() then joinVertices() each time, and then processing that 
 result on the next iteration. The program never finishes the second 'rating' 
 step, which makes sense as, IIUC from my back-of-the-napkin estimate, the 
 intermediate result would have ~4B active vertices.
 
 Spark is version 1.2.0 and running in standalone mode on a small cluster of 
 five hosts: four compute nodes and a head node where the computes have 4 
 cores and 32GB RAM each, and the head has 32 cores and 128GB RAM. After 
 restarting Spark just now, the Master web UI shows 15 workers (5 dead), two 
 per node, with cores and memory listed as 32 (0 Used) and 125.0 GB (0.0 B 
 Used) on the two head node workers and 4 (0 Used) and 30.5 GB (0.0 B 
 Used) for the 8 workers running on the compute nodes. (Note: I don't 
 understand why it's configured to run two workers per node.) The small Spark 
 example programs run to completion.
 
 I've listed the console output at http://pastebin.com/DPECKgQ9 (I'm running 
 in spark-shell).
 
 I hope you can provide some advice on things to try next (e.g., configuration 
 vars). My guess is the cluster is running out of memory, though I think it 
 has adequate aggregate ram to handle this app.
 
 Thanks very much -- matt
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why are there different parts in my CSV?

2015-02-14 Thread Su She
Hello Akhil, thank you for your continued help!

1) So, if I can write it in programitically after every batch, then
technically I should be able to have just the csv files in one directory.
However, can the /desired/output/file.txt be in hdfs? If it is only local,
I am not sure if it will help me for my use case I describe in 2)

so can i do something like this hadoop fs -getmerge /output/dir/on/hdfs
desired/dir/in/hdfs ?

2) Just to make sure I am going on the right path...my end use case is to
use hive or hbase to create a database off these csv files. Is there an
easy way for hive to read /user/test/many sub directories/with one csv file
in each into a table?

Thank you!


On Sat, Feb 14, 2015 at 12:39 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Simplest way would be to merge the output files at the end of your job
 like:

 hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt

 ​If you want to do it pro grammatically, then you can use the ​
 FileUtil.copyMerge API
 ​.​ like:

 FileUtil.copyMerge(FileSystem of source(hdfs), /output-location,
 FileSystem of destination(hdfs), Path to the merged files /merged-ouput,
 true(to delete the original dir),null)



 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote:

 Thanks Akhil for the suggestion, it is now only giving me one part -
 . Is there anyway I can just create a file rather than a directory? It
 doesn't seem like there is just a saveAsTextFile option for
 JavaPairRecieverDstream.

 Also, for the copy/merge api, how would I add that to my spark job?

 Thanks Akhil!

 Best,

 Su

 On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 For streaming application, for every batch it will create a new
 directory and puts the data in it. If you don't want to have multiple files
 inside the directory as part- then you can do a repartition before the
 saveAs* call.

 messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);


 Thanks
 Best Regards

 On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 I am writing simple word counts to hdfs using
 messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);

 1) However, each 2 seconds I getting a new *directory *that is titled
 as a csv. So i'll have test.csv, which will be a directory that has two
 files inside of it called part-0 and part 1 (something like that).
 This obv makes it very hard for me to read the data stored in the csv
 files. I am wondering if there is a better way to store the
 JavaPairRecieverDStream and JavaPairDStream?

 2) I know there is a copy/merge hadoop api for merging files...can this
 be done inside java? I am not sure the logic behind this api if I am using
 spark streaming which is constantly making new files.

 Thanks a lot for the help!







Spark Web UI Doesn't Open in Yarn-Client Mode

2015-02-14 Thread Puneet Kumar Ojha
Hi,

I am running 3 mode spark cluster on EMR. While running job I see 1 executor 
running? Does that mean only 1 of the node is being used? ( Seems from Spark 
Documentation on default mode (LOCAL).

When I switch to yarn-client mode the Spark Web UI doesn't open. How to view 
the job running details now ?





Re: Why are there different parts in my CSV?

2015-02-14 Thread Su She
Thanks Akhil for the link. Is there a reason why there is a new directory
created for each batch? Is this a format that is easily readable by other
applications such as hive/impala?


On Sat, Feb 14, 2015 at 1:28 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You can directly write to hbase with Spark. Here's and example for doing
 that https://issues.apache.org/jira/browse/SPARK-944

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 2:55 PM, Su She suhsheka...@gmail.com wrote:

 Hello Akhil, thank you for your continued help!

 1) So, if I can write it in programitically after every batch, then
 technically I should be able to have just the csv files in one directory.
 However, can the /desired/output/file.txt be in hdfs? If it is only local,
 I am not sure if it will help me for my use case I describe in 2)

 so can i do something like this hadoop fs -getmerge /output/dir/on/hdfs
 desired/dir/in/hdfs ?

 2) Just to make sure I am going on the right path...my end use case is to
 use hive or hbase to create a database off these csv files. Is there an
 easy way for hive to read /user/test/many sub directories/with one csv file
 in each into a table?

 Thank you!


 On Sat, Feb 14, 2015 at 12:39 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Simplest way would be to merge the output files at the end of your job
 like:

 hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt

 ​If you want to do it pro grammatically, then you can use the ​
 FileUtil.copyMerge API
 ​.​ like:

 FileUtil.copyMerge(FileSystem of source(hdfs), /output-location,
 FileSystem of destination(hdfs), Path to the merged files /merged-ouput,
 true(to delete the original dir),null)



 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote:

 Thanks Akhil for the suggestion, it is now only giving me one part -
 . Is there anyway I can just create a file rather than a directory? It
 doesn't seem like there is just a saveAsTextFile option for
 JavaPairRecieverDstream.

 Also, for the copy/merge api, how would I add that to my spark job?

 Thanks Akhil!

 Best,

 Su

 On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 For streaming application, for every batch it will create a new
 directory and puts the data in it. If you don't want to have multiple 
 files
 inside the directory as part- then you can do a repartition before the
 saveAs* call.

 messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);


 Thanks
 Best Regards

 On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com
 wrote:

 Hello Everyone,

 I am writing simple word counts to hdfs using
 messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);

 1) However, each 2 seconds I getting a new *directory *that is
 titled as a csv. So i'll have test.csv, which will be a directory that 
 has
 two files inside of it called part-0 and part 1 (something like
 that). This obv makes it very hard for me to read the data stored in the
 csv files. I am wondering if there is a better way to store the
 JavaPairRecieverDStream and JavaPairDStream?

 2) I know there is a copy/merge hadoop api for merging files...can
 this be done inside java? I am not sure the logic behind this api if I am
 using spark streaming which is constantly making new files.

 Thanks a lot for the help!









Re: Why are there different parts in my CSV?

2015-02-14 Thread Sean Owen
Keep in mind that if you repartition to 1 partition, you are only
using 1 task to write the output, and potentially only 1 task to
compute some parent RDDs. You lose parallelism.  The
files-in-a-directory output scheme is standard for Hadoop and for a
reason.

Therefore I would consider separating this concern and merging the
files afterwards if you need to.

On Sat, Feb 14, 2015 at 8:39 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 Simplest way would be to merge the output files at the end of your job like:

 hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt

 If you want to do it pro grammatically, then you can use the
 FileUtil.copyMerge API
 . like:

 FileUtil.copyMerge(FileSystem of source(hdfs), /output-location, FileSystem
 of destination(hdfs), Path to the merged files /merged-ouput, true(to delete
 the original dir),null)



 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote:

 Thanks Akhil for the suggestion, it is now only giving me one part - .
 Is there anyway I can just create a file rather than a directory? It doesn't
 seem like there is just a saveAsTextFile option for JavaPairRecieverDstream.

 Also, for the copy/merge api, how would I add that to my spark job?

 Thanks Akhil!

 Best,

 Su

 On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 For streaming application, for every batch it will create a new directory
 and puts the data in it. If you don't want to have multiple files inside the
 directory as part- then you can do a repartition before the saveAs*
 call.


 messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);


 Thanks
 Best Regards

 On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 I am writing simple word counts to hdfs using
 messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);

 1) However, each 2 seconds I getting a new directory that is titled as a
 csv. So i'll have test.csv, which will be a directory that has two files
 inside of it called part-0 and part 1 (something like that). This
 obv makes it very hard for me to read the data stored in the csv files. I 
 am
 wondering if there is a better way to store the JavaPairRecieverDStream and
 JavaPairDStream?

 2) I know there is a copy/merge hadoop api for merging files...can this
 be done inside java? I am not sure the logic behind this api if I am using
 spark streaming which is constantly making new files.

 Thanks a lot for the help!





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkStreaming Low Performance

2015-02-14 Thread Akhil Das
I'm getting a low performance while parsing json data. My cluster setup is
1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores.

I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
parser.

This is what i basically do:

*//Approach 1:*
val jsonStream = myDStream.map(x= {
  val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)
  mapper.readValue[Map[String,Any]](x)
})

jsonStream.count().print()


*//Approach 2:*
val jsonStream2 =
myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
Any]])

jsonStream2.count().print()



It takes around 15-20 Seconds to process/parse 35k json documents (contains
nested documents and arrays) which i put in the stream.

Is there any better approach/parser to process it faster? i also tried it
with mapPartitions but it did not make any difference.




Thanks
Best Regards


Re: Is Ubuntu server or desktop better for spark cluster

2015-02-14 Thread Sean Owen
I don't think this is very specific to Spark. Spark is not a desktop
user application. While it is perfectly possible to run it on such a
distro, I would think a server distro is more appropriate.

On Sat, Feb 14, 2015 at 3:05 PM, Joanne Contact joannenetw...@gmail.com wrote:
 Hi gurus,

 I am trying to install a real linux machine(not VM) where i will install 
 spark also Hadoop. I plan on learning the clusters.

 I found Ubuntu has desktop and server versions. Do it matter?

 Thanks!!

 J
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkStreaming Low Performance

2015-02-14 Thread Enno Shioji
I see. I'd really benchmark how the parsing performs outside Spark (in a
tight loop or something). If *that* is slow, you know it's the parsing. If
not, it's not the parsing.

Another thing you want to look at is CPU usage. If the actual parsing
really is the bottleneck, you should see very high CPU utilization. If not,
it's not the parsing per se but rather the ability to feed the messages to
the parsing library.


ᐧ

On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Ah my bad, it works without serializable exception. But not much
 performance difference is there though.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the suggestion, but doing that gives me this exception:

 http://pastebin.com/ni80NqKn

 Over this piece of code:

object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper() with
 ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
 }

 val jsonStream = myDStream.map(x= {
Holder.mapper.readValue[Map[String,Any]](x)
 })

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote:

 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to
 have a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination and
 you can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the 
 rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the ObjectMapper
 every time. This is quite expensive and jackson recommends you to share 
 the
 instance. However, if you tried other parsers / mapPartitions without
 success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 I'm getting a low performance while parsing json data. My cluster
 setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory
 and 4 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
 parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also
 tried it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards









Re: Spark Web UI Doesn't Open in Yarn-Client Mode

2015-02-14 Thread Silvio Fiorito
on yarn you need to first go to the resource manager UI, find your job, and 
click the link for the UI there.

From: Puneet Kumar Ojhamailto:puneet.ku...@pubmatic.com
Sent: ?Saturday?, ?February? ?14?, ?2015 ?5?:?25? ?AM
To: user@spark.apache.orgmailto:user@spark.apache.org

Hi,

I am running 3 mode spark cluster on EMR. While running job I see 1 executor 
running? Does that mean only 1 of the node is being used? ( Seems from Spark 
Documentation on default mode (LOCAL).

When I switch to yarn-client mode the Spark Web UI doesn't open. How to view 
the job running details now ?





Re: SparkException: Task not serializable - Jackson Json

2015-02-14 Thread mickdelaney
to get past this you can move the mapper creation code down into the closure.
its then created on the worker node so it doesnt need to be serialized. 


// Parse it into a specific case class. We use flatMap to handle errors 
// by returning an empty list (None) if we encounter an issue and a 
// list with one element if everything is ok (Some(_)). 
val result = input.flatMap(record = { 
  try { 
 val mapper = new ObjectMapper with ScalaObjectMapper 
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) 
mapper.registerModule(DefaultScalaModule) 
Some(mapper.readValue(record, classOf[Company])) 
  } catch { 
case e: Exception = None 
  } 
}) 

result.map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile) 
} 
}


BUT for more efficiency look into creating the mapper in a *mapPartitions* 
iterator, which means it'll be created on the worker node but only per
partition and not for every row like above.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkException-Task-not-serializable-Jackson-Json-tp21347p21655.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkStreaming Low Performance

2015-02-14 Thread Akhil Das
Ah my bad, it works without serializable exception. But not much
performance difference is there though.

Thanks
Best Regards

On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Thanks for the suggestion, but doing that gives me this exception:

 http://pastebin.com/ni80NqKn

 Over this piece of code:

object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper() with
 ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
 }

 val jsonStream = myDStream.map(x= {
Holder.mapper.readValue[Map[String,Any]](x)
 })

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote:

 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to have
 a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination and
 you can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the ObjectMapper
 every time. This is quite expensive and jackson recommends you to share the
 instance. However, if you tried other parsers / mapPartitions without
 success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 I'm getting a low performance while parsing json data. My cluster
 setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory
 and 4 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
 parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also tried
 it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards








Re: SparkStreaming Low Performance

2015-02-14 Thread Enno Shioji
Huh, that would come to 6.5ms per one JSON. That does feel like a lot but
if your JSON file is big enough, I guess you could get that sort of
processing time.

Jackson is more or less the most efficient JSON parser out there, so unless
the Scala API is somehow affecting it, I don't see any better way. If you
only need to read parts of the JSON, you could look into exploiting
Jackson's stream parsing API http://wiki.fasterxml.com/JacksonStreamingApi
.

I guess the good news is you can throw machines at it. You could also look
into other serialization frameworks.



ᐧ

On Sat, Feb 14, 2015 at 2:49 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Thanks again!
 Its with the parser only, just tried the parser
 https://gist.github.com/akhld/3948a5d91d218eaf809d without Spark. And
 it took me 52 Sec to process 8k json records. Not sure if there's an
 efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD
 and all it will be much faster, but i need that in SparkStreaming.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji eshi...@gmail.com wrote:

 I see. I'd really benchmark how the parsing performs outside Spark (in a
 tight loop or something). If *that* is slow, you know it's the parsing. If
 not, it's not the parsing.

 Another thing you want to look at is CPU usage. If the actual parsing
 really is the bottleneck, you should see very high CPU utilization. If not,
 it's not the parsing per se but rather the ability to feed the messages to
 the parsing library.


 ᐧ

 On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Ah my bad, it works without serializable exception. But not much
 performance difference is there though.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the suggestion, but doing that gives me this exception:

 http://pastebin.com/ni80NqKn

 Over this piece of code:

object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper() with
 ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
 }

 val jsonStream = myDStream.map(x= {
Holder.mapper.readValue[Map[String,Any]](x)
 })

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote:

 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to
 have a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination and
 you can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have 
 tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com
 wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the 
 rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the
 ObjectMapper every time. This is quite expensive and jackson recommends 
 you
 to share the instance. However, if you tried other parsers / 
 mapPartitions
 without success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 I'm getting a low performance while parsing json data. My cluster
 setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of 
 memory
 and 4 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's
 Jackson parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also
 tried it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards











Is Ubuntu server or desktop better for spark cluster

2015-02-14 Thread Joanne Contact
Hi gurus,

I am trying to install a real linux machine(not VM) where i will install spark 
also Hadoop. I plan on learning the clusters. 

I found Ubuntu has desktop and server versions. Do it matter? 

Thanks!!

J
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkStreaming Low Performance

2015-02-14 Thread Enno Shioji
(adding back user)

Fair enough. Regarding serialization exception, the hack I use is to have a
object with a transient lazy field, like so:


object Holder extends Serializable {
  @transient lazy val mapper = new ObjectMapper()
}

This way, the ObjectMapper will be instantiated at the destination and you
can share the instance.



ᐧ

On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the ObjectMapper
 every time. This is quite expensive and jackson recommends you to share the
 instance. However, if you tried other parsers / mapPartitions without
 success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 I'm getting a low performance while parsing json data. My cluster setup
 is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4
 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
 parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also tried
 it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards






Re: SparkStreaming Low Performance

2015-02-14 Thread Akhil Das
Thanks for the suggestion, but doing that gives me this exception:

http://pastebin.com/ni80NqKn

Over this piece of code:

   object Holder extends Serializable {
  @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)
}

val jsonStream = myDStream.map(x= {
   Holder.mapper.readValue[Map[String,Any]](x)
})

Thanks
Best Regards

On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote:

 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to have
 a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination and you
 can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the ObjectMapper
 every time. This is quite expensive and jackson recommends you to share the
 instance. However, if you tried other parsers / mapPartitions without
 success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 I'm getting a low performance while parsing json data. My cluster setup
 is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4
 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
 parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also tried
 it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards







Re: SparkStreaming Low Performance

2015-02-14 Thread Akhil Das
Thanks again!
Its with the parser only, just tried the parser
https://gist.github.com/akhld/3948a5d91d218eaf809d without Spark. And it
took me 52 Sec to process 8k json records. Not sure if there's an efficient
way to do this in Spark, i know if i use sparkSQL with schemaRDD and all it
will be much faster, but i need that in SparkStreaming.

Thanks
Best Regards

On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji eshi...@gmail.com wrote:

 I see. I'd really benchmark how the parsing performs outside Spark (in a
 tight loop or something). If *that* is slow, you know it's the parsing. If
 not, it's not the parsing.

 Another thing you want to look at is CPU usage. If the actual parsing
 really is the bottleneck, you should see very high CPU utilization. If not,
 it's not the parsing per se but rather the ability to feed the messages to
 the parsing library.


 ᐧ

 On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Ah my bad, it works without serializable exception. But not much
 performance difference is there though.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the suggestion, but doing that gives me this exception:

 http://pastebin.com/ni80NqKn

 Over this piece of code:

object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper() with
 ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
 }

 val jsonStream = myDStream.map(x= {
Holder.mapper.readValue[Map[String,Any]](x)
 })

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote:

 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to
 have a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination and
 you can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com
 wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the 
 rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the ObjectMapper
 every time. This is quite expensive and jackson recommends you to share 
 the
 instance. However, if you tried other parsers / mapPartitions without
 success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 I'm getting a low performance while parsing json data. My cluster
 setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory
 and 4 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's
 Jackson parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also
 tried it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards










Re: Is Ubuntu server or desktop better for spark cluster

2015-02-14 Thread Deepak Vohra
For a beginner Ubuntu Desktop is recommended as it includes a GUI and is easier 
to install. Also referServerFaq - Community Help Wiki

|   |
|   |   |   |   |   |
| ServerFaq - Community Help WikiFrequently Asked Questions about the Ubuntu 
Server Edition This Frequently Asked Questions document is intended to help 
system administrators and users of the Ubuntu Server edition.  |
|  |
| View on help.ubuntu.com | Preview by Yahoo |
|  |
|   |

    From: Joanne Contact joannenetw...@gmail.com
 To: user@spark.apache.org user@spark.apache.org 
 Sent: Saturday, February 14, 2015 7:05 AM
 Subject: Is Ubuntu server or desktop better for spark cluster
   
Hi gurus,

I am trying to install a real linux machine(not VM) where i will install spark 
also Hadoop. I plan on learning the clusters. 

I found Ubuntu has desktop and server versions. Do it matter? 

Thanks!!

J
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


  

Re: SparkSQL and star schema

2015-02-14 Thread Michael Armbrust
Yes.  Though for good performance it is usually important to make sure that
you have statistics for the smaller dimension tables.  Today that can be
done by creating them in the hive metastore and running  ANALYZE TABLE
table COMPUTE STATISTICS noscan.

In Spark 1.3 this will happen automatically when you create a table using
the datasources API.

CREATE TABLE myTable
USING parquet
OPTIONS (path /...)


On Fri, Feb 13, 2015 at 2:06 AM, Paolo Platter paolo.plat...@agilelab.it
wrote:

  Hi,

  is SparkSQL + Parquet suitable to replicate a star schema ?

  Paolo Platter
 AgileLab CTO




Re: Shuffle write increases in spark 1.2

2015-02-14 Thread Peng Cheng
I double check the 1.2 feature list and found out that the new sort-based
shuffle manager has nothing to do with HashPartitioner :- Sorry for the
misinformation.

In another hand. This may explain increase in shuffle spill as a side effect
of the new shuffle manager, let me revert spark.shuffle.manager to hash and
see if it make things better (or worse, as the benchmark in
https://issues.apache.org/jira/browse/SPARK-3280 indicates)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21657.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle write increases in spark 1.2

2015-02-14 Thread Peng Cheng
Same problem here, shuffle write increased from 10G to over 64G, since I'm
running on amazon EC2 this always cause temporary folder to consume all the
disk space. Still looking for a solution.

BTW, the 64G shuffle write is encountered on shuffling a pairRDD with
HashPartitioner, so its not related to Spark 1.2.0's new features

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21656.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQLContext.applySchema strictness

2015-02-14 Thread Michael Armbrust
Doing runtime type checking is very expensive, so we only do it when
necessary (i.e. you perform an operation like adding two columns together)

On Sat, Feb 14, 2015 at 2:19 AM, nitin nitin2go...@gmail.com wrote:

 AFAIK, this is the expected behavior. You have to make sure that the schema
 matches the row. It won't give any error when you apply the schema as it
 doesn't validate the nature of data.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650p21653.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Build spark failed with maven

2015-02-14 Thread Olivier Girardot
Hi,
this was not reproduced for me, what kind of jdk are you using for the zinc
server ?

Regards,

Olivier.

2015-02-11 5:08 GMT+01:00 Yi Tian tianyi.asiai...@gmail.com:

  Hi, all

 I got an ERROR when I build spark master branch with maven (commit:
 2d1e916730492f5d61b97da6c483d3223ca44315)

 [INFO]
 [INFO] 
 
 [INFO] Building Spark Project Catalyst 1.3.0-SNAPSHOT
 [INFO] 
 
 [INFO]
 [INFO] --- maven-enforcer-plugin:1.3.1:enforce (enforce-versions) @ 
 spark-catalyst_2.10 ---
 [INFO]
 [INFO] --- build-helper-maven-plugin:1.8:add-source (add-scala-sources) @ 
 spark-catalyst_2.10 ---
 [INFO] Source directory: 
 /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/scala added.
 [INFO]
 [INFO] --- maven-remote-resources-plugin:1.5:process (default) @ 
 spark-catalyst_2.10 ---
 [INFO]
 [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ 
 spark-catalyst_2.10 ---
 [INFO] Using 'UTF-8' encoding to copy filtered resources.
 [INFO] skip non existing resourceDirectory 
 /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/resources
 [INFO] Copying 3 resources
 [INFO]
 [INFO] --- scala-maven-plugin:3.2.0:compile (scala-compile-first) @ 
 spark-catalyst_2.10 ---
 [INFO] Using zinc server for incremental compilation
 [INFO] compiler plugin: 
 BasicArtifact(org.scalamacros,paradise_2.10.4,2.0.1,null)
 [info] Compiling 69 Scala sources and 3 Java sources to 
 /Users/tianyi/github/community/apache-spark/sql/catalyst/target/scala-2.10/classes...[error]
  
 /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala:314:
  polymorphic expression cannot be instantiated to expected type;
 [error]  found   : [T(in method 
 apply)]org.apache.spark.sql.catalyst.dsl.ScalaUdfBuilder[T(in method apply)]
 [error]  required: 
 org.apache.spark.sql.catalyst.dsl.package.ScalaUdfBuilder[T(in method 
 functionToUdfBuilder)]
 [error]   implicit def functionToUdfBuilder[T: TypeTag](func: Function1[_, 
 T]): ScalaUdfBuilder[T] = ScalaUdfBuilder(func)

 Any suggestion?
 ​



Re: SQLContext.applySchema strictness

2015-02-14 Thread Nicholas Chammas
Would it make sense to add an optional validate parameter to applySchema()
which defaults to False, both to give users the option to check the schema
immediately and to make the default behavior clearer?
​

On Sat Feb 14 2015 at 9:18:59 AM Michael Armbrust mich...@databricks.com
wrote:

 Doing runtime type checking is very expensive, so we only do it when
 necessary (i.e. you perform an operation like adding two columns together)

 On Sat, Feb 14, 2015 at 2:19 AM, nitin nitin2go...@gmail.com wrote:

 AFAIK, this is the expected behavior. You have to make sure that the
 schema
 matches the row. It won't give any error when you apply the schema as it
 doesn't validate the nature of data.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650p21653.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Why are there different parts in my CSV?

2015-02-14 Thread Su She
Thanks Sean and Akhil! I will take out the repartition(1).  Please let me
know if I understood this correctly, Spark Streamingwrites data like this:

foo-1001.csv/part -x, part-x
foo-1002.csv/part -x, part-x

When I see this on Hue, the csv's appear to me as *directories*, but if I
understand correctly, they will appear as csv *files* to other hadoop
ecosystem tools? And, if I understand Tathagata's answer correctly, other
hadoop based ecosystems, such as Hive, will be able to create a table based
of the multiple foo-10x.csv directories?

Thank you, I really appreciate the help!

On Sat, Feb 14, 2015 at 3:20 AM, Sean Owen so...@cloudera.com wrote:

 Keep in mind that if you repartition to 1 partition, you are only
 using 1 task to write the output, and potentially only 1 task to
 compute some parent RDDs. You lose parallelism.  The
 files-in-a-directory output scheme is standard for Hadoop and for a
 reason.

 Therefore I would consider separating this concern and merging the
 files afterwards if you need to.

 On Sat, Feb 14, 2015 at 8:39 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:
  Simplest way would be to merge the output files at the end of your job
 like:
 
  hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt
 
  If you want to do it pro grammatically, then you can use the
  FileUtil.copyMerge API
  . like:
 
  FileUtil.copyMerge(FileSystem of source(hdfs), /output-location,
 FileSystem
  of destination(hdfs), Path to the merged files /merged-ouput, true(to
 delete
  the original dir),null)
 
 
 
  Thanks
  Best Regards
 
  On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote:
 
  Thanks Akhil for the suggestion, it is now only giving me one part -
 .
  Is there anyway I can just create a file rather than a directory? It
 doesn't
  seem like there is just a saveAsTextFile option for
 JavaPairRecieverDstream.
 
  Also, for the copy/merge api, how would I add that to my spark job?
 
  Thanks Akhil!
 
  Best,
 
  Su
 
  On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com
 
  wrote:
 
  For streaming application, for every batch it will create a new
 directory
  and puts the data in it. If you don't want to have multiple files
 inside the
  directory as part- then you can do a repartition before the saveAs*
  call.
 
 
 
 messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
  String.class, (Class) TextOutputFormat.class);
 
 
  Thanks
  Best Regards
 
  On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com
 wrote:
 
  Hello Everyone,
 
  I am writing simple word counts to hdfs using
  messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
  String.class, (Class) TextOutputFormat.class);
 
  1) However, each 2 seconds I getting a new directory that is titled
 as a
  csv. So i'll have test.csv, which will be a directory that has two
 files
  inside of it called part-0 and part 1 (something like that).
 This
  obv makes it very hard for me to read the data stored in the csv
 files. I am
  wondering if there is a better way to store the
 JavaPairRecieverDStream and
  JavaPairDStream?
 
  2) I know there is a copy/merge hadoop api for merging files...can
 this
  be done inside java? I am not sure the logic behind this api if I am
 using
  spark streaming which is constantly making new files.
 
  Thanks a lot for the help!
 
 
 
 



Re: Why are there different parts in my CSV?

2015-02-14 Thread Su She
Okay, got it, thanks for the help Sean!


On Sat, Feb 14, 2015 at 1:08 PM, Sean Owen so...@cloudera.com wrote:

 No, they appear as directories + files to everything. Lots of tools
 are used to taking an input that is a directory of part files though.
 You can certainly point MR, Hive, etc at a directory of these files.

 On Sat, Feb 14, 2015 at 9:05 PM, Su She suhsheka...@gmail.com wrote:
  Thanks Sean and Akhil! I will take out the repartition(1).  Please let me
  know if I understood this correctly, Spark Streamingwrites data like
 this:
 
  foo-1001.csv/part -x, part-x
  foo-1002.csv/part -x, part-x
 
  When I see this on Hue, the csv's appear to me as directories, but if I
  understand correctly, they will appear as csv files to other hadoop
  ecosystem tools? And, if I understand Tathagata's answer correctly, other
  hadoop based ecosystems, such as Hive, will be able to create a table
 based
  of the multiple foo-10x.csv directories?
 
  Thank you, I really appreciate the help!
 
  On Sat, Feb 14, 2015 at 3:20 AM, Sean Owen so...@cloudera.com wrote:
 
  Keep in mind that if you repartition to 1 partition, you are only
  using 1 task to write the output, and potentially only 1 task to
  compute some parent RDDs. You lose parallelism.  The
  files-in-a-directory output scheme is standard for Hadoop and for a
  reason.
 
  Therefore I would consider separating this concern and merging the
  files afterwards if you need to.
 
  On Sat, Feb 14, 2015 at 8:39 AM, Akhil Das ak...@sigmoidanalytics.com
  wrote:
   Simplest way would be to merge the output files at the end of your job
   like:
  
   hadoop fs -getmerge /output/dir/on/hdfs/
 /desired/local/output/file.txt
  
   If you want to do it pro grammatically, then you can use the
   FileUtil.copyMerge API
   . like:
  
   FileUtil.copyMerge(FileSystem of source(hdfs), /output-location,
   FileSystem
   of destination(hdfs), Path to the merged files /merged-ouput, true(to
   delete
   the original dir),null)
  
  
  
   Thanks
   Best Regards
  
   On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com
 wrote:
  
   Thanks Akhil for the suggestion, it is now only giving me one part -
   .
   Is there anyway I can just create a file rather than a directory? It
   doesn't
   seem like there is just a saveAsTextFile option for
   JavaPairRecieverDstream.
  
   Also, for the copy/merge api, how would I add that to my spark job?
  
   Thanks Akhil!
  
   Best,
  
   Su
  
   On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das
   ak...@sigmoidanalytics.com
   wrote:
  
   For streaming application, for every batch it will create a new
   directory
   and puts the data in it. If you don't want to have multiple files
   inside the
   directory as part- then you can do a repartition before the
   saveAs*
   call.
  
  
  
  
 messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
   String.class, (Class) TextOutputFormat.class);
  
  
   Thanks
   Best Regards
  
   On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com
   wrote:
  
   Hello Everyone,
  
   I am writing simple word counts to hdfs using
  
  
 messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
   String.class, (Class) TextOutputFormat.class);
  
   1) However, each 2 seconds I getting a new directory that is titled
   as a
   csv. So i'll have test.csv, which will be a directory that has two
   files
   inside of it called part-0 and part 1 (something like
 that).
   This
   obv makes it very hard for me to read the data stored in the csv
   files. I am
   wondering if there is a better way to store the
   JavaPairRecieverDStream and
   JavaPairDStream?
  
   2) I know there is a copy/merge hadoop api for merging files...can
   this
   be done inside java? I am not sure the logic behind this api if I
 am
   using
   spark streaming which is constantly making new files.
  
   Thanks a lot for the help!
  
  
  
  
 
 



Re: Why are there different parts in my CSV?

2015-02-14 Thread Sean Owen
No, they appear as directories + files to everything. Lots of tools
are used to taking an input that is a directory of part files though.
You can certainly point MR, Hive, etc at a directory of these files.

On Sat, Feb 14, 2015 at 9:05 PM, Su She suhsheka...@gmail.com wrote:
 Thanks Sean and Akhil! I will take out the repartition(1).  Please let me
 know if I understood this correctly, Spark Streamingwrites data like this:

 foo-1001.csv/part -x, part-x
 foo-1002.csv/part -x, part-x

 When I see this on Hue, the csv's appear to me as directories, but if I
 understand correctly, they will appear as csv files to other hadoop
 ecosystem tools? And, if I understand Tathagata's answer correctly, other
 hadoop based ecosystems, such as Hive, will be able to create a table based
 of the multiple foo-10x.csv directories?

 Thank you, I really appreciate the help!

 On Sat, Feb 14, 2015 at 3:20 AM, Sean Owen so...@cloudera.com wrote:

 Keep in mind that if you repartition to 1 partition, you are only
 using 1 task to write the output, and potentially only 1 task to
 compute some parent RDDs. You lose parallelism.  The
 files-in-a-directory output scheme is standard for Hadoop and for a
 reason.

 Therefore I would consider separating this concern and merging the
 files afterwards if you need to.

 On Sat, Feb 14, 2015 at 8:39 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:
  Simplest way would be to merge the output files at the end of your job
  like:
 
  hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt
 
  If you want to do it pro grammatically, then you can use the
  FileUtil.copyMerge API
  . like:
 
  FileUtil.copyMerge(FileSystem of source(hdfs), /output-location,
  FileSystem
  of destination(hdfs), Path to the merged files /merged-ouput, true(to
  delete
  the original dir),null)
 
 
 
  Thanks
  Best Regards
 
  On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote:
 
  Thanks Akhil for the suggestion, it is now only giving me one part -
  .
  Is there anyway I can just create a file rather than a directory? It
  doesn't
  seem like there is just a saveAsTextFile option for
  JavaPairRecieverDstream.
 
  Also, for the copy/merge api, how would I add that to my spark job?
 
  Thanks Akhil!
 
  Best,
 
  Su
 
  On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das
  ak...@sigmoidanalytics.com
  wrote:
 
  For streaming application, for every batch it will create a new
  directory
  and puts the data in it. If you don't want to have multiple files
  inside the
  directory as part- then you can do a repartition before the
  saveAs*
  call.
 
 
 
  messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
  String.class, (Class) TextOutputFormat.class);
 
 
  Thanks
  Best Regards
 
  On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com
  wrote:
 
  Hello Everyone,
 
  I am writing simple word counts to hdfs using
 
  messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
  String.class, (Class) TextOutputFormat.class);
 
  1) However, each 2 seconds I getting a new directory that is titled
  as a
  csv. So i'll have test.csv, which will be a directory that has two
  files
  inside of it called part-0 and part 1 (something like that).
  This
  obv makes it very hard for me to read the data stored in the csv
  files. I am
  wondering if there is a better way to store the
  JavaPairRecieverDStream and
  JavaPairDStream?
 
  2) I know there is a copy/merge hadoop api for merging files...can
  this
  be done inside java? I am not sure the logic behind this api if I am
  using
  spark streaming which is constantly making new files.
 
  Thanks a lot for the help!
 
 
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org