Yup, I did see that. Good point though, Cody. The mismatch was happening
for me when I was trying to get the 'new JdbcRDD' approach going. Once I
switched to the 'create' method things are working just fine. Was just able
to refactor the 'get connection' logic into a 'DbConnection implements
JdbcRDD.ConnectionFactory' and my 'map row' class is still 'MapRow
implements org.apache.spark.api.java.function.Function<ResultSet, Row>'.

This works fine and makes the driver program tighter. Of course, my next
question is, how to work with the lower and upper bound parameters. As in,
what if I don't know what the min and max ID values are and just want to
extract all data from the table, what should the params be, if that's even
supported. And furthermore, what if the primary key on the table is not
numeric? or if there's no primary key altogether?

The method works fine with lowerBound=0 and upperBound=1000000, for
example. But doesn't seem to have a way to say, 'no upper bound' (-1 didn't
work).

On Wed, Feb 18, 2015 at 11:59 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Look at the definition of JdbcRDD.create:
>
>   def create[T](
>
>       sc: JavaSparkContext,
>
>       connectionFactory: ConnectionFactory,
>
>       sql: String,
>
>       lowerBound: Long,
>
>       upperBound: Long,
>
>       numPartitions: Int,
>
>       mapRow: JFunction[ResultSet, T]): JavaRDD[T] = {
>
>
> JFunction here is the interface org.apache.spark.api.java.function.Function,
> not scala Function0
>
> LIkewise, ConnectionFactory is an interface defined inside JdbcRDD, not
> scala Function0
>
> On Wed, Feb 18, 2015 at 4:50 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> That's exactly what I was doing. However, I ran into runtime issues with
>> doing that. For instance, I had a
>>
>>   public class DbConnection extends AbstractFunction0<Connection>
>> implements Serializable
>>
>> I got a runtime error from Spark complaining that DbConnection wasn't an
>> instance of scala.Function0.
>>
>> I also had a
>>
>>   public class MapRow extends
>> scala.runtime.AbstractFunction1<java.sql.ResultSet, Row> implements
>> Serializable
>>
>> with which I seemed to have more luck.
>>
>> On Wed, Feb 18, 2015 at 5:32 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Cant you implement the
>>>
>>> org.apache.spark.api.java.function.Function
>>>
>>> interface and pass an instance of that to JdbcRDD.create ?
>>>
>>> On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> Cody, you were right, I had a copy and paste snag where I ended up with
>>>> a vanilla SparkContext rather than a Java one.  I also had to *not* use my
>>>> function subclasses, rather just use anonymous inner classes for the
>>>> Function stuff and that got things working. I'm fully following
>>>> the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim.
>>>>
>>>> Is there a clean way to refactor out the custom Function classes such
>>>> as the one for getting a db connection or mapping ResultSet data to your
>>>> own POJO's rather than doing it all inline?
>>>>
>>>>
>>>> On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Is sc there a SparkContext or a JavaSparkContext?  The compilation
>>>>> error seems to indicate the former, but JdbcRDD.create expects the latter
>>>>>
>>>>> On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg <
>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>
>>>>>> I have tried that as well, I get a compile error --
>>>>>>
>>>>>> [ERROR] ...SparkProto.java:[105,39] error: no suitable method found
>>>>>> for create(SparkContext,<anonymous
>>>>>> ConnectionFactory>,String,int,int,int,<anonymous
>>>>>> Function<ResultSet,Integer>>)
>>>>>>
>>>>>> The code is a copy and paste:
>>>>>>
>>>>>>     JavaRDD<Integer> jdbcRDD = JdbcRDD.create(
>>>>>>           sc,
>>>>>>           new JdbcRDD.ConnectionFactory() {
>>>>>>             public Connection getConnection() throws SQLException {
>>>>>>               return
>>>>>> DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb");
>>>>>>             }
>>>>>>           },
>>>>>>           "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
>>>>>>           1, 100, 1,
>>>>>>           new Function<ResultSet, Integer>() {
>>>>>>             public Integer call(ResultSet r) throws Exception {
>>>>>>               return r.getInt(1);
>>>>>>             }
>>>>>>           }
>>>>>>         );
>>>>>>
>>>>>> The other thing I've tried was to define a static class locally for
>>>>>> GetConnection and use the JdbcCreate constructor. This got around the
>>>>>> compile issues but blew up at runtime with "NoClassDefFoundError:
>>>>>> scala/runtime/AbstractFunction0" !
>>>>>>
>>>>>> JdbcRDD<Row> jdbcRDD = new JdbcRDD<Row>(
>>>>>> sc,
>>>>>> (AbstractFunction0<Connection>) new DbConn(), // had to cast or a
>>>>>> compile error
>>>>>> SQL_QUERY,
>>>>>> 0L,
>>>>>> 1000L,
>>>>>> 10,
>>>>>> new MapRow(),
>>>>>> ROW_CLASS_TAG);
>>>>>> // DbConn is defined as public static class DbConn extends
>>>>>> AbstractFunction0<Connection> implements Serializable
>>>>>>
>>>>>> On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger <c...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> That test I linked
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90
>>>>>>>
>>>>>>> is calling a static method JdbcRDD.create, not new JdbcRDD.  Is that
>>>>>>> what you tried doing?
>>>>>>>
>>>>>>> On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg <
>>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks, Cody. Yes, I originally started off by looking at that but
>>>>>>>> I get a compile error if I try and use that approach: constructor 
>>>>>>>> JdbcRDD
>>>>>>>> in class JdbcRDD<T> cannot be applied to given types.  Not to mention 
>>>>>>>> that
>>>>>>>> JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last
>>>>>>>> argument).
>>>>>>>>
>>>>>>>> Wonder if it's a JDK version issue, I'm using 1.7.
>>>>>>>>
>>>>>>>> So I've got this, which doesn't compile
>>>>>>>>
>>>>>>>> JdbcRDD<Row> jdbcRDD = new JdbcRDD<Row>(
>>>>>>>> new SparkContext(conf),
>>>>>>>> new JdbcRDD.ConnectionFactory() {
>>>>>>>> public Connection getConnection() throws SQLException {
>>>>>>>> Connection conn = null;
>>>>>>>> try {
>>>>>>>> Class.forName(JDBC_DRIVER);
>>>>>>>> conn = DriverManager.getConnection(JDBC_URL, JDBC_USER,
>>>>>>>> JDBC_PASSWORD);
>>>>>>>> } catch (ClassNotFoundException ex) {
>>>>>>>> throw new RuntimeException("Error while loading JDBC driver.", ex);
>>>>>>>> }
>>>>>>>> return conn;
>>>>>>>> }
>>>>>>>> },
>>>>>>>> "SELECT * FROM EMPLOYEES",
>>>>>>>> 0L,
>>>>>>>> 1000L,
>>>>>>>> 10,
>>>>>>>> new Function<ResultSet, Row>() {
>>>>>>>> public Row call(ResultSet r) throws Exception {
>>>>>>>> return null; // have some actual logic here...
>>>>>>>> }
>>>>>>>> },
>>>>>>>> scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class));
>>>>>>>>
>>>>>>>> The other approach was mimicing the DbConnection class from this
>>>>>>>> post:
>>>>>>>> http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/.
>>>>>>>> It got around any of the compilation issues but then I got the runtime
>>>>>>>> error where Spark wouldn't recognize the db connection class as a
>>>>>>>> scala.Function0.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger <
>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>
>>>>>>>>> Take a look at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg <
>>>>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I'm reading data from a database using JdbcRDD, in Java, and I
>>>>>>>>>> have an
>>>>>>>>>> implementation of Function0<Connection> whose instance I supply
>>>>>>>>>> as the
>>>>>>>>>> 'getConnection' parameter into the JdbcRDD constructor. Compiles
>>>>>>>>>> fine.
>>>>>>>>>>
>>>>>>>>>> The definition of the class/function is as follows:
>>>>>>>>>>
>>>>>>>>>>   public class GetDbConnection extends
>>>>>>>>>> AbstractFunction0<Connection>
>>>>>>>>>> implements Serializable
>>>>>>>>>>
>>>>>>>>>> where scala.runtime.AbstractFunction0 extends scala.Function0.
>>>>>>>>>>
>>>>>>>>>> At runtime, I get an exception as below. Does anyone have an idea
>>>>>>>>>> as to how
>>>>>>>>>> to resolve this/work around it? Thanks.
>>>>>>>>>>
>>>>>>>>>> I'm running Spark 1.2.1 built for Hadoop 2.4.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Exception in thread "main" org.apache.spark.SparkException: Job
>>>>>>>>>> aborted due
>>>>>>>>>> to stage failure: Task 3 in stage 0.0 failed 1 times, most recent
>>>>>>>>>> failure:
>>>>>>>>>> Lost task 3.0 in stage 0.0 (TID 3, localhost):
>>>>>>>>>> java.lang.ClassCastException:
>>>>>>>>>> cannot assign instance of
>>>>>>>>>> com.kona.motivis.spark.proto.GetDbConnection to
>>>>>>>>>> field
>>>>>>>>>> org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection
>>>>>>>>>> of
>>>>>>>>>> type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>>>>>>         at
>>>>>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>>>>>>         at
>>>>>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>>>>>         at
>>>>>>>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
>>>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>         at java.lang.Thread.run(Thread.java:744)
>>>>>>>>>>
>>>>>>>>>> Driver stacktrace:
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org
>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>>>>>>>>>>         at scala.Option.foreach(Option.scala:236)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>>>>>>>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
>>>>>>>>>>         at
>>>>>>>>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>>>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>>>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>>>>>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> View this message in context:
>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/JdbcRDD-ClassCastException-with-scala-Function0-tp21707.html
>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>> Nabble.com.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to