Spark schema evolution
Hi , I have a table sourced from* 2 parquet files* with few extra columns in one of the parquet file. Simple * queries works fine but queries with predicate on extra column doesn't work and I get column not found *Column resp_party_type exist in just one parquet file* a) Query that work : select resp_party_type from operational_analytics b) Query that doesn't work : (complains about missing column *resp_party_type *) select category as Events, resp_party as Team, count(*) as Total from operational_analytics where application = 'PeopleMover' and resp_party_type = 'Team' group by category, resp_party *Query Plan for (b)* == Physical Plan == TungstenAggregate(key=[category#30986,resp_party#31006], functions=[(count(1),mode=Final,isDistinct=false)], output=[Events#36266,Team#36267,Total#36268L]) TungstenExchange hashpartitioning(category#30986,resp_party#31006) TungstenAggregate(key=[category#30986,resp_party#31006], functions=[(count(1),mode=Partial,isDistinct=false)], output=[category#30986,resp_party#31006,currentCount#36272L]) Project [category#30986,resp_party#31006] Filter ((application#30983 = PeopleMover) && (resp_party_type#31007 = Team)) Scan ParquetRelation[snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_peoplemover.parquet,snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_mis.parquet][category#30986,resp_party#31006,application#30983,resp_party_type#31007] I have set spark.sql.parquet.mergeSchema = true and spark.sql.parquet.filterPushdown = true. When I set spark.sql.parquet.filterPushdown = false Query (b) starts working, execution plan after setting the filterPushdown = false for Query(b) == Physical Plan == TungstenAggregate(key=[category#30986,resp_party#31006], functions=[(count(1),mode=Final,isDistinct=false)], output=[Events#36313,Team#36314,Total#36315L]) TungstenExchange hashpartitioning(category#30986,resp_party#31006) TungstenAggregate(key=[category#30986,resp_party#31006], functions=[(count(1),mode=Partial,isDistinct=false)], output=[category#30986,resp_party#31006,currentCount#36319L]) Project [category#30986,resp_party#31006] Filter ((application#30983 = PeopleMover) && (resp_party_type#31007 = Team)) Scan ParquetRelation[snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_peoplemover.parquet,snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_mis.parquet][category#30986,resp_party#31006,application#30983,resp_party_type#31007] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-schema-evolution-tp26563.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL Optimization
More details : Execution plan for Original query select distinct pge.portfolio_code from table1 pge join table2 p on p.perm_group = pge.anc_port_group join table3 uge on p.user_group=uge.anc_user_group where uge.user_name = 'user' and p.perm_type = 'TEST' == Physical Plan == TungstenAggregate(key=[portfolio_code#14119], functions=[], output=[portfolio_code#14119]) TungstenExchange hashpartitioning(portfolio_code#14119) TungstenAggregate(key=[portfolio_code#14119], functions=[], output=[portfolio_code#14119]) TungstenProject [portfolio_code#14119] BroadcastHashJoin [user_group#13665], [anc_user_group#13658], BuildRight TungstenProject [portfolio_code#14119,user_group#13665] BroadcastHashJoin [anc_port_group#14117], [perm_group#13667], BuildRight ConvertToUnsafe Scan ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117] ConvertToUnsafe Project [user_group#13665,perm_group#13667] Filter (perm_type#13666 = TEST) Scan ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][user_group#13665,perm_group#13667,perm_type#13666] ConvertToUnsafe Project [anc_user_group#13658] Filter (user_name#13659 = user) Scan ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659] Execution plan for optimized query select distinct pge.portfolio_code from table1 uge, table2 p, table3 pge where uge.user_name = 'user' and p.perm_type = 'TEST' and p.perm_group = pge.anc_port_group and p.user_group=uge.anc_user_group == Physical Plan == TungstenAggregate(key=[portfolio_code#14119], functions=[], output=[portfolio_code#14119]) TungstenExchange hashpartitioning(portfolio_code#14119) TungstenAggregate(key=[portfolio_code#14119], functions=[], output=[portfolio_code#14119]) TungstenProject [portfolio_code#14119] BroadcastHashJoin [perm_group#13667], [anc_port_group#14117], BuildRight TungstenProject [perm_group#13667] BroadcastHashJoin [anc_user_group#13658], [user_group#13665], BuildRight ConvertToUnsafe Project [anc_user_group#13658] Filter (user_name#13659 = user) Scan ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659] ConvertToUnsafe Project [perm_group#13667,user_group#13665] Filter (perm_type#13666 = TEST) Scan ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][perm_group#13667,user_group#13665,perm_type#13666] ConvertToUnsafe Scan ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Optimization-tp26548p26553.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL Optimization
Hi , I am trying to execute a simple query with join on 3 tables. When I look at the execution plan , it varies with position of table in the "from" clause. Execution plan looks more optimized when the position of table with predicates is specified before any other table. Original query : select distinct pge.portfolio_code from table1 pge join table2 p on p.perm_group = pge.anc_port_group join table3 uge on p.user_group=uge.anc_user_group where uge.user_name = 'user' and p.perm_type = 'TEST' Optimized query (table with predicates is moved ahead): select distinct pge.portfolio_code from table1 uge, table2 p, table3 pge where uge.user_name = 'user' and p.perm_type = 'TEST' and p.perm_group = pge.anc_port_group and p.user_group=uge.anc_user_group Execution plan is more optimized for the optimized query and hence the query executes faster. All the tables are being sourced from parquet files -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Optimization-tp26548.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Error while saving parquet
Please refer to the code snippet below . I get following error */tmp/temp/trade.parquet/part-r-00036.parquet is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [20, -28, -93, 93] at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$readMetaData$3.apply(ParquetTypes.scala:494) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$readMetaData$3.apply(ParquetTypes.scala:494) at scala.Option.map(Option.scala:145) at org.apache.spark.sql.parquet.ParquetTypesConverter$.readMetaData(ParquetTypes.scala:494) at org.apache.spark.sql.parquet.ParquetTypesConverter$.readSchemaFromFile(ParquetTypes.scala:515) at org.apache.spark.sql.parquet.ParquetRelation.(ParquetRelation.scala:67) at org.apache.spark.sql.SQLContext.parquetFile(SQLContext.scala:542) at com.bfm.spark.data.handlers.input.InputFormatRegistry$.registerTable(InputFormatRegistry.scala:42) at com.bfm.spark.data.handlers.input.CassandraInputHandler.handleConfiguration(CassandraInputHandler.scala:43) at com.bfm.spark.data.handlers.input.CassandraInputHandler.handleConfiguration(CassandraInputHandler.scala:21) at com.bfm.spark.data.services.CompositeConfigurationHandler$$anonfun$handleConfiguration$1.apply(CompositeConfigurationHandler.scala:18) at com.bfm.spark.data.services.CompositeConfigurationHandler$$anonfun$handleConfiguration$1.apply(CompositeConfigurationHandler.scala:15) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at com.bfm.spark.data.services.CompositeConfigurationHandler.handleConfiguration(CompositeConfigurationHandler.scala:15) at com.bfm.spark.data.services.CompositeConfigurationHandler$$anonfun$handleConfiguration$1.apply(CompositeConfigurationHandler.scala:18) at com.bfm.spark.data.services.CompositeConfigurationHandler$$anonfun$handleConfiguration$1.apply(CompositeConfigurationHandler.scala:15) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at com.bfm.spark.data.services.CompositeConfigurationHandler.handleConfiguration(CompositeConfigurationHandler.scala:15) at com.bfm.spark.data.services.ConfigurationHandlerService.execute(ConfigurationHandlerService.scala:43) at com.bfm.spark.data.scheduler.DataLoadingScheduler$$anonfun$scheduleJobsByHour$2.apply(DataLoadingScheduler.scala:45) at com.bfm.spark.data.scheduler.DataLoadingScheduler$$anonfun$scheduleJobsByHour$2.apply(DataLoadingScheduler.scala:45) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at com.bfm.spark.data.scheduler.DataLoadingScheduler.scheduleJobsByHour(DataLoadingScheduler.scala:45) at com.bfm.spark.data.scheduler.DataLoadingScheduler.everyThreeHours(DataLoadingScheduler.scala:20) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) * import com.datastax.spark.connector._ import com.google.gson.GsonBuilder import scala.collection.mutable._ import scala.util._ case class Trade(org_ :String, fund:Int, invnum:Int, touch_count:Int, blob:String) val rdd = sc.cassandraTable[Trade]( "TEST", "trade") val filteredRDD = rdd.filter(data => data.org_.equals("DEV")) val cassandraRDD = rdd.map(data => data.blob) sqlContext.jsonRDD(cassandraRDD, 0.01).registerTempTable("trade") sqlContext.sql("select * from trade").repartition(1).saveAsParquetFile("/tmp/temp/trade1.parquet") If I don't do the repartition, this works fine. Am I missing something here ? I am using Spark -1.3.1 Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-saving-parquet-tp24770.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Group by specific key and save as parquet
Hi , I have a set of data, I need to group by specific key and then save as parquet. Refer to the code snippet below. I am querying trade and then grouping by date val df = sqlContext.sql("SELECT * FROM trade") val dfSchema = df.schema val partitionKeyIndex = dfSchema.fieldNames.seq.indexOf("date") //group by date val groupedByPartitionKey = df.rdd.groupBy { row => row.getString(partitionKeyIndex) } val failure = groupedByPartitionKey.map(row => { val rowDF = sqlContext.createDataFrame(sc.parallelize(row._2.toSeq), dfSchema) val fileName = config.getTempFileName(row._1) try { val dest = new Path(fileName) if(DefaultFileSystem.getFS.exists(dest)) { DefaultFileSystem.getFS.delete(dest, true) } rowDF.saveAsParquetFile(fileName) } catch { case e : Throwable => { logError("Failed to save parquet file") } failure = true } This code doesn't work well because of NestedRDD , what is the best way to solve this problem? Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Group-by-specific-key-and-save-as-parquet-tp24527.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL 1.3 max operation giving wrong results
Hi , I am playing around with Spark SQL 1.3 and noticed that max function does not give the correct result i.e doesn't give the maximum value. The same query works fine in Spark SQL 1.2 . Is any one aware of this issue ? Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-3-max-operation-giving-wrong-results-tp22043.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Integer column in schema RDD from parquet being considered as string
Hi tsingfu , Thanks for your reply, I tried with other columns but the problem is same with other Integer columns. Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Integer-column-in-schema-RDD-from-parquet-being-considered-as-string-tp21917p21950.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Integer column in schema RDD from parquet being considered as string
Hi , I am coverting jsonRDD to parquet by saving it as parquet file (saveAsParquetFile) cacheContext.jsonFile(file:///u1/sample.json).saveAsParquetFile(sample.parquet) I am reading parquet file and registering it as a table : val parquet = cacheContext.parquetFile(sample_trades.parquet) parquet.registerTempTable(sample) When I do a print schema , I see : root |-- SAMPLE: struct (nullable = true) ||-- CODE: integer (nullable = true) ||-- DESC: string (nullable = true) When I query : cacheContext.sql(select SAMPLE.DESC from sample where SAMPLE.CODE=1).map(t=t).collect.foreach(println) , I get error that java.lang.IllegalArgumentException: Column [CODE] was not found in schema! but if I put SAMPLE.CODE in single code (forcing it as string) , it works , for example : cacheContext.sql(select SAMPLE.DESC from sample where *SAMPLE.CODE='1'*).map(t=t).collect.foreach(println) works What am I missing here ? I understand catalyst will do optimization so data type doesn't matter that much , but something is off here . Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Integer-column-in-schema-RDD-from-parquet-being-considered-as-string-tp21917.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
NullPointerException in TaskSetManager
Hi , I am trying to run a simple hadoop job (that uses CassandraHadoopInputOutputWriter) on spark (v1.2 , Hadoop v 1.x) but getting NullPointerException in TaskSetManager WARN 2015-02-26 14:21:43,217 [task-result-getter-0] TaskSetManager - Lost task 14.2 in stage 0.0 (TID 29, devntom003.dev.blackrock.com): java.lang.NullPointerException org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1007) com.bfm.spark.test.CassandraHadoopMigrator$.main(CassandraHadoopMigrator.scala:77) com.bfm.spark.test.CassandraHadoopMigrator.main(CassandraHadoopMigrator.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) logs doesn't have any stack trace, can someone please help ? Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-in-TaskSetManager-tp21832.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Thriftserver Beeline
Hi , I created some hive tables and trying to list them through Beeline , but not getting any results. I can list the tables through spark-sql. When I connect beeline, it starts up with following message : Connecting to jdbc:hive2://tst001:10001 Enter username for jdbc:hive2://tst001:10001: Enter password for jdbc:hive2://tst001:10001: Connected to: Spark SQL (version 1.2.0) Driver: null (version null) Transaction isolation: TRANSACTION_REPEATABLE_READ show tables on beeline, return no results. Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Thriftserver-Beeline-tp21712.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SPARK_LOCAL_DIRS and SPARK_WORKER_DIR
Hi , What is the difference between SPARK_LOCAL_DIRS and SPARK_WORKER_DIR ? Also does spark clean these up after the execution ? Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-LOCAL-DIRS-and-SPARK-WORKER-DIR-tp21612.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Web Service + Spark
You can also look at Spark Job Server https://github.com/spark-jobserver/spark-jobserver - Gaurav On Jan 9, 2015, at 10:25 PM, Corey Nolet cjno...@gmail.com wrote: Cui Lin, The solution largely depends on how you want your services deployed (Java web container, Spray framework, etc...) and if you are using a cluster manager like Yarn or Mesos vs. just firing up your own executors and master. I recently worked on an example for deploying Spark services inside of Jetty using Yarn as the cluster manager. It forced me to learn how Spark wires up the dependencies/classpaths. If it helps, the example that resulted from my tinkering is located at [1]. [1] https://github.com/calrissian/spark-jetty-server On Fri, Jan 9, 2015 at 9:33 PM, Cui Lin cui@hds.com wrote: Hello, All, What’s the best practice on deploying/publishing spark-based scientific applications into a web service? Similar to Shiny on R. Thanks! Best regards, Cui Lin
Data Locality
Does spark guarantee to push the processing to the data ? Before creating tasks does spark always check for data location ? So for example if I have 3 spark nodes (Node1, Node2, Node3) and data is local to just 2 nodes (Node1 and Node2) , will spark always schedule tasks on the node for which the data is local ie Node1 and Node 2(assuming Node1 and Node2 have enough resources to execute the tasks)? Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-Locality-tp21000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
unread block data when reading from NFS
Hi , I am trying to read a csv file in the following way : val csvData = sc.textFile(file:///tmp/sample.csv) csvData.collect().length This works file on spark-shell but when I try to do spark-submit of the jar, I get the following exceptions : java.lang.IllegalStateException: unread block data java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2420) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1380) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:724) Can you please help me here ? Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/unread-block-data-when-reading-from-NFS-tp20672.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Integrating Spark with other applications
Hi , I have been working on Spark SQL and want to expose this functionality to other applications. Idea is to let other applications to send sql to be executed on spark cluster and get the result back. I looked at spark job server (https://github.com/ooyala/spark-jobserver) but it provides a RESTful interface. I am looking for something similar as spring-hadoop(http://projects.spring.io/spring-hadoop/) to do a spark-submit programmatically. Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Integrating-Spark-with-other-applications-tp18383.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL on XML files
Hi , I have bunch of Xml files and I want to run spark SQL on it, is there a recommended approach ? I am thinking of either converting Xml in json and then jsonRDD Please let me know your thoughts Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-on-XML-files-tp16752.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL CLI
Hi , I have been using spark shell to execute all SQLs. I am connecting to Cassandra , converting the data in JSON and then running queries on it, I am using HiveContext (and not SQLContext) because of explode functionality in it. I want to see how can I use Spark SQL CLI for directly running the queries on saved table. I see metastore and metastore_db getting created in the spark bin directory (my hive context is LocalHiveContext). I tried executing queries in spark-sql cli after putting in a hive-site.xml with metastore and metastore db directory same as the one in spark bin, but it doesn't seem to be working. I am getting org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table test_tbl. Is this possible ? Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-CLI-tp14840.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: flattening a list in spark sql
Hi , Thanks it worked, really appreciate your help. I have also been trying to do multiple Lateral Views, but it doesn't seem to be working. Query : hiveContext.sql(Select t2 from fav LATERAL VIEW explode(TABS) tabs1 as t1 LATERAL VIEW explode(t1) tabs2 as t2) Exception org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 't2, tree: Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flattening-a-list-in-spark-sql-tp13300p13894.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cassandra connector
Are you using spark 1.1 ? If yes you would have to update the datastax cassandra connector code and remove ref to log methods from CassandraConnector.scala Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-connector-tp13896p13897.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: flattening a list in spark sql
My bad, please ignore, it works !!! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flattening-a-list-in-spark-sql-tp13300p13901.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL on Cassandra
Hi , I am reading data from Cassandra through datastax spark-cassandra connector converting it into JSON and then running spark-sql on it. Refer to the code snippet below : step 1 val o_rdd = sc.cassandraTable[CassandraRDDWrapper]( 'keyspace', 'column_family') step 2 val tempObjectRDD = sc.parallelize(o_rdd.toArray.map(i=i), 100) step 3 val objectRDD = sqlContext.jsonRDD(tempObjectRDD) step 4 objectRDD .registerAsTable(objects) At step (2) I have to explicitly do a toArray because jsonRDD takes in a RDD[String]. For me calling toArray on cassandra rdd takes forever as have million records in cassandra . Is there a better way of doing this ? How can I optimize it ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-on-Cassandra-tp13696.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: flattening a list in spark sql
Thanks . I am not using hive context . I am loading data from Cassandra and then converting it into json and then querying it through SQL context . Can I use use hive context to query on a jsonRDD ? Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flattening-a-list-in-spark-sql-tp13300p13320.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org