Re: Supporting Hive features in Spark SQL Thrift JDBC server
Hello Shahab, I think CassandraAwareHiveContext https://github.com/tuplejump/calliope/blob/develop/sql/hive/src/main/scala/org/apache/spark/sql/hive/CassandraAwareHiveContext.scala in Calliopee is what you are looking for. Create CAHC instance and you should be able to run hive functions against the SchemaRDD you create from there. Cheers, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Tue, Mar 3, 2015 at 6:03 AM, Cheng, Hao hao.ch...@intel.com wrote: The temp table in metastore can not be shared cross SQLContext instances, since HiveContext is a sub class of SQLContext (inherits all of its functionality), why not using a single HiveContext globally? Is there any specific requirement in your case that you need multiple SQLContext/HiveContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 9:46 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server You are right , CassandraAwareSQLContext is subclass of SQL context. But I did another experiment, I queried Cassandra using CassandraAwareSQLContext, then I registered the rdd as a temp table , next I tried to query it using HiveContext, but it seems that hive context can not see the registered table suing SQL context. Is this a normal case? best, /Shahab On Tue, Mar 3, 2015 at 1:35 PM, Cheng, Hao hao.ch...@intel.com wrote: Hive UDF are only applicable for HiveContext and its subclass instance, is the CassandraAwareSQLContext a direct sub class of HiveContext or SQLContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 5:10 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server val sc: SparkContext = new SparkContext(conf) val sqlCassContext = new CassandraAwareSQLContext(sc) // I used some Calliope Cassandra Spark connector val rdd : SchemaRDD = sqlCassContext.sql(select * from db.profile ) rdd.cache rdd.registerTempTable(profile) rdd.first //enforce caching val q = select from_unixtime(floor(createdAt/1000)) from profile where sampling_bucket=0 val rdd2 = rdd.sqlContext.sql(q ) println (Result: + rdd2.first) And I get the following errors: xception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'from_unixtime('floor(('createdAt / 1000))) AS c0#7, tree: Project ['from_unixtime('floor(('createdAt / 1000))) AS c0#7] Filter (sampling_bucket#10 = 0) Subquery profile Project [company#8,bucket#9,sampling_bucket#10,profileid#11,createdat#12L,modifiedat#13L,version#14] CassandraRelation localhost, 9042, 9160, normaldb_sampling, profile, org.apache.spark.sql.CassandraAwareSQLContext@778b692d, None, None, false, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at
Re: Supporting Hive features in Spark SQL Thrift JDBC server
The Hive dependency comes from spark-hive. It does work with Spark 1.1 we will have the 1.2 release later this month. On Mar 3, 2015 8:49 AM, shahab shahab.mok...@gmail.com wrote: Thanks Rohit, I am already using Calliope and quite happy with it, well done ! except the fact that : 1- It seems that it does not support Hive 0.12 or higher, Am i right? for example you can not use : current_time() UDF, or those new UDFs added in hive 0.12 . Are they supported? Any plan for supporting them? 2-It does not support Spark 1.1 and 1.2. Any plan for new release? best, /Shahab On Tue, Mar 3, 2015 at 5:41 PM, Rohit Rai ro...@tuplejump.com wrote: Hello Shahab, I think CassandraAwareHiveContext https://github.com/tuplejump/calliope/blob/develop/sql/hive/src/main/scala/org/apache/spark/sql/hive/CassandraAwareHiveContext.scala in Calliopee is what you are looking for. Create CAHC instance and you should be able to run hive functions against the SchemaRDD you create from there. Cheers, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Tue, Mar 3, 2015 at 6:03 AM, Cheng, Hao hao.ch...@intel.com wrote: The temp table in metastore can not be shared cross SQLContext instances, since HiveContext is a sub class of SQLContext (inherits all of its functionality), why not using a single HiveContext globally? Is there any specific requirement in your case that you need multiple SQLContext/HiveContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 9:46 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server You are right , CassandraAwareSQLContext is subclass of SQL context. But I did another experiment, I queried Cassandra using CassandraAwareSQLContext, then I registered the rdd as a temp table , next I tried to query it using HiveContext, but it seems that hive context can not see the registered table suing SQL context. Is this a normal case? best, /Shahab On Tue, Mar 3, 2015 at 1:35 PM, Cheng, Hao hao.ch...@intel.com wrote: Hive UDF are only applicable for HiveContext and its subclass instance, is the CassandraAwareSQLContext a direct sub class of HiveContext or SQLContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 5:10 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server val sc: SparkContext = new SparkContext(conf) val sqlCassContext = new CassandraAwareSQLContext(sc) // I used some Calliope Cassandra Spark connector val rdd : SchemaRDD = sqlCassContext.sql(select * from db.profile ) rdd.cache rdd.registerTempTable(profile) rdd.first //enforce caching val q = select from_unixtime(floor(createdAt/1000)) from profile where sampling_bucket=0 val rdd2 = rdd.sqlContext.sql(q ) println (Result: + rdd2.first) And I get the following errors: xception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'from_unixtime('floor(('createdAt / 1000))) AS c0#7, tree: Project ['from_unixtime('floor(('createdAt / 1000))) AS c0#7] Filter (sampling_bucket#10 = 0) Subquery profile Project [company#8,bucket#9,sampling_bucket#10,profileid#11,createdat#12L,modifiedat#13L,version#14] CassandraRelation localhost, 9042, 9160, normaldb_sampling, profile, org.apache.spark.sql.CassandraAwareSQLContext@778b692d, None, None, false, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala
Re: [ANN] SparkSQL support for Cassandra with Calliope
Hi Tian, We have published a build against Hadoop 2.0 with version *1.1.0-CTP-U2-H2* Let us know how your testing goes. Regards, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Sat, Oct 4, 2014 at 3:49 AM, tian zhang tzhang...@yahoo.com wrote: Hi, Rohit, Thank you for sharing this good news. I have some relevant issue that I would like to ask your help. I am using spark 1.1.0 and I have a spark application using com.tuplejump % calliope-core_2.10 % 1.1.0-CTP-U2, At runtime there are following errors that seem indicate that calliope package is compiled with hadoop 1.x and spark is running on hadoop 2.x. Can you release a new version of calliope so that it will be compatible with spark 1.1.0? Thanks. here is the error details. java.lang.IncompatibleClassChangeError: Found interface (hadoop 2.x) org.apache.hadoop.mapreduce.TaskAttemptContext, but class (hadoop 1.x) was expected com.tuplejump.calliope.hadoop.cql3.CqlRecordReader.initialize(CqlRecordReader.java:82) Tian On Friday, October 3, 2014 11:15 AM, Rohit Rai ro...@tuplejump.com wrote: Hi All, An year ago we started this journey and laid the path for Spark + Cassandra stack. We established the ground work and direction for Spark Cassandra connectors and we have been happy seeing the results. With Spark 1.1.0 and SparkSQL release, we its time to take Calliope http://tuplejump.github.io/calliope/ to the logical next level also paving the way for much more advanced functionality to come. Yesterday we released Calliope 1.1.0 Community Tech Preview https://twitter.com/tuplejump/status/517739186124627968, which brings Native SparkSQL support for Cassandra. The further details are available here http://tuplejump.github.io/calliope/tech-preview.html. This release showcases in core spark-sql http://tuplejump.github.io/calliope/start-with-sql.html, hiveql http://tuplejump.github.io/calliope/start-with-hive.html and HiveThriftServer http://tuplejump.github.io/calliope/calliope-server.html support. I differentiate it as native spark-sql integration as it doesn't rely on Cassandra's hive connectors (like Cash or DSE) and saves a level of indirection through Hive. It also allows us to harness Spark's analyzer and optimizer in future to work out the best execution plan targeting a balance between Cassandra's querying restrictions and Sparks in memory processing. As far as we know this it the first and only third party datastore connector for SparkSQL. This is a CTP release as it relies on Spark internals that still don't have/stabilized a developer API and we will work with the Spark Community in documenting the requirements and working towards a standard and stable API for third party data store integration. On another note, we no longer require you to signup to access the early access code repository. Inviting all of you try it and give us your valuable feedback. Regards, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform*
[ANN] SparkSQL support for Cassandra with Calliope
Hi All, An year ago we started this journey and laid the path for Spark + Cassandra stack. We established the ground work and direction for Spark Cassandra connectors and we have been happy seeing the results. With Spark 1.1.0 and SparkSQL release, we its time to take Calliope http://tuplejump.github.io/calliope/ to the logical next level also paving the way for much more advanced functionality to come. Yesterday we released Calliope 1.1.0 Community Tech Preview https://twitter.com/tuplejump/status/517739186124627968, which brings Native SparkSQL support for Cassandra. The further details are available here http://tuplejump.github.io/calliope/tech-preview.html. This release showcases in core spark-sql http://tuplejump.github.io/calliope/start-with-sql.html, hiveql http://tuplejump.github.io/calliope/start-with-hive.html and HiveThriftServer http://tuplejump.github.io/calliope/calliope-server.html support. I differentiate it as native spark-sql integration as it doesn't rely on Cassandra's hive connectors (like Cash or DSE) and saves a level of indirection through Hive. It also allows us to harness Spark's analyzer and optimizer in future to work out the best execution plan targeting a balance between Cassandra's querying restrictions and Sparks in memory processing. As far as we know this it the first and only third party datastore connector for SparkSQL. This is a CTP release as it relies on Spark internals that still don't have/stabilized a developer API and we will work with the Spark Community in documenting the requirements and working towards a standard and stable API for third party data store integration. On another note, we no longer require you to signup to access the early access code repository. Inviting all of you try it and give us your valuable feedback. Regards, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform*
Re: spark streaming actor receiver doesn't play well with kryoserializer
Alan/TD, We are facing the problem in a project going to production. Was there any progress on this? Are we able to confirm that this is a bug/limitation in the current streaming code? Or there is anything wrong in user scope? Regards, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai a...@opsclarity.com wrote: The stack trace was from running the Actor count sample directly, without a spark cluster, so I guess the logs would be from both? I enabled more logging and got this stack trace 4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan 14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(alan) 14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie is: off 14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started 14/07/25 17:55:27 [INFO] Remoting: Starting remoting 14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on addresses :[akka.tcp://spark@leungshwingchun:52156] 14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [ akka.tcp://spark@leungshwingchun:52156] 14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker 14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster 14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/' 14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/spark-local-20140725175527-32f2 14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity 297.0 MB. 14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157 with id = ConnectionManagerId(leungshwingchun,52157) 14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register BlockManager 14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager leungshwingchun:52157 with 297.0 MB RAM 14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security 14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at http://192.168.1.233:52158 14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security 14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at: http://192.168.1.233:52159 14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at http://leungshwingchun:4040 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of successful kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops) 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of failed kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops) 14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group related metrics 2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from SCDynamicStore 14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not found, setting default realm to empty 14/07/25 17:55:27 [DEBUG] Groups: Creating new Groups object 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the custom-built native-hadoop library... 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path= 14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling back to shell based 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping 14/07/25 17:55:27 [DEBUG] Groups: Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=30 14/07/25 17:55:28 [INFO] SparkContext: Added JAR file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar at
Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.
Hi Gerard, This was on my todos since long... i just published a Calliope snapshot built against Hadoop 2.2.x, Take it for a spin if you get a chance - You can get the jars from here - - https://oss.sonatype.org/service/local/repositories/snapshots/content/com/tuplejump/calliope_2.10/0.9.4-H2-SNAPSHOT/calliope_2.10-0.9.4-H2-SNAPSHOT.jar - https://oss.sonatype.org/service/local/repositories/snapshots/content/com/tuplejump/calliope-macros_2.10/0.9.4-H2-SNAPSHOT/calliope-macros_2.10-0.9.4-H2-SNAPSHOT.jar Or to use from Maven - dependency groupIdcom.tuplejump/groupId artifactIdcalliope_2.10/artifactId version0.9.4-H2-SNAPSHOT/version/dependency and SBT - libraryDependencies += com.tuplejump %% calliope_2.10 % 0.9.4-H2-SNAPSHOT It passes all the tests so I am assuming all is fine, but we haven't tested it very extensively. Regards, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Fri, Jun 27, 2014 at 9:31 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi Rohit, Thanks for your message. We are currently on Spark 0.9.1, Cassandra 2.0.6 and Calliope GA (Would love to try the pre-release version if you want beta testers :-) Our hadoop version is CDH4.4 and of course our spark assembly is compiled against it. We have got really interesting performance results from using Calliope and will probably try to compile it against Hadoop 2. Compared to the DataStax Java driver, out of the box, the Calliope lib gives us ~4.5x insert performance with a higher network and cpu usage (which is what we want in batch insert mode = fast) With additional code optimizations using the DataStax driver, we were able to reduce that gap to 2x but still Calliope was easier and faster to use. Will you be attending the Spark Summit? I'll be around. We'll be in touch in any case :-) -kr, Gerard. On Thu, Jun 26, 2014 at 11:03 AM, Rohit Rai ro...@tuplejump.com wrote: Hi Gerard, What is the version of Spark, Hadoop, Cassandra and Calliope are you using. We never built Calliope to Hadoop2 as we/or our clients don't use Hadoop in their deployments or use it only as the Infra component for Spark in which case H1/H2 doesn't make a difference for them. I know atleast of one case where the user had built Calliope against 2.0 and was using it happily. If you need assistance with it we are here to help. Feel free to reach out to me directly and we can work out a solution for you. Regards, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Thu, Jun 26, 2014 at 12:44 AM, Gerard Maas gerard.m...@gmail.com wrote: Thanks Nick. We used the CassandraOutputFormat through Calliope. The Calliope API makes the CassandraOutputFormat quite accessible and is cool to work with. It worked fine at prototype level, but we had Hadoop version conflicts when we put it in our Spark environment (Using our Spark assembly compiled with CDH4.4). The conflict seems to be at the Cassandra-all lib level, which is compiled against a different hadoop version (v1). We could not get round that issue. (Any pointers in that direction?) That's why I'm trying the direct CQLSSTableWriter way but it looks blocked as well. -kr, Gerard. On Wed, Jun 25, 2014 at 8:57 PM, Nick Pentreath nick.pentre...@gmail.com wrote: can you not use a Cassandra OutputFormat? Seems they have BulkOutputFormat. An example of using it with Hadoop is here: http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html Using it with Spark will be similar to the examples: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala and https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, (My excuses for the cross-post from SO) I'm trying to create Cassandra SSTables from the results of a batch computation in Spark. Ideally, each partition should create the SSTable for the data it holds in order to parallelize the process as much as possible (and probably even stream it to the Cassandra ring as well) After the initial hurdles with the CQLSSTableWriter (like requiring the yaml file), I'm confronted now with this issue: java.lang.RuntimeException: Attempting to load already loaded column family customer.rawts at org.apache.cassandra.config.Schema.load(Schema.java:347) at org.apache.cassandra.config.Schema.load(Schema.java:112) at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336) I'm creating a writer on each parallel partition like this: def store(rdd:RDD[Message]) = { rdd.foreachPartition( msgIterator = { val writer = CQLSSTableWriter.builder
Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.
Hi Gerard, What is the version of Spark, Hadoop, Cassandra and Calliope are you using. We never built Calliope to Hadoop2 as we/or our clients don't use Hadoop in their deployments or use it only as the Infra component for Spark in which case H1/H2 doesn't make a difference for them. I know atleast of one case where the user had built Calliope against 2.0 and was using it happily. If you need assistance with it we are here to help. Feel free to reach out to me directly and we can work out a solution for you. Regards, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Thu, Jun 26, 2014 at 12:44 AM, Gerard Maas gerard.m...@gmail.com wrote: Thanks Nick. We used the CassandraOutputFormat through Calliope. The Calliope API makes the CassandraOutputFormat quite accessible and is cool to work with. It worked fine at prototype level, but we had Hadoop version conflicts when we put it in our Spark environment (Using our Spark assembly compiled with CDH4.4). The conflict seems to be at the Cassandra-all lib level, which is compiled against a different hadoop version (v1). We could not get round that issue. (Any pointers in that direction?) That's why I'm trying the direct CQLSSTableWriter way but it looks blocked as well. -kr, Gerard. On Wed, Jun 25, 2014 at 8:57 PM, Nick Pentreath nick.pentre...@gmail.com wrote: can you not use a Cassandra OutputFormat? Seems they have BulkOutputFormat. An example of using it with Hadoop is here: http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html Using it with Spark will be similar to the examples: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala and https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, (My excuses for the cross-post from SO) I'm trying to create Cassandra SSTables from the results of a batch computation in Spark. Ideally, each partition should create the SSTable for the data it holds in order to parallelize the process as much as possible (and probably even stream it to the Cassandra ring as well) After the initial hurdles with the CQLSSTableWriter (like requiring the yaml file), I'm confronted now with this issue: java.lang.RuntimeException: Attempting to load already loaded column family customer.rawts at org.apache.cassandra.config.Schema.load(Schema.java:347) at org.apache.cassandra.config.Schema.load(Schema.java:112) at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336) I'm creating a writer on each parallel partition like this: def store(rdd:RDD[Message]) = { rdd.foreachPartition( msgIterator = { val writer = CQLSSTableWriter.builder() .inDirectory(/tmp/cass) .forTable(schema) .using(insertSttmt).build() msgIterator.foreach(msg = {...}) })} And if I'm reading the exception correctly, I can only create one writer per table in one JVM. Digging a bit further in the code, it looks like the Schema.load(...) singleton enforces that limitation. I guess writings to the writer will not be thread-safe and even if they were the contention that multiple threads will create by having all parallel tasks trying to dump few GB of data to disk at the same time will defeat the purpose of using the SSTables for bulk upload anyway. So, are there ways to use the CQLSSTableWriter concurrently? If not, what is the next best option to load batch data at high throughput in Cassandra? Will the upcoming Spark-Cassandra integration help with this? (ie. should I just sit back, relax and the problem will solve itself?) Thanks, Gerard.
Re: writing booleans w Calliope
Hello Adrian, Calliope relies on transformers to convert from a given type to ByteBuffer which is the format that is required by Cassandra. RichByteBuffer's incompleteness is at fault here. We are working on increasing the types we support out of the box, and will support all types supported in C* in the next release. In the meanwhile all that is needed is to add implicit transformers for your type. In this case A transformer from Boolean - ByteBuffer is missing. Cassandra stores boolean as a Byte, So you will need to add these 2 lines to your code - implicit def ByteBuffer2Boolean(buffer: ByteBuffer): Boolean = buffer.get() == 1 implicit def Boolean2ByteBuffer(bool: Boolean): ByteBuffer = ByteBuffer.wrap(if(bool) Array(1.toByte) else Array(0.toByte)) Cheers, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Fri, Apr 18, 2014 at 1:41 AM, Adrian Mocanu amoc...@verticalscope.comwrote: Has anyone managed to write Booleans to Cassandra from an RDD with Calliope? My Booleans give compile time errors: expression of type List[Any] does not conform to expected type Types.CQLRowValues CQLColumnValue is definted as ByteBuffer: type CQLColumnValue = ByteBuffer For now I convert them to string. I tried converting them to bytes but that compiled but gave me a runtime error since scala byte is not compatible with Cassandra Boolean. -Adrian
Re: Calliope Frame size larger than max length
Hello Eric, This happens when the data being fetched from Cassandra in single split is greater than the maximum framesize allowed in thrift (yes it still uses thrift underneath, until the next release when we will start using Native CQL). Generally, we do set the the Cassandra the framesize in Cassandra when using it with Spark/Hadoop to 32MB or larger depending on our data model and row size. If you don't want to touch the Cassandra configuration you will have to reduce the page size in use. The default here is 1000 CQL rows. By the sizes mentioned in error message (20MB vs 15MB) I would suggest setting the page size to 700 or lesser. This can be done by using pageSize method in CasBuilder. cqlCas.pageSize(700) Cheers, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Sat, Apr 19, 2014 at 3:02 AM, ericjohnston1989 ericjohnston1...@gmail.com wrote: Hey all, I'm working with Calliope to run jobs on a Cassandra cluster in standalone mode. On some larger jobs I run into the following error: java.lang.RuntimeException: Frame size (20667866) larger than max length (15728640)! at org.apache.cassandra.hadoop.cql3.CqlPagingRecordReader$RowIterator.executeQuery(CqlPagingRecordReader.java:665) at org.apache.cassandra.hadoop.cql3.CqlPagingRecordReader$RowIterator.computeNext(CqlPagingRecordReader.java:322) at org.apache.cassandra.hadoop.cql3.CqlPagingRecordReader$RowIterator.computeNext(CqlPagingRecordReader.java:289) at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143) at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138) at org.apache.cassandra.hadoop.cql3.CqlPagingRecordReader.nextKeyValue(CqlPagingRecordReader.java:205) at com.tuplejump.calliope.cql3.Cql3CassandraRDD$$anon$1.hasNext(Cql3CassandraRDD.scala:73) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:724) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:720) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:45) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:45) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) The max frame size (15728640) is 15mb, which is the default frame size Cassandra uses. Has anyone seen this before? Are there common workarounds? Also, I'd much rather not have to poke around changing Cassandra settings, but I can change spark settings as much as I like. My program itself is extremely simple since I'm testing. I'm just using count() on the RDD I created with casbuilder. Thanks, Eric -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Calliope-Frame-size-larger-than-max-length-tp4469.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Announcing Spark SQL
Thanks Patrick, I was thinking about that... Upon analysis I realized (on date) it would be something similar to the way Hive Context using CustomCatalog stuff. I will review it again, on the lines of implementing SchemaRDD with Cassandra. Thanks for the pointer. Upon discussion with couple of our clients, it seems the reason they would prefer using hive is that they have already invested a lot in it. Mostly in UDFs and HiveQL. 1. Are there any plans to develop the SQL Parser to handdle more complex queries like HiveQL? Can we just plugin a custom parser instead of bringing in the whole hive deps? 2. Is there any way we can support UDFs in Catalyst without using Hive? It will bee fine if we don't support Hive UDFs as is and need minor porting effort. Regards, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Fri, Mar 28, 2014 at 12:48 AM, Patrick Wendell pwend...@gmail.comwrote: Hey Rohit, I think external tables based on Cassandra or other datastores will work out-of-the box if you build Catalyst with Hive support. Michael may have feelings about this but I'd guess the longer term design for having schema support for Cassandra/HBase etc likely wouldn't rely on hive external tables because it's an unnecessary layer of indirection. Spark should be able to directly load an SchemaRDD from Cassandra by just letting the user give relevant information about the Cassandra schema. And it should let you write-back to Cassandra by giving a mapping of fields to the respective cassandra columns. I think all of this would be fairly easy to implement on SchemaRDD and likely will make it into Spark 1.1 - Patrick On Wed, Mar 26, 2014 at 10:59 PM, Rohit Rai ro...@tuplejump.com wrote: Great work guys! Have been looking forward to this . . . In the blog it mentions support for reading from Hbase/Avro... What will be the recommended approach for this? Will it be writing custom wrappers for SQLContext like in HiveContext or using Hive's EXTERNAL TABLE support? I ask this because a few days back (based on your pull request in github) I started analyzing what it would take to support Spark SQL on Cassandra. One obvious approach will be to use Hive External Table support with our cassandra-hive handler. But second approach sounds tempting as it will give more fidelity. Regards, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Thu, Mar 27, 2014 at 9:12 AM, Michael Armbrust mich...@databricks.com wrote: Any plans to make the SQL typesafe using something like Slick ( http://slick.typesafe.com/) I would really like to do something like that, and maybe we will in a couple of months. However, in the near term, I think the top priorities are going to be performance and stability. Michael
Re: [BLOG] Spark on Cassandra w/ Calliope
We are happy that you found Calliope useful and glad we could help. *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Sat, Mar 8, 2014 at 2:18 AM, Brian O'Neill b...@alumni.brown.edu wrote: FWIW - I posted some notes to help people get started quickly with Spark on C*. http://brianoneill.blogspot.com/2014/03/spark-on-cassandra-w-calliope.html (tnx again to Rohit and team for all of their help) -brian -- Brian ONeill CTO, Health Market Science (http://healthmarketscience.com) mobile:215.588.6024 blog: http://brianoneill.blogspot.com/ twitter: @boneill42
Re: [incubating-0.9.0] Too Many Open Files on Workers
Hello Andy, This is a problem we have seen in using the CQL Java driver under heavy ready loads where it is using NIO and is waiting on many pending responses which causes to many open sockets and hence too many open files. Are you by any chance using async queries? I am the maintainer of Calliope... Feel free to mail me directly on any issues/queries you have working with Calliope, will be glad to assist. Cheers, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Fri, Feb 21, 2014 at 3:34 PM, andy petrella andy.petre...@gmail.comwrote: MMMmmmh good point !! Before answering, I tried to use callioppe but I got an issue and since the iteration review was near I quickly switched to the datastax driver. But I'll get to callioppe soon, with some questions maybe ;-). Regarding your point (very good one, I've to say), actually I'm creating a session and a batch per partitions. Now the shamy part... I haven't set any options for the pool :-/. Is there some tuning clues? In my case the C* is local (docker image) so maybe should i do builder.poolingOptions().setMaxConnectionsPerHost(LOCAL, BIGNUMBER)? The point is, what about this BIGNUMBER... can it be really big? (Sounds weird to me, but I don't want to pre-filter options based on feelings). Thanks for your response andy On Fri, Feb 21, 2014 at 10:36 AM, Sourav Chandra sourav.chan...@livestream.com wrote: From stacktrace it looks like you are using datstax cassandra driver and it tried to create cluster. How many connections you are creating in poolingOptions() i.e. builder. poolingOptions().setMaxConnectionsPerHost(...) Are you creating this per rdd? Might be there are lots of connections created and at last it failed to create any more. Thanks, Sourav On Fri, Feb 21, 2014 at 3:02 PM, andy petrella andy.petre...@gmail.comwrote: Hey guyz, I've got this issue (see bottom) with Spark, deployed in Standalone mode on a local docker environment. I know that I need to raise the ulimit (only 1024 now) but in the meantime I was just wondering how this could happen. My gut feeling is because I'm mounting a lot in memory and Spark tries to dump some RDDs on the FS, and then boom. Also, I was wondering if it cannot be a clue that my job is maybe to eager in memory? How is it something quite normal which such a low ulimit on workers? Thanks a lot (in advance ^^) Cheers, andy 14/02/21 08:32:15 ERROR Executor: Exception in task ID 472 org.jboss.netty.channel.ChannelException: Failed to create a selector. at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:337) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.init(AbstractNioSelector.java:95) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.init(AbstractNioWorker.java:53) at org.jboss.netty.channel.socket.nio.NioWorker.init(NioWorker.java:45) at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45) at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28) at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:99) at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:69) at org.jboss.netty.channel.socket.nio.NioWorkerPool.init(NioWorkerPool.java:39) at org.jboss.netty.channel.socket.nio.NioWorkerPool.init(NioWorkerPool.java:33) at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.init(NioClientSocketChannelFactory.java:151) at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.init(NioClientSocketChannelFactory.java:116) at com.datastax.driver.core.Connection$Factory.init(Connection.java:349) at com.datastax.driver.core.Connection$Factory.init(Connection.java:360) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:857) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:806) at com.datastax.driver.core.Cluster.init(Cluster.java:76) at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:132) at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:771) at com.virdata.core.batch.sample.Timeseries$$anonfun$storeInCassandra$1$1$$anonfun$apply$1$$anonfun$apply$2.apply(Timeseries.scala:45) at com.virdata.core.batch.sample.Timeseries$$anonfun$storeInCassandra$1$1$$anonfun$apply$1$$anonfun$apply$2.apply(Timeseries.scala:38) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:595) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:595) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at