Hi Josh,

That worked! Thank you so much! (I can't believe it was something so obvious 
;) )

If you care about such a thing you could answer my question here for bounty: 
http://stackoverflow.com/questions/30639659/apache-phoenix-4-3-1-and-4-4-0-hbase-0-98-on-spark-1-3-1-classnotfoundexceptio

Have a great day!

Cheers,
Jeroen

On Wednesday 10 June 2015 08:58:02 Josh Mahonin wrote:
> Hi Jeroen,
> 
> Rather than bundle the Phoenix client JAR with your app, are you able to
> include it in a static location either in the SPARK_CLASSPATH, or set the
> conf values below (I use SPARK_CLASSPATH myself, though it's deprecated):
> 
>   spark.driver.extraClassPath
>   spark.executor.extraClassPath
> 
> Josh
> 
> On Wed, Jun 10, 2015 at 4:11 AM, Jeroen Vlek <j.v...@anchormen.nl> wrote:
> > Hi Josh,
> > 
> > Thank you for your effort. Looking at your code, I feel that mine is
> > semantically the same, except written in Java. The dependencies in the
> > pom.xml
> > all have the scope provided. The job is submitted as follows:
> > 
> > $ rm spark.log && MASTER=spark://maprdemo:7077
> > /opt/mapr/spark/spark-1.3.1/bin/spark-submit-jars
> > /home/mapr/projects/customer/lib/spark-streaming-
> > 
> > kafka_2.10-1.3.1.jar,/home/mapr/projects/customer/lib/kafka_2.10-0.8.1.1.j
> > ar,/home/mapr/projects/customer/lib/zkclient-0.3.jar,/home/mapr/projects/c
> > ustomer/lib/metrics-
> > core-3.1.0.jar,/home/mapr/projects/customer/lib/metrics-
> > 
> > core-2.2.0.jar,lib/spark-sql_2.10-1.3.1.jar,/opt/mapr/phoenix/phoenix-4.4.
> > 0- HBase-0.98-bin/phoenix-4.4.0-HBase-0.98-client.jar --class
> > nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector
> > KafkaStreamConsumer.jar maprdemo:5181 0 topic jdbc:phoenix:maprdemo:5181
> > true
> > 
> > The spark-defaults.conf is reverted back to its defaults (i.e. no
> > userClassPathFirst). In the catch-block of the Phoenix connection buildup
> > the
> > class path is printed by recursively iterating over the class loaders. The
> > first one already prints the phoenix-client jar [1]. It's also very
> > unlikely to
> > be a bug in Spark or Phoenix, if your proof-of-concept just works.
> > 
> > So if the JAR that contains the offending class is known by the class
> > loader,
> > then that might indicate that there's a second JAR providing the same
> > class
> > but with a different version, right?
> > Yet, the only Phoenix JAR on the whole class path hierarchy is the
> > aforementioned phoenix-client JAR. Furthermore, I googled the class in
> > question, ClientRpcControllerFactory, and it really only exists in the
> > Phoenix
> > project. We're not talking about some low-level AOP Alliance stuff here ;)
> > 
> > Maybe I'm missing some fundamental class loading knowledge, in that case
> > I'd
> > be very happy to be enlightened. This all seems very strange.
> > 
> > Cheers,
> > Jeroen
> > 
> > [1]
> > [file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./spark-
> > streaming-kafka_2.10-1.3.1.jar,
> > 
> > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./kafka_2.1
> > 0-0.8.1.1.jar,
> > 
> > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./zkclient-
> > 0.3.jar,
> > 
> > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./phoenix-4
> > .4.0- HBase-0.98-client.jar,
> > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./spark-
> > sql_2.10-1.3.1.jar,
> > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./metrics-
> > core-3.1.0.jar,
> > 
> > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./KafkaStre
> > amConsumer.jar,
> > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./metrics-
> > core-2.2.0.jar]
> > 
> > On Tuesday, June 09, 2015 11:18:08 AM Josh Mahonin wrote:
> > > This may or may not be helpful for your classpath issues, but I wanted
> > > to
> > > verify that basic functionality worked, so I made a sample app here:
> > > 
> > > https://github.com/jmahonin/spark-streaming-phoenix
> > > 
> > > This consumes events off a Kafka topic using spark streaming, and writes
> > > out event counts to Phoenix using the new phoenix-spark functionality:
> > > http://phoenix.apache.org/phoenix_spark.html
> > > 
> > > It's definitely overkill, and would probably be more efficient to use
> > > the
> > > JDBC driver directly, but it serves as a proof-of-concept.
> > > 
> > > I've only tested this in local mode. To convert it to a full jobs JAR, I
> > > suspect that keeping all of the spark and phoenix dependencies marked as
> > > 'provided', and including the Phoenix client JAR in the Spark classpath
> > > would work as well.
> > > 
> > > Good luck,
> > > 
> > > Josh
> > > 
> > > On Tue, Jun 9, 2015 at 4:40 AM, Jeroen Vlek <j.v...@work.nl> wrote:
> > > > Hi,
> > > > 
> > > > I posted a question with regards to Phoenix and Spark Streaming on
> > > > StackOverflow [1]. Please find a copy of the question to this email
> > 
> > below
> > 
> > > > the
> > > > first stack trace. I also already contacted the Phoenix mailing list
> > 
> > and
> > 
> > > > tried
> > > > the suggestion of setting spark.driver.userClassPathFirst.
> > 
> > Unfortunately
> > 
> > > > that
> > > > only pushed me further into the dependency hell, which I tried to
> > 
> > resolve
> > 
> > > > until I hit a wall with an UnsatisfiedLinkError on Snappy.
> > > > 
> > > > What I am trying to achieve: To save a stream from Kafka into
> > > > Phoenix/Hbase
> > > > via Spark Streaming. I'm using MapR as a platform and the original
> > > > exception
> > > > happens both on a 3-node cluster, as on the MapR Sandbox (a VM for
> > > > experimentation), in YARN and stand-alone mode. Further
> > > > experimentation
> > > > (like
> > > > the saveAsNewHadoopApiFile below), was done only on the sandbox in
> > > > standalone
> > > > mode.
> > > > 
> > > > Phoenix only supports Spark from 4.4.0 onwards, but I thought I could
> > > > use a naive implementation that creates a new connection for
> > > > every RDD from the DStream in 4.3.1.  This resulted in the
> > > > ClassNotFoundException described in [1], so I switched to 4.4.0.
> > > > 
> > > > Unfortunately the saveToPhoenix method is only available in Scala. So
> > > > I
> > > > did
> > > > find the suggestion to try it via the saveAsNewHadoopApiFile method
> > > > [2]
> > > > and an
> > > > example implementation [3], which I adapted to my own needs.
> > > > 
> > > > However, 4.4.0 + saveAsNewHadoopApiFile  raises the same
> > > > 
> > > > ClassNotFoundExeption, just a slightly different stacktrace:
> > > >   java.lang.RuntimeException: java.sql.SQLException: ERROR 103
> > > > 
> > > > (08004): Unable to establish connection.
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOu
> > 
> > > > tputFormat.java:58)>
> > > > 
> > > >         at
> > 
> > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.s
> > 
> > > > cala:995)>
> > > > 
> > > >         at
> > 
> > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.s
> > 
> > > > cala:979)>
> > > > 
> > > >         at
> > > > 
> > > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> > > > 
> > > >         at org.apache.spark.scheduler.Task.run(Task.scala:64)
> > > >         at
> > > > 
> > > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> > > > 
> > > >         at
> > 
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:
> > > > 1145)>
> > > > 
> > > >         at
> > 
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java
> > 
> > > > :615)>
> > > > :
> > > >         at java.lang.Thread.run(Thread.java:745)
> > > > 
> > > > Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to
> > > > establish connection.
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLEx
> > 
> > > > ceptionCode.java:386)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionI
> > 
> > > > nfo.java:145)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(Connec
> > 
> > > > tionQueryServicesImpl.java:288)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(Connection
> > 
> > > > QueryServicesImpl.java:171)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQue
> > 
> > > > ryServicesImpl.java:1881)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQue
> > 
> > > > ryServicesImpl.java:1860)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor
> > 
> > > > .java:77)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryS
> > 
> > > > ervicesImpl.java:1860)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDr
> > 
> > > > iver.java:162)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDrive
> > 
> > > > r.java:131)>
> > > > 
> > > >         at
> > > > 
> > > > org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
> > > > 
> > > >         at
> > > > 
> > > > java.sql.DriverManager.getConnection(DriverManager.java:571)
> > > > 
> > > >         at
> > > > 
> > > > java.sql.DriverManager.getConnection(DriverManager.java:187)
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionU
> > 
> > > > til.java:92)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(Conne
> > 
> > > > ctionUtil.java:80)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(Conne
> > 
> > > > ctionUtil.java:68)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.mapreduce.PhoenixRecordWriter.<init>(PhoenixRecordWrite
> > 
> > > > r.java:49)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOu
> > 
> > > > tputFormat.java:55)>
> > > > 
> > > >         ... 8 more
> > > > 
> > > > Caused by: java.io.IOException:
> > > > java.lang.reflect.InvocationTargetException
> > > > 
> > > >         at
> > 
> > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec
> > 
> > > > tionManager.java:457)>
> > > > 
> > > >         at
> > 
> > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec
> > 
> > > > tionManager.java:350)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createC
> > 
> > > > onnection(HConnectionFactory.java:47)>
> > > > 
> > > >         at
> > 
> > org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(Connec
> > 
> > > > tionQueryServicesImpl.java:286)>
> > > > 
> > > >         ... 23 more
> > > > 
> > > > Caused by: java.lang.reflect.InvocationTargetException
> > > > 
> > > >         at
> > > > 
> > > > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > > > Method)
> > > > 
> > > >         at
> > 
> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAcc
> > 
> > > > essorImpl.java:57)>
> > > > 
> > > >         at
> > 
> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstr
> > 
> > > > uctorAccessorImpl.java:45)>
> > > > 
> > > >         at
> > > > 
> > > > java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> > > > 
> > > >         at
> > 
> > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec
> > 
> > > > tionManager.java:455)>
> > > > 
> > > >         ... 26 more
> > > > 
> > > > Caused by: java.lang.UnsupportedOperationException: Unable to find
> > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
> > > > 
> > > >         at
> > 
> > org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(Ref
> > 
> > > > lectionUtils.java:36)>
> > > > 
> > > >         at
> > 
> > org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcController
> > 
> > > > Factory.java:56)>
> > > > 
> > > >         at
> > 
> > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementatio
> > 
> > > > n.<init>(HConnectionManager.java:769)>
> > > > 
> > > >         at
> > 
> > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementatio
> > 
> > > > n.<init>(HConnectionManager.java:689)>
> > > > 
> > > >         ... 31 more
> > > > 
> > > > Caused by: java.lang.ClassNotFoundException:
> > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
> > > > 
> > > >         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > >         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > >         at java.security.AccessController.doPrivileged(Native Method)
> > > >         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > > >         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > > >         at
> > > > 
> > > > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> > > > 
> > > >         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > > >         at java.lang.Class.forName0(Native Method)
> > > >         at java.lang.Class.forName(Class.java:191)
> > > >         at
> > 
> > org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(Ref
> > 
> > > > lectionUtils.java:32)>
> > > > 
> > > >         ... 34 more
> > > > 
> > > > Driver stacktrace:
> > > >         at
> > > > 
> > > > org.apache.spark.scheduler.DAGScheduler.org
> > 
> > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGSched
> > 
> > > > uler.scala:1204)>
> > > > 
> > > >         at
> > 
> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSch
> > 
> > > > eduler.scala:1193)>
> > > > 
> > > >         at
> > 
> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSch
> > 
> > > > eduler.scala:1192)>
> > > > 
> > > >         at
> > 
> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala
> > 
> > > > :59)>
> > > > :
> > > >         at
> > > > 
> > > > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > > > 
> > > >         at
> > 
> > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192
> > 
> > > > )
> > > > 
> > > >         at
> > 
> > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.app
> > 
> > > > ly(DAGScheduler.scala:693)>
> > > > 
> > > >         at
> > 
> > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.app
> > 
> > > > ly(DAGScheduler.scala:693)>
> > > > 
> > > >         at scala.Option.foreach(Option.scala:236)
> > > >         at
> > 
> > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.s
> > 
> > > > cala:693)>
> > > > 
> > > >         at
> > 
> > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSched
> > 
> > > > uler.scala:1393)>
> > > > 
> > > >         at
> > 
> > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSched
> > 
> > > > uler.scala:1354)>
> > > > 
> > > >         at
> > > > 
> > > > org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> > > > 
> > > > 
> > > > ====== Below is my question from StackOverflow ==========
> > > > 
> > > > I'm trying to connect to Phoenix via Spark and I keep getting the
> > > > following exception when opening a connection via the JDBC driver (cut
> > > > 
> > > > for brevity, full stacktrace below):
> > > >     Caused by: java.lang.ClassNotFoundException:
> > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
> > > > 
> > > >             at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > >             at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > >             at java.security.AccessController.doPrivileged(Native
> > 
> > Method)
> > 
> > > > The class in question is provided by the jar called phoenix-
> > > > core-4.3.1.jar (despite it being in the HBase package namespace, I
> > > > guess they need it to integrate with HBase).
> > > > 
> > > > There are numerous questions on SO about ClassNotFoundExceptions
> > > > on Spark and I've tried the fat-jar approach (both with Maven's
> > > > assembly and shade plugins; I've inspected the jars, they **do**
> > > > contain ClientRpcControllerFactory), and I've tried a lean jar while
> > > > specifying the jars on the command line. For the latter, the command
> > > > 
> > > > I used is as follows:
> > > >     /opt/mapr/spark/spark-1.3.1/bin/spark-submit --jars lib/spark-
> > > > 
> > > > streaming-
> > 
> > kafka_10-1.3.1.jar,lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar,lib/met
> > 
> > > > rics-
> > 
> > core-3.1.0.jar,lib/metrics-core-2.2.0.jar,lib/phoenix-core-4.3.1.jar
> > 
> > > > -- class nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector
> > > > KafkaStreamConsumer.jar node1:5181 0 topic
> > > > jdbc:phoenix:node1:5181 true
> > > > 
> > > > I've also done a classpath dump from within the code and the first
> > > > 
> > > > classloader in the hierarchy already knows the Phoenix jar:
> > > >     2015-06-04 10:52:34,323 [Executor task launch worker-1] INFO
> > > > 
> > > > nl.work.kafkastreamconsumer.phoenix.LinePersister -
> > > > [file:/home/work/projects/customer/KafkaStreamConsumer.jar,
> > > > file:/home/work/projects/customer/lib/spark-streaming-
> > > > kafka_2.10-1.3.1.jar,
> > > > file:/home/work/projects/customer/lib/kafka_2.10-0.8.1.1.jar,
> > > > file:/home/work/projects/customer/lib/zkclient-0.3.jar,
> > > > file:/home/work/projects/customer/lib/metrics-core-3.1.0.jar,
> > > > file:/home/work/projects/customer/lib/metrics-core-2.2.0.jar,
> > > > file:/home/work/projects/customer/lib/phoenix-core-4.3.1.jar]
> > > > 
> > > > So the question is: What am I missing here? Why can't Spark load the
> > > > correct class? There should be only one version of the class flying
> > > > around (namely the one from phoenix-core), so I doubt it's a
> > > > versioning conflict.
> > > > 
> > > >     [Executor task launch worker-3] ERROR
> > > > 
> > > > nl.work.kafkastreamconsumer.phoenix.LinePersister - Error while
> > > > processing line
> > > > 
> > > >     java.lang.RuntimeException: java.sql.SQLException: ERROR 103
> > > > 
> > > > (08004): Unable to establish connection.
> > > > 
> > > >             at
> > 
> > nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnec
> > 
> > > > tion.java:41)>
> > > > 
> > > >             at
> > 
> > nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.jav
> > 
> > > > a:40)>
> > > > 
> > > >             at
> > 
> > nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.jav
> > 
> > > > a:32)>
> > > > 
> > > >             at
> > 
> > org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(Jav
> > 
> > > > aPairRDD.scala:999)>
> > > > 
> > > >             at
> > 
> > scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> > 
> > > >             at
> > 
> > scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > 
> > > >             at
> > > > 
> > > > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > > > 
> > > >             at scala.collection.generic.Growable$class.
> > > > 
> > > > $plus$plus$eq(Growable.scala:48)
> > > > 
> > > >             at scala.collection.mutable.ArrayBuffer.
> > > > 
> > > > $plus$plus$eq(ArrayBuffer.scala:103)
> > > > 
> > > >             at scala.collection.mutable.ArrayBuffer.
> > > > 
> > > > $plus$plus$eq(ArrayBuffer.scala:47)
> > > > 
> > > >             at
> > > > 
> > > > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> > > > 
> > > >             at scala.collection.AbstractIterator.to
> > 
> > (Iterator.scala:1157)
> > 
> > > >             at
> > 
> > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> > 
> > > >             at
> > > > 
> > > > scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> > > > 
> > > >             at
> > 
> > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> > 
> > > >             at
> > > > 
> > > > scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> > > > 
> > > >             at
> > > > 
> > > > org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> > > > 
> > > >             at
> > > > 
> > > > org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> > > > 
> > > >             at
> > 
> > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1
> > 
> > > > 498)>
> > > > 
> > > >             at
> > 
> > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1
> > 
> > > > 498)>
> > > > 
> > > >             at
> > > > 
> > > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> > > > 
> > > >             at org.apache.spark.scheduler.Task.run(Task.scala:64)
> > > >             at
> > > > 
> > > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> > > > 
> > > >             at
> > 
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:
> > > > 1145)>
> > > > 
> > > >             at
> > 
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java
> > 
> > > > :615)>
> > > > :
> > > >             at java.lang.Thread.run(Thread.java:745)
> > > >     
> > > >     Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to
> > > > 
> > > > establish connection.
> > > > 
> > > >             at
> > 
> > org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLEx
> > 
> > > > ceptionCode.java:362)>
> > > > 
> > > >             at
> > 
> > org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionI
> > 
> > > > nfo.java:133)>
> > > > 
> > > >             at
> > 
> > org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(Connec
> > 
> > > > tionQueryServicesImpl.java:282)>
> > > > 
> > > >             at
> > 
> > org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(Connection
> > 
> > > > QueryServicesImpl.java:166)>
> > > > 
> > > >             at
> > 
> > org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQue
> > 
> > > > ryServicesImpl.java:1831)>
> > > > 
> > > >             at
> > 
> > org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQue
> > 
> > > > ryServicesImpl.java:1810)>
> > > > 
> > > >             at
> > 
> > org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor
> > 
> > > > .java:77)>
> > > > 
> > > >             at
> > 
> > org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryS
> > 
> > > > ervicesImpl.java:1810)>
> > > > 
> > > >             at
> > 
> > org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDr
> > 
> > > > iver.java:162)>
> > > > 
> > > >             at
> > 
> > org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDrive
> > 
> > > > r.java:126)>
> > > > 
> > > >             at
> > > > 
> > > > org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
> > > > 
> > > >             at
> > > > 
> > > > java.sql.DriverManager.getConnection(DriverManager.java:571)
> > > > 
> > > >             at
> > > > 
> > > > java.sql.DriverManager.getConnection(DriverManager.java:233)
> > > > 
> > > >             at
> > 
> > nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnec
> > 
> > > > tion.java:39)>
> > > > 
> > > >             ... 25 more
> > > >     
> > > >     Caused by: java.io.IOException:
> > > > java.lang.reflect.InvocationTargetException
> > > > 
> > > >             at
> > 
> > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec
> > 
> > > > tionManager.java:457)>
> > > > 
> > > >             at
> > 
> > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec
> > 
> > > > tionManager.java:350)>
> > > > 
> > > >             at
> > 
> > org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createC
> > 
> > > > onnection(HConnectionFactory.java:47)>
> > > > 
> > > >             at
> > 
> > org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(Connec
> > 
> > > > tionQueryServicesImpl.java:280)>
> > > > 
> > > >             ... 36 more
> > > >     
> > > >     Caused by: java.lang.reflect.InvocationTargetException
> > > >     
> > > >             at
> > > > 
> > > > sun.reflect.GeneratedConstructorAccessor8.newInstance(Unknown
> > > > Source)
> > > > 
> > > >             at
> > 
> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstr
> > 
> > > > uctorAccessorImpl.java:45)>
> > > > 
> > > >             at
> > > > 
> > > > java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> > > > 
> > > >             at
> > 
> > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec
> > 
> > > > tionManager.java:455)>
> > > > 
> > > >             ... 39 more
> > > >     
> > > >     Caused by: java.lang.UnsupportedOperationException: Unable to
> > > > 
> > > > find
> > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
> > > > 
> > > >             at
> > 
> > org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(Ref
> > 
> > > > lectionUtils.java:36)>
> > > > 
> > > >             at
> > 
> > org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcController
> > 
> > > > Factory.java:56)>
> > > > 
> > > >             at
> > 
> > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementatio
> > 
> > > > n.<init>(HConnectionManager.java:769)>
> > > > 
> > > >             at
> > 
> > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementatio
> > 
> > > > n.<init>(HConnectionManager.java:689)>
> > > > 
> > > >             ... 43 more
> > > >     
> > > >     Caused by: java.lang.ClassNotFoundException:
> > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
> > > > 
> > > >             at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > >             at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > >             at java.security.AccessController.doPrivileged(Native
> > 
> > Method)
> > 
> > > >             at
> > > > 
> > > > java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > > > 
> > > >             at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > > >             at
> > > > 
> > > > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> > > > 
> > > >             at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > > >             at java.lang.Class.forName0(Native Method)
> > > >             at java.lang.Class.forName(Class.java:191)
> > > >             at
> > 
> > org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(Ref
> > 
> > > > lectionUtils.java:32)>
> > > > 
> > > >             ... 46 more
> > > > 
> > > > **/edit**
> > > > 
> > > > Unfortunately the issue remains with 4.4.0-HBase-0.98. Below are the
> > > > classes in question. Since the saveToPhoenix() method is not yet
> > > > available for the Java API and since this is just a POC, my idea was
> > > > to
> > > > simply use the JDBC driver for each mini-batch.
> > > > 
> > > >     public class PhoenixConnection implements AutoCloseable,
> > > > 
> > > > Serializable {
> > > > 
> > > >         private static final long serialVersionUID =
> > > > 
> > > > -4491057264383873689L;
> > > > 
> > > >         private static final String PHOENIX_DRIVER =
> > > > 
> > > > "org.apache.phoenix.jdbc.PhoenixDriver";
> > > > 
> > > >         static {
> > > >         
> > > >                 try {
> > > >                 
> > > >                         Class.forName(PHOENIX_DRIVER);
> > > >                 
> > > >                 } catch (ClassNotFoundException e) {
> > > >                 
> > > >                         throw new RuntimeException(e);
> > > >                 
> > > >                 }
> > > >         
> > > >         }
> > > >         
> > > >         private Connection connection;
> > > >         
> > > >         public PhoenixConnection(final String jdbcUri) {
> > > >         
> > > >                 try {
> > > >                 
> > > >                         connection =
> > 
> > DriverManager.getConnection(jdbcUri);
> > 
> > > >                 } catch (SQLException e) {
> > > >                 
> > > >                         throw new RuntimeException(e);
> > > >                 
> > > >                 }
> > > >         
> > > >         }
> > > >         
> > > >         public List<Map<String, Object>> executeQuery(final String
> > > >         sql)
> > > > 
> > > > throws SQLException {
> > > > 
> > > >                 ArrayList<Map<String, Object>> resultList = new
> > > > 
> > > > ArrayList<>();
> > > > 
> > > >                 try (PreparedStatement statement =
> > > > 
> > > > connection.prepareStatement(sql); ResultSet resultSet =
> > > > statement.executeQuery() ) {
> > > > 
> > > >                         ResultSetMetaData metaData =
> > > > 
> > > > resultSet.getMetaData();
> > > > 
> > > >                         while (resultSet.next()) {
> > > >                         
> > > >                                 Map<String, Object> row = new
> > > > 
> > > > HashMap<>(metaData.getColumnCount());
> > > > 
> > > >                                 for (int column = 0; column <
> > > > 
> > > > metaData.getColumnCount(); ++column) {
> > > > 
> > > >                                         final String columnLabel =
> > > > 
> > > > metaData.getColumnLabel(column);
> > > > 
> > > >                                         row.put(columnLabel,
> > > > 
> > > > resultSet.getObject(columnLabel));
> > > > 
> > > >                                 }
> > > >                         
> > > >                         }
> > > >                 
> > > >                 }
> > > >                 resultList.trimToSize();
> > > >                 
> > > >                 return resultList;
> > > >         
> > > >         }
> > > >         
> > > >         @Override
> > > >         public void close() {
> > > >         
> > > >                 try {
> > > >                 
> > > >                         connection.close();
> > > >                 
> > > >                 } catch (SQLException e) {
> > > >                 
> > > >                         throw new RuntimeException(e);
> > > >                 
> > > >                 }
> > > >         
> > > >         }
> > > >     
> > > >     }
> > > >     
> > > >     public class LinePersister implements Function<JavaRDD<String>,
> > > > 
> > > > Void> {
> > > > 
> > > >         private static final long serialVersionUID =
> > > > 
> > > > -2529724617108874989L;
> > > > 
> > > >         private static final Logger LOGGER =
> > > > 
> > > > Logger.getLogger(LinePersister.class);
> > > > 
> > > >         private static final String TABLE_NAME = "mail_events";
> > > >         
> > > >         private final String jdbcUrl;
> > > >         
> > > >         public LinePersister(String jdbcUrl) {
> > > >         
> > > >                 this.jdbcUrl = jdbcUrl;
> > > >         
> > > >         }
> > > >         
> > > >         
> > > >         
> > > >         @Override
> > > >         public Void call(JavaRDD<String> dataSet) throws Exception {
> > > >         
> > > >                 LOGGER.info(String.format(
> > > >                 
> > > >                                 "Starting conversion on rdd with %d
> > > > 
> > > > elements",
> > > > dataSet.count()));
> > > > 
> > > >                 List<Void> collectResult = dataSet.map(new
> > > > 
> > > > Function<String, Void>() {
> > > > 
> > > >                         private static final long serialVersionUID =
> > > > 
> > > > -6651313541439109868L;
> > > > 
> > > >                         @Override
> > > >                         public Void call(String line) throws Exception
> > 
> > {
> > 
> > > >                                 LOGGER.info("Writing line " + line);
> > > >                                 Event event =
> > 
> > EventParser.parseLine(line);
> > 
> > > >                                 try (PhoenixConnection connection =
> > > >                                 new
> > > > 
> > > > PhoenixConnection(
> > > > 
> > > >                                                 jdbcUrl)) {
> > > >                                         
> > > >                                         connection.executeQuery(event
> > > > 
> > > > .createUpsertStatement(TABLE_NAME));
> > > > 
> > > >                                 } catch (Exception e) {
> > > >                                 
> > > >                                         LOGGER.error("Error while
> > > > 
> > > > processing line",
> > > > e);
> > > > 
> > > > dumpClasspath(this.getClass().getClassLoader());
> > > > 
> > > >                                 }
> > > >                                 return null;
> > > >                         
> > > >                         }
> > > >                 
> > > >                 }).collect();
> > > >                 
> > > >                 LOGGER.info(String.format("Got %d results: ",
> > > > 
> > > > collectResult.size()));
> > > > 
> > > >                 return null;
> > > >         
> > > >         }
> > > >         
> > > >         public static void dumpClasspath(ClassLoader loader)
> > > >         {
> > > >         
> > > >             LOGGER.info("Classloader " + loader + ":");
> > > >             
> > > >             if (loader instanceof URLClassLoader)
> > > >             {
> > > >             
> > > >                 URLClassLoader ucl = (URLClassLoader)loader;
> > > >                 LOGGER.info(Arrays.toString(ucl.getURLs()));
> > > >             
> > > >             }
> > > >             else
> > > >             
> > > >                 LOGGER.error("cannot display components as not a
> > > > 
> > > > URLClassLoader)");
> > > > 
> > > >             if (loader.getParent() != null)
> > > >             
> > > >                 dumpClasspath(loader.getParent());
> > > >         
> > > >         }
> > > >     
> > > >     }
> > > >     
> > > >     <?xml version="1.0" encoding="UTF-8"?>
> > > >     <project xmlns="http://maven.apache.org/POM/4.0.0";
> > > > 
> > > > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
> > > > 
> > > >         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> > > > 
> > > > http://maven.apache.org/xsd/maven-4.0.0.xsd";>
> > > > 
> > > >         <modelVersion>4.0.0</modelVersion>
> > > >         <groupId>nl.work</groupId>
> > > >         <artifactId>KafkaStreamConsumer</artifactId>
> > > >         <version>1.0</version>
> > > >         <packaging>jar</packaging>
> > > >         <properties>
> > > > 
> > > > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
> > > > 
> > > >                 <maven.compiler.source>1.7</maven.compiler.source>
> > > >                 <maven.compiler.target>1.7</maven.compiler.target>
> > > >                 <spark.version>1.3.1</spark.version>
> > > >                 <hibernate.version>4.3.10.Final</hibernate.version>
> > > >                 <phoenix.version>4.4.0-HBase-0.98</phoenix.version>
> > > >                 <hbase.version>0.98.9-hadoop2</hbase.version>
> > > >                 <spark-hbase.version>0.0.2-clabs-spark-1.3.1</spark-
> > > > 
> > > > hbase.version>
> > > > 
> > > >         </properties>
> > > >         <dependencies>
> > > >         
> > > >                 <dependency>
> > > >                 
> > > >                         <groupId>org.apache.spark</groupId>
> > > >                         <artifactId>spark-core_2.10</artifactId>
> > > >                         <version>${spark.version}</version>
> > > >                         <scope>provided</scope>
> > > >                 
> > > >                 </dependency>
> > > >                 <dependency>
> > > >                 
> > > >                         <groupId>org.apache.spark</groupId>
> > > >                         <artifactId>spark-streaming_2.10</artifactId>
> > > >                         <version>${spark.version}</version>
> > > >                         <scope>provided</scope>
> > > >                 
> > > >                 </dependency>
> > > >                 <dependency>
> > > >                 
> > > >                         <groupId>org.apache.spark</groupId>
> >  
> >  <artifactId>spark-streaming-kafka_2.10</artifactId
> >  
> > > >                         <version>${spark.version}</version>
> > > >                         <scope>provided</scope>
> > > >                 
> > > >                 </dependency>
> > > >                 <dependency>
> > > >                 
> > > >                         <groupId>org.apache.phoenix</groupId>
> > > >                         <artifactId>phoenix-core</artifactId>
> > > >                         <version>${phoenix.version}</version>
> > > >                         <scope>provided</scope>
> > > >                 
> > > >                 </dependency>
> > > >                 <dependency>
> > > >                 
> > > >                         <groupId>org.apache.phoenix</groupId>
> > > >                         <artifactId>phoenix-spark</artifactId>
> > > >                         <version>${phoenix.version}</version>
> > > >                         <scope>provided</scope>
> > > >                 
> > > >                 </dependency>
> > > >                 <dependency>
> > > >                 
> > > >                         <groupId>org.apache.hbase</groupId>
> > > >                         <artifactId>hbase-client</artifactId>
> > > >                         <version>${hbase.version}</version>
> > > >                         <scope>provided</scope>
> > > >                 
> > > >                 </dependency>
> > > >                 <dependency>
> > > >                 
> > > >                         <groupId>com.cloudera</groupId>
> > > >                         <artifactId>spark-hbase</artifactId>
> > > >                         <version>${spark-hbase.version}</version>
> > > >                         <scope>provided</scope>
> > > >                 
> > > >                 </dependency>
> > > >                 <dependency>
> > > >                 
> > > >                         <groupId>junit</groupId>
> > > >                         <artifactId>junit</artifactId>
> > > >                         <version>4.10</version>
> > > >                         <scope>test</scope>
> > > >                 
> > > >                 </dependency>
> > > >         
> > > >         </dependencies>
> > > >         <build>
> > > >         
> > > >                 <plugins>
> > > >                 
> > > >                         <plugin>
> >  
> >  <groupId>org.apache.maven.plugins</groupId
> >  
> > > > <artifactId>maven-compiler-plugin</artifactId>
> > > > 
> > > >                                 <version>3.3</version>
> > > >                                 <configuration>
> > > > 
> > > > <source>${maven.compiler.source}</source>
> > > > 
> > > > <target>${maven.compiler.target}</target>
> > > > 
> > > >                                 </configuration>
> > > >                         
> > > >                         </plugin>
> > > >                         <!-- <plugin>
> > > > 
> > > > <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-
> > > > shade-plugin</artifactId>
> > > > 
> > > >                                 <version>2.3</version> <executions>
> > > > 
> > > > <execution> <phase>package</phase> <goals>
> > > > 
> > > >                                 <goal>shade</goal> </goals>
> > > >                                 <configuration>
> > > > 
> > > > <filters> <filter> <artifact>*:*</artifact>
> > > > 
> > > >                                 <excludes>
> > > >                                 <exclude>META-INF/*.SF</exclude>
> > > > 
> > > > <exclude>META-INF/*.DSA</exclude>
> > > > 
> > > >                                 <exclude>META-INF/*.RSA</exclude>
> > > > 
> > > > </excludes> </filter> </filters> </configuration>
> > > > 
> > > >                                 </execution> </executions> </plugin>
> > 
> > -->
> > 
> > > >                 </plugins>
> > > >         
> > > >         </build>
> > > >         <repositories>
> > > >         
> > > >                 <repository>
> > > >                 
> > > >                         <id>unknown-jars-temp-repo</id>
> > > >                         <name>A temporary repository created by
> > 
> > NetBeans
> > 
> > > > for libraries and jars it could not identify. Please replace the
> > > > dependencies in this repository with correct ones and delete this
> > > > repository.</name>
> > > > 
> > > >                         <url>file:${project.basedir}/lib</url>
> > > >                 
> > > >                 </repository>
> > > >         
> > > >         </repositories>
> > > >     
> > > >     </project>
> > > > 
> > > > Cheers,
> > > > Jeroen
> > > > 
> > > > 
> > > > [1]
> > 
> > http://stackoverflow.com/questions/30639659/apache-phoenix-4-3-1-and-4-4-0
> > 
> > > > -hbase-0-98-on-spark-1-3-1-classnotfoundexceptio [2]
> > > > https://groups.google.com/forum/#!topic/phoenix-hbase-user/pKnvE1pd_K8
> > > > [3]
> > 
> > https://gist.github.com/mravi/444afe7f49821819c987#file-phoenixsparkjob-ja
> > 
> > > > va
> > > > 
> > > > ---------------------------------------------------------------------
> > > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > > > For additional commands, e-mail: user-h...@spark.apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to