Re: flatMap output on disk / flatMap memory overhead

2015-08-01 Thread Puneet Kapoor
Hi Ocatavian,

Just out of curiosity, did you try persisting your RDD in serialized format
"MEMORY_AND_DISK_SER"  or "MEMORY_ONLY_SER" ??
i.e. changing your :
"rdd.persist(MEMORY_AND_DISK)"
to
"rdd.persist(MEMORY_ONLY_SER)"

Regards

On Wed, Jun 10, 2015 at 7:27 AM, Imran Rashid  wrote:

> I agree with Richard.  It looks like the issue here is shuffling, and
> shuffle data is always written to disk, so the issue is definitely not that
> all the output of flatMap has to be stored in memory.
>
> If at all possible, I'd first suggest upgrading to a new version of spark
> -- even in 1.2, there were big improvements to shuffle with sort based
> shuffle as the default.
>
> On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher  > wrote:
>
>> Are you sure it's memory related? What is the disk utilization and IO
>> performance on the workers? The error you posted looks to be related to
>> shuffle trying to obtain block data from another worker node and failing to
>> do so in reasonable amount of time. It may still be memory related, but I'm
>> not sure that other resources are ruled out yet.
>>
>> On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea <
>> octavian.ga...@inf.ethz.ch> wrote:
>>
>>> I was tried using reduceByKey, without success.
>>>
>>> I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey
>>> .
>>> However, I got the same error as before, namely the error described here:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html
>>>
>>> My task is to count the frequencies of pairs of words that occur in a
>>> set of
>>> documents at least 5 times. I know that this final output is sparse and
>>> should comfortably fit in memory. However, the intermediate pairs that
>>> are
>>> spilled by flatMap might need to be stored on the disk, but I don't
>>> understand why the persist option does not work and my job fails.
>>>
>>> My code:
>>>
>>> rdd.persist(StorageLevel.MEMORY_AND_DISK)
>>>  .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type
>>> ((word1,word2) , 1)
>>> .reduceByKey((a,b) => (a + b).toShort)
>>> .filter({case((x,y),count) => count >= 5})
>>>
>>>
>>> My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node.
>>> One
>>> node I keep for the master, 7 nodes for the workers.
>>>
>>> my conf:
>>>
>>> conf.set("spark.cores.max", "128")
>>> conf.set("spark.akka.frameSize", "1024")
>>> conf.set("spark.executor.memory", "115g")
>>> conf.set("spark.shuffle.file.buffer.kb", "1000")
>>>
>>> my spark-env.sh:
>>>  ulimit -n 20
>>>  SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
>>> -XX:-UseCompressedOops"
>>>  SPARK_DRIVER_MEMORY=129G
>>>
>>> spark version: 1.1.1
>>>
>>> Thank you a lot for your help!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: SaveAsTextFile brings down data nodes with IO Exceptions

2015-05-15 Thread Puneet Kapoor
I am seeing this on hadoop 2.4.0 version.

Thanks for your suggestions, i will try those and let you know if they help
!

On Sat, May 16, 2015 at 1:57 AM, Steve Loughran 
wrote:

>  What version of Hadoop are you seeing this on?
>
>
>  On 15 May 2015, at 20:03, Puneet Kapoor 
> wrote:
>
>  Hey,
>
>  Did you find any solution for this issue, we are seeing similar logs in
> our Data node logs. Appreciate any help.
>
>
>
>
>
>  2015-05-15 10:51:43,615 ERROR
> org.apache.hadoop.hdfs.server.datanode.DataNode:
> NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation
>  src: /192.168.112.190:46253 dst: /192.168.151.104:50010
> java.net.SocketTimeoutException: 6 millis timeout while waiting for
> channel to be ready for read. ch :
> java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010
> remote=/192.168.112.190:46253]
> at
> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
> at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
> at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
> at java.io.BufferedInputStream.fill(Unknown Source)
> at java.io.BufferedInputStream.read1(Unknown Source)
> at java.io.BufferedInputStream.read(Unknown Source)
> at java.io.DataInputStream.read(Unknown Source)
> at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
> at
> org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
> at
> org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
> at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
> at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
> at java.lang.Thread.run(Unknown Source)
>
>
>  That's being logged @ error level in DN. It doesn't mean the DN has
> crashed, only that it timed out waiting for data: something has gone wrong
> elsewhere.
>
>  https://issues.apache.org/jira/browse/HDFS-693
>
>
> there's a couple of properties you can do to extend timeouts
>
>   
>
> dfs.socket.timeout
>
> 2
>
> 
>
>
> 
>
> dfs.datanode.socket.write.timeout
>
> 2
>
> 
>
>
>
> You can also increase the number of data node tranceiver threads to handle
> data IO across the network
>
>
> 
> dfs.datanode.max.xcievers
> 4096
> 
>
> Yes, that property has that explicit spellinng, it's easy to get wrong
>
>


Re: SaveAsTextFile brings down data nodes with IO Exceptions

2015-05-15 Thread Puneet Kapoor
Hey,

Did you find any solution for this issue, we are seeing similar logs in our
Data node logs. Appreciate any help.


2015-05-15 10:51:43,615 ERROR
org.apache.hadoop.hdfs.server.datanode.DataNode:
NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation
 src: /192.168.112.190:46253 dst: /192.168.151.104:50010
java.net.SocketTimeoutException: 6 millis timeout while waiting for
channel to be ready for read. ch :
java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010
remote=/192.168.112.190:46253]
at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
at java.io.BufferedInputStream.fill(Unknown Source)
at java.io.BufferedInputStream.read1(Unknown Source)
at java.io.BufferedInputStream.read(Unknown Source)
at java.io.DataInputStream.read(Unknown Source)
at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
at
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
at
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
at
org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742)
at
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
at
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
at
org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
at java.lang.Thread.run(Unknown Source)

Thanks
Puneet

On Wed, Dec 3, 2014 at 2:50 AM, Ganelin, Ilya 
wrote:

> Hi all, as the last stage of execution, I am writing out a dataset to disk. 
> Before I do this, I force the DAG to resolve so this is the only job left in 
> the pipeline. The dataset in question is not especially large (a few 
> gigabytes). During this step however, HDFS will inevitable crash. I will lose 
> connection to data-nodes and get stuck in the loop of death – failure causes 
> job restart, eventually causing the overall job to fail. On the data node 
> logs I see the errors below. Does anyone have any ideas as to what is going 
> on here? Thanks!
>
>
> java.io.IOException: Premature EOF from inputStream
>   at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
>   at 
> org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:455)
>   at 
> org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:741)
>   at 
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:718)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:72)
>   at 
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:225)
>   at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> innovationdatanode03.cof.ds.capitalone.com:1004:DataXceiver error processing 
> WRITE_BLOCK operation  src: /10.37.248.60:44676 dst: /10.37.248.59:1004
> java.net.SocketTimeoutException: 65000 millis timeout while waiting for 
> channel to be ready for read. ch : java.nio.channels.SocketChannel[connected 
> local=/10.37.248.59:43692 remote=/10.37.248.63:1004]
>   at 
> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
>   at 
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>   at 
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
>   at 
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
>   at java.io.FilterInputStream.read(FilterInputStream.java:83)
>   at java.io.FilterInputStream.read(FilterInputStream.java:83)
>   at 
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2101)
>   at 
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:660)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.Re

Save RDD with partition information

2015-01-13 Thread Puneet Kapoor
Hi,

I have a usecase where in I have hourly spark job which creates hourly
RDDs, which are partitioned by keys.

At the end of the day I need to access all of these RDDs and combine the
Key/Value pairs over the day.

If there is a key K1 in RDD0 (1st hour of day), RDD1 ... RDD23(last hour of
the day); we need to combine all the values of this K1 using some logic.

What I want to do is to avoid the shuffling at the end of the day since the
data in huge ~ hundreds of GB.

Questions
---
1.) Is there a way that i can persist hourly RDDs with partition
information and then while reading back the RDDs the partition information
is restored.
2.) Can i ensure that partitioning is similar for different hours. Like if
K1 goes to container_X, it would go to the same container in the next hour
and so on.

Regards
Puneet