Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2261#discussion_r186056119
  
    --- Diff: 
integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 ---
    @@ -604,31 +604,58 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
             
tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
           dictExcludeCols
             .foreach { dictExcludeCol =>
    -          if (!fields.exists(x => 
x.column.equalsIgnoreCase(dictExcludeCol))) {
    +          if (!checkFields(fields, dictExcludeCol)) {
                 val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
                                " does not exist in table. Please check create 
table statement."
                 throw new MalformedCarbonCommandException(errormsg)
               } else {
    -            val dataType = fields.find(x =>
    -              x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get
    -            if (isComplexDimDictionaryExclude(dataType)) {
    -              val errormsg = "DICTIONARY_EXCLUDE is unsupported for 
complex datatype column: " +
    -                             dictExcludeCol
    -              throw new MalformedCarbonCommandException(errormsg)
    -            } else if 
(!isDataTypeSupportedForDictionary_Exclude(dataType)) {
    +            val dataType = findField(fields, 
dictExcludeCol).get.dataType.get
    +            if (!isDataTypeSupportedForDictionary_Exclude(dataType)) {
                   val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + 
dataType.toLowerCase() +
                                  " data type column: " + dictExcludeCol
                   throw new MalformedCarbonCommandException(errorMsg)
                 }
               }
             }
         }
    +
    +
    +    def checkFields(y: Seq[Field], colToMatch: String): Boolean = {
    +      y.exists { fld =>
    +        if (fld.column.equalsIgnoreCase(colToMatch)) {
    +          true
    +        } else if (fld.children.isDefined && fld.children.get != null) {
    +          checkFields(fld.children.get, colToMatch)
    +        } else {
    +          false
    +        }
    +      }
    +    }
    +
    +    def findField(y: Seq[Field], colToMatch: String): Option[Field] = {
    --- End diff --
    
    Write description for checkFields and findFields


---

Reply via email to