Re: how to use cluster sparkSession like localSession

2018-11-04 Thread Sumedh Wale

  
  
Hi,

I think what you need is to have a long running Spark cluster to
which you can submit jobs dynamically.

For SQL, you can start Spark's HiveServer2:
https://spark.apache.org/docs/latest/sql-programming-guide.html#distributed-sql-engine
This will start a long running Spark cluster with a fixed
configuration (executors, cores etc) and allows Spark to act more
like a regular database. Then you can create jdbc:hive2:// JDBC
connections from your app and run SQL queries/DDLs.

For other components (or even SQL), you can start a Spark jobserver:
https://github.com/spark-jobserver/spark-jobserver
This will again start a long running Spark cluster. It also allows
you create new SparkContexts on-the-fly though that should not be
done from a web app rather configured separately by admin if
required. It will require you to implement your job as a
SparkJob/SparkSessionJob that will be provided pre-created
SparkContext/SparkSession, and these take parameters that can be
read dynamically in your implementation. You register your classes
in jars separately before-hand. Then you can call those methods
using REST API from your application providing it the required
parameters like a remote procedure call.

  Or you can try SnappyData that provides both of these (and much
  more) out of the box.
  
  Regards,
  Sumedh Wale
  SnappyData (http://www.snappydata.io)
  Documentation
  Download
  

On 02/11/18 11:22, 崔苗(数据与人工智能产品开发部)
  wrote:


  
  
  


 
   then how about spark sql and spark MLlib , we use
  them at most time 
  

   
  

  
  

  0049003208

  
  

  0049003...@znv.com

  

  
 


  签名由 网易邮箱大师 定制 
  


  On
11/2/2018 11:58,Daniel de Oliveira
  Mantovani
wrote: 


  
Please, read about Spark Streaming or Spark
  Structured Streaming. Your web application can easily
  communicate through some API and you won’t have the
  overhead of start a new spark job, which is pretty heavy.
  
  

  On Thu, Nov 1, 2018 at 23:01
崔苗(数据与人工智能产品开发部) <0049003...@znv.com>
wrote:
  
  

  


 
  Hi, 
  we want to execute spark code with out submit
application.jar,like this code:
  
  
  public static void main(String args[]) throws
Exception{
  
  
  
          SparkSession spark = SparkSession
  
                  .builder()
  
                  .master("local[*]")
  
                  .appName("spark test")
  
                  .getOrCreate();
  
        
          Dataset testData =
spark.read().csv(".\\src\\main\\java\\Resources\\no_schema_iris.scv");
  
          testData.printSchema();
  
          testData.show();
  
      }
  
  
  the above code can work well with idea , do
not need to generate jar file and submit , but
if we replace master("local[*]")
  with master("yarn")
  , it can't work , so is there a way to use
  cluster sparkSession like local sparkSession ?
   we need to dynamically execute spark code in
  web server according to the different request
  ,  such as filter request will call
  dataset.filter() , so there i

Re: Is there a way to run Spark SQL through REST?

2017-07-24 Thread Sumedh Wale

  
  
Yes, using the new Spark structured streaming you can keep
submitting streaming jobs against the same SparkContext in different
requests (or you can create a new SparkContext if required in a
request). The SparkJob implementation will get handle of the
SparkContext which will be existing one or new one depending on the
REST API calls -- see its github page for details on transient vs
persistent SparkContexts.
With the old Spark streaming model, you cannot add new DStreams once
StreamingContext has started (which has been a limitation of the old
streaming model), so you can submit against the same context but
only until one last job starts the StreamingContext.

regards
sumedh

On Monday 24 July 2017 06:09 AM, kant kodali wrote:

  @Sumedh Can I run streaming jobs on the same
context with spark-jobserver ? so there is no waiting for
results since the spark sql job is expected stream forever and
results of each streaming job are captured through a message
queue.


In my case each spark sql query will be a streaming job.
  
  
On Sat, Jul 22, 2017 at 6:19 AM, Sumedh
  Wale <sw...@snappydata.io>
  wrote:
  On Saturday 22 July 2017 01:31 PM, kant kodali
  wrote:
  
Is there a way to run Spark SQL through REST?
  
  

There is spark-jobserver (https://github.com/spark-jobserver/spark-jobserver).
It does more than just REST API (like long running
SparkContext).

regards

--
    Sumedh Wale
SnappyData (http://www.snappydata.io)

  


  


  


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: custom joins on dataframe

2017-07-22 Thread Sumedh Wale
The Dataset.join(right: Dataset[_], joinExprs: Column) API can use any 
arbitrary expression so you can use UDF for join.


The problem with all non-equality joins is that they use 
BroadcastNestedLoopJoin or equivalent, that is an (M X N) nested-loop 
which will be unusable for medium/large tables. At least one of the 
tables should be small for this to work with an acceptable performance. 
For example if one table has 100M rows after filter, and other 1M rows, 
then NLJ will result in 100 trillion rows to be scanned that will take 
very long under normal circumstances, but if one of the sides is much 
smaller after filter say few thousand rows then can be fine.


What you probably need for large tables is to implement own optimized 
join operator and use some join structure that can do the join 
efficiently without having to do nested loops (i.e. some fancy structure 
for efficient fuzzy joins). Its possible to do that using internal Spark 
APIs but its not easy and you have to implement an efficient join 
structure first. Or perhaps some existing libraries out there could work 
for you (like https://github.com/soundcloud/cosine-lsh-join-spark?).


--
Sumedh Wale
SnappyData (http://www.snappydata.io)

On Saturday 22 July 2017 09:09 PM, Stephen Fletcher wrote:
Normally a family of joins (left, right outter, inner) are performed 
on two dataframes using columns for the comparison ie left("acol") === 
ight("acol") . the comparison operator of the "left" dataframe does 
something internally and produces a column that i assume is used by 
the join.


What I want is to create my own comparison operation (i have a case 
where i want to use some fuzzy matching between rows and if they fall 
within some threshold we allow the join to happen)


so it would look something like

left.join(right, my_fuzzy_udf (left("cola"),right("cola")))

Where my_fuzzy_udf  is my defined UDF. My main concern is the column 
that would have to be output what would its value be ie what would the 
function need to return that the udf susbsystem would then turn to a 
column to be evaluated by the join.



Thanks in advance for any advice



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is there a way to run Spark SQL through REST?

2017-07-22 Thread Sumedh Wale

On Saturday 22 July 2017 01:31 PM, kant kodali wrote:

Is there a way to run Spark SQL through REST?


There is spark-jobserver 
(https://github.com/spark-jobserver/spark-jobserver). It does more than 
just REST API (like long running SparkContext).


regards

--
Sumedh Wale
SnappyData (http://www.snappydata.io)


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: about aggregateByKey of pairrdd.

2017-07-19 Thread Sumedh Wale

  
  

On Wednesday 19 July 2017 06:20 PM,
  qihuagao wrote:


  java pair rdd has aggregateByKey, which can avoid full shuffle, so have
impressive performance. which has parameters, 
The aggregateByKey function requires 3 parameters:
# An intitial ‘zero’ value that will not effect the total values to be
collected
# A combining function accepting two paremeters. The second paramter is
merged into the first parameter. This function combines/merges values within
a partition.
# A merging function function accepting two parameters. In this case the
parameters are merged into one. This step merges values across partitions.

While Dataframe, I noticed groupByKey, which could do save function as
aggregateByKey, but without merge functions, so I assumed it should trigger
shuffle operation. Is this true?


No for inbuilt aggregates (like avg, sum, ...) it will already do
the partition-wise partial aggregates, then shuffle partial results
to merge. Usually it should give better performance than
corresponding RDD APIs due to code generation and all. Only Hive
user-defined aggregate functions do not support partial aggregation
(SPARK-10992).
For reference see the comments:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L151


   if true should we have a funtion like the
performance like  aggregateByKey for dataframe?

Thanks.



regards
--
Sumedh Wale
SnappyData (http://www.snappydata.io)
  


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: reduceByKey as Action or Transformation

2016-04-25 Thread Sumedh Wale

  
  
On Monday 25 April 2016 11:28 PM,
  Weiping Qu wrote:


  
  Dear Ted,
  
  You are right. ReduceByKey is transformation. My fault.
  I would rephrase my question using following code snippet.
  object ScalaApp {
  
    def main(args: Array[String]): Unit ={
      val conf = new
  SparkConf().setAppName("ScalaApp").setMaster("local")
      val sc = new SparkContext(conf)
      //val textFile: RDD[String] =
      val file = sc.textFile("/home/usr/test.dat")
      val output = file.flatMap(line => line.split(" "))
    .map(word => (word, 1))
    .reduceByKey(_ + _)
  
      output.persist()
      output.count()
      output.collect()
  }
  
  It's a simple code snippet. 
  I realize that the first action count() would trigger the
  execution based on HadoopRDD, MapParititonRDD and the reduceByKey
  will take the ShuffleRDD as input to perform the count.


The count() will trigger both the execution as well as the
persistence of output RDD (as each partition is iterated).

 The second action collect just perform the collect
  over the same ShuffleRDD.


It will use the persisted ShuffleRDD blocks.

 I think the re-calculation will also be carried out
  over ShuffleRDD instead of re-executing preceding HadoopRDD and
  MapParitionRDD in case one partition of persisted output is
  missing.
  Am I right?


Since it is a partition of persisted ShuffleRDD that is missing, the
partition will have to be recreated from the base HadoopRDD. To
avoid it, one can checkpoint the ShuffleRDD if required.

 
  Thanks and Regards,
  Weiping
  
  
  
  


regards
-- 
Sumedh Wale
SnappyData (http://www.snappydata.io)
  


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SSL support for Spark Thrift Server

2016-03-08 Thread Sumedh Wale

On Saturday 05 March 2016 02:46 AM, Sourav Mazumder wrote:

Hi All,

While starting the Spark Thrift Server I don't see any option to start 
it with SSL support.


Is that support currently there ?



It uses HiveServer2 so the SSL settings in hive-site.xml should work: 
https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2#SettingUpHiveServer2-SSLEncryption



Regards,
Sourav


thanks

--
Sumedh Wale
SnappyData (http://www.snappydata.io)


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Best way to merge files from streaming jobs

2016-03-08 Thread Sumedh Wale

  
  
On Saturday 05 March 2016 02:39 AM,
  Jelez Raditchkov wrote:


  
  My streaming job is creating files on S3.
The problem is that those files end up very small if I just
  write them to S3 directly.
This is why I use coalesce() to reduce the number of files
  and make them larger.


  


RDD.coalesce right? It accepts whether or not to shuffle as an
argument. If you are reducing the number of partitions it should not
cause a shuffle.

dstream.foreachRDD { rdd =>
  val numParts = rdd.getPartitions.length
  // half the partitions
  rdd.coalesce(numParts / 2, shuffle = false)
}

Another option can be to combine multiple RDDs. Set appropriate
remember duration (StreamingContext.remember), store the RDDs in a
fixed size list/array and then process all the cached RDDs in one go
periodically when list is full (combining with RDD.zipPartitions).
You may have to keep the remember duration somewhat larger than the
duration corresponding to the list size to account for processing
time.


  
However, coalesce shuffles data and my job processing time
  ends up higher than sparkBatchIntervalMilliseconds.


I have observed that if I coalesce the number of partitions
  to be equal to the cores in the cluster I get less shuffling -
  but that is unsubstantiated.
Is there any dependency/rule between number of executors,
  number of cores etc. that I can use to minimize shuffling and
  at the same time achieve minimum number of output files per
  batch?
What is the best practice?


  


I think most DStreams (Kafka streams can be exceptions) will create
number of partitions to be same as total number of executor cores
(spark.default.parallelism). Perhaps that is why you are seeing the
above behaviour. Looks like shuffle should be avoidable for your
case but if using coalesce it will likely not use the full
processing power.


thanks
-- 
Sumedh Wale
SnappyData (http://www.snappydata.io)
  


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark sql query taking long time

2016-03-03 Thread Sumedh Wale

  
  
On Thursday 03 March 2016 09:15 PM,
  Gourav Sengupta wrote:


  

  
Hi,
  

why not read the table into a dataframe directly using SPARK
CSV package. You are trying to solve the problem the round
about way.

  

  


Yes, that will simplify and avoid the explicit split/map a bit
(though the code below is simple enough as is). However, the basic
problem with performance is not due to that. Note that a DataFrame
whether using the spark-csv package or otherwise is just an access
point into the underlying database.txt file, so multiple scans of
the DataFrame as in the code below will lead to multiple
tokenization/parse of the database.txt file which is quite
expensive. The join approach will reduce to a single scan for case
below which should definitely be done if possible, but if more
queries are required to be executed on the DataFrame then saving it
into parquet/orc (or cacheTable if possible) is faster in my
experience.


  

  
  
  Regards,

Gourav Sengupta
  
  
  


thanks
-- 
Sumedh Wale
SnappyData (http://www.snappydata.io)



  
On Thu, Mar 3, 2016 at 12:33 PM, Sumedh
  Wale <sw...@snappydata.io>
  wrote:
  

On Thursday 03 March 2016 11:03 AM, Angel Angel
  wrote:


  Hello Sir/Madam,


I am writing one application using spark sql.


i made the vary big table using the following
  command 



  val dfCustomers1 =
  sc.textFile("/root/Desktop/database.txt").map(_.split(",")).map(p
  => Customer1(p(0),p(1).trim.toInt,
  p(2).trim.toInt, p(3)))toDF





Now i want to search the address(many address)
   fields in the table and then extends the new
  table as per the searching. 



  var k =
dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(0)))


  
  
  
  
  for( a <-1 until 1500)
  {
   | var temp=
dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(a)))
   |  k =
  temp.unionAll(k)
  }
  k.show
  

  


   For above case one approach that can help a lot is
  to covert the lines[0] to a table and then do a join on it
  instead of individual searches. Something like:
  
   val linesRDD = sc.parallelize(lines, 1)
  // since number of lines is small, so 1 partition
  should be fine
 val schema =
  StructType(Array(StructField("Address", StringType)))
 val linesDF =
  sqlContext.createDataFrame(linesRDD.map(Row(_)),
  schema)
 val result = dfCustomers1.join(linesDF, "Address")
  
  
  If you do need to scan the DataFrame multiple times, then
  this will end up scanning the csv file, formatting etc in
  every loop. I would suggest caching in memory or saving to
  parquet/orc format for faster access. If there is enough
  memory then the SQLContext.cacheTable API can be used,
  else can save to parquet file:
  
  dfCustomers1.write.parquet("database.parquet")
val dfCustomers2 =
  sqlContext.read.parquet("database.parquet")
  
  
  Normally parquet file scanning should be much faster than
  CSV scan+format so use the dfCustomers2 everywhere. You
  can also try various values of
  "spark.sql.parquet.compression.codec" (lzo, snappy,
  uncompressed) instead of default gzip. Try if this reduces
  the runtime. Fastest will be if there is enough memory for
 

Re: Spark sql query taking long time

2016-03-03 Thread Sumedh Wale

  
  
On Thursday 03 March 2016 11:03 AM,
  Angel Angel wrote:


  Hello Sir/Madam,


I am writing one application using spark sql.


i made the vary big table using the following command 



  val dfCustomers1 =
  sc.textFile("/root/Desktop/database.txt").map(_.split(",")).map(p
  => Customer1(p(0),p(1).trim.toInt, p(2).trim.toInt,
  p(3)))toDF





Now i want to search the address(many address)  fields in
  the table and then extends the new table as per the
  searching. 



  var k =
dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(0)))


  
  
  
  
  for( a <-1 until 1500) {
   | var temp=
dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(a)))
   |  k = temp.unionAll(k)
  }
  k.show
  

  


For above case one approach that can help a lot is to covert the
lines[0] to a table and then do a join on it instead of individual
searches. Something like:


val linesRDD = sc.parallelize(lines, 1) // since number of lines
is small, so 1 partition should be fine
  
val schema = StructType(Array(StructField("Address",
StringType)))
  
val linesDF = sqlContext.createDataFrame(linesRDD.map(Row(_)),
schema)
  
val result = dfCustomers1.join(linesDF, "Address")


If you do need to scan the DataFrame multiple times, then this will
end up scanning the csv file, formatting etc in every loop. I would
suggest caching in memory or saving to parquet/orc format for faster
access. If there is enough memory then the SQLContext.cacheTable API
can be used, else can save to parquet file:

dfCustomers1.write.parquet("database.parquet")
  val dfCustomers2 = sqlContext.read.parquet("database.parquet")


Normally parquet file scanning should be much faster than CSV
scan+format so use the dfCustomers2 everywhere. You can also try
various values of "spark.sql.parquet.compression.codec" (lzo,
snappy, uncompressed) instead of default gzip. Try if this reduces
the runtime. Fastest will be if there is enough memory for
sqlContext.cacheTable but I doubt that will be possible since you
say it is a big table.



  

  But this is taking so long time. So can
you suggest me the any optimized way, so i can reduce the
execution time.
  
  
  My cluster has 3 slaves and 1 master.
  
  
  Thanks.

  



thanks
-- 
Sumedh Wale
SnappyData (http://www.snappydata.io)
  


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SFTP Compressed CSV into Dataframe

2016-03-02 Thread Sumedh Wale

On Thursday 03 March 2016 12:47 AM, Benjamin Kim wrote:

I wonder if anyone has opened a SFTP connection to open a remote GZIP CSV file? I am able 
to download the file first locally using the SFTP Client in the spark-sftp package. Then, 
I load the file into a dataframe using the spark-csv package, which automatically 
decompresses the file. I just want to remove the "downloading file to local" 
step and directly have the remote file decompressed, read, and loaded. Can someone give 
me any hints?


One easy way on Linux, of course, is to use sshfs 
(https://github.com/libfuse/sshfs) and mount the remote directory 
locally. Since this uses FUSE, so works fine with normal user privileges.



Thanks,
Ben


Thanks

--
Sumedh Wale
SnappyData (http://www.snappydata.io)


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Add Jars to Master/Worker classpath

2016-03-02 Thread Sumedh Wale

  
  
On Wednesday 02 March 2016 09:39 PM,
  Matthias Niehoff wrote:


  no, not to driver and executor but to the master
and worker instances of the spark standalone cluster
  
  


Why exactly does adding jars to driver/executor extraClassPath not
work?

Classpath of master/worker is setup by AbstractCommandBuilder that
explicitly adds the following:

jars named "datanucleus-*", environment variables: _SPARK_ASSEMBLY
(for assembly jar), SPARK_DIST_CLASSPATH, HADOOP_CONF_DIR,
YARN_CONF_DIR

So you can set SPARK_DIST_CLASSPATH in conf/spark-env.sh to add the
required jars (separated by platform's File.pathSeparator).


thanks
-- 
Sumedh Wale
SnappyData (http://www.snappydata.io)


  
Am 2. März 2016 um 17:05 schrieb Igor
  Berman <igor.ber...@gmail.com>:
  
spark.driver.extraClassPath
  spark.executor.extraClassPath



  

  2016-03-02 18:01 GMT+02:00
Matthias Niehoff <matthias.nieh...@codecentric.de>:

  Hi,


we want to add jars to the Master and
  Worker class path mainly for logging reason
  (we have a redis appender to send logs to
  redis -> logstash -> elasticsearch). 



  While it is working
  with setting SPARK_CLASSPATH, this
  solution is afaik deprecated and should
  not be used. Furthermore we are also using —driver-java-options
  and spark.executor.extraClassPath
  which leads to exceptions when running
  our apps in standalone cluster mode. 
  
  
  So what is the best way to
  add jars to the master and
  worker classpath?
  

  Thank you
  
  
  
  -- 
  

  
Matthias
Niehoff | IT-Consultant | Agile
Software Factory  | Consulting
  codecentric
AG | Zeppelinstr 2 | 76185
Karlsruhe | Deutschland
  tel: +49 (0)
721.9595-681 |
fax: +49 (0)
721.9595-666 |
mobil: +49 (0)
172.1702676
  www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de 

Sitz
der Gesellschaft: Solingen | HRB
25917| Amtsgericht Wuppertal
  Vorstand:
Michael Hochgürtel . Mirko
Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier
(Vorsitzender) . Klaus Jäger .
Jürgen Schütz
  
  
  Diese
E-Mail einschließlich evtl.
beigefügter Dateien enthält
vertrauliche und/oder rechtlich
geschützte Informationen. Wenn
Sie nicht der richtige Adressat
sind oder diese E-Mail
irrtümlich erhalten haben,
informieren Sie bitte sofort den
Absender und löschen Sie diese
E-Mail und evtl. beigefügter
Dateien umgehend. Das unerlaubte