Re: Storing spark processed output to Database asynchronously.
That is completely alright, as the system will make sure the works get done. My major concern is, the data drop. Will using async stop data loss? On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: If you cannot push data as fast as you are generating it, then async isnt going to help either. The work is just going to keep piling up as many many async jobs even though your batch processing times will be low as that processing time is not going to reflect how much of overall work is pending in the system. On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com wrote: Hi, From my understanding of Spark Streaming, I created a spark entry point, for continuous UDP data, using: SparkConf conf = new SparkConf().setMaster(local[2]).setAppName(NetworkWordCount);JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1));JavaReceiverInputDStreamString lines = jssc.receiverStream(new CustomReceiver(8060)); Now, when I process this input stream using: JavaDStream hash=lines.flatMap(my-code)JavaPairDStream tuple= hash.mapToPair(my-code)JavaPairDStream output= tuple.reduceByKey(my-code) output.foreachRDD( new Function2JavaPairRDDString,ArrayListString,Time,Void(){ @Override public Void call( JavaPairRDDString, ArrayListString arg0, Time arg1) throws Exception { // TODO Auto-generated method stub new AsyncRDDActions(arg0.rdd(), null); arg0.foreachPartition( new VoidFunctionIteratorTuple2String,ArrayListString(){ @Override public void call( IteratorTuple2String, ArrayListString arg0) throws Exception { // TODO Auto-generated method stub GraphDatabaseService graphDb = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder(/dev/shm/Advertisement/data/) .setConfig(remote_shell_enabled, true) .newGraphDatabase(); try (Transaction tx = graphDb.beginTx()) { while (arg0.hasNext()) { Tuple2 String, ArrayList String tuple = arg0.next(); Node HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); boolean oldHMac=false; if (HMac!= null){ System.out.println(Alread in Database: + tuple._1); oldHMac=true; } else HMac=Neo4jOperations.createHMac(graphDb, tuple._1); ArrayListString zipcodes=tuple._2; for(String zipcode : zipcodes){ Node Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode); if(Zipcode!=null){ System.out.println(Already in Database: + zipcode); if(oldHMac==true Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null) Neo4jOperations.updateToCurrentTime(HMac, Zipcode); else Neo4jOperations.travelTo(HMac, Zipcode); } else{ Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode); Neo4jOperations.travelTo(HMac, Zipcode); } } } tx.success(); } graphDb.shutdown(); } }); return null; } }); The part of
Re: [Streaming] Non-blocking recommendation in custom receiver documentation and KinesisReceiver's worker.run blocking calll
Thanks for the JIRA. I will look into this issue. TD On Thu, May 21, 2015 at 1:31 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I ran into one of the issues that are potentially caused because of this and have logged a JIRA bug - https://issues.apache.org/jira/browse/SPARK-7788 Thanks, Aniket On Wed, Sep 24, 2014 at 12:59 PM Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Hi all Reading through Spark streaming's custom receiver documentation, it is recommended that onStart and onStop methods should not block indefinitely. However, looking at the source code of KinesisReceiver, the onStart method calls worker.run that blocks until worker is shutdown (via a call to onStop). So, my question is what are the ramifications of making a blocking call in onStart and whether this is something that should be addressed in KinesisReceiver implementation. Thanks, Aniket
Re: spark mllib kmeans
i want evaluate some different distance measure for time-space clustering. so i need a api for implement my own function in java. 2015-05-19 22:08 GMT+02:00 Xiangrui Meng men...@gmail.com: Just curious, what distance measure do you need? -Xiangrui On Mon, May 11, 2015 at 8:28 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: take a look at this https://github.com/derrickburns/generalized-kmeans-clustering Best, Jao On Mon, May 11, 2015 at 3:55 PM, Driesprong, Fokko fo...@driesprong.frl wrote: Hi Paul, I would say that it should be possible, but you'll need a different distance measure which conforms to your coordinate system. 2015-05-11 14:59 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com: hi, it is possible to use a custom distance measure and a other data typ as vector? i want cluster temporal geo datas. best regards paul
Re: java program Get Stuck at broadcasting
Can you share the code, may be i/someone can help you out Thanks Best Regards On Thu, May 21, 2015 at 1:45 PM, Allan Jie allanmcgr...@gmail.com wrote: Hi, Just check the logs of datanode, it looks like this: 2015-05-20 11:42:14,605 INFO org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: / 10.9.0.48:50676, dest: /10.9.0.17:50010, bytes: 134217728, op: HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_804680172_54, offset: 0, srvID: 39fb78d5-828a-4319-8303-c704fab526e3, blockid: BP-436159032-10.9.0.16-1431330007172:blk_1073742096_1273, duration: 16994466261 2015-05-20 11:42:14,606 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder: BP-436159032-10.9.0.16-1431330007172:blk_1073742096_1273, type=LAST_IN_PIPELINE, downstreams=0:[] terminating 2015-05-20 11:42:17,788 INFO org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: / 10.9.0.17:49046, dest: /10.9.0.17:50010, bytes: 134217728, op: HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_102926009_54, offset: 0, srvID: 39fb78d5-828a-4319-8303-c704fab526e3, blockid: BP-436159032-10.9.0.16-1431330007172:blk_1073742099_1276, duration: 17829554438 2015-05-20 11:42:17,788 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder: BP-436159032-10.9.0.16-1431330007172:blk_1073742099_1276, type=HAS_DOWNSTREAM_IN_PIPELINE terminating 2015-05-20 11:42:17,904 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Receiving BP-436159032-10.9.0.16-1431330007172:blk_1073742103_1280 src: / 10.9.0.17:49049 dest: /10.9.0.17:50010 2015-05-20 11:42:17,904 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: IOException in BlockReceiver constructor. Cause is 2015-05-20 11:42:17,904 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: opWriteBlock BP-436159032-10.9.0.16-1431330007172:blk_1073742103_1280 received exception org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space: The volume with the most available space (=114409472 B) is less than the block size (=134217728 B). 2015-05-20 11:42:17,905 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: HadoopV26Slave1:50010:DataXceiver error processing WRITE_BLOCK operation src: /10.9.0.17:49049 dst: /10.9.0.17:50010 org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space: The volume with the most available space (=114409472 B) is less than the block size (=134217728 B). at org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy.chooseVolume(RoundRobinVolumeChoosingPolicy.java:67) at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeList.getNextVolume(FsVolumeList.java:69) at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.createRbw(FsDatasetImpl.java:1084) at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.createRbw(FsDatasetImpl.java:114) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.init(BlockReceiver.java:183) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:615) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:137) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:74) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:235) at java.lang.Thread.run(Thread.java:745) 2015-05-20 11:43:59,669 INFO org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073741999_1176 2015-05-20 11:46:10,214 INFO org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073742000_1177 2015-05-20 11:48:35,445 INFO org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073741990_1167 2015-05-20 11:50:04,043 INFO org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService: Scheduling blk_1073742080_1257 file /tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742080 for deletion 2015-05-20 11:50:04,136 INFO org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService: Scheduling blk_1073742081_1258 file /tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742081 for deletion 2015-05-20 11:50:04,136 INFO org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService: Scheduling blk_1073742082_1259 file /tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742082 for deletion 2015-05-20 11:50:04,136 INFO org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService: Scheduling blk_1073742083_1260 file
Re: Storing spark processed output to Database asynchronously.
If you cannot push data as fast as you are generating it, then async isnt going to help either. The work is just going to keep piling up as many many async jobs even though your batch processing times will be low as that processing time is not going to reflect how much of overall work is pending in the system. On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com wrote: Hi, From my understanding of Spark Streaming, I created a spark entry point, for continuous UDP data, using: SparkConf conf = new SparkConf().setMaster(local[2]).setAppName(NetworkWordCount);JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1));JavaReceiverInputDStreamString lines = jssc.receiverStream(new CustomReceiver(8060)); Now, when I process this input stream using: JavaDStream hash=lines.flatMap(my-code)JavaPairDStream tuple= hash.mapToPair(my-code)JavaPairDStream output= tuple.reduceByKey(my-code) output.foreachRDD( new Function2JavaPairRDDString,ArrayListString,Time,Void(){ @Override public Void call( JavaPairRDDString, ArrayListString arg0, Time arg1) throws Exception { // TODO Auto-generated method stub new AsyncRDDActions(arg0.rdd(), null); arg0.foreachPartition( new VoidFunctionIteratorTuple2String,ArrayListString(){ @Override public void call( IteratorTuple2String, ArrayListString arg0) throws Exception { // TODO Auto-generated method stub GraphDatabaseService graphDb = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder(/dev/shm/Advertisement/data/) .setConfig(remote_shell_enabled, true) .newGraphDatabase(); try (Transaction tx = graphDb.beginTx()) { while (arg0.hasNext()) { Tuple2 String, ArrayList String tuple = arg0.next(); Node HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); boolean oldHMac=false; if (HMac!= null){ System.out.println(Alread in Database: + tuple._1); oldHMac=true; } else HMac=Neo4jOperations.createHMac(graphDb, tuple._1); ArrayListString zipcodes=tuple._2; for(String zipcode : zipcodes){ Node Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode); if(Zipcode!=null){ System.out.println(Already in Database: + zipcode); if(oldHMac==true Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null) Neo4jOperations.updateToCurrentTime(HMac, Zipcode); else Neo4jOperations.travelTo(HMac, Zipcode); } else{ Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode); Neo4jOperations.travelTo(HMac, Zipcode); } } } tx.success(); } graphDb.shutdown(); } }); return null; } }); The part of code in output.foreachRDD pushes the output of spark into Neo4j Database. Checking for duplicates values. This part of code is very time consuming because of which my processing time exceeds batch time. Because of that, it
Question about Serialization in Storage Level
Hi there, This question may seem to be kind of naïve, but what's the difference between MEMORY_AND_DISK and MEMORY_AND_DISK_SER? If I call rdd.persist(StorageLevel.MEMORY_AND_DISK), the BlockManager won't serialize the rdd? Thanks, Zhipeng
Re: Spark and Flink
thanks a lot for ur help, now i split my project, it's works. 2015-05-19 15:44 GMT+02:00 Alexander Alexandrov alexander.s.alexand...@gmail.com: Sorry, we're using a forked version which changed groupID. 2015-05-19 15:15 GMT+02:00 Till Rohrmann trohrm...@apache.org: I guess it's a typo: eu.stratosphere should be replaced by org.apache.flink On Tue, May 19, 2015 at 1:13 PM, Alexander Alexandrov alexander.s.alexand...@gmail.com wrote: We managed to do this with the following config: // properties !-- Hadoop -- hadoop.version2.2.0/hadoop.version !-- Flink -- flink.version0.9-SNAPSHOT/flink.version !-- Spark -- spark.version1.2.1/spark.version // form the dependency management !-- Hadoop -- dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-common/artifactId version${hadoop.version}/version scopeprovided/scope /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-hdfs/artifactId version${hadoop.version}/version scopeprovided/scope /dependency !-- Flink -- dependency groupIdeu.stratosphere/groupId artifactIdflink-scala/artifactId version${flink.version}/version scopeprovided/scope /dependency dependency groupIdeu.stratosphere/groupId artifactIdflink-java/artifactId version${flink.version}/version scopeprovided/scope /dependency dependency groupIdeu.stratosphere/groupId artifactIdflink-clients/artifactId version${flink.version}/version scopeprovided/scope /dependency !-- Spark -- dependency groupIdorg.apache.spark/groupId artifactIdspark-core_${scala.tools.version}/artifactId version${spark.version}/version scopeprovided/scope /dependency !-- Jetty -- dependency groupIdorg.eclipse.jetty/groupId artifactIdjetty-util/artifactId version${jetty.version}/version /dependency dependency groupIdorg.eclipse.jetty/groupId artifactIdjetty-servlet/artifactId version${jetty.version}/version /dependency // actual dependencies !-- Spark -- dependency groupIdorg.apache.spark/groupId artifactIdspark-core_${scala.tools.version}/artifactId /dependency !-- Flink -- dependency groupIdeu.stratosphere/groupId artifactIdflink-scala/artifactId /dependency dependency groupIdeu.stratosphere/groupId artifactIdflink-java/artifactId /dependency dependency groupIdeu.stratosphere/groupId artifactIdflink-clients/artifactId /dependency !-- FIXME: this is a hacky solution for a Flink issue with the Jackson deps-- dependency groupIdcom.fasterxml.jackson.core/groupId artifactIdjackson-core/artifactId version2.2.1/version scopeprovided/scope /dependency dependency groupIdcom.fasterxml.jackson.core/groupId artifactIdjackson-databind/artifactId version2.2.1/version scopeprovided/scope /dependency dependency groupIdcom.fasterxml.jackson.core/groupId artifactIdjackson-annotations/artifactId version2.2.1/version scopeprovided/scope /dependency 2015-05-19 10:06 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com: it's sound good, maybe you can send me pseudo structure, that is my fist maven project. best regards, paul 2015-05-18 14:05 GMT+02:00 Robert Metzger rmetz...@apache.org: Hi, I would really recommend you to put your Flink and Spark dependencies into different maven modules. Having them both in the same project will be very hard, if not impossible. Both projects depend on similar projects with slightly different versions. I would suggest a maven module structure like this: yourproject-parent (a pom module) -- yourproject-common -- yourproject-flink -- yourproject-spark On Mon, May 18, 2015 at 10:00 AM, Pa Rö paul.roewer1...@googlemail.com wrote: hi, if i add your dependency i get over 100 errors, now i change the version number: dependencies dependency groupIdcom.fasterxml.jackson.module/groupId
Re: How to process data in chronological order
Would partitioning your data based on the key and then running mapPartitions help? Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Thu, May 21, 2015 at 4:33 AM, roy rp...@njit.edu wrote: I have a key-value RDD, key is a timestamp (femto-second resolution, so grouping buys me nothing) and I want to reduce it in the chronological order. How do I do that in spark? I am fine with reducing contiguous sections of the set separately and then aggregating the resulting objects locally. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-data-in-chronological-order-tp22966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unable to use hive queries with constants in predicates
Hi, I was testing spark to read data from hive using HiveContext. I got the following error, when I used a simple query with constants in predicates. I am using spark 1.3*. *Anyone encountered error like this ?? *Error:* Exception in thread main org.apache.spark.sql.AnalysisException: Unsupported language features in query: SELECT * from test_table where daily_partition='20150101' TOK_QUERY 1, 0,20, 81 TOK_FROM 1, 10,14, 81 TOK_TABREF 1, 12,14, 81 TOK_TABNAME 1, 12,14, 81 everest_marts_test 1, 12,12, 81 voice_cdr 1, 14,14, 100 TOK_INSERT 0, -1,-1, 0 TOK_DESTINATION 0, -1,-1, 0 TOK_DIR 0, -1,-1, 0 TOK_TMP_FILE 0, -1,-1, 0 TOK_SELECT 1, 0,8, 7 TOK_SELEXPR 1, 2,2, 7 TOK_TABLE_OR_COL 1, 2,2, 7 callingpartynumber 1, 2,2, 7 TOK_SELEXPR 1, 4,4, 26 TOK_TABLE_OR_COL 1, 4,4, 26 calledpartynumber 1, 4,4, 26 TOK_SELEXPR 1, 6,6, 44 TOK_TABLE_OR_COL 1, 6,6, 44 chargingtime 1, 6,6, 44 TOK_SELEXPR 1, 8,8, 57 TOK_TABLE_OR_COL 1, 8,8, 57 call_direction_key 1, 8,8, 57 TOK_WHERE 1, 16,20, 131 = 1, 18,20, 131 TOK_TABLE_OR_COL 1, 18,18, 116 daily_partition 1, 18,18, 116 '20150101' 1, 20,20, 132 scala.NotImplementedError: No parse rules for ASTNode type: 294, text: '20150101' : '20150101' 1, 20,20, 132 + org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1261) ; at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:261) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$ hiveQl$1.apply(ExtendedHiveQlParser.scala:41) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$ hiveQl$1.apply(ExtendedHiveQlParser.scala:40) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1. apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1. apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers. scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$ append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$ append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append( Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$ append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$ append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers. scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$ apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$ apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers. scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply( PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply( AbstractSparkSQLParser.scala:38) at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138) at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138) at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$ SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96) at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$ SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1. apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1. apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers. scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$ append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$ append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append( Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$ append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$ append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers. scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$ apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$ apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at
Re: How to set the file size for parquet Part
How many part files are you having? Did you try re-partitioning to a smaller number so that you will have bigger files of smaller number. Thanks Best Regards On Wed, May 20, 2015 at 3:06 AM, Richard Grossman richie...@gmail.com wrote: Hi I'm using spark 1.3.1 and now I can't set the size of the part generated file for parquet. The size is only 512Kb it's really to small I must made them bigger. How can set this ? Thanks
Re: Read multiple files from S3
textFile does reads all files in a directory. We have modified the sparkstreaming code base to read nested files from S3, you can check this function https://github.com/sigmoidanalytics/spark-modified/blob/8074620414df6bbed81ac855067600573a7b22ca/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala#L206 which does that and implement something similar for your usecase. Or if your job is just a batch job and you don't bother processing file by file, then may be you can iterate over your list and create a sc.textFile for each file entry and do the computing too. something like: for(file - fileNames){ // Create sparkContext // do sc.textFile(file) // do your computing // sc.stop } Thanks Best Regards On Thu, May 21, 2015 at 1:45 AM, lovelylavs lxn130...@utdallas.edu wrote: Hi, I am trying to get a collection of files according to LastModifiedDate from S3 List String FileNames = new ArrayListString(); ListObjectsRequest listObjectsRequest = new ListObjectsRequest() .withBucketName(s3_bucket) .withPrefix(logs_dir); ObjectListing objectListing; do { objectListing = s3Client.listObjects(listObjectsRequest); for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { if ((objectSummary.getLastModified().compareTo(dayBefore) 0) (objectSummary.getLastModified().compareTo(dayAfter) 1) objectSummary.getKey().contains(.log)) FileNames.add(objectSummary.getKey()); } listObjectsRequest.setMarker(objectListing.getNextMarker()); } while (objectListing.isTruncated()); I would like to process these files using Spark I understand that textFile reads a single text file. Is there any way to read all these files that are part of the List? Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Read-multiple-files-from-S3-tp22965.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: rdd.saveAsTextFile problem
Hi , I had tried the workaround shared here, but still facing the same issue... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/rdd-saveAsTextFile-problem-tp176p22970.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FP Growth saveAsTextFile
+user If this was in cluster mode, you should provide a path on a shared file system, e.g., HDFS, instead of a local path. If this is in local model, I'm not sure what went wrong. On Wed, May 20, 2015 at 2:09 PM, Eric Tanner eric.tan...@justenough.com wrote: Here is the stack trace. Thanks for looking at this. scala model.freqItemsets.saveAsTextFile(c:///repository/trunk/Scala_210_wspace/fpGrowth/modelText1) 15/05/20 14:07:47 INFO SparkContext: Starting job: saveAsTextFile at console:33 15/05/20 14:07:47 INFO DAGScheduler: Got job 15 (saveAsTextFile at console:33) with 2 output partitions (allowLocal=false) 15/05/20 14:07:47 INFO DAGScheduler: Final stage: Stage 30(saveAsTextFile at console:33) 15/05/20 14:07:47 INFO DAGScheduler: Parents of final stage: List(Stage 29) 15/05/20 14:07:47 INFO DAGScheduler: Missing parents: List() 15/05/20 14:07:47 INFO DAGScheduler: Submitting Stage 30 (MapPartitionsRDD[21] at saveAsTextFile at console:33), which has no missing parents 15/05/20 14:07:47 INFO MemoryStore: ensureFreeSpace(131288) called with curMem=724428, maxMem=278302556 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 128.2 KB, free 264.6 MB) 15/05/20 14:07:47 INFO MemoryStore: ensureFreeSpace(78995) called with curMem=855716, maxMem=278302556 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 77.1 KB, free 264.5 MB) 15/05/20 14:07:47 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on localhost:52396 (size: 77.1 KB, free: 265.1 MB) 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block broadcast_18_piece0 15/05/20 14:07:47 INFO SparkContext: Created broadcast 18 from broadcast at DAGScheduler.scala:839 15/05/20 14:07:47 INFO DAGScheduler: Submitting 2 missing tasks from Stage 30 (MapPartitionsRDD[21] at saveAsTextFile at console:33) 15/05/20 14:07:47 INFO TaskSchedulerImpl: Adding task set 30.0 with 2 tasks 15/05/20 14:07:47 INFO BlockManager: Removing broadcast 17 15/05/20 14:07:47 INFO TaskSetManager: Starting task 0.0 in stage 30.0 (TID 33, localhost, PROCESS_LOCAL, 1056 bytes) 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_17_piece0 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_17_piece0 of size 4737 dropped from memory (free 277372582) 15/05/20 14:07:47 INFO TaskSetManager: Starting task 1.0 in stage 30.0 (TID 34, localhost, PROCESS_LOCAL, 1056 bytes) 15/05/20 14:07:47 INFO BlockManagerInfo: Removed broadcast_17_piece0 on localhost:52396 in memory (size: 4.6 KB, free: 265.1 MB) 15/05/20 14:07:47 INFO Executor: Running task 1.0 in stage 30.0 (TID 34) 15/05/20 14:07:47 INFO Executor: Running task 0.0 in stage 30.0 (TID 33) 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block broadcast_17_piece0 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_17 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_17 of size 6696 dropped from memory (free 277379278) 15/05/20 14:07:47 INFO ContextCleaner: Cleaned broadcast 17 15/05/20 14:07:47 INFO BlockManager: Removing broadcast 16 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_16_piece0 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_16_piece0 of size 4737 dropped from memory (free 277384015) 15/05/20 14:07:47 INFO BlockManagerInfo: Removed broadcast_16_piece0 on localhost:52396 in memory (size: 4.6 KB, free: 265.1 MB) 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block broadcast_16_piece0 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_16 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_16 of size 6696 dropped from memory (free 277390711) 15/05/20 14:07:47 INFO ContextCleaner: Cleaned broadcast 16 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 15/05/20 14:07:47 ERROR Executor: Exception in task 1.0 in stage 30.0 (TID 34) java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1010) at org.apache.hadoop.util.Shell.runCommand(Shell.java:482) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715) at org.apache.hadoop.util.Shell.execCommand(Shell.java:808) at org.apache.hadoop.util.Shell.execCommand(Shell.java:791) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:656) at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:490) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:462) at
Re: rdd.saveAsTextFile problem
This thread happened a year back, can you please share what issue you are facing? which version of spark you are using? What is your system environment? Exception stack-trace? Thanks Best Regards On Thu, May 21, 2015 at 12:19 PM, Keerthi keerthi.reddy1...@gmail.com wrote: Hi , I had tried the workaround shared here, but still facing the same issue... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/rdd-saveAsTextFile-problem-tp176p22970.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java program Get Stuck at broadcasting
Sure, the code is very simple. I think u guys can understand from the main function. public class Test1 { public static double[][] createBroadcastPoints(String localPointPath, int row, int col) throws IOException{ BufferedReader br = RAWF.reader(localPointPath); String line = null; int rowIndex = 0; double[][] pointFeatures = new double[row][col]; while((line = br.readLine())!=null){ ListString point = Arrays.asList(line.split(,)); int colIndex = 0; for(String pointFeature: point){ pointFeatures[rowIndex][colIndex] = Double.valueOf(pointFeature); colIndex++; } rowIndex++; } br.close(); return pointFeatures; } public static void main(String[] args) throws IOException{ /**Parameter Setting***/ String localPointPath = /home/hduser/skyrock/skyrockImageFeatures.csv; String remoteFilePath = hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv; //this csv file is only 468MB final int row = 133433; final int col = 458; /**/ SparkConf conf = new SparkConf(). setAppName(distance). setMaster(spark://HadoopV26Master:7077). set(spark.executor.memory, 4g). set(spark.eventLog.enabled, true) .set(spark.eventLog.dir, /usr/local/spark/logs/spark-events) .set(spark.local.dir, /tmp/spark-temp); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString textFile = sc.textFile(remoteFilePath); //Broadcast variable //double[][] xx =; final Broadcastdouble[][] broadcastPoints = sc.broadcast(createBroadcastPoints(localPointPath,row,col)); //final Broadcastdouble[][] broadcastPoints = sc.broadcast(xx); /** * Compute the distance in terms of each point on each instance. * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1 */ JavaPairRDDPair,Double distance = textFile.flatMapToPair(new PairFlatMapFunctionString, Pair, Double(){ public IterableTuple2Pair, Double call(String v1) throws Exception{ ListString al = Arrays.asList(v1.split(,)); double[] featureVals = new double[al.size()]; for(int j=0;jal.size()-1;j++) featureVals[j] = Double.valueOf(al.get(j+1)); int jIndex = Integer.valueOf(al.get(0)); double[][] allPoints = broadcastPoints.getValue(); double sum = 0; ListTuple2Pair, Double list = new ArrayListTuple2Pair, Double(); for(int i=0;irow; i++){ sum = 0; for(int j=0;jal.size()-1;j++){ sum += (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]); } list.add(new Tuple2Pair,Double(new Pair(i,jIndex),Math.sqrt(sum))); } return list; } }); distance.saveAsTextFile(hdfs://HadoopV26Master:9000/user/+args[0]); } } On 21 May 2015 at 16:44, Akhil Das ak...@sigmoidanalytics.com wrote: Can you share the code, may be i/someone can help you out Thanks Best Regards On Thu, May 21, 2015 at 1:45 PM, Allan Jie allanmcgr...@gmail.com wrote: Hi, Just check the logs of datanode, it looks like this: 2015-05-20 11:42:14,605 INFO org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: / 10.9.0.48:50676, dest: /10.9.0.17:50010, bytes: 134217728, op: HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_804680172_54, offset: 0, srvID: 39fb78d5-828a-4319-8303-c704fab526e3, blockid: BP-436159032-10.9.0.16-1431330007172:blk_1073742096_1273, duration: 16994466261 2015-05-20 11:42:14,606 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder: BP-436159032-10.9.0.16-1431330007172:blk_1073742096_1273, type=LAST_IN_PIPELINE, downstreams=0:[] terminating 2015-05-20 11:42:17,788 INFO org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: / 10.9.0.17:49046, dest: /10.9.0.17:50010, bytes: 134217728, op: HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_102926009_54, offset: 0, srvID: 39fb78d5-828a-4319-8303-c704fab526e3, blockid: BP-436159032-10.9.0.16-1431330007172:blk_1073742099_1276, duration: 17829554438 2015-05-20 11:42:17,788 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder: BP-436159032-10.9.0.16-1431330007172:blk_1073742099_1276, type=HAS_DOWNSTREAM_IN_PIPELINE terminating 2015-05-20 11:42:17,904 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Receiving BP-436159032-10.9.0.16-1431330007172:blk_1073742103_1280 src: / 10.9.0.17:49049 dest: /10.9.0.17:50010 2015-05-20 11:42:17,904 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: IOException in BlockReceiver constructor. Cause is 2015-05-20 11:42:17,904 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: opWriteBlock BP-436159032-10.9.0.16-1431330007172:blk_1073742103_1280 received exception org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space: The volume with the most available space (=114409472 B) is less than the block size (=134217728 B). 2015-05-20 11:42:17,905 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: HadoopV26Slave1:50010:DataXceiver error processing WRITE_BLOCK operation src: /10.9.0.17:49049 dst: /10.9.0.17:50010 org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space: The volume with the most available space (=114409472 B) is less than the block size
Re: java program got Stuck at broadcasting
Sure, the code is very simple. I think u guys can understand from the main function. public class Test1 { public static double[][] createBroadcastPoints(String localPointPath, int row, int col) throws IOException{ BufferedReader br = RAWF.reader(localPointPath); String line = null; int rowIndex = 0; double[][] pointFeatures = new double[row][col]; while((line = br.readLine())!=null){ ListString point = Arrays.asList(line.split(,)); int colIndex = 0; for(String pointFeature: point){ pointFeatures[rowIndex][colIndex] = Double.valueOf(pointFeature); colIndex++; } rowIndex++; } br.close(); return pointFeatures; } public static void main(String[] args) throws IOException{ /**Parameter Setting***/ String localPointPath = /home/hduser/skyrock/skyrockImageFeatures.csv; String remoteFilePath = hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv; //this csv file is only 468MB final int row = 133433; final int col = 458; /**/ SparkConf conf = new SparkConf(). setAppName(distance). setMaster(spark://HadoopV26Master:7077). set(spark.executor.memory, 4g). set(spark.eventLog.enabled, true) .set(spark.eventLog.dir, /usr/local/spark/logs/spark-events) .set(spark.local.dir, /tmp/spark-temp); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString textFile = sc.textFile(remoteFilePath); //Broadcast variable //double[][] xx =; final Broadcastdouble[][] broadcastPoints = sc.broadcast(createBroadcastPoints(localPointPath,row,col)); //final Broadcastdouble[][] broadcastPoints = sc.broadcast(xx); /** * Compute the distance in terms of each point on each instance. * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1 */ JavaPairRDDPair,Double distance = textFile.flatMapToPair(new PairFlatMapFunctionString, Pair, Double(){ public IterableTuple2lt;Pair, Double call(String v1) throws Exception{ ListString al = Arrays.asList(v1.split(,)); double[] featureVals = new double[al.size()]; for(int j=0;jal.size()-1;j++) featureVals[j] = Double.valueOf(al.get(j+1)); int jIndex = Integer.valueOf(al.get(0)); double[][] allPoints = broadcastPoints.getValue(); double sum = 0; Listlt;Tuple2lt;Pair, Double list = new ArrayListTuple2lt;Pair, Double(); for(int i=0;irow; i++){ sum = 0; for(int j=0;jlt;al.size()-1;j++){ sum += (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]); } list.add(new Tuple2lt;Pair,Double(new Pair(i,jIndex),Math.sqrt(sum))); } return list; } }); distance.saveAsTextFile(hdfs://HadoopV26Master:9000/user/+args[0]); } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-program-got-Stuck-at-broadcasting-tp22953p22973.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
?????? How to use spark to access HBase with Security enabled
Hi, Many thanks for the help. My Spark version is 1.3.0 too and I run it on Yarn. According to your advice I have changed the configuration. Now my program can read the hbase-site.xml correctly. And it can also authenticate with zookeeper successfully. But I meet a new problem that is my program still can not pass the authentication of HBase. Did you or anybody else ever meet such kind of situation ? I used a keytab file to provide the principal. Since it can pass the authentication of the Zookeeper, I am sure the keytab file is OK. But it jsut can not pass the authentication of HBase. The exception is listed below and could you or anybody else help me ? Still many many thanks! Exception*** 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 sessionTimeout=9 watcher=hconnection-0x4e142a710x0, quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, baseZNode=/hbase 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in. 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started. 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI as SASL mechanism. 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate using Login Context section 'Client' 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established to bgdt02.dev.hrb/130.1.9.98:2181, initiating session 15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu May 21 16:03:18 CST 2015 15/05/21 16:03:18 INFO zookeeper.Login: TGT expires: Fri May 22 16:03:18 CST 2015 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 22 11:43:32 CST 2015 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete on server bgdt02.dev.hrb/130.1.9.98:2181, sessionid = 0x24d46cb0ffd0020, negotiated timeout = 4 15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable called multiple times. Overwriting connection and table reference; TableInputFormatBase will not close these old references when done. 15/05/21 16:03:19 INFO util.RegionSizeCalculator: Calculating region sizes for table ns_dev1:hd01. 15/05/21 16:03:19 WARN ipc.AbstractRpcClient: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/05/21 16:03:19 ERROR ipc.AbstractRpcClient: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'. javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:604) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:153) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:730) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:727) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:727) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:880) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:849) at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1173) at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216) at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300) at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:31751) at org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:332) at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:187) at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:62)
Re: Spark Streaming - Design considerations/Knobs
Honestly, given the length of my email, I didn't expect a reply. :-) Thanks for reading and replying. However, I have a follow-up question: I don't think if I understand the block replication completely. Are the blocks replicated immediately after they are received by the receiver? Or are they kept on the receiver node only and are moved only on shuffle? Has the replication something to do with locality.wait? Thanks, Hemant On Thu, May 21, 2015 at 2:21 AM, Tathagata Das t...@databricks.com wrote: Correcting the ones that are incorrect or incomplete. BUT this is good list for things to remember about Spark Streaming. On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com wrote: Hi, I have compiled a list (from online sources) of knobs/design considerations that need to be taken care of by applications running on spark streaming. Is my understanding correct? Any other important design consideration that I should take care of? - A DStream is associated with a single receiver. For attaining read parallelism multiple receivers i.e. multiple DStreams need to be created. - A receiver is run within an executor. It occupies one core. Ensure that there are enough cores for processing after receiver slots are booked i.e. spark.cores.max should take the receiver slots into account. - The receivers are allocated to executors in a round robin fashion. - When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. - These blocks are distributed by the BlockManager of the current executor to the block managers of other executors. After that, the Network Input Tracker running on the driver is informed about the block locations for further processing. - A RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally. The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in (as you observed next). - Having bigger blockinterval means bigger blocks. A high value of spark.locality.wait increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally. - Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling dstream.repartition(n). This reshuffles the data in RDD randomly to create n number of partitions. Yes, for greater parallelism. Though comes at the cost of a shuffle. - An RDD's processing is scheduled by driver's jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued. - If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. - To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted. To further clarify, the jobs depend on the number of output operations (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those output operations. dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark job per batch dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() } }// TWO Spark jobs per batch dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd = rdd.count } // TWO Spark jobs per batch - - If the batch processing time is more than batchinterval then obviously the receiver's memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. You can limit the rate of receiver using SparkConf config spark.streaming.receiver.maxRate - - For being fully fault tolerant, spark streaming needs to enable checkpointing. Checkpointing increases the batch processing time. Incomplete. There are two types of checkpointing - data and metadata. Only data checkpointing, needed by only some operations, increase batch processing time. Read - http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing Furthemore, with checkpoint you can recover computation, but you
Re: Query a Dataframe in rdd.map()
never mind... i didnt realize you were referring to the first table as df. so you want to do a join between the first table and an RDD? the right way to do it within the data frame construct is to think of it as a join... map the second RDD to a data frame and do an inner join on ip On Thu, May 21, 2015 at 10:54 AM, Ram Sriharsha sriharsha@gmail.com wrote: Your original code snippet seems incomplete and there isn't enough information to figure out what problem you actually ran into from your original code snippet there is an rdd variable which is well defined and a df variable that is not defined in the snippet of code you sent one way to make this work is as below (until the last line is executed you are actually not collecting anything on the driver, and if your dataset is too big to collect on the driver for inspection just do a take(n) on the result from pyspark.sql import Row,SQLContext from pyspark.sql.functions import count sqlContext = SQLContext(sc) # convert list of ip into a data frame with column ip Record = Row(ip) df = sc.parallelize(map(lambda x: Record(x), ['208.51.22.18', '31.207.6.173', '208.51.22.18'])).toDF() # obtain ip - frequency and inspect df.groupBy(df.ip).agg(count(df.ip)).show() ++-+ | ip|COUNT(ip)| ++-+ |208.51.22.18|2| |31.207.6.173|1| ++-+ what exactly is the issue you are running into when you say it doesn't get through? On Thu, May 21, 2015 at 10:47 AM, ping yan sharon...@gmail.com wrote: Thanks. I suspected that, but figured that df query inside a map sounds so intuitive that I don't just want to give up. I've tried join and even better with a DStream.transform() and it works! freqs = testips.transform(lambda rdd: rdd.join(kvrdd).map(lambda (x,y): y[1])) Thank you for the help! Ping On Thu, May 21, 2015 at 10:40 AM, Holden Karau hol...@pigscanfly.ca wrote: So DataFrames, like RDDs, can only be accused from the driver. If your IP Frequency table is small enough you could collect it and distribute it as a hashmap with broadcast or you could also join your rdd with the ip frequency table. Hope that helps :) On Thursday, May 21, 2015, ping yan sharon...@gmail.com wrote: I have a dataframe as a reference table for IP frequencies. e.g., ip freq 10.226.93.67 1 10.226.93.69 1 161.168.251.101 4 10.236.70.2 1 161.168.251.105 14 All I need is to query the df in a map. rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18']) freqs = rdd.map(lambda x: df.where(df.ip ==x ).first()) It doesn't get through.. would appreciate any help. Thanks! Ping -- Ping Yan Ph.D. in Management Dept. of Management Information Systems University of Arizona Tucson, AZ 85721 -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau -- Ping Yan Ph.D. in Management Dept. of Management Information Systems University of Arizona Tucson, AZ 85721
Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error
Looks like somehow the file size reported by the FSInputDStream of Tachyon's FileSystem interface, is returning zero. On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Just to follow up this thread further . I was doing some fault tolerant testing of Spark Streaming with Tachyon as OFF_HEAP block store. As I said in earlier email, I could able to solve the BlockNotFound exception when I used Hierarchical Storage of Tachyon , which is good. I continue doing some testing around storing the Spark Streaming WAL and CheckPoint files also in Tachyon . Here is few finding .. When I store the Spark Streaming Checkpoint location in Tachyon , the throughput is much higher . I tested the Driver and Receiver failure cases , and Spark Streaming is able to recover without any Data Loss on Driver failure. *But on Receiver failure , Spark Streaming looses data* as I see Exception while reading the WAL file from Tachyon receivedData location for the same Receiver id which just failed. If I change the Checkpoint location back to HDFS , Spark Streaming can recover from both Driver and Receiver failure . Here is the Log details when Spark Streaming receiver failed ...I raised a JIRA for the same issue : https://issues.apache.org/jira/browse/SPARK-7525 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2 (epoch 1)* INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to remove executor 2 from BlockManagerMaster. INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing block manager BlockManagerId(2, 10.252.5.54, 45789) INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 successfully in removeExecutor INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered receiver for stream 2 from 10.252.5.62*:47255 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919 http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)* at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.IllegalArgumentException:* Seek position is past EOF: 645603894, fileSize = 0* at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239) at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141) ... 15 more INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes) INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage 103.0 (TID 422) on executor 10.252.5.61: org.apache.spark.SparkException (Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft:// 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)) [duplicate 1] INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes) INFO : org.apache.spark.deploy.client.AppClient$ClientActor -
Re: Query a Dataframe in rdd.map()
Thanks. I suspected that, but figured that df query inside a map sounds so intuitive that I don't just want to give up. I've tried join and even better with a DStream.transform() and it works! freqs = testips.transform(lambda rdd: rdd.join(kvrdd).map(lambda (x,y): y[1])) Thank you for the help! Ping On Thu, May 21, 2015 at 10:40 AM, Holden Karau hol...@pigscanfly.ca wrote: So DataFrames, like RDDs, can only be accused from the driver. If your IP Frequency table is small enough you could collect it and distribute it as a hashmap with broadcast or you could also join your rdd with the ip frequency table. Hope that helps :) On Thursday, May 21, 2015, ping yan sharon...@gmail.com wrote: I have a dataframe as a reference table for IP frequencies. e.g., ip freq 10.226.93.67 1 10.226.93.69 1 161.168.251.101 4 10.236.70.2 1 161.168.251.105 14 All I need is to query the df in a map. rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18']) freqs = rdd.map(lambda x: df.where(df.ip ==x ).first()) It doesn't get through.. would appreciate any help. Thanks! Ping -- Ping Yan Ph.D. in Management Dept. of Management Information Systems University of Arizona Tucson, AZ 85721 -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau -- Ping Yan Ph.D. in Management Dept. of Management Information Systems University of Arizona Tucson, AZ 85721
RE: rdd.sample() methods very slow
I don't need to be 100% randome. How about randomly pick a few partitions and return all docs in those partitions? Is rdd.mapPartitionsWithIndex() the right method to use to just process a small portion of partitions? Ningjun -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, May 21, 2015 11:30 AM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: rdd.sample() methods very slow I guess the fundamental issue is that these aren't stored in a way that allows random access to a Document. Underneath, Hadoop has a concept of a MapFile which is like a SequenceFile with an index of offsets into the file where records being. Although Spark doesn't use it, you could maybe create some custom RDD that takes advantage of this format to grab random elements efficiently. Other things come to mind but I think they're all slower -- like hashing all the docs and taking the smallest n in each of k partitions to get a pretty uniform random sample of kn docs. On Thu, May 21, 2015 at 4:04 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Is there any other way to solve the problem? Let me state the use case I have an RDD[Document] contains over 7 millions items. The RDD need to be save on a persistent storage (currently I save it as object file on disk). Then I need to get a small random sample of Document objects (e.g. 10,000 document). How can I do this quickly? The rdd.sample() methods does not help because it need to read the entire RDD of 7 million Document from disk which take very long time. Ningjun From: Sean Owen [mailto:so...@cloudera.com] Sent: Tuesday, May 19, 2015 4:51 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: rdd.sample() methods very slow The way these files are accessed is inherently sequential-access. There isn't a way to in general know where record N is in a file like this and jump to it. So they must be read to be sampled. On Tue, May 19, 2015 at 9:44 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Hi I have an RDD[Document] that contains 7 million objects and it is saved in file system as object file. I want to get a random sample of about 70 objects from it using rdd.sample() method. It is ver slow val rdd : RDD[Document] = sc.objectFile[Document](C:/temp/docs.obj).sample(false, 0.1D, 0L).cache() val count = rdd.count() From Spark UI, I see spark is try to read the entire object files at the folder “C:/temp/docs.obj” which is about 29.7 GB. Of course this is very slow. Why does Spark try to read entire 7 million objects while I only need to return a random sample of 70 objects? Is there any efficient way to get a random sample of 70 objects without reading through the entire object files? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Pipelining with Spark
From the performance and scalability standpoint, is it better to plug in, say a multi-threaded pipeliner into a Spark job, or implement pipelining via Spark's own transformation mechanisms such as e.g. map or filter? I'm seeing some reference architectures where things like 'morphlines' are plugged into Spark but it'd seem that Spark may yield better performance and scalability if each stage within a pipeline is a function in a Spark job - ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pipelining-with-Spark-tp22976.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to use hive queries with constants in predicates
I have not seen this error but have seen another user have weird parser issues before: http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccag6lhyed_no6qrutwsxeenrbqjuuzvqtbpxwx4z-gndqoj3...@mail.gmail.com%3E I would attach a debugger and see what is going on -- if I'm looking at the right place ( http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hive/hive-exec/0.13.1/org/apache/hadoop/hive/ql/parse/HiveParser.java#HiveParser) token 294 is RCURLY...which doesnt make much sense... On Thu, May 21, 2015 at 2:10 AM, Devarajan Srinivasan devathecool1...@gmail.com wrote: Hi, I was testing spark to read data from hive using HiveContext. I got the following error, when I used a simple query with constants in predicates. I am using spark 1.3*. *Anyone encountered error like this ?? *Error:* Exception in thread main org.apache.spark.sql.AnalysisException: Unsupported language features in query: SELECT * from test_table where daily_partition='20150101' TOK_QUERY 1, 0,20, 81 TOK_FROM 1, 10,14, 81 TOK_TABREF 1, 12,14, 81 TOK_TABNAME 1, 12,14, 81 everest_marts_test 1, 12,12, 81 voice_cdr 1, 14,14, 100 TOK_INSERT 0, -1,-1, 0 TOK_DESTINATION 0, -1,-1, 0 TOK_DIR 0, -1,-1, 0 TOK_TMP_FILE 0, -1,-1, 0 TOK_SELECT 1, 0,8, 7 TOK_SELEXPR 1, 2,2, 7 TOK_TABLE_OR_COL 1, 2,2, 7 callingpartynumber 1, 2,2, 7 TOK_SELEXPR 1, 4,4, 26 TOK_TABLE_OR_COL 1, 4,4, 26 calledpartynumber 1, 4,4, 26 TOK_SELEXPR 1, 6,6, 44 TOK_TABLE_OR_COL 1, 6,6, 44 chargingtime 1, 6,6, 44 TOK_SELEXPR 1, 8,8, 57 TOK_TABLE_OR_COL 1, 8,8, 57 call_direction_key 1, 8,8, 57 TOK_WHERE 1, 16,20, 131 = 1, 18,20, 131 TOK_TABLE_OR_COL 1, 18,18, 116 daily_partition 1, 18,18, 116 '20150101' 1, 20,20, 132 scala.NotImplementedError: No parse rules for ASTNode type: 294, text: '20150101' : '20150101' 1, 20,20, 132 + org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1261) ; at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:261) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$ hiveQl$1.apply(ExtendedHiveQlParser.scala:41) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$ hiveQl$1.apply(ExtendedHiveQlParser.scala:40) at scala.util.parsing.combinator.Parsers$Success.map(Parsers. scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers. scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1. apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1. apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers. scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$ append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$ append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append( Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$ append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$ append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers. scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$ apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$ apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers. scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply( PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply( AbstractSparkSQLParser.scala:38) at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138) at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138) at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$ SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96) at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$ SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95) at scala.util.parsing.combinator.Parsers$Success.map(Parsers. scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers. scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1. apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1. apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers. scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$ append$1$$anonfun$apply$2.apply(Parsers.scala:254) at
Re: Question about Serialization in Storage Level
From the docs, https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence: Storage LevelMeaningMEMORY_ONLYStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.MEMORY_AND_DISKStore RDD as *deserialized* Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed. MEMORY_ONLY_SERStore RDD as *serialized* Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer https://spark.apache.org/docs/latest/tuning.html, but more CPU-intensive to read.MEMORY_AND_DISK_SERSimilar to *MEMORY_ONLY_SER*, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed. On Thu, May 21, 2015 at 3:52 AM, Jiang, Zhipeng zhipeng.ji...@intel.com wrote: Hi there, This question may seem to be kind of naïve, but what’s the difference between *MEMORY_AND_DISK* and *MEMORY_AND_DISK_SER*? If I call *rdd.persist(StorageLevel.MEMORY_AND_DISK)*, the BlockManager won’t serialize the *rdd*? Thanks, Zhipeng
Re: Spark Streaming on top of Cassandra?
Can some one provide example of Spark Streaming using Java? I have cassandra running but did not configure spark but would like to create Dstream. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-on-top-of-Cassandra-tp1283p22978.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming on top of Cassandra?
hi. I have a spark streaming - cassandra application which you can probably borrow pretty easily. You can always rewrite a part of it in java if you need to , or else, you can just use scala (see the blog post below if you want a java style dev workflow w/ scala using intellij)/ This application implements a spark stream w twitter and ETLs it into either a file queue or cassandra (see the commented out cassandra snippet). https://github.com/jayunit100/SparkStreamingApps/blob/master/src/main/scala/sparkapps/ctakes/CTakesTwitterStreamingApp.scala Cassandra sink works really well with the spark context compile time bindings . Maybe just clone this repo down and use it as a blueprint :) There is a blog post here about how to set up your IDE so that the dev workflow is very similar to that of standard java http://jayunit100.blogspot.com/2014/07/set-up-spark-application-devleopment.html good luck !. On Thu, May 21, 2015 at 4:24 PM, tshah77 tejasrs...@gmail.com wrote: Can some one provide example of Spark Streaming using Java? I have cassandra running but did not configure spark but would like to create Dstream. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-on-top-of-Cassandra-tp1283p22978.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- jay vyas
Re: Connecting to an inmemory database from Spark
Doesnt seem like a Cassandra specific issue. Could you give us more information (code, errors, stack traces)? On Thu, May 21, 2015 at 1:33 PM, tshah77 tejasrs...@gmail.com wrote: TD, Do you have any example about reading from cassandra using spark streaming in java? I am trying to connect to cassandra using spark streaming and it is throwing an error as could not parse master url. Thanks Tejas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-tp1343p22979.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
foreach vs foreachPartitions
I would like to know if the foreachPartitions will results in a better performance, due to an higher level of parallelism, compared to the foreach method considering the case in which I'm flowing through an RDD in order to perform some sums into an accumulator variable. Thank you, Beniamino. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/foreach-vs-foreachPartitions-tp22983.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Connecting to an inmemory database from Spark
TD, Do you have any example about reading from cassandra using spark streaming in java? I am trying to connect to cassandra using spark streaming and it is throwing an error as could not parse master url. Thanks Tejas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-tp1343p22979.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to use spark to access HBase with Security enabled
Are the worker nodes colocated with HBase region servers ? Were you running as hbase super user ? You may need to login, using code similar to the following: if (isSecurityEnabled()) { SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost); } SecurityUtil is hadoop class. Cheers On Thu, May 21, 2015 at 1:58 AM, donhoff_h 165612...@qq.com wrote: Hi, Many thanks for the help. My Spark version is 1.3.0 too and I run it on Yarn. According to your advice I have changed the configuration. Now my program can read the hbase-site.xml correctly. And it can also authenticate with zookeeper successfully. But I meet a new problem that is my program still can not pass the authentication of HBase. Did you or anybody else ever meet such kind of situation ? I used a keytab file to provide the principal. Since it can pass the authentication of the Zookeeper, I am sure the keytab file is OK. But it jsut can not pass the authentication of HBase. The exception is listed below and could you or anybody else help me ? Still many many thanks! Exception*** 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 sessionTimeout=9 watcher=hconnection-0x4e142a710x0, quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, baseZNode=/hbase 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in. 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started. 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI as SASL mechanism. 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate using Login Context section 'Client' 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established to bgdt02.dev.hrb/130.1.9.98:2181, initiating session 15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu May 21 16:03:18 CST 2015 15/05/21 16:03:18 INFO zookeeper.Login: TGT expires: Fri May 22 16:03:18 CST 2015 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 22 11:43:32 CST 2015 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete on server bgdt02.dev.hrb/130.1.9.98:2181, sessionid = 0x24d46cb0ffd0020, negotiated timeout = 4 15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable called multiple times. Overwriting connection and table reference; TableInputFormatBase will not close these old references when done. 15/05/21 16:03:19 INFO util.RegionSizeCalculator: Calculating region sizes for table ns_dev1:hd01. 15/05/21 16:03:19 WARN ipc.AbstractRpcClient: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/05/21 16:03:19 ERROR ipc.AbstractRpcClient: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'. javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:604) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:153) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:730) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:727) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:727) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:880) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:849) at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1173) at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216) at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)
foreach plus accumulator Vs mapPartitions performance
Hi, everybody. There are some cases in which I can obtain the same results by using the mapPartitions and the foreach method. For example in a typical MapReduce approach one would perform a reduceByKey immediately after a mapPartitions that transform the original RDD in a collection of tuple (key, value). I think that is possible to achieve the same result by using, for instance an array of accumulator where at each index an executor sums a value and the index itself could be a key. Since the reduceByKey will perform a shuffle on disk I think that when is possible, the foreach approach should be better even though the foreach has the side effect of sum a value to an accumulator. I am making this request to see if my reasoning is correct . I hope I was clear. Thank you, Beniamino -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/foreach-plus-accumulator-Vs-mapPartitions-performance-tp22982.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark MOOC - early access
*Hi Spark Devs and Users,BerkeleyX and Databricks are currently developing two Spark-related MOOC on edX (intro https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x, ml https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x), the first of which starts on June 1st. Together these courses have over 75K enrolled students!To help students perform exercises course content, we have created a Vagrant box that contains Spark and IPython (running on Ubuntu 32-bit). This will simplify user setup and helps us support them. We are writing to give you early access to the VM environment and the first assignment, and to request your help to test out the VM/assignment before we unleash it to 75K people (see instructions below). We’ve provided instructions below. We’re happy to help if you have any difficulties getting the VM setup; please feel free to contact me (marco.s...@gmail.com marco.s...@gmail.com) with any issues, comments, or questions.Sincerely,Marco ShawSpark MOOC TA_(This is being sent as an HTML formatted email. Some of the links have been duplicated just in case.)1. Install VirtualBox here https://www.virtualbox.org/wiki/Downloads on your OS (see Windows tutorial here https://www.youtube.com/watch?v=06Sf-m64fcY (https://www.youtube.com/watch?v=06Sf-m64fcY https://www.youtube.com/watch?v=06Sf-m64fcY))2. Install Vagrant here https://www.vagrantup.com/downloads.html on your OS (see Windows tutorial here https://www.youtube.com/watch?v=LZVS23BaA1I (https://www.youtube.com/watch?v=LZVS23BaA1I https://www.youtube.com/watch?v=LZVS23BaA1I))3) Install virtual machine using the following steps: (see Windows tutorial here https://www.youtube.com/watch?v=ZuJCqHC7IYc (https://www.youtube.com/watch?v=ZuJCqHC7IYc https://www.youtube.com/watch?v=ZuJCqHC7IYc))a. Create a custom directory (e.g. c:\users\marco\myvagrant or /home/marco/myvagrant)b. Download the file https://raw.githubusercontent.com/spark-mooc/mooc-setup/master/Vagrantfile to the custom directory (NOTE: It must be named exactly Vagrantfile with no extension)c. Open a DOS prompt (Windows) or terminal (Mac/Linux) to the custom directory and issue the command vagrant up4) Perform basic commands in VM as described below: (see Windows tutorial here https://www.youtube.com/watch?v=bkteLH77IR0 (https://www.youtube.com/watch?v=bkteLH77IR0 https://www.youtube.com/watch?v=bkteLH77IR0))a. To start the VM, from a DOS prompt (Windows) or terminal (Mac/Linux), issue the command vagrant up.b. To stop the VM, from a DOS prompt (Windows) or terminal (Mac/Linux), issue the command vagrant halt.c. To erase or delete the VM, from a DOS prompt (Windows) or terminal (Mac/Linux), issue the command vagrant destroy.d. Once the VM is running, to access the notebook, open a web browser to http://localhost:8001 http://localhost:8001/.5) Using test notebook as described below: (see Windows tutorial here https://www.youtube.com/watch?v=mlfAmyF3Q-s (https://www.youtube.com/watch?v=mlfAmyF3Q-s https://www.youtube.com/watch?v=mlfAmyF3Q-s))a. To start the VM, from a DOS prompt (Windows) or terminal (Mac/Linux), issue the command vagrant up.b. Once the VM is running, to access the notebook, open a web browser to http://localhost:8001 http://localhost:8001/.c. Upload this IPython notebook: https://raw.githubusercontent.com/spark-mooc/mooc-setup/master/vm_test_student.ipynb https://raw.githubusercontent.com/spark-mooc/mooc-setup/master/vm_test_student.ipynb.d. Run through the notebook.6) Play around with the first MOOC assignment (email Marco for details when you get to this point).7) Please answer the following questionsa. What machine are you using (OS, RAM, CPU, age)?b. How long did the entire process take?c. How long did the VM download take? Relatedly, where are you located?d. Do you have any other comments/suggestions?*
Re: How to use spark to access HBase with Security enabled
What I found with the CDH-5.4.1 Spark 1.3, the spark.executor.extraClassPath setting is not working. Had to use SPARK_CLASSPATH instead. On Thursday, May 21, 2015, Ted Yu yuzhih...@gmail.com wrote: Are the worker nodes colocated with HBase region servers ? Were you running as hbase super user ? You may need to login, using code similar to the following: if (isSecurityEnabled()) { SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost); } SecurityUtil is hadoop class. Cheers On Thu, May 21, 2015 at 1:58 AM, donhoff_h 165612...@qq.com javascript:_e(%7B%7D,'cvml','165612...@qq.com'); wrote: Hi, Many thanks for the help. My Spark version is 1.3.0 too and I run it on Yarn. According to your advice I have changed the configuration. Now my program can read the hbase-site.xml correctly. And it can also authenticate with zookeeper successfully. But I meet a new problem that is my program still can not pass the authentication of HBase. Did you or anybody else ever meet such kind of situation ? I used a keytab file to provide the principal. Since it can pass the authentication of the Zookeeper, I am sure the keytab file is OK. But it jsut can not pass the authentication of HBase. The exception is listed below and could you or anybody else help me ? Still many many thanks! Exception*** 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 sessionTimeout=9 watcher=hconnection-0x4e142a710x0, quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, baseZNode=/hbase 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in. 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started. 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI as SASL mechanism. 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate using Login Context section 'Client' 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established to bgdt02.dev.hrb/130.1.9.98:2181, initiating session 15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu May 21 16:03:18 CST 2015 15/05/21 16:03:18 INFO zookeeper.Login: TGT expires: Fri May 22 16:03:18 CST 2015 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 22 11:43:32 CST 2015 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete on server bgdt02.dev.hrb/130.1.9.98:2181, sessionid = 0x24d46cb0ffd0020, negotiated timeout = 4 15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable called multiple times. Overwriting connection and table reference; TableInputFormatBase will not close these old references when done. 15/05/21 16:03:19 INFO util.RegionSizeCalculator: Calculating region sizes for table ns_dev1:hd01. 15/05/21 16:03:19 WARN ipc.AbstractRpcClient: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/05/21 16:03:19 ERROR ipc.AbstractRpcClient: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'. javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:604) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:153) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:730) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:727) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:727) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:880) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:849) at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1173) at
Re: Spark HistoryServer not coming up
This got resolved after cleaning /user/spark/applicationHistory/* -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-HistoryServer-not-coming-up-tp22975p22981.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
running spark on yarn
Hello, folks. We just recently switched to using Yarn on our cluster (when upgrading to cloudera 5.4.1) I'm trying to run a spark job from within a broader application (a web service running on Jetty), so I can't just start it using spark-submit. Does anyone know of an instructions page on how to do that under yarn? I've managed to get it mostly there by including all spark, yarn, hadoop, and hdfs config files in my SparkConf (somewhat indirectly, and that is a bit of a short-hand), but while the job shows up now under yarn, and has its own applications web ui page, it's not showing up under the main spark page, and it's still missing some things (like it can't find the native library for snappy compression), so I still think I'm doing something wrong. Any help or hints would be much appreciated. Thanks, -Nathan
Re: PySpark Logs location
Doesn't work for me so far , using command but got such output. What should I check to fix the issue? Any configuration parameters ... [root@sdo-hdp-bd-master1 ~]# yarn logs -applicationId application_1426424283508_0048 15/05/21 13:25:09 INFO impl.TimelineClientImpl: Timeline service address: http://hdp-bd-node1.development.c4i:8188/ws/v1/timeline/ 15/05/21 13:25:09 INFO client.RMProxy: Connecting to ResourceManager at hdp-bd-node1.development.c4i/12.23.45.253:8050 /app-logs/root/logs/application_1426424283508_0048does not exist. *Log aggregation has not completed or is not enabled.* Thanks Oleg. On Wed, May 20, 2015 at 11:33 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: Oleg, You can see applicationId in your Spark History Server. Go to http://historyserver:18088/ Also check https://spark.apache.org/docs/1.1.0/running-on-yarn.html#debugging-your-application It should be no different with PySpark. -- Ruslan Dautkhanov On Wed, May 20, 2015 at 2:12 PM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi Ruslan. Could you add more details please. Where do I get applicationId? In case I have a lot of log files would it make sense to view it from single point. How actually I can configure / manage log location of PySpark? Thanks Oleg. On Wed, May 20, 2015 at 10:24 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: You could use yarn logs -applicationId application_1383601692319_0008 -- Ruslan Dautkhanov On Wed, May 20, 2015 at 5:37 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am executing PySpark job on yarn ( hortonworks distribution). Could someone pointing me where is the log locations? Thanks Oleg.
DataFrame Column Alias problem
Hi Spark Users Group, I’m doing groupby operations on my DataFrame *df* as following, to get count for each value of col1: df.groupBy(col1).agg(col1 - count).show // I don't know if I should write like this. col1 COUNT(col1#347) aaa2 bbb4 ccc4 ... and more... As I ‘d like to sort by the resulting count, with .sort(COUNT(col1#347)), but the column name of the count result obviously cannot be retrieved in advance. Intuitively one might consider acquire column name by column index in a fashion of R’s DataFrame, except Spark doesn’t support. I have Googled *spark agg alias* and so forth, and checked DataFrame.as in Spark API, neither helped on this. Am I the only one who had ever got stuck on this issue or anything I have missed? REGARDS, Todd Leo
Re: rdd.saveAsTextFile problem
On Thu, May 21, 2015 at 4:17 PM, Howard Yang howardyang2...@gmail.com wrote: follow http://www.srccodes.com/p/article/38/build-install-configure-run-apache-hadoop-2.2.0-microsoft-windows-os to build latest version Hadoop in my windows machine, and Add Environment Variable *HADOOP_HOME* and edit *Path* Variable to add *bin* directory of *HADOOP_HOME* (say*C:\hadoop\bin*). fix this issue in my env 2015-05-21 9:55 GMT+03:00 Akhil Das ak...@sigmoidanalytics.com: This thread happened a year back, can you please share what issue you are facing? which version of spark you are using? What is your system environment? Exception stack-trace? Thanks Best Regards On Thu, May 21, 2015 at 12:19 PM, Keerthi keerthi.reddy1...@gmail.com wrote: Hi , I had tried the workaround shared here, but still facing the same issue... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/rdd-saveAsTextFile-problem-tp176p22970.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multi user setup and saving a DataFrame / RDD to a network exported file system
Hi, thanks for answer, I'll open a ticket. In the meantime - I have found a workaround. The recipe is the following: 1. Create a new account/group on all machines (lets call it sparkuser). Run spark from this account. 2. Add your user to group sparkuser. 3. If you decide to write RDD/parquet file under workdir directory you need to execute the following (just once, before running spark-submit): chgrp sparkuser workdir chmod g+s workdir setfacl -d -m g::rwx workdir (first two steps can be replaced also by newgrp sparkuser, but this way all your files will be created with sparkuser group) than calls like rdd.saveAsPickleFile(workdir+/somename) work just fine. The above solution has one serious problem - any other user from sparkuser group will be able to overwrite your saved data. cheers, Tomasz W dniu 20.05.2015 o 23:08, Davies Liu pisze: Could you file a JIRA for this? The executor should run under the user who submit a job, I think. On Wed, May 20, 2015 at 2:40 AM, Tomasz Fruboes tomasz.frub...@fuw.edu.pl wrote: Thanks for a suggestion. I have tried playing with it, sc.sparkUser() gives me expected user name, but it doesnt solve the problem. From a quick search through the spark code it seems to me, that this setting is effective only for yarn and mesos. I think the workaround for the problem could be using --deploy-mode cluster (not 100% convenient, since disallows any interactive work), but this is not supported for python based programs. Cheers, Tomasz W dniu 20.05.2015 o 10:57, Iulian Dragoș pisze: You could try setting `SPARK_USER` to the user under which your workers are running. I couldn't find many references to this variable, but at least Yarn and Mesos take it into account when spawning executors. Chances are that standalone mode also does it. iulian On Wed, May 20, 2015 at 9:29 AM, Tomasz Fruboes tomasz.frub...@fuw.edu.pl mailto:tomasz.frub...@fuw.edu.pl wrote: Hi, thanks for answer. The rights are drwxr-xr-x 3 tfruboes all 5632 05-19 15 tel:5632%2005-19%2015:40 test19EE/ I have tried setting the rights to 777 for this directory prior to execution. This does not get propagated down the chain, ie the directory created as a result of the save call (namesAndAges.parquet2 in the path in the dump [1] below) is created with the drwxr-xr-x rights (owned by the user submitting the job, ie tfruboes). The temp directories created inside namesAndAges.parquet2/_temporary/0/ (e.g. task_201505200920_0009_r_01) are owned by root, again with drwxr-xr-x access rights Cheers, Tomasz W dniu 19.05.2015 o 23:56, Davies Liu pisze: It surprises me, could you list the owner information of /mnt/lustre/bigdata/med_home/tmp/test19EE/ ? On Tue, May 19, 2015 at 8:15 AM, Tomasz Fruboes tomasz.frub...@fuw.edu.pl mailto:tomasz.frub...@fuw.edu.pl wrote: Dear Experts, we have a spark cluster (standalone mode) in which master and workers are started from root account. Everything runs correctly to the point when we try doing operations such as dataFrame.select(name, age).save(ofile, parquet) or rdd.saveAsPickleFile(ofile) , where ofile is path on a network exported filesystem (visible on all nodes, in our case this is lustre, I guess on nfs effect would be similar). Unsurprisingly temp files created on workers are owned by root, which then leads to a crash (see [1] below). Is there a solution/workaround for this (e.g. controlling file creation mode of the temporary files)? Cheers, Tomasz ps I've tried to google this problem, couple of similar reports, but no clear answer/solution found ps2 For completeness - running master/workers as a regular user solves the problem only for the given user. For other users submitting to this master the result is given in [2] below [0] Cluster details: Master/workers: centos 6.5 Spark 1.3.1 prebuilt for hadoop 2.4 (same behaviour for the 2.6 build) [1] ## File /mnt/home/tfruboes/2015.05.SparkLocal/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o27.save. : java.io.IOException: Failed to rename
Re: DataFrame Column Alias problem
df.groupBy($col1).agg(count($col1).as(c)).show On Thu, May 21, 2015 at 3:09 AM, SLiZn Liu sliznmail...@gmail.com wrote: Hi Spark Users Group, I’m doing groupby operations on my DataFrame *df* as following, to get count for each value of col1: df.groupBy(col1).agg(col1 - count).show // I don't know if I should write like this. col1 COUNT(col1#347) aaa2 bbb4 ccc4 ... and more... As I ‘d like to sort by the resulting count, with .sort(COUNT(col1#347)), but the column name of the count result obviously cannot be retrieved in advance. Intuitively one might consider acquire column name by column index in a fashion of R’s DataFrame, except Spark doesn’t support. I have Googled *spark agg alias* and so forth, and checked DataFrame.as in Spark API, neither helped on this. Am I the only one who had ever got stuck on this issue or anything I have missed? REGARDS, Todd Leo
Re: [Spark SQL 1.3.1] data frame saveAsTable returns exception
Hi, is this fixed in master? Grega On Thu, May 14, 2015 at 7:50 PM, Michael Armbrust mich...@databricks.com wrote: End of the month is the target: https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage On Thu, May 14, 2015 at 3:45 AM, Ishwardeep Singh ishwardeep.si...@impetus.co.in wrote: Hi Michael Ayan, Thank you for your response to my problem. Michael do we have a tentative release date for Spark version 1.4? Regards, Ishwardeep *From:* Michael Armbrust [mailto:mich...@databricks.com] *Sent:* Wednesday, May 13, 2015 10:54 PM *To:* ayan guha *Cc:* Ishwardeep Singh; user *Subject:* Re: [Spark SQL 1.3.1] data frame saveAsTable returns exception I think this is a bug in our date handling that should be fixed in Spark 1.4. On Wed, May 13, 2015 at 8:23 AM, ayan guha guha.a...@gmail.com wrote: Your stack trace says it can't convert date to integer. You sure about column positions? On 13 May 2015 21:32, Ishwardeep Singh ishwardeep.si...@impetus.co.in wrote: Hi , I am using Spark SQL 1.3.1. I have created a dataFrame using jdbc data source and am using saveAsTable() method but got the following 2 exceptions: java.lang.RuntimeException: Unsupported datatype DecimalType() at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393) at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269) at org.apache.spark.sql.parquet.ParquetRelation2.init(newParquet.scala:391) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240) at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:218) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1121) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1071) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1037) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1015) java.lang.ClassCastException: java.sql.Date cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) at org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:215) at
saveAsTextFile() part- files are missing
Hello! I just started with Spark. I have an application which counts words in a file (1 MB file). The file is stored locally. I loaded the file using native code and then created the RDD from it. JavaRDDString rddFromFile = context.parallelize(myFile, 2); JavaRDDString words = rddFromFile.flatMap(...); JavaPairRDDString, Integer pairs = words.mapToPair(...); JavaPairRDDString, Integer counter = pairs.reduceByKey(..); counter.saveAsTextFile(file:///root/output); context.close(); I have one master and 2 slaves. I run the program from the master node. The output directory is created on the master node and on the 2 nodes. On the master node I have only one file _SUCCES (empty) and on the nodes I have _temporary file. I printed the counter at the console, the result seems ok. What am I doing wrong? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-part-files-are-missing-tp22974.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: rdd.sample() methods very slow
Is there any other way to solve the problem? Let me state the use case I have an RDD[Document] contains over 7 millions items. The RDD need to be save on a persistent storage (currently I save it as object file on disk). Then I need to get a small random sample of Document objects (e.g. 10,000 document). How can I do this quickly? The rdd.sample() methods does not help because it need to read the entire RDD of 7 million Document from disk which take very long time. Ningjun From: Sean Owen [mailto:so...@cloudera.com] Sent: Tuesday, May 19, 2015 4:51 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: rdd.sample() methods very slow The way these files are accessed is inherently sequential-access. There isn't a way to in general know where record N is in a file like this and jump to it. So they must be read to be sampled. On Tue, May 19, 2015 at 9:44 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote: Hi I have an RDD[Document] that contains 7 million objects and it is saved in file system as object file. I want to get a random sample of about 70 objects from it using rdd.sample() method. It is ver slow val rdd : RDD[Document] = sc.objectFile[Document](C:/temp/docs.obj).sample(false, 0.1D, 0L).cache() val count = rdd.count() From Spark UI, I see spark is try to read the entire object files at the folder “C:/temp/docs.obj” which is about 29.7 GB. Of course this is very slow. Why does Spark try to read entire 7 million objects while I only need to return a random sample of 70 objects? Is there any efficient way to get a random sample of 70 objects without reading through the entire object files? Ningjun
Re: rdd.sample() methods very slow
I guess the fundamental issue is that these aren't stored in a way that allows random access to a Document. Underneath, Hadoop has a concept of a MapFile which is like a SequenceFile with an index of offsets into the file where records being. Although Spark doesn't use it, you could maybe create some custom RDD that takes advantage of this format to grab random elements efficiently. Other things come to mind but I think they're all slower -- like hashing all the docs and taking the smallest n in each of k partitions to get a pretty uniform random sample of kn docs. On Thu, May 21, 2015 at 4:04 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Is there any other way to solve the problem? Let me state the use case I have an RDD[Document] contains over 7 millions items. The RDD need to be save on a persistent storage (currently I save it as object file on disk). Then I need to get a small random sample of Document objects (e.g. 10,000 document). How can I do this quickly? The rdd.sample() methods does not help because it need to read the entire RDD of 7 million Document from disk which take very long time. Ningjun From: Sean Owen [mailto:so...@cloudera.com] Sent: Tuesday, May 19, 2015 4:51 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: rdd.sample() methods very slow The way these files are accessed is inherently sequential-access. There isn't a way to in general know where record N is in a file like this and jump to it. So they must be read to be sampled. On Tue, May 19, 2015 at 9:44 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Hi I have an RDD[Document] that contains 7 million objects and it is saved in file system as object file. I want to get a random sample of about 70 objects from it using rdd.sample() method. It is ver slow val rdd : RDD[Document] = sc.objectFile[Document](C:/temp/docs.obj).sample(false, 0.1D, 0L).cache() val count = rdd.count() From Spark UI, I see spark is try to read the entire object files at the folder “C:/temp/docs.obj” which is about 29.7 GB. Of course this is very slow. Why does Spark try to read entire 7 million objects while I only need to return a random sample of 70 objects? Is there any efficient way to get a random sample of 70 objects without reading through the entire object files? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
map reduce ?
Hi, I have JavaPairRDDString, ListInteger and as an example what I want to get. user_id cat1 cat2 cat3 cat4 522 0 1 2 0 62 1 0 3 0 661 1 2 0 1 query : the users who have a number (except 0) in cat1 and cat3 column answer: cat2 - 522,611 cat3-522,62 = user 522 How can I get this solution? I think at first, I should have JavaRDDListString of user list who are in that column. Thank you Best, yasemin -- hiç ender hiç
Re: java program Get Stuck at broadcasting
Hey, I think I found out the problem. Turns out that the file I saved is too large. On 21 May 2015 at 16:44, Akhil Das ak...@sigmoidanalytics.com wrote: Can you share the code, may be i/someone can help you out Thanks Best Regards On Thu, May 21, 2015 at 1:45 PM, Allan Jie allanmcgr...@gmail.com wrote: Hi, Just check the logs of datanode, it looks like this: 2015-05-20 11:42:14,605 INFO org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: / 10.9.0.48:50676, dest: /10.9.0.17:50010, bytes: 134217728, op: HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_804680172_54, offset: 0, srvID: 39fb78d5-828a-4319-8303-c704fab526e3, blockid: BP-436159032-10.9.0.16-1431330007172:blk_1073742096_1273, duration: 16994466261 2015-05-20 11:42:14,606 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder: BP-436159032-10.9.0.16-1431330007172:blk_1073742096_1273, type=LAST_IN_PIPELINE, downstreams=0:[] terminating 2015-05-20 11:42:17,788 INFO org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: / 10.9.0.17:49046, dest: /10.9.0.17:50010, bytes: 134217728, op: HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_102926009_54, offset: 0, srvID: 39fb78d5-828a-4319-8303-c704fab526e3, blockid: BP-436159032-10.9.0.16-1431330007172:blk_1073742099_1276, duration: 17829554438 2015-05-20 11:42:17,788 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder: BP-436159032-10.9.0.16-1431330007172:blk_1073742099_1276, type=HAS_DOWNSTREAM_IN_PIPELINE terminating 2015-05-20 11:42:17,904 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Receiving BP-436159032-10.9.0.16-1431330007172:blk_1073742103_1280 src: / 10.9.0.17:49049 dest: /10.9.0.17:50010 2015-05-20 11:42:17,904 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: IOException in BlockReceiver constructor. Cause is 2015-05-20 11:42:17,904 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: opWriteBlock BP-436159032-10.9.0.16-1431330007172:blk_1073742103_1280 received exception org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space: The volume with the most available space (=114409472 B) is less than the block size (=134217728 B). 2015-05-20 11:42:17,905 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: HadoopV26Slave1:50010:DataXceiver error processing WRITE_BLOCK operation src: /10.9.0.17:49049 dst: /10.9.0.17:50010 org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space: The volume with the most available space (=114409472 B) is less than the block size (=134217728 B). at org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy.chooseVolume(RoundRobinVolumeChoosingPolicy.java:67) at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeList.getNextVolume(FsVolumeList.java:69) at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.createRbw(FsDatasetImpl.java:1084) at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.createRbw(FsDatasetImpl.java:114) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.init(BlockReceiver.java:183) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:615) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:137) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:74) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:235) at java.lang.Thread.run(Thread.java:745) 2015-05-20 11:43:59,669 INFO org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073741999_1176 2015-05-20 11:46:10,214 INFO org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073742000_1177 2015-05-20 11:48:35,445 INFO org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073741990_1167 2015-05-20 11:50:04,043 INFO org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService: Scheduling blk_1073742080_1257 file /tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742080 for deletion 2015-05-20 11:50:04,136 INFO org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService: Scheduling blk_1073742081_1258 file /tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742081 for deletion 2015-05-20 11:50:04,136 INFO org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService: Scheduling blk_1073742082_1259 file /tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742082 for deletion 2015-05-20 11:50:04,136 INFO
Pandas timezone problems
After deserialization, something seems to be wrong with my pandas DataFrames. It looks like the timezone information is lost, and subsequent errors ensue. Serializing and deserializing a timezone-aware DataFrame tests just fine, so it must be Spark that somehow changes the data. My program runs timezone-unaware data without problems. Anybody have any ideas on what causes this, or how to solve it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pandas-timezone-problems-tp22985.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pandas timezone problems
These are relevant: JIRA: https://issues.apache.org/jira/browse/SPARK-6411 PR: https://github.com/apache/spark/pull/6250 On Thu, May 21, 2015 at 3:16 PM, Def_Os njde...@gmail.com wrote: After deserialization, something seems to be wrong with my pandas DataFrames. It looks like the timezone information is lost, and subsequent errors ensue. Serializing and deserializing a timezone-aware DataFrame tests just fine, so it must be Spark that somehow changes the data. My program runs timezone-unaware data without problems. Anybody have any ideas on what causes this, or how to solve it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pandas-timezone-problems-tp22985.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: S3NativeFileSystem inefficient implementation when calling sc.textFile
I stumble upon this thread and I conjecture that this may affect restoring a checkpointed RDD as well: http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-gt-10-hour-between-stage-latency-td22925.html#a22928 In my case I have 1600+ fragmented checkpoint file and the time to read all metadata takes a staggering 11 hours. If this is really the cause then its an obvious handicap, as checkponted RDD already has all file parttition information available and doesn't need to to read them from s3 into driver again (which cause a single-point-of-failure and a bottleneck). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/S3NativeFileSystem-inefficient-implementation-when-calling-sc-textFile-tp19841p22984.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark MOOC - early access
Awesome, Thanks a ton for helping us all and futuristic planning, Much appreciate it, Regards, Kartik On May 21, 2015 4:41 PM, Marco Shaw marco.s...@gmail.com wrote: *Hi Spark Devs and Users,BerkeleyX and Databricks are currently developing two Spark-related MOOC on edX (intro https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x, ml https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x), the first of which starts on June 1st. Together these courses have over 75K enrolled students!To help students perform exercises course content, we have created a Vagrant box that contains Spark and IPython (running on Ubuntu 32-bit). This will simplify user setup and helps us support them. We are writing to give you early access to the VM environment and the first assignment, and to request your help to test out the VM/assignment before we unleash it to 75K people (see instructions below). We’ve provided instructions below. We’re happy to help if you have any difficulties getting the VM setup; please feel free to contact me (marco.s...@gmail.com marco.s...@gmail.com) with any issues, comments, or questions.Sincerely,Marco ShawSpark MOOC TA_(This is being sent as an HTML formatted email. Some of the links have been duplicated just in case.)1. Install VirtualBox here https://www.virtualbox.org/wiki/Downloads on your OS (see Windows tutorial here https://www.youtube.com/watch?v=06Sf-m64fcY (https://www.youtube.com/watch?v=06Sf-m64fcY https://www.youtube.com/watch?v=06Sf-m64fcY))2. Install Vagrant here https://www.vagrantup.com/downloads.html on your OS (see Windows tutorial here https://www.youtube.com/watch?v=LZVS23BaA1I (https://www.youtube.com/watch?v=LZVS23BaA1I https://www.youtube.com/watch?v=LZVS23BaA1I))3) Install virtual machine using the following steps: (see Windows tutorial here https://www.youtube.com/watch?v=ZuJCqHC7IYc (https://www.youtube.com/watch?v=ZuJCqHC7IYc https://www.youtube.com/watch?v=ZuJCqHC7IYc))a. Create a custom directory (e.g. c:\users\marco\myvagrant or /home/marco/myvagrant)b. Download the file https://raw.githubusercontent.com/spark-mooc/mooc-setup/master/Vagrantfile to the custom directory (NOTE: It must be named exactly Vagrantfile with no extension)c. Open a DOS prompt (Windows) or terminal (Mac/Linux) to the custom directory and issue the command vagrant up4) Perform basic commands in VM as described below: (see Windows tutorial here https://www.youtube.com/watch?v=bkteLH77IR0 (https://www.youtube.com/watch?v=bkteLH77IR0 https://www.youtube.com/watch?v=bkteLH77IR0))a. To start the VM, from a DOS prompt (Windows) or terminal (Mac/Linux), issue the command vagrant up.b. To stop the VM, from a DOS prompt (Windows) or terminal (Mac/Linux), issue the command vagrant halt.c. To erase or delete the VM, from a DOS prompt (Windows) or terminal (Mac/Linux), issue the command vagrant destroy.d. Once the VM is running, to access the notebook, open a web browser to http://localhost:8001 http://localhost:8001/.5) Using test notebook as described below: (see Windows tutorial here https://www.youtube.com/watch?v=mlfAmyF3Q-s (https://www.youtube.com/watch?v=mlfAmyF3Q-s https://www.youtube.com/watch?v=mlfAmyF3Q-s))a. To start the VM, from a DOS prompt (Windows) or terminal (Mac/Linux), issue the command vagrant up.b. Once the VM is running, to access the notebook, open a web browser to http://localhost:8001 http://localhost:8001/.c. Upload this IPython notebook: https://raw.githubusercontent.com/spark-mooc/mooc-setup/master/vm_test_student.ipynb https://raw.githubusercontent.com/spark-mooc/mooc-setup/master/vm_test_student.ipynb.d. Run through the notebook.6) Play around with the first MOOC assignment (email Marco for details when you get to this point).7) Please answer the following questionsa. What machine are you using (OS, RAM, CPU, age)?b. How long did the entire process take?c. How long did the VM download take? Relatedly, where are you located?d. Do you have any other comments/suggestions?*
Re: [pyspark] Starting workers in a virtualenv
Could you try with specify PYSPARK_PYTHON to the path of python in your virtual env, for example PYSPARK_PYTHON=/path/to/env/bin/python bin/spark-submit xx.py On Mon, Apr 20, 2015 at 12:51 AM, Karlson ksonsp...@siberie.de wrote: Hi all, I am running the Python process that communicates with Spark in a virtualenv. Is there any way I can make sure that the Python processes of the workers are also started in a virtualenv? Currently I am getting ImportErrors when the worker tries to unpickle stuff that is not installed system-wide. For now both the worker and the driver run on the same machine in local mode. Thanks in advance! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
task all finished, while the stage marked finish long time later problem
Hi all, We are running spark streaming with version 1.1.1. recently we found an odd problem. In stage 44554, All the task finished, but the stage marked finished long time later, as you can see the log below, the last task finished @15/05/21 21:17:36 And also the stage remove from taskset. But for about 7s later @15/05/21 21:17:43. It is finished at DAGScheduler. So all the dependency stages of 44554 launched after 15/05/21 21:17:43. (see image later),I know there is a config : spark.akka.threads, the default is 4, we do not change it. Does this config affect ? Anyone know have any idea about this? Drive Log: 15/05/21 21:17:36 INFO [org.apache.spark.scheduler.TaskSetManager---task-result-getter-3]: Finished task 180.0 in stage 44554.0 (TID 1291943) in 3512 ms on gd6-mercury-spark-010.idc.vip.com (191/191) 15/05/21 21:17:36 INFO [org.apache.spark.scheduler.TaskSchedulerImpl---task-result-getter-3]: Removed TaskSet 44554.0, whose tasks have all completed, from pool default 15/05/21 21:17:43 INFO [org.apache.spark.scheduler.DAGScheduler---sparkDriver-akka.actor.default-dispatcher-44]: Stage 44554 (mapToPair at TraceCalculator.java:88) finished in 14.582 s Stage image: [cid:image001.png@01D0947A.81D6BFC0] 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作! This communication is intended only for the addressee(s) and may contain information that is privileged and confidential. You are hereby notified that, if you are not an intended recipient listed above, or an authorized employee or agent of an addressee of this communication responsible for delivering e-mail messages to an intended recipient, any dissemination, distribution or reproduction of this communication (including any attachments hereto) is strictly prohibited. If you have received this communication in error, please notify us immediately by a reply e-mail addressed to the sender and permanently delete the original e-mail communication and any attachments from all storage devices without making or otherwise retaining a copy.
Kmeans Labeled Point RDD
Hello, New to Spark. I wanted to know if it is possible to use a Labeled Point RDD in org.apache.spark.mllib.clustering.KMeans. After I cluster my data I would like to be able to identify which observations were grouped with each centroid. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kmeans-Labeled-Point-RDD-tp22989.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kmeans Labeled Point RDD
You can predict and then zip it with the points RDD to get approx. same as LP. Cheers k/ On Thu, May 21, 2015 at 6:19 PM, anneywarlord anneywarl...@gmail.com wrote: Hello, New to Spark. I wanted to know if it is possible to use a Labeled Point RDD in org.apache.spark.mllib.clustering.KMeans. After I cluster my data I would like to be able to identify which observations were grouped with each centroid. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kmeans-Labeled-Point-RDD-tp22989.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: foreach plus accumulator Vs mapPartitions performance
Or you can simply use `reduceByKeyLocally` if you don't want to worry about implementing accumulators and such, and assuming that the reduced values will fit in memory of the driver (which you are assuming by using accumulators). Best, Burak On Thu, May 21, 2015 at 2:46 PM, ben delpizz...@gmail.com wrote: Hi, everybody. There are some cases in which I can obtain the same results by using the mapPartitions and the foreach method. For example in a typical MapReduce approach one would perform a reduceByKey immediately after a mapPartitions that transform the original RDD in a collection of tuple (key, value). I think that is possible to achieve the same result by using, for instance an array of accumulator where at each index an executor sums a value and the index itself could be a key. Since the reduceByKey will perform a shuffle on disk I think that when is possible, the foreach approach should be better even though the foreach has the side effect of sum a value to an accumulator. I am making this request to see if my reasoning is correct . I hope I was clear. Thank you, Beniamino -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/foreach-plus-accumulator-Vs-mapPartitions-performance-tp22982.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark Logs location
https://spark.apache.org/docs/latest/running-on-yarn.html#debugging-your-application When log aggregation isn’t turned on, logs are retained locally on each machine under YARN_APP_LOGS_DIR, which is usually configured to/tmp/logs or $HADOOP_HOME/logs/userlogs depending on the Hadoop version and installation. Viewing logs for a container requires going to the host that contains them and looking in this directory. Subdirectories organize log files by application ID and container ID. You can enable log aggregation by changing yarn.log-aggregation-enable to true so it'll be easier to see yarn application logs. -- Ruslan Dautkhanov On Thu, May 21, 2015 at 5:08 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Doesn't work for me so far , using command but got such output. What should I check to fix the issue? Any configuration parameters ... [root@sdo-hdp-bd-master1 ~]# yarn logs -applicationId application_1426424283508_0048 15/05/21 13:25:09 INFO impl.TimelineClientImpl: Timeline service address: http://hdp-bd-node1.development.c4i:8188/ws/v1/timeline/ 15/05/21 13:25:09 INFO client.RMProxy: Connecting to ResourceManager at hdp-bd-node1.development.c4i/12.23.45.253:8050 /app-logs/root/logs/application_1426424283508_0048does not exist. *Log aggregation has not completed or is not enabled.* Thanks Oleg. On Wed, May 20, 2015 at 11:33 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: Oleg, You can see applicationId in your Spark History Server. Go to http://historyserver:18088/ Also check https://spark.apache.org/docs/1.1.0/running-on-yarn.html#debugging-your-application It should be no different with PySpark. -- Ruslan Dautkhanov On Wed, May 20, 2015 at 2:12 PM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi Ruslan. Could you add more details please. Where do I get applicationId? In case I have a lot of log files would it make sense to view it from single point. How actually I can configure / manage log location of PySpark? Thanks Oleg. On Wed, May 20, 2015 at 10:24 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: You could use yarn logs -applicationId application_1383601692319_0008 -- Ruslan Dautkhanov On Wed, May 20, 2015 at 5:37 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am executing PySpark job on yarn ( hortonworks distribution). Could someone pointing me where is the log locations? Thanks Oleg.
Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error
Hi Tathagata, Thanks for looking into this. Further investigating I found that the issue is with Tachyon does not support File Append. The streaming receiver which writes to WAL when failed, and again restarted, not able to append to same WAL file after restart. I raised this with Tachyon user group, and Haoyuan told that within 3 months time Tachyon file append will be ready. Will revisit this issue again then . Regards, Dibyendu On Fri, May 22, 2015 at 12:24 AM, Tathagata Das t...@databricks.com wrote: Looks like somehow the file size reported by the FSInputDStream of Tachyon's FileSystem interface, is returning zero. On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Just to follow up this thread further . I was doing some fault tolerant testing of Spark Streaming with Tachyon as OFF_HEAP block store. As I said in earlier email, I could able to solve the BlockNotFound exception when I used Hierarchical Storage of Tachyon , which is good. I continue doing some testing around storing the Spark Streaming WAL and CheckPoint files also in Tachyon . Here is few finding .. When I store the Spark Streaming Checkpoint location in Tachyon , the throughput is much higher . I tested the Driver and Receiver failure cases , and Spark Streaming is able to recover without any Data Loss on Driver failure. *But on Receiver failure , Spark Streaming looses data* as I see Exception while reading the WAL file from Tachyon receivedData location for the same Receiver id which just failed. If I change the Checkpoint location back to HDFS , Spark Streaming can recover from both Driver and Receiver failure . Here is the Log details when Spark Streaming receiver failed ...I raised a JIRA for the same issue : https://issues.apache.org/jira/browse/SPARK-7525 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2 (epoch 1)* INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to remove executor 2 from BlockManagerMaster. INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing block manager BlockManagerId(2, 10.252.5.54, 45789) INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 successfully in removeExecutor INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered receiver for stream 2 from 10.252.5.62*:47255 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919 http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)* at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.IllegalArgumentException:* Seek position is past EOF: 645603894, fileSize = 0* at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239) at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141) ... 15 more INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes) INFO :
LDA prediction on new document
Hi, guys, I'm pretty new to LDA. I notice spark 1.3.0 mllib provide EM based LDA implementation. It returns both topics and topic distribution. My question is how can I use these parameters to predict on new document ? And I notice there is an Online LDA implementation in spark master branch, it only returns topics , how can I use this to do prediction on new document (and trained document) ? thanks
Re: Query a Dataframe in rdd.map()
Your original code snippet seems incomplete and there isn't enough information to figure out what problem you actually ran into from your original code snippet there is an rdd variable which is well defined and a df variable that is not defined in the snippet of code you sent one way to make this work is as below (until the last line is executed you are actually not collecting anything on the driver, and if your dataset is too big to collect on the driver for inspection just do a take(n) on the result from pyspark.sql import Row,SQLContext from pyspark.sql.functions import count sqlContext = SQLContext(sc) # convert list of ip into a data frame with column ip Record = Row(ip) df = sc.parallelize(map(lambda x: Record(x), ['208.51.22.18', '31.207.6.173', '208.51.22.18'])).toDF() # obtain ip - frequency and inspect df.groupBy(df.ip).agg(count(df.ip)).show() ++-+ | ip|COUNT(ip)| ++-+ |208.51.22.18|2| |31.207.6.173|1| ++-+ what exactly is the issue you are running into when you say it doesn't get through? On Thu, May 21, 2015 at 10:47 AM, ping yan sharon...@gmail.com wrote: Thanks. I suspected that, but figured that df query inside a map sounds so intuitive that I don't just want to give up. I've tried join and even better with a DStream.transform() and it works! freqs = testips.transform(lambda rdd: rdd.join(kvrdd).map(lambda (x,y): y[1])) Thank you for the help! Ping On Thu, May 21, 2015 at 10:40 AM, Holden Karau hol...@pigscanfly.ca wrote: So DataFrames, like RDDs, can only be accused from the driver. If your IP Frequency table is small enough you could collect it and distribute it as a hashmap with broadcast or you could also join your rdd with the ip frequency table. Hope that helps :) On Thursday, May 21, 2015, ping yan sharon...@gmail.com wrote: I have a dataframe as a reference table for IP frequencies. e.g., ip freq 10.226.93.67 1 10.226.93.69 1 161.168.251.101 4 10.236.70.2 1 161.168.251.105 14 All I need is to query the df in a map. rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18']) freqs = rdd.map(lambda x: df.where(df.ip ==x ).first()) It doesn't get through.. would appreciate any help. Thanks! Ping -- Ping Yan Ph.D. in Management Dept. of Management Information Systems University of Arizona Tucson, AZ 85721 -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau -- Ping Yan Ph.D. in Management Dept. of Management Information Systems University of Arizona Tucson, AZ 85721
Re: rdd.sample() methods very slow
If sampling whole partitions is sufficient (or a part of a partition), sure you could mapPartitionsWithIndex and decide whether to process a partition at all based on its # and skip the rest. That's much faster. On Thu, May 21, 2015 at 7:07 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I don't need to be 100% randome. How about randomly pick a few partitions and return all docs in those partitions? Is rdd.mapPartitionsWithIndex() the right method to use to just process a small portion of partitions? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Official Docker container for Spark
Hi, I am using spark 1.2.0. Can you suggest docker containers which can be deployed in production? I found lot of spark images in https://registry.hub.docker.com/ . But could not figure out which one to use. None of them seems like official image. Does anybody have any recommendation? Thanks Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Official-Docker-container-for-Spark-tp22977.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java program got Stuck at broadcasting
Can you try commenting the saveAsTextFile and do a simple count()? If its a broadcast issue, then it would throw up the same error. On 21 May 2015 14:21, allanjie allanmcgr...@gmail.com wrote: Sure, the code is very simple. I think u guys can understand from the main function. public class Test1 { public static double[][] createBroadcastPoints(String localPointPath, int row, int col) throws IOException{ BufferedReader br = RAWF.reader(localPointPath); String line = null; int rowIndex = 0; double[][] pointFeatures = new double[row][col]; while((line = br.readLine())!=null){ ListString point = Arrays.asList(line.split(,)); int colIndex = 0; for(String pointFeature: point){ pointFeatures[rowIndex][colIndex] = Double.valueOf(pointFeature); colIndex++; } rowIndex++; } br.close(); return pointFeatures; } public static void main(String[] args) throws IOException{ /**Parameter Setting***/ String localPointPath = /home/hduser/skyrock/skyrockImageFeatures.csv; String remoteFilePath = hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv; //this csv file is only 468MB final int row = 133433; final int col = 458; /**/ SparkConf conf = new SparkConf(). setAppName(distance). setMaster(spark://HadoopV26Master:7077). set(spark.executor.memory, 4g). set(spark.eventLog.enabled, true) .set(spark.eventLog.dir, /usr/local/spark/logs/spark-events) .set(spark.local.dir, /tmp/spark-temp); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString textFile = sc.textFile(remoteFilePath); //Broadcast variable //double[][] xx =; final Broadcastdouble[][] broadcastPoints = sc.broadcast(createBroadcastPoints(localPointPath,row,col)); //final Broadcastdouble[][] broadcastPoints = sc.broadcast(xx); /** * Compute the distance in terms of each point on each instance. * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1 */ JavaPairRDDPair,Double distance = textFile.flatMapToPair(new PairFlatMapFunctionString, Pair, Double(){ public IterableTuple2lt;Pair, Double call(String v1) throws Exception{ ListString al = Arrays.asList(v1.split(,)); double[] featureVals = new double[al.size()]; for(int j=0;jal.size()-1;j++) featureVals[j] = Double.valueOf(al.get(j+1)); int jIndex = Integer.valueOf(al.get(0)); double[][] allPoints = broadcastPoints.getValue(); double sum = 0; Listlt;Tuple2lt;Pair, Double list = new ArrayListTuple2lt;Pair, Double(); for(int i=0;irow; i++){ sum = 0; for(int j=0;jlt;al.size()-1;j++){ sum += (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]); } list.add(new Tuple2lt;Pair,Double(new Pair(i,jIndex),Math.sqrt(sum))); } return list; } }); distance.saveAsTextFile(hdfs://HadoopV26Master:9000/user/+args[0]); } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-program-got-Stuck-at-broadcasting-tp22953p22973.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark HistoryServer not coming up
Hi, After restarting Spark HistoryServer, it failed to come up, I checked logs for Spark HistoryServer found following messages :' 2015-05-21 11:38:03,790 WARN org.apache.spark.scheduler.ReplayListenerBus: Log path provided contains no log files. 2015-05-21 11:38:52,319 INFO org.apache.spark.deploy.history.HistoryServer: Registered signal handlers for [TERM, HUP, INT] 2015-05-21 11:38:52,328 WARN org.apache.spark.deploy.history.HistoryServerArguments: Setting log directory through the command line is deprecated as of Spark 1.1.0. Please set this through spark.history.fs.logDirectory instead. 2015-05-21 11:38:52,461 INFO org.apache.spark.SecurityManager: Changing view acls to: spark 2015-05-21 11:38:52,462 INFO org.apache.spark.SecurityManager: Changing modify acls to: spark 2015-05-21 11:38:52,463 INFO org.apache.spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark) 2015-05-21 11:41:24,893 ERROR org.apache.spark.deploy.history.HistoryServer: RECEIVED SIGNAL 15: SIGTERM 2015-05-21 11:41:33,439 INFO org.apache.spark.deploy.history.HistoryServer: Registered signal handlers for [TERM, HUP, INT] 2015-05-21 11:41:33,447 WARN org.apache.spark.deploy.history.HistoryServerArguments: Setting log directory through the command line is deprecated as of Spark 1.1.0. Please set this through spark.history.fs.logDirectory instead. 2015-05-21 11:41:33,578 INFO org.apache.spark.SecurityManager: Changing view acls to: spark 2015-05-21 11:41:33,579 INFO org.apache.spark.SecurityManager: Changing modify acls to: spark 2015-05-21 11:41:33,579 INFO org.apache.spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark) 2015-05-21 11:44:07,147 WARN org.apache.hadoop.hdfs.BlockReaderFactory: I/O error constructing remote block reader. java.io.EOFException: Premature EOF: no length prefix available at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2109) at org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:408) at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:785) at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:663) at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:327) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:574) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:797) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:844) at java.io.DataInputStream.read(DataInputStream.java:149) at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:154) at java.io.BufferedReader.readLine(BufferedReader.java:317) at java.io.BufferedReader.readLine(BufferedReader.java:382) at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:67) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55) at org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$5.apply(FsHistoryProvider.scala:175) at org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$5.apply(FsHistoryProvider.scala:172) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at
Re: Spark Streaming graceful shutdown in Spark 1.4
My 2 cents: As per javadoc: https://docs.oracle.com/javase/7/docs/api/java/lang/Runtime.html#addShutdownHook(java.lang.Thread) Shutdown hooks should also finish their work quickly. When a program invokes exit the expectation is that the virtual machine will promptly shut down and exit. When the virtual machine is terminated due to user logoff or system shutdown the underlying operating system may only allow a fixed amount of time in which to shut down and exit. It is therefore inadvisable to attempt any user interaction or to perform a long-running computation in a shutdown hook. The shutdown hook should not do any long-running work and may exit before stop returns. It means we cannot implement the stopGracefully = true semantics correctly, which the user will expect stops gracefully by waiting for the processing of all received data to be completed. So I agree that we can add `ssc.stop` as a the shutdown hook. But stopGracefully should be false. Best Regards, Shixiong Zhu 2015-05-20 21:59 GMT-07:00 Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com: Thanks Tathagata for making this change.. Dibyendu On Thu, May 21, 2015 at 8:24 AM, Tathagata Das t...@databricks.com wrote: If you are talking about handling driver crash failures, then all bets are off anyways! Adding a shutdown hook in the hope of handling driver process failure, handles only a some cases (Ctrl-C), but does not handle cases like SIGKILL (does not run JVM shutdown hooks) or driver machine crash. So its not a good idea to rely on that. Nonetheless I have opened a PR to handle the shutdown of the StreamigntContext in the same way as SparkContext. https://github.com/apache/spark/pull/6307 On Tue, May 19, 2015 at 12:51 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thenka Sean . you are right. If driver program is running then I can handle shutdown in main exit path . But if Driver machine is crashed (if you just stop the application, for example killing the driver process ), then Shutdownhook is the only option isn't it ? What I try to say is , just doing ssc.stop in sys.ShutdownHookThread or Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need to use the Utils.addShutdownHook with a priority .. So just checking if Spark Streaming can make graceful shutdown as default shutdown mechanism. Dibyendu On Tue, May 19, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote: I don't think you should rely on a shutdown hook. Ideally you try to stop it in the main exit path of your program, even in case of an exception. On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: You mean to say within Runtime.getRuntime().addShutdownHook I call ssc.stop(stopSparkContext = true, stopGracefully = true) ? This won't work anymore in 1.4. The SparkContext got stopped before Receiver processed all received blocks and I see below exception in logs. But if I add the Utils.addShutdownHook with the priority as I mentioned , then only graceful shutdown works . In that case shutdown-hook run in priority order.
Re: saveAsTextFile() part- files are missing
Hi, it looks you are writing to a local filesystem. Could you try writing to a location visible by all nodes (master and workers), e.g. nfs share? HTH, Tomasz W dniu 21.05.2015 o 17:16, rroxanaioana pisze: Hello! I just started with Spark. I have an application which counts words in a file (1 MB file). The file is stored locally. I loaded the file using native code and then created the RDD from it. JavaRDDString rddFromFile = context.parallelize(myFile, 2); JavaRDDString words = rddFromFile.flatMap(...); JavaPairRDDString, Integer pairs = words.mapToPair(...); JavaPairRDDString, Integer counter = pairs.reduceByKey(..); counter.saveAsTextFile(file:///root/output); context.close(); I have one master and 2 slaves. I run the program from the master node. The output directory is created on the master node and on the 2 nodes. On the master node I have only one file _SUCCES (empty) and on the nodes I have _temporary file. I printed the counter at the console, the result seems ok. What am I doing wrong? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-part-files-are-missing-tp22974.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark HistoryServer not coming up
Seems like there might be a mismatch between your Spark jars and your cluster's HDFS version. Make sure you're using the Spark jar that matches the hadoop version of your cluster. On Thu, May 21, 2015 at 8:48 AM, roy rp...@njit.edu wrote: Hi, After restarting Spark HistoryServer, it failed to come up, I checked logs for Spark HistoryServer found following messages :' 2015-05-21 11:38:03,790 WARN org.apache.spark.scheduler.ReplayListenerBus: Log path provided contains no log files. 2015-05-21 11:38:52,319 INFO org.apache.spark.deploy.history.HistoryServer: Registered signal handlers for [TERM, HUP, INT] 2015-05-21 11:38:52,328 WARN org.apache.spark.deploy.history.HistoryServerArguments: Setting log directory through the command line is deprecated as of Spark 1.1.0. Please set this through spark.history.fs.logDirectory instead. 2015-05-21 11:38:52,461 INFO org.apache.spark.SecurityManager: Changing view acls to: spark 2015-05-21 11:38:52,462 INFO org.apache.spark.SecurityManager: Changing modify acls to: spark 2015-05-21 11:38:52,463 INFO org.apache.spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark) 2015-05-21 11:41:24,893 ERROR org.apache.spark.deploy.history.HistoryServer: RECEIVED SIGNAL 15: SIGTERM 2015-05-21 11:41:33,439 INFO org.apache.spark.deploy.history.HistoryServer: Registered signal handlers for [TERM, HUP, INT] 2015-05-21 11:41:33,447 WARN org.apache.spark.deploy.history.HistoryServerArguments: Setting log directory through the command line is deprecated as of Spark 1.1.0. Please set this through spark.history.fs.logDirectory instead. 2015-05-21 11:41:33,578 INFO org.apache.spark.SecurityManager: Changing view acls to: spark 2015-05-21 11:41:33,579 INFO org.apache.spark.SecurityManager: Changing modify acls to: spark 2015-05-21 11:41:33,579 INFO org.apache.spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark) 2015-05-21 11:44:07,147 WARN org.apache.hadoop.hdfs.BlockReaderFactory: I/O error constructing remote block reader. java.io.EOFException: Premature EOF: no length prefix available at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2109) at org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:408) at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:785) at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:663) at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:327) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:574) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:797) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:844) at java.io.DataInputStream.read(DataInputStream.java:149) at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:154) at java.io.BufferedReader.readLine(BufferedReader.java:317) at java.io.BufferedReader.readLine(BufferedReader.java:382) at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:67) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55) at org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$5.apply(FsHistoryProvider.scala:175) at org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$5.apply(FsHistoryProvider.scala:172) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at
Query a Dataframe in rdd.map()
I have a dataframe as a reference table for IP frequencies. e.g., ip freq 10.226.93.67 1 10.226.93.69 1 161.168.251.101 4 10.236.70.2 1 161.168.251.105 14 All I need is to query the df in a map. rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18']) freqs = rdd.map(lambda x: df.where(df.ip ==x ).first()) It doesn't get through.. would appreciate any help. Thanks! Ping -- Ping Yan Ph.D. in Management Dept. of Management Information Systems University of Arizona Tucson, AZ 85721
Re: Spark Streaming + Kafka failure recovery
Hi Cody, That is clear. Thanks! Bill On Tue, May 19, 2015 at 1:27 PM, Cody Koeninger c...@koeninger.org wrote: If you checkpoint, the job will start from the successfully consumed offsets. If you don't checkpoint, by default it will start from the highest available offset, and you will potentially lose data. Is the link I posted, or for that matter the scaladoc, really not clear on that point? The scaladoc says: To recover from driver failures, you have to enable checkpointing in the StreamingContext http://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/StreamingContext.html. The information on consumed offset can be recovered from the checkpoint. On Tue, May 19, 2015 at 2:38 PM, Bill Jay bill.jaypeter...@gmail.com wrote: If a Spark streaming job stops at 12:01 and I resume the job at 12:02. Will it still start to consume the data that were produced to Kafka at 12:01? Or it will just start consuming from the current time? On Tue, May 19, 2015 at 10:58 AM, Cody Koeninger c...@koeninger.org wrote: Have you read https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md ? 1. There's nothing preventing that. 2. Checkpointing will give you at-least-once semantics, provided you have sufficient kafka retention. Be aware that checkpoints aren't recoverable if you upgrade code. On Tue, May 19, 2015 at 12:42 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am currently using Spark streaming to consume and save logs every hour in our production pipeline. The current setting is to run a crontab job to check every minute whether the job is still there and if not resubmit a Spark streaming job. I am currently using the direct approach for Kafka consumer. I have two questions: 1. In the direct approach, no offset is stored in zookeeper and no group id is specified. Can two consumers (one is Spark streaming and the other is a Kafak console consumer in Kafka package) read from the same topic from the brokers together (I would like both of them to get all messages, i.e. publish-subscribe mode)? What about two Spark streaming jobs read from the same topic? 2. How to avoid data loss if a Spark job is killed? Does checkpointing serve this purpose? The default behavior of Spark streaming is to read the latest logs. However, if a job is killed, can the new job resume from what was left to avoid loosing logs? Thanks! Bill
Re: Query a Dataframe in rdd.map()
So DataFrames, like RDDs, can only be accused from the driver. If your IP Frequency table is small enough you could collect it and distribute it as a hashmap with broadcast or you could also join your rdd with the ip frequency table. Hope that helps :) On Thursday, May 21, 2015, ping yan sharon...@gmail.com wrote: I have a dataframe as a reference table for IP frequencies. e.g., ip freq 10.226.93.67 1 10.226.93.69 1 161.168.251.101 4 10.236.70.2 1 161.168.251.105 14 All I need is to query the df in a map. rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18']) freqs = rdd.map(lambda x: df.where(df.ip ==x ).first()) It doesn't get through.. would appreciate any help. Thanks! Ping -- Ping Yan Ph.D. in Management Dept. of Management Information Systems University of Arizona Tucson, AZ 85721 -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau