Re: Error from reading S3 in Scala
Thanks everyone, One reason to use "s3a//" is because I use "s3a//" in my development env (Eclipse) on a desktop. I will debug and test on my desktop then put jar file on EMR Cluster. I do not think "s3//" will works on a desktop. With helping from AWS suport, this bug is cause by the version of Joda-Time in my pom file is not match with aws-SDK.jar because AWS authentication requires a valid Date or x-amz-date header. It will work after update to joda-time 2.8.1, aws SDK 1.10.x and amazon-hadoop 2.6.1. But, it will shown exception on amazon-hadoop 2.7.2. The reason for using amazon-hadoop 2.7.2 is because in EMR 4.6.0 the supported version are Hadoop 2.7.2, Spark 1.6.1. Please let me know if you have a better idea to set up the development environment for debug and test. Regards, Jingyu On 4 May 2016 at 20:32, James Hammerton <ja...@gluru.co> wrote: > > > On 3 May 2016 at 17:22, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > >> Hi, >> >> The best thing to do is start the EMR clusters with proper permissions in >> the roles that way you do not need to worry about the keys at all. >> >> Another thing, why are we using s3a// instead of s3:// ? >> > > Probably because of what's said about s3:// and s3n:// here (which is why > I use s3a://): > > https://wiki.apache.org/hadoop/AmazonS3 > > Regards, > > James > > >> Besides that you can increase s3 speeds using the instructions mentioned >> here: >> https://aws.amazon.com/blogs/aws/aws-storage-update-amazon-s3-transfer-acceleration-larger-snowballs-in-more-regions/ >> >> >> Regards, >> Gourav >> >> On Tue, May 3, 2016 at 12:04 PM, Steve Loughran <ste...@hortonworks.com> >> wrote: >> >>> don't put your secret in the URI, it'll only creep out in the logs. >>> >>> Use the specific properties coverd in >>> http://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html, >>> which you can set in your spark context by prefixing them with spark.hadoop. >>> >>> you can also set the env vars, AWS_ACCESS_KEY_ID and >>> AWS_SECRET_ACCESS_KEY; SparkEnv will pick these up and set the relevant >>> spark context keys for you >>> >>> >>> On 3 May 2016, at 01:53, Zhang, Jingyu <jingyu.zh...@news.com.au> wrote: >>> >>> Hi All, >>> >>> I am using Eclipse with Maven for developing Spark applications. I got a >>> error for Reading from S3 in Scala but it works fine in Java when I run >>> them in the same project in Eclipse. The Scala/Java code and the error in >>> following >>> >>> >>> Scala >>> >>> val uri = URI.create("s3a://" + key + ":" + seckey + "@" + >>> "graphclustering/config.properties"); >>> val pt = new Path("s3a://" + key + ":" + seckey + "@" + >>> "graphclustering/config.properties"); >>> val fs = FileSystem.get(uri,ctx.hadoopConfiguration); >>> val inputStream:InputStream = fs.open(pt); >>> >>> Exception: on aws-java-1.7.4 and hadoop-aws-2.6.1 >>> >>> Exception in thread "main" >>> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: >>> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: >>> 8A56DC7BF0BFF09A), S3 Extended Request ID >>> >>> at com.amazonaws.http.AmazonHttpClient.handleErrorResponse( >>> AmazonHttpClient.java:1160) >>> >>> at com.amazonaws.http.AmazonHttpClient.executeOneRequest( >>> AmazonHttpClient.java:748) >>> >>> at com.amazonaws.http.AmazonHttpClient.executeHelper( >>> AmazonHttpClient.java:467) >>> >>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:302 >>> ) >>> >>> at com.amazonaws.services.s3.AmazonS3Client.invoke( >>> AmazonS3Client.java:3785) >>> >>> at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata( >>> AmazonS3Client.java:1050) >>> >>> at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata( >>> AmazonS3Client.java:1027) >>> >>> at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus( >>> S3AFileSystem.java:688) >>> >>> at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:222) >>> >>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766) >>> >>> at com.news.report.graph.GraphCluster$.main(GraphCluster.scala:53) >>> &g
Error from reading S3 in Scala
Hi All, I am using Eclipse with Maven for developing Spark applications. I got a error for Reading from S3 in Scala but it works fine in Java when I run them in the same project in Eclipse. The Scala/Java code and the error in following Scala val uri = URI.create("s3a://" + key + ":" + seckey + "@" + "graphclustering/config.properties"); val pt = new Path("s3a://" + key + ":" + seckey + "@" + "graphclustering/config.properties"); val fs = FileSystem.get(uri,ctx.hadoopConfiguration); val inputStream:InputStream = fs.open(pt); Exception: on aws-java-1.7.4 and hadoop-aws-2.6.1 Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 8A56DC7BF0BFF09A), S3 Extended Request ID at com.amazonaws.http.AmazonHttpClient.handleErrorResponse( AmazonHttpClient.java:1160) at com.amazonaws.http.AmazonHttpClient.executeOneRequest( AmazonHttpClient.java:748) at com.amazonaws.http.AmazonHttpClient.executeHelper( AmazonHttpClient.java:467) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:302) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata( AmazonS3Client.java:1050) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata( AmazonS3Client.java:1027) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus( S3AFileSystem.java:688) at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:222) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766) at com.news.report.graph.GraphCluster$.main(GraphCluster.scala:53) at com.news.report.graph.GraphCluster.main(GraphCluster.scala) 16/05/03 10:49:17 INFO SparkContext: Invoking stop() from shutdown hook 16/05/03 10:49:17 INFO SparkUI: Stopped Spark web UI at http://10.65.80.125:4040 16/05/03 10:49:17 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 16/05/03 10:49:17 INFO MemoryStore: MemoryStore cleared 16/05/03 10:49:17 INFO BlockManager: BlockManager stopped Exception: on aws-java-1.7.4 and hadoop-aws-2.7.2 16/05/03 10:23:40 INFO Slf4jLogger: Slf4jLogger started 16/05/03 10:23:40 INFO Remoting: Starting remoting 16/05/03 10:23:40 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@10.65.80.125:61860] 16/05/03 10:23:40 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 61860. 16/05/03 10:23:40 INFO SparkEnv: Registering MapOutputTracker 16/05/03 10:23:40 INFO SparkEnv: Registering BlockManagerMaster 16/05/03 10:23:40 INFO DiskBlockManager: Created local directory at /private/var/folders/sc/tdmkbvr1705b8p70xqj1kqks5l9p 16/05/03 10:23:40 INFO MemoryStore: MemoryStore started with capacity 1140.4 MB 16/05/03 10:23:40 INFO SparkEnv: Registering OutputCommitCoordinator 16/05/03 10:23:40 INFO Utils: Successfully started service 'SparkUI' on port 4040. 16/05/03 10:23:40 INFO SparkUI: Started SparkUI at http://10.65.80.125:4040 16/05/03 10:23:40 INFO Executor: Starting executor ID driver on host localhost 16/05/03 10:23:40 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 61861. 16/05/03 10:23:40 INFO NettyBlockTransferService: Server created on 61861 16/05/03 10:23:40 INFO BlockManagerMaster: Trying to register BlockManager 16/05/03 10:23:40 INFO BlockManagerMasterEndpoint: Registering block manager localhost:61861 with 1140.4 MB RAM, BlockManagerId(driver, localhost, 61861) 16/05/03 10:23:40 INFO BlockManagerMaster: Registered BlockManager Exception in thread "main" java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:285) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at com.news.report.graph.GraphCluster$.main(GraphCluster.scala:52) at com.news.report.graph.GraphCluster.main(GraphCluster.scala) 16/05/03 10:23:51 INFO SparkContext: Invoking stop() from shutdown hook 16/05/03 10:23:51 INFO SparkUI: Stopped Spark web UI at http://10.65.80.125:4040 16/05/03 10:23:51 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 16/05/03 10:23:51 INFO MemoryStore: MemoryStore cleared 16/05/03 10:23:51 INFO BlockManager: BlockManager stopped 16/05/03 10:23:51 INFO BlockManagerMaster: BlockManagerMaster stopped 16/05/03 10:23:51 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 16/05/03 10:23:51 INFO SparkContext: Successfully stopped SparkContext
Re: Scala vs Python for Spark ecosystem
Graphx did not support Python yet. http://spark.apache.org/docs/latest/graphx-programming-guide.html The workaround solution is use graphframes (3rd party API), https://issues.apache.org/jira/browse/SPARK-3789 but some features in Python are not as same as Scala, https://github.com/graphframes/graphframes/issues/57 Jingyu On 20 April 2016 at 16:52, sujeet jogwrote: > It depends on the trade off's you wish to have, > > Python being a interpreted language, speed of execution will be lesser, > but it being a very common language used across, people can jump in hands > on quickly > > Scala programs run in java environment, so it's obvious you will get good > execution speed, although it's not common for people to know this language > readily. > > > Pyspark API's i believe will have everything which Scala Spark API's offer > in long run. > > > > On Wed, Apr 20, 2016 at 12:14 PM, berkerkozan > wrote: > >> I know scala better than python but my team (2 other my friend) knows only >> python. We want to use graphx or maybe try graphframes. >> What will be the future of these 2 languages for spark ecosystem? Will >> python cover everything scala can in short time periods? what do you >> advice? >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Scala-vs-Python-for-Spark-ecosystem-tp26805.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
How many threads will be used to read RDBMS after set numPartitions =10 in Spark JDBC
Hi All, I want read Mysql from Spark. Please let me know how many threads will be used to read the RDBMS after set numPartitions =10 in Spark JDBC. What is the best practice to read large dataset from RDBMS to Spark? Thanks, Jingyu -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Re: Memory issues on spark
May set "maximizeResourceAllocation", then EMR will do the best config for you. http://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/emr-spark-configure.html Jingyu On 18 February 2016 at 12:02,wrote: > Hi All, > > I have been facing memory issues in spark. im using spark-sql on AWS EMR. > i have around 50GB file in AWS S3. I want to read this file in BI tool > connected to spark-sql on thrift-server over OBDC. I'm executing select * > from table in BI tool(qlikview,tableau). > I run into OOM error sometimes and some time the LOST_EXECUTOR. I'm really > confused. > The spark runs fine for smaller data set. > > I have 3 node EMR cluster with m3.2xlarge. > > I have set below conf on spark. > > export SPARK_EXECUTOR_INSTANCES=16 > export SPARK_EXECUTOR_CORES=16 > export SPARK_EXECUTOR_MEMORY=15G > export SPARK_DRIVER_MEMORY=12G > spark.kryoserializer.buffer.max 1024m > > Even after setting SPARK_EXECUTOR_INSTANCES as 16, only 2 executors come > up. > > This is been road block since long time. Any help would be appreciated. > > Thanks > Arun. > > This e-mail and any files transmitted with it are for the sole use of the > intended recipient(s) and may contain confidential and privileged > information. If you are not the intended recipient(s), please reply to the > sender and destroy all copies of the original message. Any unauthorized > review, use, disclosure, dissemination, forwarding, printing or copying of > this email, and/or any action taken in reliance on the contents of this > e-mail is strictly prohibited and may be unlawful. Where permitted by > applicable law, this e-mail and other e-mail communications sent to and > from Cognizant e-mail addresses may be monitored. > -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Failed to remove broadcast 2 with removeFromMaster = true in Graphx
I running a Pregel function with 37 nodes in EMR hadoop. After a hour logs show following. Can anyone please tell what the problem is and how do I solve it? Thanks 16/02/05 14:02:46 WARN BlockManagerMaster: Failed to remove broadcast 2 with removeFromMaster = true - Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) at scala.util.Try$.apply(Try.scala:161) at scala.util.Failure.recover(Try.scala:185) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at scala.concurrent.Promise$class.complete(Promise.scala:55) at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643) at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658) at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at scala.concurrent.Promise$class.tryFailure(Promise.scala:112) at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153) at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:241) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242) ... 7 more 16/02/05 14:02:46 ERROR ContextCleaner: Error cleaning broadcast 2 org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) at scala.util.Try$.apply(Try.scala:161) at scala.util.Failure.recover(Try.scala:185)
Unpersist RDD in Graphx
Hi, What is he best way to unpersist the RDD in graphx to release memory? RDD.unpersist or RDD.unpersistVertices and RDD..edges.unpersist I study the source code of Pregel.scala, Both of above were used between line 148 and line 150. Can anyone please tell me what the different? In addition, what is the difference to set blocking = false and blocking = true? oldMessages.unpersist(blocking = false) prevG.unpersistVertices(blocking = false) prevG.edges.unpersist(blocking = false) Thanks, Jingyu -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
How to filter the isolated vertexes in Graphx
I try to filter vertexes that did not have any connection links with others. How to filter those isolated vertexes in Graphx? Thanks, Jingyu -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Graphx: How to print the group of connected components one by one
Can anyone please let me know How to print all nodes in connected components one by one? graph.connectedComponents() e.g. connected Component ID Nodes ID 1 1,2,3 6 6,7,8,9 Thanks -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Size exceeds Integer.MAX_VALUE on EMR 4.0.0 Spark 1.4.1
I am using spark-csv to save files in s3, it shown Size exceeds. Please let me know how to fix it. Thanks. df.write() .format("com.databricks.spark.csv") .option("header", "true") .save("s3://newcars.csv"); java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:617) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Re: Size exceeds Integer.MAX_VALUE (SparkSQL$TreeNodeException: sort, tree) on EMR 4.0.0 Spark 1.4.1
false, [net_site#530,device#449], [net_site#530,device#449,Coalesce(SUM(PartialCount#717L),0) AS unique_nk_count#109L] Exchange (HashPartitioning 200) Aggregate true, [net_site#530,device#449], [net_site#530,device#449,COUNT(device#449) AS PartialCount#717L] Project [net_site#530,device#449] Filter (cnt#108L = 1) Aggregate false, [net_site#530,device#449,news_key#459], [net_site#530,device#449,news_key#459,Coalesce(SUM(PartialCount#719L),0) AS cnt#108L] Exchange (HashPartitioning 200) Aggregate true, [net_site#530,device#449,news_key#459], [net_site#530,device#449,news_key#459,COUNT(news_key#459) AS PartialCount#719L] Project [net_site#530,device#449,news_key#459] Filter (CAST(et#451, DoubleType) = 3.0) InMemoryColumnarTableScan [net_site#530,device#449,news_key#459,et#451], [(CAST(et#451, DoubleType) = 3.0)], (InMemoryRelation [net_site#530,device#449,cbd#448,et#451,news_key#459,underscore_et#478], true, 1, StorageLevel(true, true, false, true, 1), (Project [net_site#50,device#6,cbd#5,et#8,news_key#16,underscore_et#35]), None) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:171) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87) at org.apache.spark.sql.execution.Sort$$anonfun$doExecute$5.apply(basicOperators.scala:190) at org.apache.spark.sql.execution.Sort$$anonfun$doExecute$5.apply(basicOperators.scala:190) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) ... 21 more On 16 November 2015 at 21:16, Zhang, Jingyu <jingyu.zh...@news.com.au> wrote: > I am using spark-csv to save files in s3, it shown Size exceeds. Please let > me know how to fix it. Thanks. > > df.write() > .format("com.databricks.spark.csv") > .option("header", "true") > .save("s3://newcars.csv"); > > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511) > at > org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:617) > at > org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.conc
Re: How to passing parameters to another java class
Thanks Fengdong, the startTime, and endTime are null in the method of call(Iterator lines). Java do not allow top-level class to be Static. >From Spark docs, I can broadcast them but I don't know how to receive them form another class. On 16 November 2015 at 16:16, Fengdong Yu <fengdo...@everstring.com> wrote: > If you got “cannot Serialized” Exception, then you need to > PixelGenerator as a Static class. > > > > > On Nov 16, 2015, at 1:10 PM, Zhang, Jingyu <jingyu.zh...@news.com.au> > wrote: > > Thanks, that worked for local environment but not in the Spark Cluster. > > > On 16 November 2015 at 16:05, Fengdong Yu <fengdo...@everstring.com> > wrote: > >> Can you try : new PixelGenerator(startTime, endTime) ? >> >> >> >> On Nov 16, 2015, at 12:47 PM, Zhang, Jingyu <jingyu.zh...@news.com.au> >> wrote: >> >> I want to pass two parameters into new java class from >> rdd.mapPartitions(), the code like following. >> >> ---Source Code >> >> Main method: >> >> /*the parameters that I want to pass into the PixelGenerator.class for >> selecting any items between the startTime and the endTime. >> >> */ >> >> int startTime, endTime; >> >> JavaRDD pixelsObj = pixelsStr.mapPartitions(new >> PixelGenerator()); >> >> PixelGenerator.java >> >> public class PixelGenerator implements FlatMapFunction<Iterator, >> PixelObject> { >> >> public Iterable call(Iterator lines) { >> >> ... >> >> } >> >> Can anyone told me how to pass the startTime, endTime >> into PixelGenerator class? >> >> Many Thanks >> >> This message and its attachments may contain legally privileged or >> confidential information. It is intended solely for the named addressee. If >> you are not the addressee indicated in this message or responsible for >> delivery of the message to the addressee, you may not copy or deliver this >> message or its attachments to anyone. Rather, you should permanently delete >> this message and its attachments and kindly notify the sender by reply >> e-mail. Any content of this message and its attachments which does not >> relate to the official business of the sending company must be taken not to >> have been sent or endorsed by that company or any of its related entities. >> No warranty is made that the e-mail or attachments are free from computer >> virus or other defect. >> >> >> > > This message and its attachments may contain legally privileged or > confidential information. It is intended solely for the named addressee. If > you are not the addressee indicated in this message or responsible for > delivery of the message to the addressee, you may not copy or deliver this > message or its attachments to anyone. Rather, you should permanently delete > this message and its attachments and kindly notify the sender by reply > e-mail. Any content of this message and its attachments which does not > relate to the official business of the sending company must be taken not to > have been sent or endorsed by that company or any of its related entities. > No warranty is made that the e-mail or attachments are free from computer > virus or other defect. > > > -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
How to passing parameters to another java class
I want to pass two parameters into new java class from rdd.mapPartitions(), the code like following. ---Source Code Main method: /*the parameters that I want to pass into the PixelGenerator.class for selecting any items between the startTime and the endTime. */ int startTime, endTime; JavaRDD pixelsObj = pixelsStr.mapPartitions(new PixelGenerator()); PixelGenerator.java public class PixelGenerator implements FlatMapFunction{ public Iterable call(Iterator lines) { ... } Can anyone told me how to pass the startTime, endTime into PixelGenerator class? Many Thanks -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Re: How to passing parameters to another java class
Thanks, that worked for local environment but not in the Spark Cluster. On 16 November 2015 at 16:05, Fengdong Yu <fengdo...@everstring.com> wrote: > Can you try : new PixelGenerator(startTime, endTime) ? > > > > On Nov 16, 2015, at 12:47 PM, Zhang, Jingyu <jingyu.zh...@news.com.au> > wrote: > > I want to pass two parameters into new java class from > rdd.mapPartitions(), the code like following. > > ---Source Code > > Main method: > > /*the parameters that I want to pass into the PixelGenerator.class for > selecting any items between the startTime and the endTime. > > */ > > int startTime, endTime; > > JavaRDD pixelsObj = pixelsStr.mapPartitions(new > PixelGenerator()); > > PixelGenerator.java > > public class PixelGenerator implements FlatMapFunction<Iterator, > PixelObject> { > > public Iterable call(Iterator lines) { > > ... > > } > > Can anyone told me how to pass the startTime, endTime > into PixelGenerator class? > > Many Thanks > > This message and its attachments may contain legally privileged or > confidential information. It is intended solely for the named addressee. If > you are not the addressee indicated in this message or responsible for > delivery of the message to the addressee, you may not copy or deliver this > message or its attachments to anyone. Rather, you should permanently delete > this message and its attachments and kindly notify the sender by reply > e-mail. Any content of this message and its attachments which does not > relate to the official business of the sending company must be taken not to > have been sent or endorsed by that company or any of its related entities. > No warranty is made that the e-mail or attachments are free from computer > virus or other defect. > > > -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Spark-csv error on read AWS s3a in spark 1.4.1
A small csv file in S3. I use s3a://key:seckey@bucketname/a.csv It works for SparkContext pixelsStr: SparkContext = ctx.textFile(s3pathOrg); It works for Java Spark-csv as well Java code : DataFrame careerOneDF = sqlContext.read().format( "com.databricks.spark.csv") .option("inferSchema", "true") .option("header", "true").load(s3pathOrg ); However, it do not work for Scala, error message shown below val careerOneDF:DataFrame = sqlContext.read .format("com.databricks.spark.csv") .option("inferSchema", "true") .option("header", "true") .load(s3pathOrg); com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: F2E11C10E6D35BF3), S3 Extended Request ID: 0tdESZAHmROgSJem6P3gYnEZs86rrt4PByrTYbxzCw0xyM9KUMCHEAX3x4lcoy5O3A8qccgHraQ= at com.amazonaws.http.AmazonHttpClient.handleErrorResponse( AmazonHttpClient.java:1160) at com.amazonaws.http.AmazonHttpClient.executeOneRequest( AmazonHttpClient.java:748) at com.amazonaws.http.AmazonHttpClient.executeHelper( AmazonHttpClient.java:467) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:302) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata( AmazonS3Client.java:1050) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata( AmazonS3Client.java:1027) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus( S3AFileSystem.java:688) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus( S3AFileSystem.java:71) at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57) at org.apache.hadoop.fs.Globber.glob(Globber.java:252) at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus( FileInputFormat.java:257) at org.apache.hadoop.mapred.FileInputFormat.listStatus( FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits( FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions( MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1255) at org.apache.spark.rdd.RDDOperationScope$.withScope( RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope( RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.take(RDD.scala:1250) at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1290) at org.apache.spark.rdd.RDDOperationScope$.withScope( RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope( RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.first(RDD.scala:1289) at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute( CsvRelation.scala:129) at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:127) at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:109) at com.databricks.spark.csv.CsvRelation.(CsvRelation.scala:62) at com.databricks.spark.csv.DefaultSource.createRelation( DefaultSource.scala:115) at com.databricks.spark.csv.DefaultSource.createRelation( DefaultSource.scala:40) at com.databricks.spark.csv.DefaultSource.createRelation( DefaultSource.scala:28) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:269) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104) Thanks -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
key not found: sportingpulse.com in Spark SQL 1.5.0
There is not a problem in Spark SQL 1.5.1 but the error of "key not found: sportingpulse.com" shown up when I use 1.5.0. I have to use the version of 1.5.0 because that the one AWS EMR support. Can anyone tell me why Spark uses "sportingpulse.com" and how to fix it? Thanks. Caused by: java.util.NoSuchElementException: key not found: sportingpulse.com at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.compress( compressionSchemes.scala:258) at org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.build( CompressibleColumnBuilder.scala:110) at org.apache.spark.sql.columnar.NativeColumnBuilder.build( ColumnBuilder.scala:87) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply( InMemoryColumnarTableScan.scala:152) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply( InMemoryColumnarTableScan.scala:152) at scala.collection.TraversableLike$$anonfun$map$1.apply( TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply( TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach( IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next( InMemoryColumnarTableScan.scala:152) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next( InMemoryColumnarTableScan.scala:120) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute( MapPartitionsWithPreparationRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73 ) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41 ) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Re: key not found: sportingpulse.com in Spark SQL 1.5.0
Thanks Silvio and Ted, Can you please let me know how to fix this intermittent issues? Should I wait EMR upgrading to support the Spark 1.5.1 or change my code from DataFrame to normal Spark map-reduce? Regards, Jingyu On 31 October 2015 at 09:40, Silvio Fiorito <silvio.fior...@granturing.com> wrote: > It's something due to the columnar compression. I've seen similar > intermittent issues when caching DataFrames. "sportingpulse.com" is a > value in one of the columns of the DF. > -- > From: Ted Yu <yuzhih...@gmail.com> > Sent: 10/30/2015 6:33 PM > To: Zhang, Jingyu <jingyu.zh...@news.com.au> > Cc: user <user@spark.apache.org> > Subject: Re: key not found: sportingpulse.com in Spark SQL 1.5.0 > > I searched for sportingpulse in *.scala and *.java files under 1.5 branch. > There was no hit. > > mvn dependency doesn't show sportingpulse either. > > Is it possible this is specific to EMR ? > > Cheers > > On Fri, Oct 30, 2015 at 2:57 PM, Zhang, Jingyu <jingyu.zh...@news.com.au> > wrote: > >> There is not a problem in Spark SQL 1.5.1 but the error of "key not >> found: sportingpulse.com" shown up when I use 1.5.0. >> >> I have to use the version of 1.5.0 because that the one AWS EMR support. >> Can anyone tell me why Spark uses "sportingpulse.com" and how to fix it? >> >> Thanks. >> >> Caused by: java.util.NoSuchElementException: key not found: >> sportingpulse.com >> >> at scala.collection.MapLike$class.default(MapLike.scala:228) >> >> at scala.collection.AbstractMap.default(Map.scala:58) >> >> at scala.collection.mutable.HashMap.apply(HashMap.scala:64) >> >> at >> org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.compress( >> compressionSchemes.scala:258) >> >> at >> org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.build( >> CompressibleColumnBuilder.scala:110) >> >> at org.apache.spark.sql.columnar.NativeColumnBuilder.build( >> ColumnBuilder.scala:87) >> >> at >> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply( >> InMemoryColumnarTableScan.scala:152) >> >> at >> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply( >> InMemoryColumnarTableScan.scala:152) >> >> at scala.collection.TraversableLike$$anonfun$map$1.apply( >> TraversableLike.scala:244) >> >> at scala.collection.TraversableLike$$anonfun$map$1.apply( >> TraversableLike.scala:244) >> >> at scala.collection.IndexedSeqOptimized$class.foreach( >> IndexedSeqOptimized.scala:33) >> >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> >> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> >> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >> >> at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next( >> InMemoryColumnarTableScan.scala:152) >> >> at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next( >> InMemoryColumnarTableScan.scala:120) >> >> at org.apache.spark.storage.MemoryStore.unrollSafely( >> MemoryStore.scala:278) >> >> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171 >> ) >> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >> >> at org.apache.spark.rdd.MapPartitionsRDD.compute( >> MapPartitionsRDD.scala:38) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> >> at org.apache.spark.rdd.MapPartitionsRDD.compute( >> MapPartitionsRDD.scala:38) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> >> at org.apache.spark.rdd.MapPartitionsRDD.compute( >> MapPartitionsRDD.scala:38) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> >> at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute( >> MapPartitionsWithPreparationRDD.scala:63) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> >> at org.apache.spark.rdd.MapPartitionsRDD.compute( >> MapPartitionsRDD.sca
Re: Save data to different S3
Try s3://aws_key:aws_secret@bucketName/folderName with your access key and secret to save the data. On 30 October 2015 at 10:55, William Liwrote: > Hi – I have a simple app running fine with Spark, it reads data from S3 > and performs calculation. > > When reading data from S3, I use hadoopConfiguration.set for both > fs.s3n.awsAccessKeyId, and the fs.s3n.awsSecretAccessKey to it has > permissions to load the data from customer sources. > > However, after I complete the analysis, how do I save the results (it’s a > org.apache.spark.rdd.RDD[String]) into my own s3 bucket which requires > different access key and secret? It seems one option is that I could save > the results as local file to the spark cluster, then create a new > SQLContext with the different access, then load the data from the local > file. > > Is there any other options without requiring save and re-load files? > > > Thanks, > > William. > -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Re: NullPointerException when cache DataFrame in Java (Spark1.5.1)
Thanks Romi, I resize the dataset to 7MB, however, the code show NullPointerException exception as well. Did you try to cache a DataFrame with just a single row? Yes, I tried. But, Same problem. . Do you rows have any columns with null values? No, I had filter out null values before cache the dataframe. Can you post a code snippet here on how you load/generate the dataframe? Sure, Here is the working code 1: JavaRDD pixels = pixelsStr.map(new PixelGenerator()).cache(); System.out.println(pixels.count()); // 3000-4000 rows Working code 2: JavaRDD pixels = pixelsStr.map(new PixelGenerator()); DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject.class ); DataFrame totalDF1 = schemaPixel.select(schemaPixel.col("domain")).filter("'domain' is not null").limit(500); System.out.println(totalDF1.count()); BUT, after change limit(500) to limit(1000). The code report NullPointerException. JavaRDD pixels = pixelsStr.map(new PixelGenerator()); DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject.class ); DataFrame totalDF = schemaPixel.select(schemaPixel.col("domain")).filter("'domain' is not null").limit(*1000*); System.out.println(totalDF.count()); // problem at this line 15/10/29 18:56:28 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/10/29 18:56:28 INFO TaskSchedulerImpl: Cancelling stage 0 15/10/29 18:56:28 INFO DAGScheduler: ShuffleMapStage 0 (count at X.java:113) failed in 3.764 s 15/10/29 18:56:28 INFO DAGScheduler: Job 0 failed: count at XXX.java:113, took 3.862207 s org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) Does dataframe.rdd.cache work? No, I tried but same exception. Thanks, Jingyu On 29 October 2015 at 17:38, Romi Kuntsman <r...@totango.com> wrote: > Did you try to cache a DataFrame with just a single row? > Do you rows have any columns with null values? > Can you post a code snippet here on how you load/generate the dataframe? > Does dataframe.rdd.cache work? > > *Romi Kuntsman*, *Big Data Engineer* > http://www.totango.com > > On Thu, Oct 29, 2015 at 4:33 AM, Zhang, Jingyu <jingyu.zh...@news.com.au> > wrote: > >> It is not a problem to use JavaRDD.cache() for 200M data (all Objects >> read form Json Format). But when I try to use DataFrame.cache(), It shown >> exception in below. >> >> My machine can cache 1 G data in Avro format without any problem. >> >> 15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms >> >> 15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in >> 27.832369 ms >> >> 15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID >> 1) >> >> java.lang.NullPointerException >> >> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) >> >> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >> DelegatingMethodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:497) >> >> at >> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply( >> SQLContext.scala:500) >> >> at >> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply( >> SQLContext.scala:500) >> >> at scala.collection.TraversableLike$$anonfun$map$1.apply( >> TraversableLike.scala:244) >> >> at scala.collection.TraversableLike$$anonfun$map$1.apply( >> TraversableLike.scala:244) >> >> at scala.collection.IndexedSeqOptimized$class.foreach( >> IndexedSeqOptimized.scala:33) >> >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> >> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> >> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >> >> at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply( >> SQLContext.scala:500) >> >> at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply( >> SQLContext.scala:498) >> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >> >> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) >> >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> >> at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next( >> InMemoryColumnarTableScan.scala:127) >> >> at org.apache.spark.sql.columnar.InMemoryRelat
NullPointerException when cache DataFrame in Java (Spark1.5.1)
It is not a problem to use JavaRDD.cache() for 200M data (all Objects read form Json Format). But when I try to use DataFrame.cache(), It shown exception in below. My machine can cache 1 G data in Avro format without any problem. 15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms 15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in 27.832369 ms 15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.NullPointerException at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply( SQLContext.scala:500) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply( SQLContext.scala:500) at scala.collection.TraversableLike$$anonfun$map$1.apply( TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply( TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach( IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply( SQLContext.scala:500) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply( SQLContext.scala:498) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next( InMemoryColumnarTableScan.scala:127) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next( InMemoryColumnarTableScan.scala:120) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/10/29 13:26:23 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) Thanks, Jingyu -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Exception on save s3n file (1.4.1, hadoop 2.6)
I got following exception when I run JavPairRDD.values().saveAsTextFile("s3n://bucket); Can anyone help me out? thanks 15/09/25 12:24:32 INFO SparkContext: Successfully stopped SparkContext Exception in thread "main" java.lang.NoClassDefFoundError: org/jets3t/service/ServiceException at org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore( NativeS3FileSystem.java:338) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize( NativeS3FileSystem.java:328) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.spark.SparkHadoopWriter$.createPathFromString(SparkHadoopWriter.scala:170) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:988) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:965) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:897) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:896) at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1404) at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1383) at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1383) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1383) at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:519) at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:47) at com.news.report.section.SectionSubSection.run(SectionSubSection.java:184) at com.news.report.section.SectionSubSection.main(SectionSubSection.java:67) Caused by: java.lang.ClassNotFoundException: org.jets3t.service.ServiceException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 34 more -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
How to get RDD from PairRDD<key,value> in Java
Hi All, I want to extract the "value" RDD from PairRDDin Java Please let me know how can I get it easily. Thanks Jingyu -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
caching DataFrames
I have A and B DataFrames A has columns a11,a12, a21,a22 B has columns b11,b12, b21,b22 I persistent them in cache 1. A.Cache(), 2. B.Cache() Then, I persistent the subset in cache later 3. DataFrame A1 (a11,a12).cache() 4. DataFrame B1 (b11,b12).cache() 5. DataFrame AB1 (a11,a12,b11,b12).cahce() Can you please tell me what happen for caching case (3,4, and 5) after A and B cached? How much more memory do I need compare with Caching 1 and 2 only? Thanks Jingyu -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Re: Java Heap Space Error
Is you sql works if do not runs a regex on strings and concatenates them, I mean just Select the stuff without String operations? On 24 September 2015 at 10:11, java8964wrote: > Try to increase partitions count, that will make each partition has less > data. > > Yong > > -- > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com > Date: Thu, 24 Sep 2015 00:32:47 +0300 > CC: user@spark.apache.org > To: java8...@hotmail.com > > > Yes, it’s possible. I use S3 as data source. My external tables has > partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of > 200 in 2.stage because of sql.shuffle.partitions. > > How can i avoid this situation, this is my query: > > select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not > NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' > ') inputlist from landing where dt='2015-9' and userid != '' and userid > is not null and userid is not NULL and pagetype = 'productDetail' group by > userid > > > On 23 Sep 2015, at 23:55, java8964 wrote: > > Based on your description, you job shouldn't have any shuffle then, as you > just apply regex and concatenation on the column, but there is one > partition having 4.3M records to be read, vs less than 1M records for other > partitions. > > Is that possible? It depends on what is the source of your data. > > If there is shuffle in your query (More than 2 stages generated by your > query, and this is my guess of what happening), then it simple means that > one partition having way more data than the rest of partitions. > > Yong > > -- > From: yu...@useinsider.com > Subject: Java Heap Space Error > Date: Wed, 23 Sep 2015 23:07:17 +0300 > To: user@spark.apache.org > > What can cause this issue in the attached picture? I’m running and sql > query which runs a regex on strings and concatenates them. Because of this > task, my job gives java heap space error. > > > > > -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Re: caching DataFrames
Thanks Hemant, I will generate a total report (dfA) with many columns from log data. After the report (A) done. I will generate many detail reports (dfA1-dfAi) base on the subset of the total report (dfA), those detail reports using aggregate and window functions, according on different rules. However, some information will lost after aggregate or window functions. In the end, few of the detail reports can be generate directly from subset df, But, many of reports should get some information back from the total report. Thus, I consider if there are any performance benefit if I cache both dfA and its subset. If so, how many memory that I should prepare for them. On 24 September 2015 at 14:56, Hemant Bhanawat <hemant9...@gmail.com> wrote: > hit send button too early... > > However, why would you want to cache a dataFrame that is subset of already > cached dataFrame. > > If dfA is cached, and dfA1 is created by applying some transformation on > dfA, actions on dfA1 will use cache of dfA. > > > val dfA1 = dfA.filter($"_1" > 50) > > // this will run on the cached data of A. > > dfA1.count() > > > > On Thu, Sep 24, 2015 at 10:20 AM, Hemant Bhanawat <hemant9...@gmail.com> > wrote: > >> Two dataframes do not share cache storage in Spark. Hence it's immaterial >> that how two dataFrames are related to each other. Both of them are going >> to consume memory based on the data that they have. So for your A1 and B1 >> you would need extra memory that would be equivalent to half the memory of >> A/B. >> >> You can check the storage that a dataFrame is consuming in the Spark UI's >> Storage tab. http://host:4040/storage/ >> >> >> >> On Thu, Sep 24, 2015 at 5:37 AM, Zhang, Jingyu <jingyu.zh...@news.com.au> >> wrote: >> >>> I have A and B DataFrames >>> A has columns a11,a12, a21,a22 >>> B has columns b11,b12, b21,b22 >>> >>> I persistent them in cache >>> 1. A.Cache(), >>> 2. B.Cache() >>> >>> Then, I persistent the subset in cache later >>> >>> 3. DataFrame A1 (a11,a12).cache() >>> >>> 4. DataFrame B1 (b11,b12).cache() >>> >>> 5. DataFrame AB1 (a11,a12,b11,b12).cahce() >>> >>> Can you please tell me what happen for caching case (3,4, and 5) after A >>> and B cached? >>> How much more memory do I need compare with Caching 1 and 2 only? >>> >>> Thanks >>> >>> Jingyu >>> >>> This message and its attachments may contain legally privileged or >>> confidential information. It is intended solely for the named addressee. If >>> you are not the addressee indicated in this message or responsible for >>> delivery of the message to the addressee, you may not copy or deliver this >>> message or its attachments to anyone. Rather, you should permanently delete >>> this message and its attachments and kindly notify the sender by reply >>> e-mail. Any content of this message and its attachments which does not >>> relate to the official business of the sending company must be taken not to >>> have been sent or endorsed by that company or any of its related entities. >>> No warranty is made that the e-mail or attachments are free from computer >>> virus or other defect. >> >> >> > -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Re: word count (group by users) in spark
Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time - so if a single key has more key-value pairs than can fit in memory, an out of memory exception occurs. Cheers, Jingyu On 21 September 2015 at 16:39, Aniket Bhatnagarwrote: > Unless I am mistaken, in a group by operation, it spills to disk in case > values for a key don't fit in memory. > > Thanks, > Aniket > > On Mon, Sep 21, 2015 at 10:43 AM Huy Banh wrote: > >> Hi, >> >> If your input format is user -> comment, then you could: >> >> val comments = sc.parallelize(List(("u1", "one two one"), ("u2", "three >> four three"))) >> val wordCounts = comments. >>flatMap({case (user, comment) => >> for (word <- comment.split(" ")) yield(((user, word), 1)) }). >>reduceByKey(_ + _) >> >> val output = wordCounts. >>map({case ((user, word), count) => (user, (word, count))}). >>groupByKey() >> >> By Aniket, if we group by user first, it could run out of memory when >> spark tries to put all words in a single sequence, couldn't it? >> >> On Sat, Sep 19, 2015 at 11:05 PM Aniket Bhatnagar < >> aniket.bhatna...@gmail.com> wrote: >> >>> Using scala API, you can first group by user and then use combineByKey. >>> >>> Thanks, >>> Aniket >>> >>> On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com < >>> kali.tumm...@gmail.com> wrote: >>> Hi All, I would like to achieve this below output using spark , I managed to write in Hive and call it in spark but not in just spark (scala), how to group word counts on particular user (column) for example. Imagine users and their given tweets I want to do word count based on user name. Input:- kaliA,B,A,B,B james B,A,A,A,B Output:- kali A [Count] B [Count] James A [Count] B [Count] My Hive Answer:- CREATE EXTERNAL TABLE TEST ( user_name string , COMMENTS STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' STORED AS TEXTFILE LOCATION '/data/kali/test'; HDFS FOLDER (create hdfs folder and create a text file with data mentioned in the email) use default;select user_name,COLLECT_SET(text) from (select user_name,concat(sub,' ',count(comments)) as text from test LATERAL VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w group by user_name; Spark With Hive:- package com.examples /** * Created by kalit_000 on 17/09/2015. */ import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ object HiveWordCount { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory", "1g") val sc = new SparkContext(conf) val sqlContext= new SQLContext(sc) val hc=new HiveContext(sc) hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST (user_name string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' STORED AS TEXTFILE LOCATION '/data/kali/test' ") val op=hc.sql("select user_name,COLLECT_SET(text) from (select user_name,concat(sub,' ',count(comments)) as text from default.test LATERAL VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w group by user_name") op.collect.foreach(println) } Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this