Ali, just create a BIGINT column with numeric values in phoenix and use sequences <http://phoenix.apache.org/sequences.html> to populate it automatically
I included the setup below in case someone starts from scratch Prerequisites: - export JAVA_HOME, SCALA_HOME and install sbt - install hbase in standalone mode <http://hbase.apache.org/book/quickstart.html> - add phoenix jar <http://phoenix.apache.org/download.html> to hbase lib directory - start hbase and create a table in phoenix <http://phoenix.apache.org/Phoenix-in-15-minutes-or-less.html> to verify everything is working - install spark in standalone mode, and verify that it works using spark shell <http://spark.apache.org/docs/latest/quick-start.html> 1. create a sequence <http://phoenix.apache.org/sequences.html> in phoenix: $PHOENIX_HOME/hadoop1/bin/sqlline.py localhost > CREATE SEQUENCE IF NOT EXISTS my_schema.my_sequence; 2.add a BIGINT column called e.g. "id" to your table in phoenix > CREATE TABLE test.orders ( id BIGINT not null primary key, name VARCHAR); 3. add some values >UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR my_schema.my_sequence, 'foo'); >UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR my_schema.my_sequence, 'bar'); 4. create jdbc adapter (following SimpleApp setup in Spark->GettingStarted->StandAlone applications <https://spark.apache.org/docs/latest/quick-start.html#Standalone_Applications> ): //SparkToJDBC.scala import java.sql.DriverManager import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Date; import org.apache.spark.SparkContext import org.apache.spark.rdd.JdbcRDD object SparkToJDBC { def main(args: Array[String]) { val sc = new SparkContext("local", "phoenix") try{ val rdd = new JdbcRDD(sc,() => { Class.forName("org.apache.phoenix.jdbc.PhoenixDriver").newInstance() DriverManager.getConnection("jdbc:phoenix:localhost", "", "") }, "SELECT id, name FROM test.orders WHERE id >= ? AND id <= ?", 1, 100, 3, (r:ResultSet) => { processResultSet(r) } ).cache() println("#####################"); println(rdd.count()); println("#####################"); } catch { case _: Throwable => println("Could not connect to database") } sc.stop() } def processResultSet(rs: ResultSet){ val rsmd = rs.getMetaData() val numberOfColumns = rsmd.getColumnCount() var i = 1 while (i <= numberOfColumns) { val colName = rsmd.getColumnName(i) val tableName = rsmd.getTableName(i) val name = rsmd.getColumnTypeName(i) val caseSen = rsmd.isCaseSensitive(i) val writable = rsmd.isWritable(i) println("Information for column " + colName) println(" Column is in table " + tableName) println(" column type is " + name) println("") i += 1 } while (rs.next()) { var i = 1 while (i <= numberOfColumns) { val s = rs.getString(i) System.out.print(s + " ") i += 1 } println("") } } } 5. build SparkToJDBC.scala sbt package 6. execute spark job: note: don't forget to add phoenix jar using --jars option like this: ../spark-1.1.0/bin/spark-submit *--jars ../phoenix-3.1.0-bin/hadoop2/* *phoenix-3.1.0-client-hadoop2.**jar *--class "SparkToJDBC" --master local[4] target/scala-2.10/simple-project_2.10-1.0.jar regards Alex On Fri, Nov 21, 2014 at 4:34 PM, Josh Mahonin <jmaho...@interset.com> wrote: > Hi Alaa Ali, > > In order for Spark to split the JDBC query in parallel, it expects an > upper and lower bound for your input data, as well as a number of > partitions so that it can split the query across multiple tasks. > > For example, depending on your data distribution, you could set an upper > and lower bound on your timestamp range, and spark should be able to create > new sub-queries to split up the data. > > Another option is to load up the whole table using the PhoenixInputFormat > as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate > functions, but it does let you load up whole tables as RDDs. > > I've previously posted example code here: > > http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAJ6CGtA1DoTdadRtT5M0+75rXTyQgu5gexT+uLccw_8Ppzyt=q...@mail.gmail.com%3E > > There's also an example library implementation here, although I haven't > had a chance to test it yet: > https://github.com/simplymeasured/phoenix-spark > > Josh > > On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali <contact.a...@gmail.com> wrote: > >> I want to run queries on Apache Phoenix which has a JDBC driver. The >> query that I want to run is: >> >> select ts,ename from random_data_date limit 10 >> >> But I'm having issues with the JdbcRDD upper and lowerBound parameters >> (that I don't actually understand). >> >> Here's what I have so far: >> >> import org.apache.spark.rdd.JdbcRDD >> import java.sql.{Connection, DriverManager, ResultSet} >> >> val url="jdbc:phoenix:zookeeper" >> val sql = "select ts,ename from random_data_date limit ?" >> val myRDD = new JdbcRDD(sc, () => DriverManager.getConnection(url), sql, >> 5, 10, 2, r => r.getString("ts") + ", " + r.getString("ename")) >> >> But this doesn't work because the sql expression that the JdbcRDD expects >> has to have two ?s to represent the lower and upper bound. >> >> How can I run my query through the JdbcRDD? >> >> Regards, >> Alaa Ali >> > >