[ 
https://issues.apache.org/jira/browse/KUDU-2518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke updated KUDU-2518:
------------------------------
    Labels: kudu-roadmap  (was: )

> SparkSQL queries without temporary tables
> -----------------------------------------
>
>                 Key: KUDU-2518
>                 URL: https://issues.apache.org/jira/browse/KUDU-2518
>             Project: Kudu
>          Issue Type: Improvement
>          Components: hms, spark
>    Affects Versions: 1.7.1
>            Reporter: Dan Burkert
>            Priority: Major
>              Labels: kudu-roadmap
>
> One long-standing ergonomic issue with the Kudu/SparkSQL integration is the 
> requirement to register Kudu tables as temp tables before they can be scanned 
> using a SQL string ({{sql("SELECT * FROM my_kudu_table")}}).  Ideally 
> SparkSQL could query Kudu tables that it discovers via the HMS with no 
> additional configuration.  Yesterday I explored what it would take to get 
> there, and I found some interesting things.
>  
> If the HMS table contains a {{spark.sql.sources.provider}} table property 
> with a value like {{org.apache.kudu.spark.kudu.DefaultSource}}, SparkSQL will 
> automatically instantiate the corresponding {{RelationProvider}} class, 
> passing a {{SQLContext}} and a map of parameters, which it fills in with the 
> table's HDFS URI, and storage properties.  The current plan for Kudu + HMS 
> integration (KUDU-2191) is not to set any storage properties, instead 
> attributes like master addresses and table ID will be stored as table 
> properties.  As a result, SparkSQL is instantiating a Kudu {{DefaultSource}}, 
> but it doesn't pass necessary arguments like the table name or master 
> addresses.   Getting this far required adding a dummy 
> {{org.apache.kudu.hive.KuduStorageHandler}} class to the classpath so that 
> the Hive client wouldn't choke on the bogus class name.  The stacktrace from 
> Spark attempting to instantiate the {{DefaultSource}} is provided below.
>  
> {code:java}
> Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041
> Spark context available as 'sc' (master = local[*], app id = 
> local-1532719985143).
> Spark session available as 'spark'.
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /___/ .__/\_,_/_/ /_/\_\   version 2.3.1
>       /_/
>          
> Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
> Type in expressions to have them evaluated.
> Type :help for more information.
> scala> sql("DESCRIBE TABLE t1")
> org.spark_project.guava.util.concurrent.UncheckedExecutionException: 
> java.lang.IllegalArgumentException: Kudu table name must be specified in 
> create options using key 'kudu.table'.  parameters: Map(), parameters-size: 
> 0, parameters-keys: Set(), path: None
>   at 
> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2263)
>   at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>   at 
> org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
>   at 
> org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137)
>   at 
> org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227)
>   at 
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264)
>   at 
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
>   at 
> org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255)
>   at 
> org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
>   at 
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>   at scala.collection.immutable.List.foldLeft(List.scala:84)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103)
>   at 
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
>   at 
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
>   at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
>   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
>   at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627)
>   at 
> org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
>   at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
>   at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
>   at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
>   at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
>   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
>   at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
>   ... 49 elided
> Caused by: java.lang.IllegalArgumentException: Kudu table name must be 
> specified in create options using key 'kudu.table'.  parameters: Map(), 
> parameters-size: 0, parameters-keys: Set(), path: None
>   at 
> org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82)
>   at 
> org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82)
>   at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>   at 
> org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.getOrElse(CaseInsensitiveMap.scala:28)
>   at 
> org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:81)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
>   at 
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242)
>   at 
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227)
>   at 
> org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
>   at 
> org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>   at 
> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>   at 
> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>   at 
> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>   ... 96 more
> scala>{code}
>  
> After striking out with the existing interfaces I looked at the 
> {{DataSourceRegister}} API which is a part of the {{DataSourceV2}} effort 
> underway in Spark.  It's not clear that this API actually provides more 
> context when creating relations (we need table name and master addresses from 
> the table properties and options are still just passed as a map in 
> {{DataSourceOptions}}), but more significantly it doesn't appear that the 
> {{spark.sql.sources.provider}} property works correctly with {{DataSourceV2}} 
> instances, it gives a class cast issue:
>  
> {code:java}
> Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041
> Spark context available as 'sc' (master = local[*], app id = 
> local-1532720634224).
> Spark session available as 'spark'.
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /___/ .__/\_,_/_/ /_/\_\   version 2.3.1
>       /_/
>          
> Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
> Type in expressions to have them evaluated.
> Type :help for more information.
> scala> sql("DESCRIBE TABLE t1")
> org.apache.spark.sql.AnalysisException: org.apache.kudu.spark.KuduDataSource 
> is not a valid Spark SQL Data Source.;
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:415)
>   at 
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242)
>   at 
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227)
>   at 
> org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
>   at 
> org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>   at 
> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>   at 
> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>   at 
> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>   at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>   at 
> org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
>   at 
> org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137)
>   at 
> org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227)
>   at 
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264)
>   at 
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
>   at 
> org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255)
>   at 
> org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
>   at 
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>   at scala.collection.immutable.List.foldLeft(List.scala:84)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103)
>   at 
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
>   at 
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
>   at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
>   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
>   at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627)
>   at 
> org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
>   at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
>   at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
>   at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
>   at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
>   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
>   at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
>   ... 49 elided
> scala>{code}
>  
> {{org.apache.kudu.spark.KuduDataSource}} is a dummy class I put on the 
> classpath and added to the Hive metastore table attribute:
>  
> {code:java}
> class KuduDataSource extends DataSourceV2
> with DataSourceRegister
> with ReadSupport
> {
>   override def shortName(): String = "kudu"
>   override def createReader(options: DataSourceOptions): DataSourceReader = {
>     new KuduDataSourceReader(options)
>   }
> }
> class KuduDataSourceReader(val options: DataSourceOptions) extends 
> DataSourceReader {
>   override def readSchema(): StructType = ???
>   override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] 
> = ???
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to