[
https://issues.apache.org/jira/browse/PHOENIX-2328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14960772#comment-14960772
]
Josh Mahonin commented on PHOENIX-2328:
---------------------------------------
Thanks for the bug report. I've attached a patch which should fix this (and
prevent future unsupported filter issues, should spark introduce them).
[[email protected]] Please review.
Regarding your second query: The Spark jdbc format parallelizes queries by
taking in 3 parameters: lowerBound, upperBound, and numPartitions. The bounds
must be of type LONG and bound to a parameter in your query (ideally a primary
key!). For some use cases, this is entirely appropriate and works across most
types of JDBC databases, including Phoenix.
The spark-phoenix plugin natively understands the partition splits that Phoenix
provides, and doesn't require a LONG field in the query to use as a partition
key. All you need to do is specify a table, and optionally SELECT columns
and/or a WHERE predicate, and Phoenix does the rest for you. There are
limitations though, such as not being able to perform Phoenix aggregations like
GROUP BY or SUM. However, once the data is loaded into Spark, those operation
are available to you, at a trade-off of efficiency.
The choice of which one to use very much comes down to your own specific use
case.
> "Unsupported filter" error for "like" when using Spark DataFrame API
> --------------------------------------------------------------------
>
> Key: PHOENIX-2328
> URL: https://issues.apache.org/jira/browse/PHOENIX-2328
> Project: Phoenix
> Issue Type: Bug
> Affects Versions: 4.5.3
> Reporter: Suhas Nalapure
> Assignee: Josh Mahonin
> Attachments: PHOENIX-2328.patch
>
>
> Hi, I'm using Spark Dataframe API to connect to Hbase 0.98 through Phoenix
> 4.5.3 & get a " Unsupported filter" error when the filter condition is
> 'like'. The error trail & the relevant lines from the source code code given
> below.
> Also I have another related query. Given that Phoenix can be accessed using
> the standard java jdbc api, Spark DataFrame can also be constructed using
> "jdbc" format string ( E.g. df =
> sqlContext.read().format("jdbc").options(params).load(); where params is a
> Map of Phoenix jdbc connection url and other relevant parameters). So of
> these 2 ways to work with Phoenix with Spark i.e. 1. as a Spark datasource
> plugin 2. as another rdbms source, which one would be the recommended way &
> why?
> Exception:
> -------------
> 2015-10-16 17:25:42,944 DEBUG [main] com.dataken.utilities.DFHelper
> Filtering using expr: ID like 'RrcLog%'
> Exception in thread "main" java.lang.Exception: Unsupported filter
> at
> org.apache.phoenix.spark.PhoenixRelation$$anonfun$buildFilter$1.apply(PhoenixRelation.scala:83)
> at
> org.apache.phoenix.spark.PhoenixRelation$$anonfun$buildFilter$1.apply(PhoenixRelation.scala:70)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
> org.apache.phoenix.spark.PhoenixRelation.buildFilter(PhoenixRelation.scala:70)
> at
> org.apache.phoenix.spark.PhoenixRelation.buildScan(PhoenixRelation.scala:42)
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:53)
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:53)
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:279)
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:278)
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:310)
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:274)
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:49)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:374)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:920)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:918)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:924)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:924)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
> at
> org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
> at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
> at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)
> at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)
> at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
> at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
> at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
> at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
> at
> com.dataken.designer.analytical.pojo.EvaluableExpressionTest.main(EvaluableExpressionTest.java:177)
> SOURCE CODE
> -----------------------
> DataFrame df =
> sqlContext.read().format("org.apache.phoenix.spark").options(params).load();
> df.filter("ID like 'RrcLog%'");
> Thanks,
> Suhas
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)