Hi Stephen I’m using emr-4.7.1 (Spark 1.6.1) EMR cluster to run pyspark through zeppelin. I’ve downloaded the spark-riak-connector Jar file (Version: 1.6.0 ( 16d483 | zip | jar ) / Date: 2016-09-07 / License: Apache-2.0 / Scala version: 2.10) from https://spark-packages.org/package/basho/spark-riak-connector onto the EMR master node.
My complete zeppelin script is as follows: %dep z.reset() z.load("/home/hadoop/spark-riak-connector_2.10-1.6.0.jar") %pyspark import riak, datetime, time, random host='172.31.00.00’ pb_port = '8087' hostAndPort = ":".join([host, pb_port]) client = riak.RiakClient(host=host, pb_port=pb_port) table=client.table("test") %pyspark site = 'AA' species = 'fff' start_date = int(time.time()) events = [] for i in range(9): measurementDate = start_date + i value = random.uniform(-20, 110) events.append([site, species, measurementDate, value]) end_date = measurementDate for e in events: print e testRDD = sc.parallelize(events) df = testRDD.toDF(['site', 'species','measurementDate','value']) df.show() %pyspark df.write \ .format('org.apache.spark.sql.riak') \ .option('spark.riak.connection.host', '172.31.41.86:8087') \ .mode('Append') \ .save('test') If i run pyspark from the masternode directly with –jars "/home/hadoop/spark-riak-connector_2.10-1.6.0.jar" it gives me the same error message as above. If i run pyspark from the masternode directly without the --jar file argument it will give the error as below (same in zeppelin), so it seems the package is found. py4j.protocol.Py4JJavaError: An error occurred while calling o61.save. : java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.riak. Please find packages at http://spark-packages.org at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:77) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:219) ... Thanks for the help! Kind Regards, Joris van Agtmaal +(0) 6 25 39 39 06 From: Stephen Etheridge [mailto:setheri...@basho.com] Sent: 13 September 2016 14:45 To: Agtmaal, Joris van <joris.vanagtm...@wartsila.com> Cc: riak-users@lists.basho.com; Manu Marchal <emarc...@basho.com> Subject: Re: RIAK TS installed nodes not connecting Hi Joris, I have looked at the tutorial you have been following but I confess I am confused. In the example you are following I do not see where the spark and sql contexts are created. I use PySpark through the Jupyter notebook and I have to specify a path to the connector on invoking the jupyter notebook. Is it possible for you to share all your code (and how you are invoking zeppelin) with me so I can trace everything through? regards Stephen On Mon, Sep 12, 2016 at 3:27 PM, Agtmaal, Joris van <joris.vanagtm...@wartsila.com<mailto:joris.vanagtm...@wartsila.com>> wrote: Hi I’m new to Riak and followed the installation instructions to get it working on an AWS cluster (3 nodes). So far ive been able to use Riak in pyspark (zeppelin) to create/read/write tables, but i would like to use the dataframes directly from spark, using the Spark-Riak Connector. When following the example found here: http://docs.basho.com/riak/ts/1.4.0/add-ons/spark-riak-connector/quick-start/#python But i run into trouble on this last part: host= my_ip_adress_of_riak_node pb_port = '8087' hostAndPort = ":".join([host, pb_port]) client = riak.RiakClient(host=host, pb_port=pb_port) df.write \ .format('org.apache.spark.sql.riak') \ .option('spark.riak.connection.host', hostAndPort) \ .mode('Append') \ .save('test') Important to note that i’m using a local download of the Jar file that is loaded into the pyspark interpreter in zeppeling through: %dep z.reset() z.load("/home/hadoop/spark-riak-connector_2.10-1.6.0.jar") Here is the error message i get back: Py4JJavaError: An error occurred while calling o569.save. : java.lang.NoClassDefFoundError: com/basho/riak/client/core/util/HostAndPort at com.basho.riak.spark.rdd.connector.RiakConnectorConf$.apply(RiakConnectorConf.scala:76) at com.basho.riak.spark.rdd.connector.RiakConnectorConf$.apply(RiakConnectorConf.scala:89) at org.apache.spark.sql.riak.RiakRelation$.apply(RiakRelation.scala:115) at org.apache.spark.sql.riak.DefaultSource.createRelation(DefaultSource.scala:51) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745) (<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError(u'An error occurred while calling o569.save.\n', JavaObject id=o570), <traceback object at 0x7f7021bb0200>) Hope somebody can help out. thanks, joris _______________________________________________ riak-users mailing list riak-users@lists.basho.com<mailto:riak-users@lists.basho.com> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com -- { "name" : "Stephen Etheridge", "title" : "Solution Architect, EMEA", "Organisation" : "Basho Technologies, Inc", "Telephone" : "07814 406662", "email" : "mailto:setheri...@basho.com<mailto:setheri...@basho.com>", "github" : "http://github.com/datalemming", "twitter" : "@datalemming"}
_______________________________________________ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com