Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r149264766 --- Diff: integration/spark2.2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ScalarSubquery} +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkOptimizer +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _} +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy} +import org.apache.spark.sql.internal.{SQLConf, SessionState} +import org.apache.spark.sql.optimizer.CarbonLateDecodeRule +import org.apache.spark.sql.parser.CarbonSparkSqlParser + +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class CarbonSessionCatalog( + externalCatalog: HiveExternalCatalog, + globalTempViewManager: GlobalTempViewManager, + functionRegistry: FunctionRegistry, + sparkSession: SparkSession, + conf: SQLConf, + hadoopConf: Configuration, + parser: ParserInterface, + functionResourceLoader: FunctionResourceLoader) + extends HiveSessionCatalog( + externalCatalog, + globalTempViewManager, + new HiveMetastoreCatalog(sparkSession), + functionRegistry, + conf, + hadoopConf, + parser, + functionResourceLoader + ) { + + lazy val carbonEnv = { + val env = new CarbonEnv + env.init(sparkSession) + env + } + + + private def refreshRelationFromCache(identifier: TableIdentifier, + alias: Option[String], + carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = { + var isRefreshed = false + val storePath = CarbonEnv.getInstance(sparkSession).storePath + carbonEnv.carbonMetastore. + checkSchemasModifiedTimeAndReloadTables(storePath) + + val tableMeta = carbonEnv.carbonMetastore + .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName, + carbonDatasourceHadoopRelation.carbonTable.getFactTableName) + if (tableMeta.isEmpty || (tableMeta.isDefined && + tableMeta.get.carbonTable.getTableLastUpdatedTime != + carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) { + refreshTable(identifier) + DataMapStoreManager.getInstance(). + clearDataMap(AbsoluteTableIdentifier.from(storePath, + identifier.database.getOrElse("default"), identifier.table)) + isRefreshed = true + logInfo(s"Schema changes have been detected for table: $identifier") + } + isRefreshed + } + + // override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = { + // makeFunctionBuilder(funcName, Utils.classForName(className)) + // } + // + // /** + // * Construct a [[FunctionBuilder]] based on the provided class that represents a function. + // */ + // private def makeFunctionBuilder(name: String, clazz: Class[_]): FunctionBuilder = { + // // When we instantiate hive UDF wrapper class, we may throw exception if the input + // // expressions don't satisfy the hive UDF, such as type mismatch, input number + // // mismatch, etc. Here we catch the exception and throw AnalysisException instead. + // (children: Seq[Expression]) => { + // try { + // if (classOf[UDF].isAssignableFrom(clazz)) { + // val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), children) + // udf.dataType // Force it to check input data types. + // udf + // } else if (classOf[GenericUDF].isAssignableFrom(clazz)) { + // val udf = HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), children) + // udf.dataType // Force it to check input data types. + // udf + // } else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) { + // val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), children) + // udaf.dataType // Force it to check input data types. + // udaf + // } else if (classOf[UDAF].isAssignableFrom(clazz)) { + // val udaf = HiveUDAFFunction( + // name, + // new HiveFunctionWrapper(clazz.getName), + // children, + // isUDAFBridgeRequired = true) + // udaf.dataType // Force it to check input data types. + // udaf + // } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) { + // val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), children) + // udtf.elementSchema // Force it to check input data types. + // udtf + // } else { + // throw new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}'") + // } + // } catch { + // case ae: AnalysisException => + // throw ae + // case NonFatal(e) => + // val analysisException = + // new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}': $e") + // analysisException.setStackTrace(e.getStackTrace) + // throw analysisException + // } + // } + // } + // + // override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = { + // try { + // lookupFunction0(name, children) + // } catch { + // case NonFatal(_) => + // // SPARK-16228 ExternalCatalog may recognize `double`-type only. + // val newChildren = children.map { child => + // if (child.dataType.isInstanceOf[DecimalType]) Cast(child, DoubleType) else child + // } + // lookupFunction0(name, newChildren) + // } + // } + // + // private def lookupFunction0(name: FunctionIdentifier, children: Seq[Expression]): Expression = { + // val database = name.database.map(formatDatabaseName) + // val funcName = name.copy(database = database) + // Try(super.lookupFunction(funcName, children)) match { + // case Success(expr) => expr + // case Failure(error) => + // if (functionRegistry.functionExists(funcName.unquotedString)) { + // // If the function actually exists in functionRegistry, it means that there is an + // // error when we create the Expression using the given children. + // // We need to throw the original exception. + // throw error + // } else { + // // This function is not in functionRegistry, let's try to load it as a Hive's + // // built-in function. + // // Hive is case insensitive. + // val functionName = funcName.unquotedString.toLowerCase(Locale.ROOT) + // if (!hiveFunctions.contains(functionName)) { + // failFunctionLookup(funcName) + // } + // + // // TODO: Remove this fallback path once we implement the list of fallback functions + // // defined below in hiveFunctions. + // val functionInfo = { + // try { + // Option(HiveFunctionRegistry.getFunctionInfo(functionName)).getOrElse( + // failFunctionLookup(funcName)) + // } catch { + // // If HiveFunctionRegistry.getFunctionInfo throws an exception, + // // we are failing to load a Hive builtin function, which means that + // // the given function is not a Hive builtin function. + // case NonFatal(e) => failFunctionLookup(funcName) + // } + // } + // val className = functionInfo.getFunctionClass.getName + // val functionIdentifier = + // FunctionIdentifier(functionName.toLowerCase(Locale.ROOT), database) + // val func = CatalogFunction(functionIdentifier, className, Nil) + // // Put this Hive built-in function to our function registry. + // registerFunction(func, ignoreIfExists = false) + // // Now, we need to create the Expression. + // functionRegistry.lookupFunction(functionName, children) + // } + // } + // } + // + // // TODO Removes this method after implementing Spark native "histogram_numeric". + // override def functionExists(name: FunctionIdentifier): Boolean = { + // super.functionExists(name) || hiveFunctions.contains(name.funcName) + // } + // + // /** List of functions we pass over to Hive. Note that over time this list should go to 0. */ + // // We have a list of Hive built-in functions that we do not support. So, we will check + // // Hive's function registry and lazily load needed functions into our own function registry. + // // List of functions we are explicitly not supporting are: + // // compute_stats, context_ngrams, create_union, + // // current_user, ewah_bitmap, ewah_bitmap_and, ewah_bitmap_empty, ewah_bitmap_or, field, + // // in_file, index, matchpath, ngrams, noop, noopstreaming, noopwithmap, + // // noopwithmapstreaming, parse_url_tuple, reflect2, windowingtablefunction. + // // Note: don't forget to update SessionCatalog.isTemporaryFunction + // private val hiveFunctions = Seq( + // "histogram_numeric" + // ) +} + +/** + * Session state implementation to override sql parser and adding strategies + * + * @param sparkSession + */ +class CarbonSessionStateBuilder(sparkSession: SparkSession, parentState: Option[SessionState] = None) + extends HiveSessionStateBuilder(sparkSession, parentState) { + + override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) + + experimentalMethods.extraStrategies = + Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession)) + experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule) + + /** + * Internal catalog for managing table and database states. + */ + override lazy val catalog : CarbonSessionCatalog= { --- End diff -- white space before '='
---