Using Spark, SparkR and Ranger, please help.
Hello, I have been able to use Spark with Apache Ranger. I had the right configuration files to Spark conf, I add Ranger jars to the classpath and it works, Spark complies to Ranger rules when I access Hive tables. However with SparkR it does not work, which is rather surprising considering SparkR is supposed to be just a layer over Spark. I don't understand why sparkR seem to behave differently, maybe I am just missing something. So when I run Spark when I do: sqlContext.sql("show databases").collect() it works, I get all my Hive databases. But in sparkR it does not behave the same way. when I do: sql(sqlContext,"show databases") ... Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : java.lang.RuntimeException: [1.1] failure: ``with'' expected but identifier show found ... >From the documentation it seems that I need to instanciate an hiveContext. hiveContext <- sparkRHive.init(sc) sql(hiveContext, "show databases") ... 16/01/20 18:37:20 ERROR RBackendHandler: sql on 2 failed Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : java.lang.AssertionError: Authorization plugins not initialized! at org.apache.hadoop.hive.ql.session.SessionState.getAuthorizationMode(SessionState.java:1511) at org.apache.hadoop.hive.ql.session.SessionState.isAuthorizationModeV2(SessionState.java:1515) at org.apache.hadoop.hive.ql.Driver.doAuthorization(Driver.java:566) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:468) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:308) at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1122) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1170) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1059) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1049) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$runHive$1.apply(ClientWrapper.scala:484) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$runHive$1.apply(ClientWrapper.scala:473) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.appl ... Any help would be appreciated. Regards, Julien
Issues with partitionBy: FetchFailed
Hello, I am facing an issue with partitionBy, it is not clear whether it is a problem with my code or with my spark setup. I am using Spark 1.1, standalone, and my other spark projects work fine. So I have to repartition a relatively large file (about 70 million lines). Here is a minimal version of what is not working fine: myRDD = sc.textFile(...).map { line = (extractKey(line),line) } myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100)) myRepartitionedRDD.saveAsTextFile(...) It runs quite some time, until I get some errors and it retries. Errors are: FetchFailed(BlockManagerId(3,myWorker2, 52082,0), shuffleId=1,mapId=1,reduceId=5) Logs are not much more infomrative. I get: Java.io.IOException : sendMessageReliability failed because ack was not received within 60 sec I get similar errors with all my workers. Do you have some kind of explanation for this behaviour? What could be wrong? Thanks,
Re: Saving RDD with array of strings
Just use flatMap, it does exactly what you need: newLines.flatMap { lines = lines }.saveAsTextFile(...) 2014-09-21 11:26 GMT+02:00 Sarath Chandra sarathchandra.jos...@algofusiontech.com: Hi All, If my RDD is having array/sequence of strings, how can I save them as a HDFS file with each string on separate line? For example if I write code as below, the output should get saved as hdfs file having one string per line ... ... var newLines = lines.map(line = myfunc(line)); newLines.saveAsTextFile(hdfsPath); ... ... def myfunc(line: String):Array[String] = { line.split(;); } Thanks, ~Sarath.
Strange exception while accessing hdfs from spark.
Hello, I have been using Spark for quite some time, and I now get this error (please stderr output below) when accessing hdfs. It seems to come from Hadoop, however, I can access hdfs from the command line without any problem. The WARN on the first seems to be key, because it never appeared previously. My HADOOP_CONF_DIR is correctly set, so I don't know why libhadoop cannot be loaded. Please note that this java.net.UnknownHostException: crm is rather strange because crm is not an host, it is the name of my hadoop cluster, as specified in hadoop core.xml file in dfs.nameservices and fs.defaultFS hadoop properties. Thanks for your help. 14/09/17 22:33:49 WARN BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 14/09/17 22:33:49 WARN BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 14/09/17 22:34:01 WARN TaskSetManager: Lost TID 20 (task 1.0:20) 14/09/17 22:34:01 WARN TaskSetManager: Loss was due to java.lang.IllegalArgumentExcep tion java.lang.IllegalArgumentException: java.net.UnknownHostException: crm at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.jav a:418) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.ja va:231) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:13 9) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:510) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:453) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSys tem.java:136) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:166) at org.apache.hadoop.mapred.JobConf.getWorkingDirectory(JobConf.java:653) at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.jav a:389) at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.jav a:362) at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546) at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$1.apply(HadoopRDD.scala
ReduceByKey performance optimisation
Hello, I am facing performance issues with reduceByKey. In know that this topic has already been covered but I did not really find answers to my question. I am using reduceByKey to remove entries with identical keys, using, as reduce function, (a,b) = a. It seems to be a relatively straightforward use of reduceByKey, but performances on moderately big RDDs (some tens of millions of line) are very low, far from what you can reach with mono-server computing packages like R for example. I have read on other threads on the topic that reduceByKey always entirely shuffle the whole data. Is that true ? So it means that a custom partitionning could not help, right? In my case, I could relatively easily grant that two identical keys would always be on the same partition, therefore an option could by to use mapPartition and reeimplement reduce locally, but I would like to know if there are simpler / more elegant alternatives. Thanks for your help,
Re: ReduceByKey performance optimisation
I need to remove objects with duplicate key, but I need the whole object. Object which have the same key are not necessarily equal, though (but I can dump any on the ones that have identical key). 2014-09-13 12:50 GMT+02:00 Sean Owen so...@cloudera.com: If you are just looking for distinct keys, .keys.distinct() should be much better. On Sat, Sep 13, 2014 at 10:46 AM, Julien Carme julien.ca...@gmail.com wrote: Hello, I am facing performance issues with reduceByKey. In know that this topic has already been covered but I did not really find answers to my question. I am using reduceByKey to remove entries with identical keys, using, as reduce function, (a,b) = a. It seems to be a relatively straightforward use of reduceByKey, but performances on moderately big RDDs (some tens of millions of line) are very low, far from what you can reach with mono-server computing packages like R for example. I have read on other threads on the topic that reduceByKey always entirely shuffle the whole data. Is that true ? So it means that a custom partitionning could not help, right? In my case, I could relatively easily grant that two identical keys would always be on the same partition, therefore an option could by to use mapPartition and reeimplement reduce locally, but I would like to know if there are simpler / more elegant alternatives. Thanks for your help,
Re: ReduceByKey performance optimisation
OK, mapPartition seems to be the way to go. Thanks for the help! Le 13 sept. 2014 16:41, Sean Owen so...@cloudera.com a écrit : This is more concise: x.groupBy(obj.fieldtobekey).values.map(_.head) ... but I doubt it's faster. If all objects with the same fieldtobekey are within the same partition, then yes I imagine your biggest speedup comes from exploiting that. How about ... x.mapPartitions(_.map(obj = (obj.fieldtobekey, obj)).toMap.values) This does require that all keys, plus a representative object each, fits in memory. I bet you can make it faster than this example too. On Sat, Sep 13, 2014 at 1:15 PM, Gary Malouf malouf.g...@gmail.com wrote: You need something like: val x: RDD[MyAwesomeObject] x.map(obj = obj.fieldtobekey - obj).reduceByKey { case (l, _) = l } Does that make sense? On Sat, Sep 13, 2014 at 7:28 AM, Julien Carme julien.ca...@gmail.com wrote: I need to remove objects with duplicate key, but I need the whole object. Object which have the same key are not necessarily equal, though (but I can dump any on the ones that have identical key). 2014-09-13 12:50 GMT+02:00 Sean Owen so...@cloudera.com: If you are just looking for distinct keys, .keys.distinct() should be much better. On Sat, Sep 13, 2014 at 10:46 AM, Julien Carme julien.ca...@gmail.com wrote: Hello, I am facing performance issues with reduceByKey. In know that this topic has already been covered but I did not really find answers to my question. I am using reduceByKey to remove entries with identical keys, using, as reduce function, (a,b) = a. It seems to be a relatively straightforward use of reduceByKey, but performances on moderately big RDDs (some tens of millions of line) are very low, far from what you can reach with mono-server computing packages like R for example. I have read on other threads on the topic that reduceByKey always entirely shuffle the whole data. Is that true ? So it means that a custom partitionning could not help, right? In my case, I could relatively easily grant that two identical keys would always be on the same partition, therefore an option could by to use mapPartition and reeimplement reduce locally, but I would like to know if there are simpler / more elegant alternatives. Thanks for your help,