RDD equivalent of HBase Scan
HBase scans come with the ability to specify filters that make scans very fast and efficient (as they let you seek for the keys that pass the filter). Do RDD's or Spark DataFrames offer anything similar or would I be required to use a NoSQL db like HBase to do something like this? -- Stuart Layton
Re: RDD equivalent of HBase Scan
Thanks but I'm hoping to get away from hbase all together. I was wondering if there is a way to get similar scan performance directly on cached rdd's or data frames On Thu, Mar 26, 2015 at 9:54 AM, Ted Yu yuzhih...@gmail.com wrote: In examples//src/main/scala/org/apache/spark/examples/HBaseTest.scala, TableInputFormat is used. TableInputFormat accepts parameter public static final String SCAN = hbase.mapreduce.scan; where if specified, Scan object would be created from String form: if (conf.get(SCAN) != null) { try { scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN)); You can use TableMapReduceUtil#convertScanToString() to convert a Scan which has filter(s) and pass to TableInputFormat Cheers On Thu, Mar 26, 2015 at 6:46 AM, Stuart Layton stuart.lay...@gmail.com wrote: HBase scans come with the ability to specify filters that make scans very fast and efficient (as they let you seek for the keys that pass the filter). Do RDD's or Spark DataFrames offer anything similar or would I be required to use a NoSQL db like HBase to do something like this? -- Stuart Layton -- Stuart Layton
What are the best options for quickly filtering a DataFrame on a single column?
I have a SparkSQL dataframe with a a few billion rows that I need to quickly filter down to a few hundred thousand rows, using an operation like (syntax may not be correct) df = df[ df.filter(lambda x: x.key_col in approved_keys)] I was thinking about serializing the data using parquet and saving it to S3, however as I want to optimize for filtering speed I'm not sure this is the best option. -- Stuart Layton
Re: What are the best options for quickly filtering a DataFrame on a single column?
Thanks for the response, I was using IN as an example of the type of operation I need to do. Is there another way to do this that lines up more naturally with the way things are supposed to be done in SparkSQL? On Wed, Mar 25, 2015 at 2:29 PM, Michael Armbrust mich...@databricks.com wrote: The only way to do in using python currently is to use the string based filter API (where you pass us an expression as a string, and we parse it using our SQL parser). from pyspark.sql import Row from pyspark.sql.functions import * df = sc.parallelize([Row(name=test)]).toDF() df.filter(name in ('a', 'b')).collect() Out[1]: [] df.filter(name in ('test')).collect() Out[2]: [Row(name=u'test')] In general you want to avoid lambda functions whenever you can do the same thing a dataframe expression. This is because your lambda function is a black box that we cannot optimize (though you should certainly use them for the advanced stuff that expressions can't handle). I opened SPARK-6536 https://issues.apache.org/jira/browse/SPARK-6536 to provide a nicer interface for this. On Wed, Mar 25, 2015 at 7:41 AM, Stuart Layton stuart.lay...@gmail.com wrote: I have a SparkSQL dataframe with a a few billion rows that I need to quickly filter down to a few hundred thousand rows, using an operation like (syntax may not be correct) df = df[ df.filter(lambda x: x.key_col in approved_keys)] I was thinking about serializing the data using parquet and saving it to S3, however as I want to optimize for filtering speed I'm not sure this is the best option. -- Stuart Layton -- Stuart Layton
Can a DataFrame be saved to s3 directly using Parquet?
I'm trying to save a dataframe to s3 as a parquet file but I'm getting Wrong FS errors df.saveAsParquetFile(parquetFile) 15/03/25 18:56:10 INFO storage.MemoryStore: ensureFreeSpace(46645) called with curMem=82744, maxMem=278302556 15/03/25 18:56:10 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 45.6 KB, free 265.3 MB) 15/03/25 18:56:10 INFO storage.MemoryStore: ensureFreeSpace(7078) called with curMem=129389, maxMem=278302556 15/03/25 18:56:10 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 6.9 KB, free 265.3 MB) 15/03/25 18:56:10 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on ip-172-31-1-219.ec2.internal:58280 (size: 6.9 KB, free: 265.4 MB) 15/03/25 18:56:10 INFO storage.BlockManagerMaster: Updated info of block broadcast_5_piece0 15/03/25 18:56:10 INFO spark.SparkContext: Created broadcast 5 from textFile at JSONRelation.scala:98 Traceback (most recent call last): File stdin, line 1, in module File /root/spark/python/pyspark/sql/dataframe.py, line 121, in saveAsParquetFile self._jdf.saveAsParquetFile(path) File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o22.saveAsParquetFile. : java.lang.IllegalArgumentException: Wrong FS: s3n://com.my.bucket/spark-testing/, expected: hdfs:// ec2-52-0-159-113.compute-1.amazonaws.com:9000 Is it possible to save a dataframe to s3 directly using parquet? -- Stuart Layton