compression behaviour inconsistency between 1.3 and 1.4
Hi I've observed an inconsistent behaviour in .saveAsTextFile. Up until version 1.3 it was possible to save RDDs as snappy compressed files with the invocation of rdd.saveAsTextFile(targetFile) but after upgrading to 1.4 this no longer works. I need to specify a codec for that: rdd.saveAsTextFile(targetFile, classOf[SnappyCodec]) As I understand I should be able to either set the appropriate codec class or set those options globally on the cluster using properties. I have the following settings in /etc/hadoop/conf/core-site.xml mapred.map.output.compression.codec org.apache.hadoop.io.compress.SnappyCodec mapred.compress.map.output false mapred.output.compression.codec org.apache.hadoop.io.compress.SnappyCodec The config hasn't changed between upgrading from 1.3 to 1.4. What is the proper behaviour? Am I doing something strange here or has this recently changed? Regards Marcin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using Neo4j with Apache Spark
On Thu, 12 Mar 2015 00:48:12 -0700 d34th4ck3r wrote: > I'm trying to use Neo4j with Apache Spark Streaming but I am finding > serializability as an issue. > > Basically, I want Apache Spark to parse and bundle my data in real > time. After, the data has been bundled it should be stored in the > database, Neo4j. However, I am getting this error: Hi It seems some things in your task aren't serializable. A quick look at the code suggests graphDB as a potential problem. If you want to create that in one place (driver) and fetch it later in the step you can do sth like this: - create a container class, that you will broadcast class LazyGraphDB extends Serializable { @transient override lazy val graphDB = new GraphDatabase() } - than in driver code: val graphDbBc = sc.broadcast(new LazyGraphDB) - and in the task you'd like to use it, just write: graphDbBc.value.graphDB... Just remember about all the "transient, lazy" modifiers. Regards Marcin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: skewed outer join with spark 1.2.0 - memory consumption
On Wed, 11 Mar 2015 11:19:56 +0100 Marcin Cylke wrote: > Hi > > I'm trying to do a join of two datasets: 800GB with ~50MB. The job finishes if I set spark.yarn.executor.memoryOverhead to 2048MB. If it is around 1000MB it fails with "executor lost" errors. My spark settings are: - executor cores - 8 - num executors - 32 - executor memory - 4g Regards Marcin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
skewed outer join with spark 1.2.0 - memory consumption
Hi I'm trying to do a join of two datasets: 800GB with ~50MB. My code looks like this: private def parseClickEventLine(line: String, jsonFormatBC: Broadcast[LazyJsonFormat]): ClickEvent = { val json = line.parseJson.asJsObject val eventJson = if (json.fields.contains("recommendationId")) json else json.fields("message").asJsObject jsonFormatBC.value.clickEventJsonFormat.read(eventJson) } val jsonFormatBc: Broadcast[LazyJsonFormat] = sc.broadcast(new LazyJsonFormat) val views = sc.recoLogRdd(jobConfig.viewsDirectory) .map(view => (view.id.toString, view)) val clicks = sc.textFile(s"${jobConfig.clicksDirectory}/*") .map(parseClickEventLine(_, jsonFormatBc)) .map(click => (click.recommendationId, click)) val clicksCounts = views.leftOuterJoin(clicks).map({ case (recommendationId, (view, click)) => val metaItemType = click.flatMap(c => view.itemDetailsById.get(c.itemIdStr).map(_.metaItemType)) (view, metaItemType) -> click.map(_ => 1).getOrElse(0) }) clicksCounts.reduceByKey(_ + _).map(toCSV).saveAsTextFile(jobConfig.outputDirectory) I'm using Spark 1.2.0 and have the following options set: spark.default.parallelism = 24 spark.serializer': 'org.apache.spark.serializer.KryoSerializer', spark.test.disableBlockManagerHeartBeat': 'true', spark.shuffle.netty.connect.timeout': '3', spark.storage.blockManagerSlaveTimeoutMs': '3', spark.yarn.user.classpath.first': 'true', spark.yarn.executor.memoryOverhead': '1536' The job is run on YARN and I see errors in container logs: 015-03-11 09:16:56,629 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=24500,containerID=container_1425476483191_402083_01_19] is running beyond physical memory limits. Current usage: 6.0 GB of 6 GB physical memory used; 6.9 GB of 12.6 GB virtual memory used. Killing container. So the problems is related to the excessive use of memory. Could you advise me what should I fix in my code to make it work for my usecase? The strange thing is, that the code worked earlier, with versions around 1.0.0. Is it possible that changes between 1.0.0 and 1.2.0 caused that kind of regression? Regards Marcin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark 1.2 slower than 1.0 in unit tests
Hi We're using Spark in our app's unit tests. The tests start spark context with "local[*]" and test time now is 178 seconds on spark 1.2 instead of 41 seconds on 1.0. We are using spark version from cloudera CDH (1.2.0-cdh5.3.1). Could you give some hints what could cause that? and where to search for a solution for that? Regards Marcin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: problem with hdfs access in spark job
On Thu, 15 May 2014 09:44:35 -0700 Marcelo Vanzin wrote: > These are actually not worrisome; that's just the HDFS client doing > its own thing to support HA. It probably picked the "wrong" NN to try > first, and got the "NN in standby" exception, which it logs. Then it > tries the other NN and things just work as expected. Business as > usual. > > Not sure about the other exceptions you mention. I've seen the second > one before, but it didn't seem to affect my jobs - maybe some race > during cleanup. > Ok, great to hear, that this errors are not that serious. Thanks Marcin
problem with hdfs access in spark job
Hi I'm running Spark 0.9.1 on hadoop cluster - cdh4.2.1, with YARN. I have a job, that performs a few transformations on a given file and joins that file with some other. The job itself finishes with success, however some tasks are failed and then after rerun succeeds. During the development process I've been experimenting with different settings and have those now in the code: - additional hadoop config: "fs.hdfs.impl.disable.cache", "true" - spark config set on SparkContext: .set("spark.test.disableBlockManagerHeartBeat", "true") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.default.parallelism", "1000") .set("spark.shuffle.netty.connect.timeout", "30") .set("spark.storage.blockManagerSlaveTimeoutMs", "30") When I look into the logs I see lots of error messages: - This looks like some problems with HA - but I've checked namenodes during the job was running, and there was no switch between master and slave namenode. 14/05/14 15:25:44 ERROR security.UserGroupInformation: PriviledgedActionException as:hc_client_reco_dev (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby 14/05/14 15:25:44 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby 14/05/14 15:25:44 ERROR security.UserGroupInformation: PriviledgedActionException as:hc_client_reco_dev (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby - there are multiple exceptions logged as INFO, don't know if this is serious: 14/05/14 15:30:06 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 14/05/14 15:30:06 INFO network.ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@2c34bc84 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:341) at org.apache.spark.network.ConnectionManager$$anon$3.run(ConnectionManager.scala:98) - I also see a few of those - which seems strange 14/05/14 15:26:45 ERROR executor.Executor: Exception in task ID 2081 java.io.FileNotFoundException: /data/storage/1/yarn/local/usercache/hc_client_reco_dev/appcache/application_1398268932983_1221792/spark-local-20140514152006-9c62/38/shuffle_5_121_395 (No such file or directory) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.(RandomAccessFile.java:233) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:87) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:105) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:265) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:205) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.getLocalBlocks(BlockFetcherIterator.scala:204) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.initialize(BlockFetcherIterator.scala:235) at org.apache.spark.storage.BlockManager.getMultiple(BlockManager.scala:452) at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:77) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:125) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:115) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) Could someone suggest any solutions to that? Regards Marcin
Re: 'Filesystem closed' while running spark job
On Tue, 22 Apr 2014 12:28:15 +0200 Marcin Cylke wrote: > Hi > > I have a Spark job that reads files from HDFS, does some pretty basic > transformations, then writes it to some other location on hdfs. > > I'm running this job with spark-0.9.1-rc3, on Hadoop Yarn with > Kerberos security enabled. > > One of my approaches to fixing this issue was changing SparkConf, so > I've added: > > "spark.test.disableBlockManagerHeartBeat", "true" > "spark.serializer", "org.apache.spark.serializer.KryoSerializer" > "spark.default.parallelism", "1000" > > This did not help. > > My Spark Job is failing with the following error. Is this a known > issue? Should I provide more details, if so, about which parts of my > configuration? I'm also getting this exception, which may be related to that "Filesystem closed" java.io.FileNotFoundException (java.io.FileNotFoundException: /data/storage/1/yarn/local/usercache/client_dev/appcache/application_1397472748075_221075/spark-local-20140422145148-82a6/09/shuffle_5_15_265 (No such file or directory)) java.io.RandomAccessFile.open(Native Method) java.io.RandomAccessFile.(RandomAccessFile.java:233) org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:87) org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:105) org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:265) org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:205) org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.getLocalBlocks(BlockFetcherIterator.scala:204) org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.initialize(BlockFetcherIterator.scala:235) org.apache.spark.storage.BlockManager.getMultiple(BlockManager.scala:452) org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:77) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:125) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:115) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:115) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) org.apache.spark.rdd.RDD.iterator(RDD.scala:232) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:32) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) org.apache.spark.rdd.RDD.iterator(RDD.scala:232) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:32) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) org.apache.spark.rdd.RDD.iterator(RDD.scala:232) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) org.apache.spark.rdd.RDD.iterator(RDD.scala:232) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) org.apache.spark.rdd.RDD.iterator(RDD.scala:232) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) org.apache.spark.rdd.RDD.iterator(RDD.scala:232) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) org.apache.spark.rdd.RDD.iterator(RDD.scala:232) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) org.apache.spark.scheduler.Task.run(Task.scala:53) org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) java.security.AccessController.doPrivileged(Native Method) javax.security.auth.Subject.doAs(Subject.java:415) org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:724)
'Filesystem closed' while running spark job
Hi I have a Spark job that reads files from HDFS, does some pretty basic transformations, then writes it to some other location on hdfs. I'm running this job with spark-0.9.1-rc3, on Hadoop Yarn with Kerberos security enabled. One of my approaches to fixing this issue was changing SparkConf, so I've added: "spark.test.disableBlockManagerHeartBeat", "true" "spark.serializer", "org.apache.spark.serializer.KryoSerializer" "spark.default.parallelism", "1000" This did not help. My Spark Job is failing with the following error. Is this a known issue? Should I provide more details, if so, about which parts of my configuration? 14/04/22 11:59:58 ERROR executor.Executor: Exception in task ID 2866 java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:565) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:648) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706) at java.io.DataInputStream.read(DataInputStream.java:100) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:206) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:45) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:164) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:149) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)