At the beginning of the code, do a query to find the current maximum ID

Don't just put in an arbitrarily large value, or all of your rows will end
up in 1 spark partition at the beginning of the range.

The question of keys is up to you... all that you need to be able to do is
write a sql statement that takes 2 numbers to specify the bounds.  Of
course, a numeric primary key is going to be the most efficient way to do
that.

On Thu, Feb 19, 2015 at 8:57 AM, Dmitry Goldenberg <dgoldenberg...@gmail.com
> wrote:

> Yup, I did see that. Good point though, Cody. The mismatch was happening
> for me when I was trying to get the 'new JdbcRDD' approach going. Once I
> switched to the 'create' method things are working just fine. Was just able
> to refactor the 'get connection' logic into a 'DbConnection implements
> JdbcRDD.ConnectionFactory' and my 'map row' class is still 'MapRow
> implements org.apache.spark.api.java.function.Function<ResultSet, Row>'.
>
> This works fine and makes the driver program tighter. Of course, my next
> question is, how to work with the lower and upper bound parameters. As in,
> what if I don't know what the min and max ID values are and just want to
> extract all data from the table, what should the params be, if that's even
> supported. And furthermore, what if the primary key on the table is not
> numeric? or if there's no primary key altogether?
>
> The method works fine with lowerBound=0 and upperBound=1000000, for
> example. But doesn't seem to have a way to say, 'no upper bound' (-1 didn't
> work).
>
> On Wed, Feb 18, 2015 at 11:59 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Look at the definition of JdbcRDD.create:
>>
>>   def create[T](
>>
>>       sc: JavaSparkContext,
>>
>>       connectionFactory: ConnectionFactory,
>>
>>       sql: String,
>>
>>       lowerBound: Long,
>>
>>       upperBound: Long,
>>
>>       numPartitions: Int,
>>
>>       mapRow: JFunction[ResultSet, T]): JavaRDD[T] = {
>>
>>
>> JFunction here is the interface org.apache.spark.api.java.function.Function,
>> not scala Function0
>>
>> LIkewise, ConnectionFactory is an interface defined inside JdbcRDD, not
>> scala Function0
>>
>> On Wed, Feb 18, 2015 at 4:50 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> That's exactly what I was doing. However, I ran into runtime issues with
>>> doing that. For instance, I had a
>>>
>>>   public class DbConnection extends AbstractFunction0<Connection>
>>> implements Serializable
>>>
>>> I got a runtime error from Spark complaining that DbConnection wasn't an
>>> instance of scala.Function0.
>>>
>>> I also had a
>>>
>>>   public class MapRow extends
>>> scala.runtime.AbstractFunction1<java.sql.ResultSet, Row> implements
>>> Serializable
>>>
>>> with which I seemed to have more luck.
>>>
>>> On Wed, Feb 18, 2015 at 5:32 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> Cant you implement the
>>>>
>>>> org.apache.spark.api.java.function.Function
>>>>
>>>> interface and pass an instance of that to JdbcRDD.create ?
>>>>
>>>> On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg <
>>>> dgoldenberg...@gmail.com> wrote:
>>>>
>>>>> Cody, you were right, I had a copy and paste snag where I ended up
>>>>> with a vanilla SparkContext rather than a Java one.  I also had to *not*
>>>>> use my function subclasses, rather just use anonymous inner classes for 
>>>>> the
>>>>> Function stuff and that got things working. I'm fully following
>>>>> the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim.
>>>>>
>>>>> Is there a clean way to refactor out the custom Function classes such
>>>>> as the one for getting a db connection or mapping ResultSet data to your
>>>>> own POJO's rather than doing it all inline?
>>>>>
>>>>>
>>>>> On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Is sc there a SparkContext or a JavaSparkContext?  The compilation
>>>>>> error seems to indicate the former, but JdbcRDD.create expects the latter
>>>>>>
>>>>>> On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg <
>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>
>>>>>>> I have tried that as well, I get a compile error --
>>>>>>>
>>>>>>> [ERROR] ...SparkProto.java:[105,39] error: no suitable method found
>>>>>>> for create(SparkContext,<anonymous
>>>>>>> ConnectionFactory>,String,int,int,int,<anonymous
>>>>>>> Function<ResultSet,Integer>>)
>>>>>>>
>>>>>>> The code is a copy and paste:
>>>>>>>
>>>>>>>     JavaRDD<Integer> jdbcRDD = JdbcRDD.create(
>>>>>>>           sc,
>>>>>>>           new JdbcRDD.ConnectionFactory() {
>>>>>>>             public Connection getConnection() throws SQLException {
>>>>>>>               return
>>>>>>> DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb");
>>>>>>>             }
>>>>>>>           },
>>>>>>>           "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
>>>>>>>           1, 100, 1,
>>>>>>>           new Function<ResultSet, Integer>() {
>>>>>>>             public Integer call(ResultSet r) throws Exception {
>>>>>>>               return r.getInt(1);
>>>>>>>             }
>>>>>>>           }
>>>>>>>         );
>>>>>>>
>>>>>>> The other thing I've tried was to define a static class locally for
>>>>>>> GetConnection and use the JdbcCreate constructor. This got around the
>>>>>>> compile issues but blew up at runtime with "NoClassDefFoundError:
>>>>>>> scala/runtime/AbstractFunction0" !
>>>>>>>
>>>>>>> JdbcRDD<Row> jdbcRDD = new JdbcRDD<Row>(
>>>>>>> sc,
>>>>>>> (AbstractFunction0<Connection>) new DbConn(), // had to cast or a
>>>>>>> compile error
>>>>>>> SQL_QUERY,
>>>>>>> 0L,
>>>>>>> 1000L,
>>>>>>> 10,
>>>>>>> new MapRow(),
>>>>>>> ROW_CLASS_TAG);
>>>>>>> // DbConn is defined as public static class DbConn extends
>>>>>>> AbstractFunction0<Connection> implements Serializable
>>>>>>>
>>>>>>> On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger <c...@koeninger.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> That test I linked
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90
>>>>>>>>
>>>>>>>> is calling a static method JdbcRDD.create, not new JdbcRDD.  Is
>>>>>>>> that what you tried doing?
>>>>>>>>
>>>>>>>> On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg <
>>>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks, Cody. Yes, I originally started off by looking at that but
>>>>>>>>> I get a compile error if I try and use that approach: constructor 
>>>>>>>>> JdbcRDD
>>>>>>>>> in class JdbcRDD<T> cannot be applied to given types.  Not to mention 
>>>>>>>>> that
>>>>>>>>> JavaJdbcRDDSuite somehow manages to not pass in the class tag (the 
>>>>>>>>> last
>>>>>>>>> argument).
>>>>>>>>>
>>>>>>>>> Wonder if it's a JDK version issue, I'm using 1.7.
>>>>>>>>>
>>>>>>>>> So I've got this, which doesn't compile
>>>>>>>>>
>>>>>>>>> JdbcRDD<Row> jdbcRDD = new JdbcRDD<Row>(
>>>>>>>>> new SparkContext(conf),
>>>>>>>>> new JdbcRDD.ConnectionFactory() {
>>>>>>>>> public Connection getConnection() throws SQLException {
>>>>>>>>> Connection conn = null;
>>>>>>>>> try {
>>>>>>>>> Class.forName(JDBC_DRIVER);
>>>>>>>>> conn = DriverManager.getConnection(JDBC_URL, JDBC_USER,
>>>>>>>>> JDBC_PASSWORD);
>>>>>>>>> } catch (ClassNotFoundException ex) {
>>>>>>>>> throw new RuntimeException("Error while loading JDBC driver.", ex);
>>>>>>>>> }
>>>>>>>>> return conn;
>>>>>>>>> }
>>>>>>>>> },
>>>>>>>>> "SELECT * FROM EMPLOYEES",
>>>>>>>>> 0L,
>>>>>>>>> 1000L,
>>>>>>>>> 10,
>>>>>>>>> new Function<ResultSet, Row>() {
>>>>>>>>> public Row call(ResultSet r) throws Exception {
>>>>>>>>> return null; // have some actual logic here...
>>>>>>>>> }
>>>>>>>>> },
>>>>>>>>> scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class));
>>>>>>>>>
>>>>>>>>> The other approach was mimicing the DbConnection class from this
>>>>>>>>> post:
>>>>>>>>> http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/.
>>>>>>>>> It got around any of the compilation issues but then I got the runtime
>>>>>>>>> error where Spark wouldn't recognize the db connection class as a
>>>>>>>>> scala.Function0.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger <
>>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>>
>>>>>>>>>> Take a look at
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg <
>>>>>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm reading data from a database using JdbcRDD, in Java, and I
>>>>>>>>>>> have an
>>>>>>>>>>> implementation of Function0<Connection> whose instance I supply
>>>>>>>>>>> as the
>>>>>>>>>>> 'getConnection' parameter into the JdbcRDD constructor. Compiles
>>>>>>>>>>> fine.
>>>>>>>>>>>
>>>>>>>>>>> The definition of the class/function is as follows:
>>>>>>>>>>>
>>>>>>>>>>>   public class GetDbConnection extends
>>>>>>>>>>> AbstractFunction0<Connection>
>>>>>>>>>>> implements Serializable
>>>>>>>>>>>
>>>>>>>>>>> where scala.runtime.AbstractFunction0 extends scala.Function0.
>>>>>>>>>>>
>>>>>>>>>>> At runtime, I get an exception as below. Does anyone have an
>>>>>>>>>>> idea as to how
>>>>>>>>>>> to resolve this/work around it? Thanks.
>>>>>>>>>>>
>>>>>>>>>>> I'm running Spark 1.2.1 built for Hadoop 2.4.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Exception in thread "main" org.apache.spark.SparkException: Job
>>>>>>>>>>> aborted due
>>>>>>>>>>> to stage failure: Task 3 in stage 0.0 failed 1 times, most
>>>>>>>>>>> recent failure:
>>>>>>>>>>> Lost task 3.0 in stage 0.0 (TID 3, localhost):
>>>>>>>>>>> java.lang.ClassCastException:
>>>>>>>>>>> cannot assign instance of
>>>>>>>>>>> com.kona.motivis.spark.proto.GetDbConnection to
>>>>>>>>>>> field
>>>>>>>>>>> org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection
>>>>>>>>>>> of
>>>>>>>>>>> type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>>>>>>         at
>>>>>>>>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
>>>>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:744)
>>>>>>>>>>>
>>>>>>>>>>> Driver stacktrace:
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org
>>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>>>>>>>>>>>         at scala.Option.foreach(Option.scala:236)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>>>>>>>>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
>>>>>>>>>>>         at
>>>>>>>>>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>>>>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>>>>>>>         at
>>>>>>>>>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>>>>>>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> View this message in context:
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/JdbcRDD-ClassCastException-with-scala-Function0-tp21707.html
>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to