Re: how to use cluster sparkSession like localSession
Hi, I think what you need is to have a long running Spark cluster to which you can submit jobs dynamically. For SQL, you can start Spark's HiveServer2: https://spark.apache.org/docs/latest/sql-programming-guide.html#distributed-sql-engine This will start a long running Spark cluster with a fixed configuration (executors, cores etc) and allows Spark to act more like a regular database. Then you can create jdbc:hive2:// JDBC connections from your app and run SQL queries/DDLs. For other components (or even SQL), you can start a Spark jobserver: https://github.com/spark-jobserver/spark-jobserver This will again start a long running Spark cluster. It also allows you create new SparkContexts on-the-fly though that should not be done from a web app rather configured separately by admin if required. It will require you to implement your job as a SparkJob/SparkSessionJob that will be provided pre-created SparkContext/SparkSession, and these take parameters that can be read dynamically in your implementation. You register your classes in jars separately before-hand. Then you can call those methods using REST API from your application providing it the required parameters like a remote procedure call. Or you can try SnappyData that provides both of these (and much more) out of the box. Regards, Sumedh Wale SnappyData (http://www.snappydata.io) Documentation Download On 02/11/18 11:22, 崔苗(数据与人工智能产品开发部) wrote: then how about spark sql and spark MLlib , we use them at most time 0049003208 0049003...@znv.com 签名由 网易邮箱大师 定制 On 11/2/2018 11:58,Daniel de Oliveira Mantovani wrote: Please, read about Spark Streaming or Spark Structured Streaming. Your web application can easily communicate through some API and you won’t have the overhead of start a new spark job, which is pretty heavy. On Thu, Nov 1, 2018 at 23:01 崔苗(数据与人工智能产品开发部) <0049003...@znv.com> wrote: Hi, we want to execute spark code with out submit application.jar,like this code: public static void main(String args[]) throws Exception{ SparkSession spark = SparkSession .builder() .master("local[*]") .appName("spark test") .getOrCreate(); Dataset testData = spark.read().csv(".\\src\\main\\java\\Resources\\no_schema_iris.scv"); testData.printSchema(); testData.show(); } the above code can work well with idea , do not need to generate jar file and submit , but if we replace master("local[*]") with master("yarn") , it can't work , so is there a way to use cluster sparkSession like local sparkSession ? we need to dynamically execute spark code in web server according to the different request , such as filter request will call dataset.filter() , so there i
Re: Is there a way to run Spark SQL through REST?
Yes, using the new Spark structured streaming you can keep submitting streaming jobs against the same SparkContext in different requests (or you can create a new SparkContext if required in a request). The SparkJob implementation will get handle of the SparkContext which will be existing one or new one depending on the REST API calls -- see its github page for details on transient vs persistent SparkContexts. With the old Spark streaming model, you cannot add new DStreams once StreamingContext has started (which has been a limitation of the old streaming model), so you can submit against the same context but only until one last job starts the StreamingContext. regards sumedh On Monday 24 July 2017 06:09 AM, kant kodali wrote: @Sumedh Can I run streaming jobs on the same context with spark-jobserver ? so there is no waiting for results since the spark sql job is expected stream forever and results of each streaming job are captured through a message queue. In my case each spark sql query will be a streaming job. On Sat, Jul 22, 2017 at 6:19 AM, Sumedh Wale <sw...@snappydata.io> wrote: On Saturday 22 July 2017 01:31 PM, kant kodali wrote: Is there a way to run Spark SQL through REST? There is spark-jobserver (https://github.com/spark-jobserver/spark-jobserver). It does more than just REST API (like long running SparkContext). regards -- Sumedh Wale SnappyData (http://www.snappydata.io) - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: custom joins on dataframe
The Dataset.join(right: Dataset[_], joinExprs: Column) API can use any arbitrary expression so you can use UDF for join. The problem with all non-equality joins is that they use BroadcastNestedLoopJoin or equivalent, that is an (M X N) nested-loop which will be unusable for medium/large tables. At least one of the tables should be small for this to work with an acceptable performance. For example if one table has 100M rows after filter, and other 1M rows, then NLJ will result in 100 trillion rows to be scanned that will take very long under normal circumstances, but if one of the sides is much smaller after filter say few thousand rows then can be fine. What you probably need for large tables is to implement own optimized join operator and use some join structure that can do the join efficiently without having to do nested loops (i.e. some fancy structure for efficient fuzzy joins). Its possible to do that using internal Spark APIs but its not easy and you have to implement an efficient join structure first. Or perhaps some existing libraries out there could work for you (like https://github.com/soundcloud/cosine-lsh-join-spark?). -- Sumedh Wale SnappyData (http://www.snappydata.io) On Saturday 22 July 2017 09:09 PM, Stephen Fletcher wrote: Normally a family of joins (left, right outter, inner) are performed on two dataframes using columns for the comparison ie left("acol") === ight("acol") . the comparison operator of the "left" dataframe does something internally and produces a column that i assume is used by the join. What I want is to create my own comparison operation (i have a case where i want to use some fuzzy matching between rows and if they fall within some threshold we allow the join to happen) so it would look something like left.join(right, my_fuzzy_udf (left("cola"),right("cola"))) Where my_fuzzy_udf is my defined UDF. My main concern is the column that would have to be output what would its value be ie what would the function need to return that the udf susbsystem would then turn to a column to be evaluated by the join. Thanks in advance for any advice - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Is there a way to run Spark SQL through REST?
On Saturday 22 July 2017 01:31 PM, kant kodali wrote: Is there a way to run Spark SQL through REST? There is spark-jobserver (https://github.com/spark-jobserver/spark-jobserver). It does more than just REST API (like long running SparkContext). regards -- Sumedh Wale SnappyData (http://www.snappydata.io) - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: about aggregateByKey of pairrdd.
On Wednesday 19 July 2017 06:20 PM, qihuagao wrote: java pair rdd has aggregateByKey, which can avoid full shuffle, so have impressive performance. which has parameters, The aggregateByKey function requires 3 parameters: # An intitial ‘zero’ value that will not effect the total values to be collected # A combining function accepting two paremeters. The second paramter is merged into the first parameter. This function combines/merges values within a partition. # A merging function function accepting two parameters. In this case the parameters are merged into one. This step merges values across partitions. While Dataframe, I noticed groupByKey, which could do save function as aggregateByKey, but without merge functions, so I assumed it should trigger shuffle operation. Is this true? No for inbuilt aggregates (like avg, sum, ...) it will already do the partition-wise partial aggregates, then shuffle partial results to merge. Usually it should give better performance than corresponding RDD APIs due to code generation and all. Only Hive user-defined aggregate functions do not support partial aggregation (SPARK-10992). For reference see the comments: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L151 if true should we have a funtion like the performance like aggregateByKey for dataframe? Thanks. regards -- Sumedh Wale SnappyData (http://www.snappydata.io) - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: reduceByKey as Action or Transformation
On Monday 25 April 2016 11:28 PM, Weiping Qu wrote: Dear Ted, You are right. ReduceByKey is transformation. My fault. I would rephrase my question using following code snippet. object ScalaApp { def main(args: Array[String]): Unit ={ val conf = new SparkConf().setAppName("ScalaApp").setMaster("local") val sc = new SparkContext(conf) //val textFile: RDD[String] = val file = sc.textFile("/home/usr/test.dat") val output = file.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) output.persist() output.count() output.collect() } It's a simple code snippet. I realize that the first action count() would trigger the execution based on HadoopRDD, MapParititonRDD and the reduceByKey will take the ShuffleRDD as input to perform the count. The count() will trigger both the execution as well as the persistence of output RDD (as each partition is iterated). The second action collect just perform the collect over the same ShuffleRDD. It will use the persisted ShuffleRDD blocks. I think the re-calculation will also be carried out over ShuffleRDD instead of re-executing preceding HadoopRDD and MapParitionRDD in case one partition of persisted output is missing. Am I right? Since it is a partition of persisted ShuffleRDD that is missing, the partition will have to be recreated from the base HadoopRDD. To avoid it, one can checkpoint the ShuffleRDD if required. Thanks and Regards, Weiping regards -- Sumedh Wale SnappyData (http://www.snappydata.io) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SSL support for Spark Thrift Server
On Saturday 05 March 2016 02:46 AM, Sourav Mazumder wrote: Hi All, While starting the Spark Thrift Server I don't see any option to start it with SSL support. Is that support currently there ? It uses HiveServer2 so the SSL settings in hive-site.xml should work: https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2#SettingUpHiveServer2-SSLEncryption Regards, Sourav thanks -- Sumedh Wale SnappyData (http://www.snappydata.io) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Best way to merge files from streaming jobs
On Saturday 05 March 2016 02:39 AM, Jelez Raditchkov wrote: My streaming job is creating files on S3. The problem is that those files end up very small if I just write them to S3 directly. This is why I use coalesce() to reduce the number of files and make them larger. RDD.coalesce right? It accepts whether or not to shuffle as an argument. If you are reducing the number of partitions it should not cause a shuffle. dstream.foreachRDD { rdd => val numParts = rdd.getPartitions.length // half the partitions rdd.coalesce(numParts / 2, shuffle = false) } Another option can be to combine multiple RDDs. Set appropriate remember duration (StreamingContext.remember), store the RDDs in a fixed size list/array and then process all the cached RDDs in one go periodically when list is full (combining with RDD.zipPartitions). You may have to keep the remember duration somewhat larger than the duration corresponding to the list size to account for processing time. However, coalesce shuffles data and my job processing time ends up higher than sparkBatchIntervalMilliseconds. I have observed that if I coalesce the number of partitions to be equal to the cores in the cluster I get less shuffling - but that is unsubstantiated. Is there any dependency/rule between number of executors, number of cores etc. that I can use to minimize shuffling and at the same time achieve minimum number of output files per batch? What is the best practice? I think most DStreams (Kafka streams can be exceptions) will create number of partitions to be same as total number of executor cores (spark.default.parallelism). Perhaps that is why you are seeing the above behaviour. Looks like shuffle should be avoidable for your case but if using coalesce it will likely not use the full processing power. thanks -- Sumedh Wale SnappyData (http://www.snappydata.io) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark sql query taking long time
On Thursday 03 March 2016 09:15 PM, Gourav Sengupta wrote: Hi, why not read the table into a dataframe directly using SPARK CSV package. You are trying to solve the problem the round about way. Yes, that will simplify and avoid the explicit split/map a bit (though the code below is simple enough as is). However, the basic problem with performance is not due to that. Note that a DataFrame whether using the spark-csv package or otherwise is just an access point into the underlying database.txt file, so multiple scans of the DataFrame as in the code below will lead to multiple tokenization/parse of the database.txt file which is quite expensive. The join approach will reduce to a single scan for case below which should definitely be done if possible, but if more queries are required to be executed on the DataFrame then saving it into parquet/orc (or cacheTable if possible) is faster in my experience. Regards, Gourav Sengupta thanks -- Sumedh Wale SnappyData (http://www.snappydata.io) On Thu, Mar 3, 2016 at 12:33 PM, Sumedh Wale <sw...@snappydata.io> wrote: On Thursday 03 March 2016 11:03 AM, Angel Angel wrote: Hello Sir/Madam, I am writing one application using spark sql. i made the vary big table using the following command val dfCustomers1 = sc.textFile("/root/Desktop/database.txt").map(_.split(",")).map(p => Customer1(p(0),p(1).trim.toInt, p(2).trim.toInt, p(3)))toDF Now i want to search the address(many address) fields in the table and then extends the new table as per the searching. var k = dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(0))) for( a <-1 until 1500) { | var temp= dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(a))) | k = temp.unionAll(k) } k.show For above case one approach that can help a lot is to covert the lines[0] to a table and then do a join on it instead of individual searches. Something like: val linesRDD = sc.parallelize(lines, 1) // since number of lines is small, so 1 partition should be fine val schema = StructType(Array(StructField("Address", StringType))) val linesDF = sqlContext.createDataFrame(linesRDD.map(Row(_)), schema) val result = dfCustomers1.join(linesDF, "Address") If you do need to scan the DataFrame multiple times, then this will end up scanning the csv file, formatting etc in every loop. I would suggest caching in memory or saving to parquet/orc format for faster access. If there is enough memory then the SQLContext.cacheTable API can be used, else can save to parquet file: dfCustomers1.write.parquet("database.parquet") val dfCustomers2 = sqlContext.read.parquet("database.parquet") Normally parquet file scanning should be much faster than CSV scan+format so use the dfCustomers2 everywhere. You can also try various values of "spark.sql.parquet.compression.codec" (lzo, snappy, uncompressed) instead of default gzip. Try if this reduces the runtime. Fastest will be if there is enough memory for
Re: Spark sql query taking long time
On Thursday 03 March 2016 11:03 AM, Angel Angel wrote: Hello Sir/Madam, I am writing one application using spark sql. i made the vary big table using the following command val dfCustomers1 = sc.textFile("/root/Desktop/database.txt").map(_.split(",")).map(p => Customer1(p(0),p(1).trim.toInt, p(2).trim.toInt, p(3)))toDF Now i want to search the address(many address) fields in the table and then extends the new table as per the searching. var k = dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(0))) for( a <-1 until 1500) { | var temp= dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(a))) | k = temp.unionAll(k) } k.show For above case one approach that can help a lot is to covert the lines[0] to a table and then do a join on it instead of individual searches. Something like: val linesRDD = sc.parallelize(lines, 1) // since number of lines is small, so 1 partition should be fine val schema = StructType(Array(StructField("Address", StringType))) val linesDF = sqlContext.createDataFrame(linesRDD.map(Row(_)), schema) val result = dfCustomers1.join(linesDF, "Address") If you do need to scan the DataFrame multiple times, then this will end up scanning the csv file, formatting etc in every loop. I would suggest caching in memory or saving to parquet/orc format for faster access. If there is enough memory then the SQLContext.cacheTable API can be used, else can save to parquet file: dfCustomers1.write.parquet("database.parquet") val dfCustomers2 = sqlContext.read.parquet("database.parquet") Normally parquet file scanning should be much faster than CSV scan+format so use the dfCustomers2 everywhere. You can also try various values of "spark.sql.parquet.compression.codec" (lzo, snappy, uncompressed) instead of default gzip. Try if this reduces the runtime. Fastest will be if there is enough memory for sqlContext.cacheTable but I doubt that will be possible since you say it is a big table. But this is taking so long time. So can you suggest me the any optimized way, so i can reduce the execution time. My cluster has 3 slaves and 1 master. Thanks. thanks -- Sumedh Wale SnappyData (http://www.snappydata.io) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SFTP Compressed CSV into Dataframe
On Thursday 03 March 2016 12:47 AM, Benjamin Kim wrote: I wonder if anyone has opened a SFTP connection to open a remote GZIP CSV file? I am able to download the file first locally using the SFTP Client in the spark-sftp package. Then, I load the file into a dataframe using the spark-csv package, which automatically decompresses the file. I just want to remove the "downloading file to local" step and directly have the remote file decompressed, read, and loaded. Can someone give me any hints? One easy way on Linux, of course, is to use sshfs (https://github.com/libfuse/sshfs) and mount the remote directory locally. Since this uses FUSE, so works fine with normal user privileges. Thanks, Ben Thanks -- Sumedh Wale SnappyData (http://www.snappydata.io) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Add Jars to Master/Worker classpath
On Wednesday 02 March 2016 09:39 PM, Matthias Niehoff wrote: no, not to driver and executor but to the master and worker instances of the spark standalone cluster Why exactly does adding jars to driver/executor extraClassPath not work? Classpath of master/worker is setup by AbstractCommandBuilder that explicitly adds the following: jars named "datanucleus-*", environment variables: _SPARK_ASSEMBLY (for assembly jar), SPARK_DIST_CLASSPATH, HADOOP_CONF_DIR, YARN_CONF_DIR So you can set SPARK_DIST_CLASSPATH in conf/spark-env.sh to add the required jars (separated by platform's File.pathSeparator). thanks -- Sumedh Wale SnappyData (http://www.snappydata.io) Am 2. März 2016 um 17:05 schrieb Igor Berman <igor.ber...@gmail.com>: spark.driver.extraClassPath spark.executor.extraClassPath 2016-03-02 18:01 GMT+02:00 Matthias Niehoff <matthias.nieh...@codecentric.de>: Hi, we want to add jars to the Master and Worker class path mainly for logging reason (we have a redis appender to send logs to redis -> logstash -> elasticsearch). While it is working with setting SPARK_CLASSPATH, this solution is afaik deprecated and should not be used. Furthermore we are also using —driver-java-options and spark.executor.extraClassPath which leads to exceptions when running our apps in standalone cluster mode. So what is the best way to add jars to the master and worker classpath? Thank you -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte