Hi All, 

I re wrote my code to use sqlContext.read.jdbc which lets me specify
upperbound,lowerbound,numberofparitions etc .. which might run in parallel,
I need to try on a cluster which I will do when I have time.
But please confirm read.jdbc does parallel reads ?

Spark code:-


package com.kali.db

/**
 * Created by kalit_000 on 06/12/2015.
 */

import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark._
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.sql.DataFrame
import org.springframework.context.support.ClassPathXmlApplicationContext

case class
SparkSqlValueClassMPP(driver:String,url:String,username:String,password:String,sql:String,table:String,opdelimeter:String,lowerbound:String,upperbound:String,numberofparitions:String,parallelizecolumn:String)

object SparkDBExtractorMPP {

  def main (args: Array[String]) {

    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val conf = new
SparkConf().setMaster("local[*]").setAppName("SparkDBExtractorMPP").set("spark.hadoop.validateOutputSpecs",
"false")
    val sc = new SparkContext(conf)

    def opfile(value:DataFrame,delimeter:String):RDD[String]=
    {
      value.map(x =>
x.toString.replace("[","").replace("]","").replace(",",delimeter))
    }

    //read the application context file
    val ctx = new ClassPathXmlApplicationContext("sparkDBExtractorMpp.xml")
    val DBinfo =
ctx.getBean("SparkSQLDBExtractorMPP").asInstanceOf[SparkSqlValueClassMPP]

    val driver = DBinfo.driver
    val url = DBinfo.url
    val username = DBinfo.username
    val password = DBinfo.password
    val query = DBinfo.sql
    val sqlquery = DBinfo.sql
    val table = DBinfo.table
    val opdelimeter=DBinfo.opdelimeter
    val lowerbound=DBinfo.lowerbound.toInt
    val upperbound=DBinfo.upperbound.toInt
    val numberofpartitions=DBinfo.numberofparitions.toInt
    val parallelizecolumn=DBinfo.parallelizecolumn


    println("DB Driver:-%s".format(driver))
    println("DB Url:-%s".format(url))
    println("Username:-%s".format(username))
    println("Password:-%s".format(password))
    println("Query:-%s".format(query))
    println("Table:-%s".format(table))
    println("Opdelimeter:-%s".format(opdelimeter))
    println("Lowerbound:-%s".format(lowerbound))
    println("Upperbound:-%s".format(upperbound))
    println("Numberofpartitions:-%s".format(numberofpartitions))
    println("Parallelizecolumn:-%s".format(parallelizecolumn))

    try {
    val props=new Properties()
    props.put("user",username)
    props.put("password",password)
    props.put("driver",driver)

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val df =
sqlContext.read.jdbc(url,table,parallelizecolumn,lowerbound,upperbound,numberofpartitions,props)

    df.show(10)

    df.registerTempTable(table)
    val OP=sqlContext.sql(query)

   
opfile(OP,opdelimeter).saveAsTextFile("C:\\Users\\kalit_000\\Desktop\\typesafe\\scaladbop\\op.txt")

    } catch {
      case e: Exception => e.printStackTrace
    }
    sc.stop()
  }
}

Spring:-
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC &quot;-//SPRING//DTD BEAN//EN&quot;
        &quot;http://www.springframework.org/dtd/spring-beans.dtd&quot;>
<beans>
    <bean id="queryProps"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
    </bean>

    <bean id="SparkSQLDBExtractorMPP"
class="com.kali.db.SparkSqlValueClassMPP">
        <constructor-arg
value="com.microsoft.sqlserver.jdbc.SQLServerDriver" />
        <constructor-arg
value="jdbc:sqlserver://localhost;user=admin;password=oracle;database=AdventureWorks2014"
/>
        <constructor-arg value="admin" />
        <constructor-arg value="oracle" />
        <constructor-arg value="select
CustomerID,StoreID,TerritoryID,AccountNumber,ModifiedDate from customer
limit 10" />
        <constructor-arg value="customer" />
        <constructor-arg value="~" />
        <constructor-arg value="1" />
        <constructor-arg value="100" />
        <constructor-arg value="8" />
        <constructor-arg value="CustomerID" />
    </bean>
</beans>






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-data-frames-do-they-run-in-parallel-by-default-tp25604p25611.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to