At the beginning of the code, do a query to find the current maximum ID Don't just put in an arbitrarily large value, or all of your rows will end up in 1 spark partition at the beginning of the range.
The question of keys is up to you... all that you need to be able to do is write a sql statement that takes 2 numbers to specify the bounds. Of course, a numeric primary key is going to be the most efficient way to do that. On Thu, Feb 19, 2015 at 8:57 AM, Dmitry Goldenberg <dgoldenberg...@gmail.com > wrote: > Yup, I did see that. Good point though, Cody. The mismatch was happening > for me when I was trying to get the 'new JdbcRDD' approach going. Once I > switched to the 'create' method things are working just fine. Was just able > to refactor the 'get connection' logic into a 'DbConnection implements > JdbcRDD.ConnectionFactory' and my 'map row' class is still 'MapRow > implements org.apache.spark.api.java.function.Function<ResultSet, Row>'. > > This works fine and makes the driver program tighter. Of course, my next > question is, how to work with the lower and upper bound parameters. As in, > what if I don't know what the min and max ID values are and just want to > extract all data from the table, what should the params be, if that's even > supported. And furthermore, what if the primary key on the table is not > numeric? or if there's no primary key altogether? > > The method works fine with lowerBound=0 and upperBound=1000000, for > example. But doesn't seem to have a way to say, 'no upper bound' (-1 didn't > work). > > On Wed, Feb 18, 2015 at 11:59 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Look at the definition of JdbcRDD.create: >> >> def create[T]( >> >> sc: JavaSparkContext, >> >> connectionFactory: ConnectionFactory, >> >> sql: String, >> >> lowerBound: Long, >> >> upperBound: Long, >> >> numPartitions: Int, >> >> mapRow: JFunction[ResultSet, T]): JavaRDD[T] = { >> >> >> JFunction here is the interface org.apache.spark.api.java.function.Function, >> not scala Function0 >> >> LIkewise, ConnectionFactory is an interface defined inside JdbcRDD, not >> scala Function0 >> >> On Wed, Feb 18, 2015 at 4:50 PM, Dmitry Goldenberg < >> dgoldenberg...@gmail.com> wrote: >> >>> That's exactly what I was doing. However, I ran into runtime issues with >>> doing that. For instance, I had a >>> >>> public class DbConnection extends AbstractFunction0<Connection> >>> implements Serializable >>> >>> I got a runtime error from Spark complaining that DbConnection wasn't an >>> instance of scala.Function0. >>> >>> I also had a >>> >>> public class MapRow extends >>> scala.runtime.AbstractFunction1<java.sql.ResultSet, Row> implements >>> Serializable >>> >>> with which I seemed to have more luck. >>> >>> On Wed, Feb 18, 2015 at 5:32 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> Cant you implement the >>>> >>>> org.apache.spark.api.java.function.Function >>>> >>>> interface and pass an instance of that to JdbcRDD.create ? >>>> >>>> On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg < >>>> dgoldenberg...@gmail.com> wrote: >>>> >>>>> Cody, you were right, I had a copy and paste snag where I ended up >>>>> with a vanilla SparkContext rather than a Java one. I also had to *not* >>>>> use my function subclasses, rather just use anonymous inner classes for >>>>> the >>>>> Function stuff and that got things working. I'm fully following >>>>> the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim. >>>>> >>>>> Is there a clean way to refactor out the custom Function classes such >>>>> as the one for getting a db connection or mapping ResultSet data to your >>>>> own POJO's rather than doing it all inline? >>>>> >>>>> >>>>> On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> Is sc there a SparkContext or a JavaSparkContext? The compilation >>>>>> error seems to indicate the former, but JdbcRDD.create expects the latter >>>>>> >>>>>> On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg < >>>>>> dgoldenberg...@gmail.com> wrote: >>>>>> >>>>>>> I have tried that as well, I get a compile error -- >>>>>>> >>>>>>> [ERROR] ...SparkProto.java:[105,39] error: no suitable method found >>>>>>> for create(SparkContext,<anonymous >>>>>>> ConnectionFactory>,String,int,int,int,<anonymous >>>>>>> Function<ResultSet,Integer>>) >>>>>>> >>>>>>> The code is a copy and paste: >>>>>>> >>>>>>> JavaRDD<Integer> jdbcRDD = JdbcRDD.create( >>>>>>> sc, >>>>>>> new JdbcRDD.ConnectionFactory() { >>>>>>> public Connection getConnection() throws SQLException { >>>>>>> return >>>>>>> DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"); >>>>>>> } >>>>>>> }, >>>>>>> "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", >>>>>>> 1, 100, 1, >>>>>>> new Function<ResultSet, Integer>() { >>>>>>> public Integer call(ResultSet r) throws Exception { >>>>>>> return r.getInt(1); >>>>>>> } >>>>>>> } >>>>>>> ); >>>>>>> >>>>>>> The other thing I've tried was to define a static class locally for >>>>>>> GetConnection and use the JdbcCreate constructor. This got around the >>>>>>> compile issues but blew up at runtime with "NoClassDefFoundError: >>>>>>> scala/runtime/AbstractFunction0" ! >>>>>>> >>>>>>> JdbcRDD<Row> jdbcRDD = new JdbcRDD<Row>( >>>>>>> sc, >>>>>>> (AbstractFunction0<Connection>) new DbConn(), // had to cast or a >>>>>>> compile error >>>>>>> SQL_QUERY, >>>>>>> 0L, >>>>>>> 1000L, >>>>>>> 10, >>>>>>> new MapRow(), >>>>>>> ROW_CLASS_TAG); >>>>>>> // DbConn is defined as public static class DbConn extends >>>>>>> AbstractFunction0<Connection> implements Serializable >>>>>>> >>>>>>> On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger <c...@koeninger.org> >>>>>>> wrote: >>>>>>> >>>>>>>> That test I linked >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90 >>>>>>>> >>>>>>>> is calling a static method JdbcRDD.create, not new JdbcRDD. Is >>>>>>>> that what you tried doing? >>>>>>>> >>>>>>>> On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg < >>>>>>>> dgoldenberg...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thanks, Cody. Yes, I originally started off by looking at that but >>>>>>>>> I get a compile error if I try and use that approach: constructor >>>>>>>>> JdbcRDD >>>>>>>>> in class JdbcRDD<T> cannot be applied to given types. Not to mention >>>>>>>>> that >>>>>>>>> JavaJdbcRDDSuite somehow manages to not pass in the class tag (the >>>>>>>>> last >>>>>>>>> argument). >>>>>>>>> >>>>>>>>> Wonder if it's a JDK version issue, I'm using 1.7. >>>>>>>>> >>>>>>>>> So I've got this, which doesn't compile >>>>>>>>> >>>>>>>>> JdbcRDD<Row> jdbcRDD = new JdbcRDD<Row>( >>>>>>>>> new SparkContext(conf), >>>>>>>>> new JdbcRDD.ConnectionFactory() { >>>>>>>>> public Connection getConnection() throws SQLException { >>>>>>>>> Connection conn = null; >>>>>>>>> try { >>>>>>>>> Class.forName(JDBC_DRIVER); >>>>>>>>> conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, >>>>>>>>> JDBC_PASSWORD); >>>>>>>>> } catch (ClassNotFoundException ex) { >>>>>>>>> throw new RuntimeException("Error while loading JDBC driver.", ex); >>>>>>>>> } >>>>>>>>> return conn; >>>>>>>>> } >>>>>>>>> }, >>>>>>>>> "SELECT * FROM EMPLOYEES", >>>>>>>>> 0L, >>>>>>>>> 1000L, >>>>>>>>> 10, >>>>>>>>> new Function<ResultSet, Row>() { >>>>>>>>> public Row call(ResultSet r) throws Exception { >>>>>>>>> return null; // have some actual logic here... >>>>>>>>> } >>>>>>>>> }, >>>>>>>>> scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class)); >>>>>>>>> >>>>>>>>> The other approach was mimicing the DbConnection class from this >>>>>>>>> post: >>>>>>>>> http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/. >>>>>>>>> It got around any of the compilation issues but then I got the runtime >>>>>>>>> error where Spark wouldn't recognize the db connection class as a >>>>>>>>> scala.Function0. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger < >>>>>>>>> c...@koeninger.org> wrote: >>>>>>>>> >>>>>>>>>> Take a look at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg < >>>>>>>>>> dgoldenberg...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I'm reading data from a database using JdbcRDD, in Java, and I >>>>>>>>>>> have an >>>>>>>>>>> implementation of Function0<Connection> whose instance I supply >>>>>>>>>>> as the >>>>>>>>>>> 'getConnection' parameter into the JdbcRDD constructor. Compiles >>>>>>>>>>> fine. >>>>>>>>>>> >>>>>>>>>>> The definition of the class/function is as follows: >>>>>>>>>>> >>>>>>>>>>> public class GetDbConnection extends >>>>>>>>>>> AbstractFunction0<Connection> >>>>>>>>>>> implements Serializable >>>>>>>>>>> >>>>>>>>>>> where scala.runtime.AbstractFunction0 extends scala.Function0. >>>>>>>>>>> >>>>>>>>>>> At runtime, I get an exception as below. Does anyone have an >>>>>>>>>>> idea as to how >>>>>>>>>>> to resolve this/work around it? Thanks. >>>>>>>>>>> >>>>>>>>>>> I'm running Spark 1.2.1 built for Hadoop 2.4. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Exception in thread "main" org.apache.spark.SparkException: Job >>>>>>>>>>> aborted due >>>>>>>>>>> to stage failure: Task 3 in stage 0.0 failed 1 times, most >>>>>>>>>>> recent failure: >>>>>>>>>>> Lost task 3.0 in stage 0.0 (TID 3, localhost): >>>>>>>>>>> java.lang.ClassCastException: >>>>>>>>>>> cannot assign instance of >>>>>>>>>>> com.kona.motivis.spark.proto.GetDbConnection to >>>>>>>>>>> field >>>>>>>>>>> org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection >>>>>>>>>>> of >>>>>>>>>>> type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>>>>>>> at >>>>>>>>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57) >>>>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:744) >>>>>>>>>>> >>>>>>>>>>> Driver stacktrace: >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org >>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) >>>>>>>>>>> at scala.Option.foreach(Option.scala:236) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) >>>>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) >>>>>>>>>>> at >>>>>>>>>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>>>>>>> at >>>>>>>>>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> View this message in context: >>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/JdbcRDD-ClassCastException-with-scala-Function0-tp21707.html >>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>>>> Nabble.com. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >