Re: read compressed hdfs files using SparkContext.textFile?

2015-09-08 Thread shenyan zhen
Realized I was using spark-shell, so it assumes local file.
By submitting a spark job, the same code worked fine..

On Tue, Sep 8, 2015 at 3:13 PM, shenyan zhen  wrote:

> Hi,
>
> For hdfs files written with below code:
>
> rdd.saveAsTextFile(getHdfsPath(...), classOf
> [org.apache.hadoop.io.compress.GzipCodec])
>
>
> I can see the hdfs files been generated:
>
>
> 0  /lz/streaming/am/144173460/_SUCCESS
>
> 1.6 M  /lz/streaming/am/144173460/part-0.gz
>
> 1.6 M  /lz/streaming/am/144173460/part-1.gz
>
> 1.6 M  /lz/streaming/am/144173460/part-2.gz
>
> ...
>
>
> How do I read it using SparkContext?
>
>
> My naive attempt:
>
> val t1 = sc.textFile("/lz/streaming/am/144173460")
>
> t1.take(1).head
>
> did not work:
>
>
> org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
> file:/lz/streaming/am/144173460
>
> at
> org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
>
> at
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
>
> at
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
>
> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>
> at scala.Option.getOrElse(Option.scala:120)
>
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>
>
> Thanks,
>
> Shenyan
>
>
>
>
>


read compressed hdfs files using SparkContext.textFile?

2015-09-08 Thread shenyan zhen
Hi,

For hdfs files written with below code:

rdd.saveAsTextFile(getHdfsPath(...), classOf
[org.apache.hadoop.io.compress.GzipCodec])


I can see the hdfs files been generated:


0  /lz/streaming/am/144173460/_SUCCESS

1.6 M  /lz/streaming/am/144173460/part-0.gz

1.6 M  /lz/streaming/am/144173460/part-1.gz

1.6 M  /lz/streaming/am/144173460/part-2.gz

...


How do I read it using SparkContext?


My naive attempt:

val t1 = sc.textFile("/lz/streaming/am/144173460")

t1.take(1).head

did not work:


org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
file:/lz/streaming/am/144173460

at
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)

at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)

at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)

at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)


Thanks,

Shenyan


Re: SparkContext initialization error- java.io.IOException: No space left on device

2015-09-06 Thread shenyan zhen
Thank you both - yup: the /tmp disk space was filled up:)

On Sun, Sep 6, 2015 at 11:51 AM, Ted Yu  wrote:

> Use the following command if needed:
> df -i /tmp
>
> See
> https://wiki.gentoo.org/wiki/Knowledge_Base:No_space_left_on_device_while_there_is_plenty_of_space_available
>
> On Sun, Sep 6, 2015 at 6:15 AM, Shixiong Zhu  wrote:
>
>> The folder is in "/tmp" by default. Could you use "df -h" to check the
>> free space of /tmp?
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-09-05 9:50 GMT+08:00 shenyan zhen :
>>
>>> Has anyone seen this error? Not sure which dir the program was trying to
>>> write to.
>>>
>>> I am running Spark 1.4.1, submitting Spark job to Yarn, in yarn-client
>>> mode.
>>>
>>> 15/09/04 21:36:06 ERROR SparkContext: Error adding jar
>>> (java.io.IOException: No space left on device), was the --addJars option
>>> used?
>>>
>>> 15/09/04 21:36:08 ERROR SparkContext: Error initializing SparkContext.
>>>
>>> java.io.IOException: No space left on device
>>>
>>> at java.io.FileOutputStream.writeBytes(Native Method)
>>>
>>> at java.io.FileOutputStream.write(FileOutputStream.java:300)
>>>
>>> at
>>> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:178)
>>>
>>> at java.util.zip.ZipOutputStream.closeEntry(ZipOutputStream.java:213)
>>>
>>> at java.util.zip.ZipOutputStream.finish(ZipOutputStream.java:318)
>>>
>>> at
>>> java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:163)
>>>
>>> at java.util.zip.ZipOutputStream.close(ZipOutputStream.java:338)
>>>
>>> at
>>> org.apache.spark.deploy.yarn.Client.createConfArchive(Client.scala:432)
>>>
>>> at
>>> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:338)
>>>
>>> at
>>> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:561)
>>>
>>> at
>>> org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:115)
>>>
>>> at
>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
>>>
>>> at
>>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
>>>
>>> at org.apache.spark.SparkContext.(SparkContext.scala:497)
>>>
>>> Thanks,
>>> Shenyan
>>>
>>
>>
>


SparkContext initialization error- java.io.IOException: No space left on device

2015-09-04 Thread shenyan zhen
Has anyone seen this error? Not sure which dir the program was trying to
write to.

I am running Spark 1.4.1, submitting Spark job to Yarn, in yarn-client mode.

15/09/04 21:36:06 ERROR SparkContext: Error adding jar
(java.io.IOException: No space left on device), was the --addJars option
used?

15/09/04 21:36:08 ERROR SparkContext: Error initializing SparkContext.

java.io.IOException: No space left on device

at java.io.FileOutputStream.writeBytes(Native Method)

at java.io.FileOutputStream.write(FileOutputStream.java:300)

at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:178)

at java.util.zip.ZipOutputStream.closeEntry(ZipOutputStream.java:213)

at java.util.zip.ZipOutputStream.finish(ZipOutputStream.java:318)

at java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:163)

at java.util.zip.ZipOutputStream.close(ZipOutputStream.java:338)

at org.apache.spark.deploy.yarn.Client.createConfArchive(Client.scala:432)

at
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:338)

at
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:561)

at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:115)

at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)

at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)

at org.apache.spark.SparkContext.(SparkContext.scala:497)

Thanks,
Shenyan


Re: Fighting against performance: JDBC RDD badly distributed

2015-07-28 Thread shenyan zhen
Saif,

I am guessing but not sure your use case. Are you retrieving the entire
table into Spark? If yes, do you have primary key on your table?
If also yes, then JdbcRDD should be efficient. DataFrameReader.jdbc gives
you more options, again, depends on your use case.

Possible for you to describe your objective and show some code snippet?

Shenyan


On Tue, Jul 28, 2015 at 3:23 PM,  wrote:

>  Thank you for your response Zhen,
>
>
>
> I am using some vendor specific JDBC driver JAR file (honestly I dont know
> where it came from). It’s api is NOT like JdbcRDD, instead, more like jdbc
> from DataFrameReader
>
>
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader
>
>
>
> So I ask two questions now:
>
> 1.   Will running a query using JdbcRDD prove better than bringing an
> entire table as DataFrame? I am later on, converting back to RDDs.
>
> 2.   I lack of some proper criteria to decide a proper column for
> distributon. My table has more than 400 columns.
>
>
>
> Saif
>
>
>
> *From:* shenyan zhen [mailto:shenya...@gmail.com]
> *Sent:* Tuesday, July 28, 2015 4:16 PM
> *To:* Ellafi, Saif A.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Fighting against performance: JDBC RDD badly distributed
>
>
>
> Hi Saif,
>
>
>
> Are you using JdbcRDD directly from Spark?
>
> If yes, then the poor distribution could be due to the bound key you used.
>
>
>
> See the JdbcRDD Scala doc at
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.JdbcRDD
> :
>
> *sql*
>
> the text of the query. The query must contain two ? placeholders for
> parameters used to partition the results. E.g. "select title, author from
> books where ? <= id and id <= ?"
>
> *lowerBound*
>
> the minimum value of the first placeholder
>
> *upperBound*
>
> the maximum value of the second placeholder The lower and upper bounds are
> inclusive.
>
> *numPartitions *
>
> the number of partitions. Given a lowerBound of 1, an upperBound of 20,
> and a numPartitions of 2, the query would be executed twice, once with (1,
> 10) and once with (11, 20)
>
>
>
> Shenyan
>
>
>
>
>
> On Tue, Jul 28, 2015 at 2:41 PM,  wrote:
>
> Hi all,
>
>
>
> I am experimenting and learning performance on big tasks locally, with a
> 32 cores node and more than 64GB of Ram, data is loaded from a database
> through JDBC driver, and launching heavy computations against it. I am
> presented with two questions:
>
>
>
> 1.   My RDD is poorly distributed. I am partitioning into 32 pieces,
> but first 31 pieces are extremely lightweight compared to piece 32
>
>
>
> 15/07/28 13:37:48 INFO Executor: Finished task 30.0 in stage 0.0 (TID 30).
> 1419 bytes result sent to driver
>
> 15/07/28 13:37:48 INFO TaskSetManager: Starting task 31.0 in stage 0.0
> (TID 31, localhost, PROCESS_LOCAL, 1539 bytes)
>
> 15/07/28 13:37:48 INFO Executor: Running task 31.0 in stage 0.0 (TID 31)
>
> 15/07/28 13:37:48 INFO TaskSetManager: Finished task 30.0 in stage 0.0
> (TID 30) in 2798 ms on localhost (31/32)
>
> 15/07/28 13:37:48 INFO CacheManager: Partition rdd_2_31 not found,
> computing it
>
> *...All pieces take 3 seconds while last one takes around 15 minutes to
> compute...*
>
>
>
> Is there anything I can do about this? preferrably without reshufling,
> i.e. in the DataFrameReader JDBC options (lowerBound, upperBound, partition
> column)
>
>
>
> 2.   After long time of processing, sometimes I get OOMs, I fail to
> find a how-to for fallback and give retries to already persisted data to
> avoid time.
>
>
>
> Thanks,
>
> Saif
>
>
>
>
>


Re: Fighting against performance: JDBC RDD badly distributed

2015-07-28 Thread shenyan zhen
Hi Saif,

Are you using JdbcRDD directly from Spark?
If yes, then the poor distribution could be due to the bound key you used.

See the JdbcRDD Scala doc at
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.JdbcRDD
:
sql

the text of the query. The query must contain two ? placeholders for
parameters used to partition the results. E.g. "select title, author from
books where ? <= id and id <= ?"
lowerBound

the minimum value of the first placeholder
upperBound

the maximum value of the second placeholder The lower and upper bounds are
inclusive.
numPartitions

the number of partitions. Given a lowerBound of 1, an upperBound of 20, and
a numPartitions of 2, the query would be executed twice, once with (1, 10)
and once with (11, 20)

Shenyan


On Tue, Jul 28, 2015 at 2:41 PM,  wrote:

>  Hi all,
>
> I am experimenting and learning performance on big tasks locally, with a
> 32 cores node and more than 64GB of Ram, data is loaded from a database
> through JDBC driver, and launching heavy computations against it. I am
> presented with two questions:
>
>
>1. My RDD is poorly distributed. I am partitioning into 32 pieces, but
>first 31 pieces are extremely lightweight compared to piece 32
>
>
> 15/07/28 13:37:48 INFO Executor: Finished task 30.0 in stage 0.0 (TID 30).
> 1419 bytes result sent to driver
> 15/07/28 13:37:48 INFO TaskSetManager: Starting task 31.0 in stage 0.0
> (TID 31, localhost, PROCESS_LOCAL, 1539 bytes)
> 15/07/28 13:37:48 INFO Executor: Running task 31.0 in stage 0.0 (TID 31)
> 15/07/28 13:37:48 INFO TaskSetManager: Finished task 30.0 in stage 0.0
> (TID 30) in 2798 ms on localhost (31/32)
> 15/07/28 13:37:48 INFO CacheManager: Partition rdd_2_31 not found,
> computing it
> *...All pieces take 3 seconds while last one takes around 15 minutes to
> compute...*
>
> Is there anything I can do about this? preferrably without reshufling,
> i.e. in the DataFrameReader JDBC options (lowerBound, upperBound, partition
> column)
>
>
>1. After long time of processing, sometimes I get OOMs, I fail to find
>a how-to for fallback and give retries to already persisted data to avoid
>time.
>
>
> Thanks,
> Saif
>
>


Re: Meets class not found error in spark console with newly hive context

2015-07-02 Thread shenyan zhen
In case it helps: I got around it temporarily by saving and reseting the
context class loader around creating HiveContext.
On Jul 2, 2015 4:36 AM, "Terry Hole"  wrote:

> Found this a bug in spark 1.4.0: SPARK-8368
> 
>
> Thanks!
> Terry
>
> On Thu, Jul 2, 2015 at 1:20 PM, Terry Hole  wrote:
>
>> All,
>>
>> I am using spark console 1.4.0 to do some tests, when a create a newly
>> HiveContext (Line 18 in the code) in my test function, it always throw
>> exception like below (It works in spark console 1.3.0), but if i removed
>> the HiveContext (The line 18 in the code) in my function, it works fine.
>> Any idea what's wrong with this?
>>
>> java.lang.ClassNotFoundException:
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$
>> iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$streamingTest$1$$anonfun$apply$1
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at
>> java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(Clos
>> ureCleaner.scala:455)
>> at
>> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
>> Source)
>> at
>> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
>> Source)
>> at
>> org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:101)
>> at
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:197)
>> at
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$sp(DStream.scala:630)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
>> at
>> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258)
>> at
>> org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:629)
>> at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
>> $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.streamingTest(:98)
>> at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
>> $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:93)
>> at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
>> $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:98)
>>
>>
>>
>>  1 import org.apache.spark._ 2 import org.apache.spark.SparkContext._ 3 
>> import org.apache.spark.streaming.{ StreamingContext, Seconds, Minutes, Time 
>> } 4 import org.apache.spark.streaming.StreamingContext._ 5 import 
>> org.apache.spark.rdd.RDD 6 import org.apache.spark.streaming.dstream.DStream 
>> 7 import org.apache.spark.HashPartitioner 8 import 
>> org.apache.spark.storage.StorageLevel 9 import org.apache.spark.sql._10 
>> import org.apache.spark.sql.hive._11 import 
>> scala.collection.mutable.{Queue}12 import scala.concurrent.Future13 import 
>> scala.concurrent.ExecutionContext.Implicits.global14 15 def 
>> streamingTest(args: Array[String]) {16 println(">>> create 
>> streamingContext.")17 val ssc = new StreamingContext(sc, Seconds(1))18   
>>   *val sqlContext2 = new HiveContext(sc)*19 20 val accum = 
>> sc.accumulator(0, "End Accumulator")21 val queue = 
>> scala.collection.mutable.Queue(sc.textFile("G:/pipe/source"))22 val 
>> textSource = ssc.queueStream(queue, true)23 textSource.foreachRDD(rdd => 
>> { rdd.foreach( item => {accum += 1} ) })24 textSource.foreachRDD(rdd => 
>> {25 var sample = rdd.take(10)26 if 
>> (sample.length > 0) {27 sample.foreach(item => 
>> println("#= " + item))28 }29 })30 
>> println(">>> Start streaming context.")31 ssc.start()32 val stopFunc 
>> = Future {var isRun = true; var