Re: flatMap output on disk / flatMap memory overhead
Hi Ocatavian, Just out of curiosity, did you try persisting your RDD in serialized format "MEMORY_AND_DISK_SER" or "MEMORY_ONLY_SER" ?? i.e. changing your : "rdd.persist(MEMORY_AND_DISK)" to "rdd.persist(MEMORY_ONLY_SER)" Regards On Wed, Jun 10, 2015 at 7:27 AM, Imran Rashid wrote: > I agree with Richard. It looks like the issue here is shuffling, and > shuffle data is always written to disk, so the issue is definitely not that > all the output of flatMap has to be stored in memory. > > If at all possible, I'd first suggest upgrading to a new version of spark > -- even in 1.2, there were big improvements to shuffle with sort based > shuffle as the default. > > On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher > wrote: > >> Are you sure it's memory related? What is the disk utilization and IO >> performance on the workers? The error you posted looks to be related to >> shuffle trying to obtain block data from another worker node and failing to >> do so in reasonable amount of time. It may still be memory related, but I'm >> not sure that other resources are ruled out yet. >> >> On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea < >> octavian.ga...@inf.ethz.ch> wrote: >> >>> I was tried using reduceByKey, without success. >>> >>> I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey >>> . >>> However, I got the same error as before, namely the error described here: >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html >>> >>> My task is to count the frequencies of pairs of words that occur in a >>> set of >>> documents at least 5 times. I know that this final output is sparse and >>> should comfortably fit in memory. However, the intermediate pairs that >>> are >>> spilled by flatMap might need to be stored on the disk, but I don't >>> understand why the persist option does not work and my job fails. >>> >>> My code: >>> >>> rdd.persist(StorageLevel.MEMORY_AND_DISK) >>> .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type >>> ((word1,word2) , 1) >>> .reduceByKey((a,b) => (a + b).toShort) >>> .filter({case((x,y),count) => count >= 5}) >>> >>> >>> My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. >>> One >>> node I keep for the master, 7 nodes for the workers. >>> >>> my conf: >>> >>> conf.set("spark.cores.max", "128") >>> conf.set("spark.akka.frameSize", "1024") >>> conf.set("spark.executor.memory", "115g") >>> conf.set("spark.shuffle.file.buffer.kb", "1000") >>> >>> my spark-env.sh: >>> ulimit -n 20 >>> SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit >>> -XX:-UseCompressedOops" >>> SPARK_DRIVER_MEMORY=129G >>> >>> spark version: 1.1.1 >>> >>> Thank you a lot for your help! >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >
Re: SaveAsTextFile brings down data nodes with IO Exceptions
I am seeing this on hadoop 2.4.0 version. Thanks for your suggestions, i will try those and let you know if they help ! On Sat, May 16, 2015 at 1:57 AM, Steve Loughran wrote: > What version of Hadoop are you seeing this on? > > > On 15 May 2015, at 20:03, Puneet Kapoor > wrote: > > Hey, > > Did you find any solution for this issue, we are seeing similar logs in > our Data node logs. Appreciate any help. > > > > > > 2015-05-15 10:51:43,615 ERROR > org.apache.hadoop.hdfs.server.datanode.DataNode: > NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation > src: /192.168.112.190:46253 dst: /192.168.151.104:50010 > java.net.SocketTimeoutException: 6 millis timeout while waiting for > channel to be ready for read. ch : > java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010 > remote=/192.168.112.190:46253] > at > org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) > at java.io.BufferedInputStream.fill(Unknown Source) > at java.io.BufferedInputStream.read1(Unknown Source) > at java.io.BufferedInputStream.read(Unknown Source) > at java.io.DataInputStream.read(Unknown Source) > at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192) > at > org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) > at > org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) > at > org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) > at > org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446) > at > org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702) > at > org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742) > at > org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124) > at > org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71) > at > org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232) > at java.lang.Thread.run(Unknown Source) > > > That's being logged @ error level in DN. It doesn't mean the DN has > crashed, only that it timed out waiting for data: something has gone wrong > elsewhere. > > https://issues.apache.org/jira/browse/HDFS-693 > > > there's a couple of properties you can do to extend timeouts > > > > dfs.socket.timeout > > 2 > > > > > > > dfs.datanode.socket.write.timeout > > 2 > > > > > > You can also increase the number of data node tranceiver threads to handle > data IO across the network > > > > dfs.datanode.max.xcievers > 4096 > > > Yes, that property has that explicit spellinng, it's easy to get wrong > >
Re: SaveAsTextFile brings down data nodes with IO Exceptions
Hey, Did you find any solution for this issue, we are seeing similar logs in our Data node logs. Appreciate any help. 2015-05-15 10:51:43,615 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation src: /192.168.112.190:46253 dst: /192.168.151.104:50010 java.net.SocketTimeoutException: 6 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010 remote=/192.168.112.190:46253] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at java.io.BufferedInputStream.fill(Unknown Source) at java.io.BufferedInputStream.read1(Unknown Source) at java.io.BufferedInputStream.read(Unknown Source) at java.io.DataInputStream.read(Unknown Source) at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232) at java.lang.Thread.run(Unknown Source) Thanks Puneet On Wed, Dec 3, 2014 at 2:50 AM, Ganelin, Ilya wrote: > Hi all, as the last stage of execution, I am writing out a dataset to disk. > Before I do this, I force the DAG to resolve so this is the only job left in > the pipeline. The dataset in question is not especially large (a few > gigabytes). During this step however, HDFS will inevitable crash. I will lose > connection to data-nodes and get stuck in the loop of death – failure causes > job restart, eventually causing the overall job to fail. On the data node > logs I see the errors below. Does anyone have any ideas as to what is going > on here? Thanks! > > > java.io.IOException: Premature EOF from inputStream > at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194) > at > org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) > at > org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) > at > org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) > at > org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:455) > at > org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:741) > at > org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:718) > at > org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126) > at > org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:72) > at > org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:225) > at java.lang.Thread.run(Thread.java:745) > > > > > innovationdatanode03.cof.ds.capitalone.com:1004:DataXceiver error processing > WRITE_BLOCK operation src: /10.37.248.60:44676 dst: /10.37.248.59:1004 > java.net.SocketTimeoutException: 65000 millis timeout while waiting for > channel to be ready for read. ch : java.nio.channels.SocketChannel[connected > local=/10.37.248.59:43692 remote=/10.37.248.63:1004] > at > org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118) > at java.io.FilterInputStream.read(FilterInputStream.java:83) > at java.io.FilterInputStream.read(FilterInputStream.java:83) > at > org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2101) > at > org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:660) > at > org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126) > at > org.apache.hadoop.hdfs.protocol.datatransfer.Re
Save RDD with partition information
Hi, I have a usecase where in I have hourly spark job which creates hourly RDDs, which are partitioned by keys. At the end of the day I need to access all of these RDDs and combine the Key/Value pairs over the day. If there is a key K1 in RDD0 (1st hour of day), RDD1 ... RDD23(last hour of the day); we need to combine all the values of this K1 using some logic. What I want to do is to avoid the shuffling at the end of the day since the data in huge ~ hundreds of GB. Questions --- 1.) Is there a way that i can persist hourly RDDs with partition information and then while reading back the RDDs the partition information is restored. 2.) Can i ensure that partitioning is similar for different hours. Like if K1 goes to container_X, it would go to the same container in the next hour and so on. Regards Puneet