
just create a BIGINT column with numeric values in phoenix and use sequences
<> to populate it automatically

I included the setup below in case someone starts from scratch

- export JAVA_HOME, SCALA_HOME and install sbt
- install hbase in standalone mode
- add phoenix jar <> to hbase lib
- start hbase and create a table in phoenix
<> to verify
everything is working
- install spark in standalone mode, and verify that it works using spark
shell <>

1. create a sequence <> in phoenix:
$PHOENIX_HOME/hadoop1/bin/ localhost

 > CREATE SEQUENCE IF NOT EXISTS my_schema.my_sequence;

2.add a BIGINT column called e.g. "id" to your table in phoenix

> CREATE TABLE test.orders ( id BIGINT not null primary key, name VARCHAR);

3. add some values
>UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR
my_schema.my_sequence, 'foo');
>UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR
my_schema.my_sequence, 'bar');

4. create jdbc adapter (following SimpleApp setup in


import java.sql.DriverManager
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import java.util.Date;

import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD

object SparkToJDBC {

  def main(args: Array[String]) {
    val sc = new SparkContext("local", "phoenix")
            val rdd = new JdbcRDD(sc,() => {


DriverManager.getConnection("jdbc:phoenix:localhost", "", "")
                   "SELECT id, name  FROM test.orders WHERE id >= ? AND id
<= ?",
                    1, 100, 3,
                    (r:ResultSet) => {

     } catch {
              case _: Throwable => println("Could not connect to database")

def processResultSet(rs: ResultSet){

          val rsmd = rs.getMetaData()
          val numberOfColumns = rsmd.getColumnCount()

          var i = 1
          while (i <= numberOfColumns) {
            val colName = rsmd.getColumnName(i)
            val tableName = rsmd.getTableName(i)
            val name = rsmd.getColumnTypeName(i)
            val caseSen = rsmd.isCaseSensitive(i)
            val writable = rsmd.isWritable(i)
            println("Information for column " + colName)
            println("    Column is in table " + tableName)
            println("    column type is " + name)
            i += 1

          while ( {
            var i = 1
            while (i <= numberOfColumns) {
              val s = rs.getString(i)
              System.out.print(s + "  ")
              i += 1


5. build SparkToJDBC.scala
sbt package

6. execute spark job:
note: don't forget to add phoenix jar using --jars option like this:

../spark-1.1.0/bin/spark-submit *--jars ../phoenix-3.1.0-bin/hadoop2/*
*phoenix-3.1.0-client-hadoop2.**jar *--class "SparkToJDBC" --master
local[4] target/scala-2.10/simple-project_2.10-1.0.jar


On Fri, Nov 21, 2014 at 4:34 PM, Josh Mahonin <> wrote:

> Hi Alaa Ali,
> In order for Spark to split the JDBC query in parallel, it expects an
> upper and lower bound for your input data, as well as a number of
> partitions so that it can split the query across multiple tasks.
> For example, depending on your data distribution, you could set an upper
> and lower bound on your timestamp range, and spark should be able to create
> new sub-queries to split up the data.
> Another option is to load up the whole table using the PhoenixInputFormat
> as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate
> functions, but it does let you load up whole tables as RDDs.
> I've previously posted example code here:
> There's also an example library implementation here, although I haven't
> had a chance to test it yet:
> Josh
> On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali <> wrote:
>> I want to run queries on Apache Phoenix which has a JDBC driver. The
>> query that I want to run is:
>>     select ts,ename from random_data_date limit 10
>> But I'm having issues with the JdbcRDD upper and lowerBound parameters
>> (that I don't actually understand).
>> Here's what I have so far:
>> import org.apache.spark.rdd.JdbcRDD
>> import java.sql.{Connection, DriverManager, ResultSet}
>> val url="jdbc:phoenix:zookeeper"
>> val sql = "select ts,ename from random_data_date limit ?"
>> val myRDD = new JdbcRDD(sc, () => DriverManager.getConnection(url), sql,
>> 5, 10, 2, r => r.getString("ts") + ", " + r.getString("ename"))
>> But this doesn't work because the sql expression that the JdbcRDD expects
>> has to have two ?s to represent the lower and upper bound.
>> How can I run my query through the JdbcRDD?
>> Regards,
>> Alaa Ali

Reply via email to