Ali,

just create a BIGINT column with numeric values in phoenix and use sequences
<http://phoenix.apache.org/sequences.html> to populate it automatically

I included the setup below in case someone starts from scratch

Prerequisites:
- export JAVA_HOME, SCALA_HOME and install sbt
- install hbase in standalone mode
<http://hbase.apache.org/book/quickstart.html>
- add phoenix jar <http://phoenix.apache.org/download.html> to hbase lib
directory
- start hbase and create a table in phoenix
<http://phoenix.apache.org/Phoenix-in-15-minutes-or-less.html> to verify
everything is working
- install spark in standalone mode, and verify that it works using spark
shell <http://spark.apache.org/docs/latest/quick-start.html>

1. create a sequence <http://phoenix.apache.org/sequences.html> in phoenix:
$PHOENIX_HOME/hadoop1/bin/sqlline.py localhost

 > CREATE SEQUENCE IF NOT EXISTS my_schema.my_sequence;

2.add a BIGINT column called e.g. "id" to your table in phoenix

> CREATE TABLE test.orders ( id BIGINT not null primary key, name VARCHAR);

3. add some values
>UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR
my_schema.my_sequence, 'foo');
>UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR
my_schema.my_sequence, 'bar');

4. create jdbc adapter (following SimpleApp setup in
Spark->GettingStarted->StandAlone
applications
<https://spark.apache.org/docs/latest/quick-start.html#Standalone_Applications>
):

//SparkToJDBC.scala

import java.sql.DriverManager
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import java.util.Date;

import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD

object SparkToJDBC {

  def main(args: Array[String]) {
    val sc = new SparkContext("local", "phoenix")
    try{
            val rdd = new JdbcRDD(sc,() => {

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver").newInstance()

DriverManager.getConnection("jdbc:phoenix:localhost", "", "")
                   },
                   "SELECT id, name  FROM test.orders WHERE id >= ? AND id
<= ?",
                    1, 100, 3,
                    (r:ResultSet) => {
                                processResultSet(r)
                    }
                ).cache()

            println("#####################");
            println(rdd.count());
            println("#####################");
     } catch {
              case _: Throwable => println("Could not connect to database")
     }
     sc.stop()
  }

def processResultSet(rs: ResultSet){

          val rsmd = rs.getMetaData()
          val numberOfColumns = rsmd.getColumnCount()

          var i = 1
          while (i <= numberOfColumns) {
            val colName = rsmd.getColumnName(i)
            val tableName = rsmd.getTableName(i)
            val name = rsmd.getColumnTypeName(i)
            val caseSen = rsmd.isCaseSensitive(i)
            val writable = rsmd.isWritable(i)
            println("Information for column " + colName)
            println("    Column is in table " + tableName)
            println("    column type is " + name)
            println("")
            i += 1
          }

          while (rs.next()) {
            var i = 1
            while (i <= numberOfColumns) {
              val s = rs.getString(i)
              System.out.print(s + "  ")
              i += 1
            }
            println("")
          }
   }

}

5. build SparkToJDBC.scala
sbt package

6. execute spark job:
note: don't forget to add phoenix jar using --jars option like this:

../spark-1.1.0/bin/spark-submit *--jars ../phoenix-3.1.0-bin/hadoop2/*
*phoenix-3.1.0-client-hadoop2.**jar *--class "SparkToJDBC" --master
local[4] target/scala-2.10/simple-project_2.10-1.0.jar

regards
Alex


On Fri, Nov 21, 2014 at 4:34 PM, Josh Mahonin <jmaho...@interset.com> wrote:

> Hi Alaa Ali,
>
> In order for Spark to split the JDBC query in parallel, it expects an
> upper and lower bound for your input data, as well as a number of
> partitions so that it can split the query across multiple tasks.
>
> For example, depending on your data distribution, you could set an upper
> and lower bound on your timestamp range, and spark should be able to create
> new sub-queries to split up the data.
>
> Another option is to load up the whole table using the PhoenixInputFormat
> as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate
> functions, but it does let you load up whole tables as RDDs.
>
> I've previously posted example code here:
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAJ6CGtA1DoTdadRtT5M0+75rXTyQgu5gexT+uLccw_8Ppzyt=q...@mail.gmail.com%3E
>
> There's also an example library implementation here, although I haven't
> had a chance to test it yet:
> https://github.com/simplymeasured/phoenix-spark
>
> Josh
>
> On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali <contact.a...@gmail.com> wrote:
>
>> I want to run queries on Apache Phoenix which has a JDBC driver. The
>> query that I want to run is:
>>
>>     select ts,ename from random_data_date limit 10
>>
>> But I'm having issues with the JdbcRDD upper and lowerBound parameters
>> (that I don't actually understand).
>>
>> Here's what I have so far:
>>
>> import org.apache.spark.rdd.JdbcRDD
>> import java.sql.{Connection, DriverManager, ResultSet}
>>
>> val url="jdbc:phoenix:zookeeper"
>> val sql = "select ts,ename from random_data_date limit ?"
>> val myRDD = new JdbcRDD(sc, () => DriverManager.getConnection(url), sql,
>> 5, 10, 2, r => r.getString("ts") + ", " + r.getString("ename"))
>>
>> But this doesn't work because the sql expression that the JdbcRDD expects
>> has to have two ?s to represent the lower and upper bound.
>>
>> How can I run my query through the JdbcRDD?
>>
>> Regards,
>> Alaa Ali
>>
>
>

Reply via email to