[ https://issues.apache.org/jira/browse/KUDU-2518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Grant Henke updated KUDU-2518: ------------------------------ Labels: kudu-roadmap (was: ) > SparkSQL queries without temporary tables > ----------------------------------------- > > Key: KUDU-2518 > URL: https://issues.apache.org/jira/browse/KUDU-2518 > Project: Kudu > Issue Type: Improvement > Components: hms, spark > Affects Versions: 1.7.1 > Reporter: Dan Burkert > Priority: Major > Labels: kudu-roadmap > > One long-standing ergonomic issue with the Kudu/SparkSQL integration is the > requirement to register Kudu tables as temp tables before they can be scanned > using a SQL string ({{sql("SELECT * FROM my_kudu_table")}}). Ideally > SparkSQL could query Kudu tables that it discovers via the HMS with no > additional configuration. Yesterday I explored what it would take to get > there, and I found some interesting things. > > If the HMS table contains a {{spark.sql.sources.provider}} table property > with a value like {{org.apache.kudu.spark.kudu.DefaultSource}}, SparkSQL will > automatically instantiate the corresponding {{RelationProvider}} class, > passing a {{SQLContext}} and a map of parameters, which it fills in with the > table's HDFS URI, and storage properties. The current plan for Kudu + HMS > integration (KUDU-2191) is not to set any storage properties, instead > attributes like master addresses and table ID will be stored as table > properties. As a result, SparkSQL is instantiating a Kudu {{DefaultSource}}, > but it doesn't pass necessary arguments like the table name or master > addresses. Getting this far required adding a dummy > {{org.apache.kudu.hive.KuduStorageHandler}} class to the classpath so that > the Hive client wouldn't choke on the bogus class name. The stacktrace from > Spark attempting to instantiate the {{DefaultSource}} is provided below. > > {code:java} > Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041 > Spark context available as 'sc' (master = local[*], app id = > local-1532719985143). > Spark session available as 'spark'. > Welcome to > ____ __ > / __/__ ___ _____/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 > /_/ > > Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181) > Type in expressions to have them evaluated. > Type :help for more information. > scala> sql("DESCRIBE TABLE t1") > org.spark_project.guava.util.concurrent.UncheckedExecutionException: > java.lang.IllegalArgumentException: Kudu table name must be specified in > create options using key 'kudu.table'. parameters: Map(), parameters-size: > 0, parameters-keys: Set(), path: None > at > org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2263) > at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) > at > org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789) > at > org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137) > at > org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227) > at > org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264) > at > org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) > at > org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255) > at > org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103) > at > org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) > at > org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) > at > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74) > at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627) > at > org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) > at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190) > at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190) > at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253) > at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) > ... 49 elided > Caused by: java.lang.IllegalArgumentException: Kudu table name must be > specified in create options using key 'kudu.table'. parameters: Map(), > parameters-size: 0, parameters-keys: Set(), path: None > at > org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82) > at > org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82) > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > at > org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.getOrElse(CaseInsensitiveMap.scala:28) > at > org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:81) > at > org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340) > at > org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242) > at > org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227) > at > org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792) > at > org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) > at > org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) > at > org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) > at > org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) > ... 96 more > scala>{code} > > After striking out with the existing interfaces I looked at the > {{DataSourceRegister}} API which is a part of the {{DataSourceV2}} effort > underway in Spark. It's not clear that this API actually provides more > context when creating relations (we need table name and master addresses from > the table properties and options are still just passed as a map in > {{DataSourceOptions}}), but more significantly it doesn't appear that the > {{spark.sql.sources.provider}} property works correctly with {{DataSourceV2}} > instances, it gives a class cast issue: > > {code:java} > Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041 > Spark context available as 'sc' (master = local[*], app id = > local-1532720634224). > Spark session available as 'spark'. > Welcome to > ____ __ > / __/__ ___ _____/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 > /_/ > > Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181) > Type in expressions to have them evaluated. > Type :help for more information. > scala> sql("DESCRIBE TABLE t1") > org.apache.spark.sql.AnalysisException: org.apache.kudu.spark.KuduDataSource > is not a valid Spark SQL Data Source.; > at > org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:415) > at > org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242) > at > org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227) > at > org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792) > at > org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) > at > org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) > at > org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) > at > org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) > at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) > at > org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789) > at > org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137) > at > org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227) > at > org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264) > at > org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) > at > org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255) > at > org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103) > at > org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) > at > org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) > at > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74) > at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627) > at > org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) > at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190) > at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190) > at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253) > at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) > ... 49 elided > scala>{code} > > {{org.apache.kudu.spark.KuduDataSource}} is a dummy class I put on the > classpath and added to the Hive metastore table attribute: > > {code:java} > class KuduDataSource extends DataSourceV2 > with DataSourceRegister > with ReadSupport > { > override def shortName(): String = "kudu" > override def createReader(options: DataSourceOptions): DataSourceReader = { > new KuduDataSourceReader(options) > } > } > class KuduDataSourceReader(val options: DataSourceOptions) extends > DataSourceReader { > override def readSchema(): StructType = ??? > override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] > = ??? > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)