This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a3177b  [bug] fix bitmap and hll type column access issue (#218)
5a3177b is described below

commit 5a3177be4b49afd5bb5f8f54a1344ad903c03853
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Mon Jul 29 15:54:23 2024 +0800

    [bug] fix bitmap and hll type column access issue (#218)
---
 .../doris/spark/cfg/ConfigurationOptions.java      |   7 --
 .../org/apache/doris/spark/rest/RestService.java   | 106 +++++++++++----------
 .../apache/doris/spark/serialization/RowBatch.java |  17 ++--
 .../apache/doris/spark/rdd/ScalaValueReader.scala  |   2 +-
 .../org/apache/doris/spark/sql/DorisRelation.scala |   5 +
 .../org/apache/doris/spark/sql/SchemaUtils.scala   |  62 ++++++++----
 .../apache/doris/spark/sql/TestSchemaUtils.scala   |  36 +++----
 7 files changed, 125 insertions(+), 110 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index a0fea83..61d4563 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -93,13 +93,6 @@ public interface ConfigurationOptions {
 
     int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50;
 
-    /**
-     * set types to ignore, split by comma
-     * e.g.
-     * "doris.ignore-type"="bitmap,hll"
-     */
-    String DORIS_IGNORE_TYPE = "doris.ignore-type";
-
     String DORIS_SINK_ENABLE_2PC = "doris.sink.enable-2pc";
     boolean DORIS_SINK_ENABLE_2PC_DEFAULT = false;
 
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index 412b6a8..3f3516f 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -17,56 +17,21 @@
 
 package org.apache.doris.spark.rest;
 
+import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_BENODES;
 import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FENODES;
-import static 
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FILTER_QUERY;
-import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD;
 import static 
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD;
 import static 
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_REQUEST_AUTH_USER;
 import static 
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE;
 import static 
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
 import static 
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_MIN;
 import static 
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLE_IDENTIFIER;
-import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_BENODES;
-import static org.apache.doris.spark.util.ErrorMessages.CONNECT_FAILED_MESSAGE;
 import static 
org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE;
 import static 
org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
 import static 
org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.io.Serializable;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Base64;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.json.JsonMapper;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.doris.spark.cfg.ConfigurationOptions;
 import org.apache.doris.spark.cfg.Settings;
 import org.apache.doris.spark.cfg.SparkSettings;
-import org.apache.doris.spark.exception.ConnectedFailedException;
 import org.apache.doris.spark.exception.DorisException;
 import org.apache.doris.spark.exception.IllegalArgumentException;
 import org.apache.doris.spark.exception.ShouldNeverHappenException;
@@ -76,23 +41,44 @@ import org.apache.doris.spark.rest.models.BackendV2;
 import org.apache.doris.spark.rest.models.QueryPlan;
 import org.apache.doris.spark.rest.models.Schema;
 import org.apache.doris.spark.rest.models.Tablet;
+import org.apache.doris.spark.sql.SchemaUtils;
 import org.apache.doris.spark.util.HttpUtil;
 import org.apache.doris.spark.util.URLs;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpStatus;
-import org.apache.http.client.config.RequestConfig;
+import org.apache.http.StatusLine;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 
-import com.google.common.annotations.VisibleForTesting;
-import scala.collection.JavaConverters;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
 
 /**
  * Service for communicate with Doris FE.
@@ -238,18 +224,33 @@ public class RestService implements Serializable {
      * @throws DorisException throw when find partition failed
      */
     public static List<PartitionDefinition> findPartitions(Settings cfg, 
Logger logger) throws DorisException {
-        String[] tableIdentifiers = 
parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
-        String sql = "select " + cfg.getProperty(DORIS_READ_FIELD, "*") +
-                " from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] 
+ "`";
-        if (!StringUtils.isEmpty(cfg.getProperty(DORIS_FILTER_QUERY))) {
-            sql += " where " + cfg.getProperty(DORIS_FILTER_QUERY);
+        String[] tableIdentifiers =
+                
parseIdentifier(cfg.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER), 
logger);
+        String readFields = 
cfg.getProperty(ConfigurationOptions.DORIS_READ_FIELD, "*");
+        if (!"*".equals(readFields)) {
+            String[] readFieldArr = readFields.split(",");
+            String[] bitmapColumns = 
cfg.getProperty(SchemaUtils.DORIS_BITMAP_COLUMNS(), "").split(",");
+            String[] hllColumns = 
cfg.getProperty(SchemaUtils.DORIS_HLL_COLUMNS(), "").split(",");
+            for (int i = 0; i < readFieldArr.length; i++) {
+                String readFieldName = readFieldArr[i].replaceAll("`", "");
+                if (ArrayUtils.contains(bitmapColumns, readFieldName)
+                        || ArrayUtils.contains(hllColumns, readFieldName)) {
+                    readFieldArr[i] = "'READ UNSUPPORTED' AS " + 
readFieldArr[i];
+                }
+            }
+            readFields = StringUtils.join(readFieldArr, ",");
+        }
+        String sql = "select " + readFields + " from `" + tableIdentifiers[0] 
+ "`.`" + tableIdentifiers[1] + "`";
+        if 
(!StringUtils.isEmpty(cfg.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY)))
 {
+            sql += " where " + 
cfg.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY);
         }
         logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
 
         String finalSql = sql;
         String response = queryAllFrontends((SparkSettings) cfg, (frontend, 
enableHttps) -> {
-            HttpPost httpPost = new HttpPost(URLs.queryPlan(frontend, 
tableIdentifiers[0], tableIdentifiers[1], enableHttps));
-            String entity = "{\"sql\": \""+ finalSql +"\"}";
+            HttpPost httpPost =
+                    new HttpPost(URLs.queryPlan(frontend, tableIdentifiers[0], 
tableIdentifiers[1], enableHttps));
+            String entity = "{\"sql\": \"" + finalSql + "\"}";
             logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
             StringEntity stringEntity = new StringEntity(entity, 
StandardCharsets.UTF_8);
             stringEntity.setContentEncoding("UTF-8");
@@ -630,10 +631,11 @@ public class RestService implements Serializable {
                 String user = settings.getProperty(DORIS_REQUEST_AUTH_USER, 
"");
                 String password = 
settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
                 logger.info("Send request to Doris FE '{}' with user '{}'.", 
request.getURI(), user);
-                request.setHeader(HttpHeaders.AUTHORIZATION, "Basic " +
-                        Base64.getEncoder().encodeToString((user + ":" + 
password).getBytes(StandardCharsets.UTF_8)));
+                request.setHeader(HttpHeaders.AUTHORIZATION, "Basic "
+                        + Base64.getEncoder().encodeToString((user + ":" + 
password).getBytes(StandardCharsets.UTF_8)));
                 CloseableHttpResponse response = client.execute(request);
-                if (response.getStatusLine().getStatusCode() == 
HttpStatus.SC_OK) {
+                StatusLine statusLine = response.getStatusLine();
+                if (statusLine.getStatusCode() == HttpStatus.SC_OK) {
                     String resStr = EntityUtils.toString(response.getEntity());
                     Map<String, Object> resMap = MAPPER.readValue(resStr,
                             new TypeReference<Map<String, Object>>() {
@@ -643,6 +645,8 @@ public class RestService implements Serializable {
                     }
                     return resStr;
                 }
+                logger.warn("Request for {} get a bad status, code: {}, msg: 
{}", request.getURI().toString(),
+                        statusLine.getStatusCode(), 
statusLine.getReasonPhrase());
             } catch (IOException e) {
                 logger.error("Doris FE node {} is unavailable, Request the 
next Doris FE node. Err: {}", frontend, e.getMessage());
             }
@@ -652,4 +656,4 @@ public class RestService implements Serializable {
         throw new DorisException(errMsg);
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
index b38c007..feb9a4f 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
@@ -17,6 +17,11 @@
 
 package org.apache.doris.spark.serialization;
 
+import org.apache.doris.sdk.thrift.TScanBatchResult;
+import org.apache.doris.spark.exception.DorisException;
+import org.apache.doris.spark.rest.models.Schema;
+import org.apache.doris.spark.util.IPUtils;
+
 import com.google.common.base.Preconditions;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.BaseIntVector;
@@ -43,10 +48,6 @@ import org.apache.arrow.vector.complex.impl.UnionMapReader;
 import org.apache.arrow.vector.ipc.ArrowStreamReader;
 import org.apache.arrow.vector.types.Types;
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.doris.sdk.thrift.TScanBatchResult;
-import org.apache.doris.spark.exception.DorisException;
-import org.apache.doris.spark.rest.models.Schema;
-import org.apache.doris.spark.util.IPUtils;
 import org.apache.spark.sql.types.Decimal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,8 +73,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 
-import static org.apache.doris.spark.util.IPUtils.convertLongToIPv4Address;
-
 /**
  * row batch data container.
  */
@@ -157,8 +156,7 @@ public class RowBatch {
 
     private void addValueToRow(int rowIndex, Object obj) {
         if (rowIndex > rowCountInOneBatch) {
-            String errMsg = "Get row offset: " + rowIndex + " larger than row 
size: " +
-                    rowCountInOneBatch;
+            String errMsg = "Get row offset: " + rowIndex + " larger than row 
size: " + rowCountInOneBatch;
             logger.error(errMsg);
             throw new NoSuchElementException(errMsg);
         }
@@ -261,7 +259,8 @@ public class RowBatch {
                             ipv4Vector = (UInt4Vector) curFieldVector;
                         }
                         for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = ipv4Vector.isNull(rowIndex) ? 
null : convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
+                            Object fieldValue = ipv4Vector.isNull(rowIndex) ? 
null :
+                                    
IPUtils.convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
                             addValueToRow(rowIndex, fieldValue);
                         }
                         break;
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
index 719b16b..f9124a6 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
@@ -138,7 +138,7 @@ class ScalaValueReader(partition: PartitionDefinition, 
settings: Settings) exten
   protected val openResult: TScanOpenResult = 
lockClient(_.openScanner(openParams))
   protected val contextId: String = openResult.getContextId
   protected val schema: Schema =
-    SchemaUtils.convertToSchema(openResult.getSelectedColumns)
+    SchemaUtils.convertToSchema(openResult.getSelectedColumns, settings)
 
   private[this] val asyncThread: Thread = new Thread {
     override def run(): Unit = {
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
index db546ce..aa014ff 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
@@ -70,6 +70,11 @@ private[sql] class DorisRelation(
           .map(filter => s"($filter)").mkString(" and ")
     }
 
+    val bitmapColumnStr = cfg.getProperty(SchemaUtils.DORIS_BITMAP_COLUMNS, "")
+    paramWithScan += (SchemaUtils.DORIS_BITMAP_COLUMNS -> bitmapColumnStr)
+    val hllColumnStr = cfg.getProperty(SchemaUtils.DORIS_HLL_COLUMNS, "")
+    paramWithScan += (SchemaUtils.DORIS_HLL_COLUMNS -> hllColumnStr)
+
     // required columns for column pruner
     if (requiredColumns != null && requiredColumns.length > 0) {
       paramWithScan += (ConfigurationOptions.DORIS_READ_FIELD ->
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index 76d231a..e21c6f2 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -19,8 +19,9 @@ package org.apache.doris.spark.sql
 
 import com.fasterxml.jackson.databind.json.JsonMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.doris.sdk.thrift.TScanColumnDesc
-import org.apache.doris.spark.cfg.ConfigurationOptions.{DORIS_IGNORE_TYPE, 
DORIS_READ_FIELD}
+import org.apache.commons.lang3.StringUtils
+import org.apache.doris.sdk.thrift.{TPrimitiveType, TScanColumnDesc}
+import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD
 import org.apache.doris.spark.cfg.Settings
 import org.apache.doris.spark.exception.DorisException
 import org.apache.doris.spark.rest.RestService
@@ -38,6 +39,9 @@ private[spark] object SchemaUtils {
   private val logger = 
LoggerFactory.getLogger(SchemaUtils.getClass.getSimpleName.stripSuffix("$"))
   private val MAPPER = 
JsonMapper.builder().addModule(DefaultScalaModule).build()
 
+  val DORIS_BITMAP_COLUMNS = "doris.bitmap.columns"
+  val DORIS_HLL_COLUMNS = "doris.hll.columns"
+
   /**
    * discover Doris table schema from Doris FE.
    *
@@ -46,9 +50,12 @@ private[spark] object SchemaUtils {
    */
   def discoverSchema(cfg: Settings): StructType = {
     val schema = discoverSchemaFromFe(cfg)
+    val bitmapColumns = 
schema.getProperties.filter(_.getType.equalsIgnoreCase("BITMAP")).map(_.getName).mkString(",")
+    cfg.setProperty(DORIS_BITMAP_COLUMNS, bitmapColumns)
+    val hllColumns = 
schema.getProperties.filter(_.getType.equalsIgnoreCase("HLL")).map(_.getName).mkString(",")
+    cfg.setProperty(DORIS_HLL_COLUMNS, hllColumns)
     val dorisReadField = cfg.getProperty(DORIS_READ_FIELD)
-    val ignoreColumnType = cfg.getProperty(DORIS_IGNORE_TYPE)
-    convertToStruct(schema, dorisReadField, ignoreColumnType)
+    convertToStruct(schema, dorisReadField)
   }
 
   /**
@@ -67,20 +74,14 @@ private[spark] object SchemaUtils {
    * @param schema inner schema
    * @return Spark Catalyst StructType
    */
-  def convertToStruct(schema: Schema, dorisReadFields: String, ignoredTypes: 
String): StructType = {
+  def convertToStruct(schema: Schema, dorisReadFields: String): StructType = {
     val fieldList = if (dorisReadFields != null && dorisReadFields.nonEmpty) {
       dorisReadFields.split(",")
     } else {
       Array.empty[String]
     }
-    val ignoredTypeList = if (ignoredTypes != null && ignoredTypes.nonEmpty) {
-      ignoredTypes.split(",").map(t => t.trim.toUpperCase)
-    } else {
-      Array.empty[String]
-    }
     val fields = schema.getProperties
-      .filter(x => (fieldList.contains(x.getName) || fieldList.isEmpty)
-        && !ignoredTypeList.contains(x.getType))
+      .filter(x => fieldList.contains(x.getName) || fieldList.isEmpty)
       .map(f =>
         DataTypes.createStructField(
           f.getName,
@@ -132,8 +133,8 @@ private[spark] object SchemaUtils {
       case "VARIANT"         => DataTypes.StringType
       case "IPV4"            => DataTypes.StringType
       case "IPV6"            => DataTypes.StringType
-      case "HLL"             =>
-        throw new DorisException("Unsupported type " + dorisType)
+      case "BITMAP"          => DataTypes.StringType // Placeholder only, no 
support for reading
+      case "HLL"             => DataTypes.StringType // Placeholder only, no 
support for reading
       case _                             =>
         throw new DorisException("Unrecognized Doris type " + dorisType)
     }
@@ -145,12 +146,39 @@ private[spark] object SchemaUtils {
    * @param tscanColumnDescs Doris BE return schema
    * @return inner schema struct
    */
-  def convertToSchema(tscanColumnDescs: Seq[TScanColumnDesc]): Schema = {
-    val schema = new Schema(tscanColumnDescs.length)
-    tscanColumnDescs.foreach(desc => schema.put(new Field(desc.getName, 
desc.getType.name, "", 0, 0, "")))
+  def convertToSchema(tscanColumnDescs: Seq[TScanColumnDesc], settings: 
Settings): Schema = {
+    val readColumns = settings.getProperty(DORIS_READ_FIELD, 
"").split(",").filter(_.nonEmpty).map(_.replaceAll("`", ""))
+    val bitmapColumns = settings.getProperty(DORIS_BITMAP_COLUMNS, 
"").split(",").filter(_.nonEmpty)
+    val hllColumns = settings.getProperty(DORIS_HLL_COLUMNS, 
"").split(",").filter(_.nonEmpty)
+    val fieldList = fieldUnion(readColumns, bitmapColumns, hllColumns, 
tscanColumnDescs)
+    val schema = new Schema(fieldList.length)
+    fieldList.foreach(schema.put)
     schema
   }
 
+  private def fieldUnion(readColumns: Array[String], bitmapColumns: 
Array[String], hllColumns: Array[String],
+                 tScanColumnDescSeq: Seq[TScanColumnDesc]): List[Field] = {
+    val fieldList = mutable.Buffer[Field]()
+    var rcIdx = 0;
+    var tsdIdx = 0;
+    while (rcIdx < readColumns.length || tsdIdx < tScanColumnDescSeq.length) {
+      if (rcIdx < readColumns.length) {
+        if (StringUtils.equals(readColumns(rcIdx), 
tScanColumnDescSeq(tsdIdx).getName)) {
+          fieldList += new Field(tScanColumnDescSeq(tsdIdx).getName, 
tScanColumnDescSeq(tsdIdx).getType.name, "", 0, 0, "")
+          rcIdx += 1
+          tsdIdx += 1
+        } else if (bitmapColumns.contains(readColumns(rcIdx)) || 
hllColumns.contains(readColumns(rcIdx))) {
+          fieldList += new Field(readColumns(rcIdx), 
TPrimitiveType.VARCHAR.name, "", 0, 0, "")
+          rcIdx += 1
+        }
+      } else {
+        fieldList += new Field(tScanColumnDescSeq(tsdIdx).getName, 
tScanColumnDescSeq(tsdIdx).getType.name, "", 0, 0, "")
+        tsdIdx += 1
+      }
+    }
+    fieldList.toList
+  }
+
   def rowColumnValue(row: SpecializedGetters, ordinal: Int, dataType: 
DataType): Any = {
 
     if (row.isNullAt(ordinal)) null
diff --git 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
index da797e2..5da7534 100644
--- 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
+++ 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
@@ -18,8 +18,10 @@
 package org.apache.doris.spark.sql
 
 import org.apache.doris.sdk.thrift.{TPrimitiveType, TScanColumnDesc}
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
 import org.apache.doris.spark.exception.DorisException
 import org.apache.doris.spark.rest.models.{Field, Schema}
+import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types._
 import org.hamcrest.core.StringStartsWith.startsWith
@@ -42,7 +44,7 @@ class TestSchemaUtils extends ExpectedExceptionTest {
     fields :+= DataTypes.createStructField("k1", DataTypes.ByteType, true)
     fields :+= DataTypes.createStructField("k5", DataTypes.LongType, true)
     val expected = DataTypes.createStructType(fields.asJava)
-    Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, "k1,k5", 
null))
+    Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, "k1,k5"))
   }
 
   @Test
@@ -68,10 +70,8 @@ class TestSchemaUtils extends ExpectedExceptionTest {
     Assert.assertEquals(DataTypes.StringType, 
SchemaUtils.getCatalystType("STRING", 0, 0))
     Assert.assertEquals(DataTypes.StringType, 
SchemaUtils.getCatalystType("JSON", 0, 0))
     Assert.assertEquals(DataTypes.StringType, 
SchemaUtils.getCatalystType("JSONB", 0, 0))
-
-    thrown.expect(classOf[DorisException])
-    thrown.expectMessage(startsWith("Unsupported type"))
-    SchemaUtils.getCatalystType("HLL", 0, 0)
+    Assert.assertEquals(DataTypes.StringType, 
SchemaUtils.getCatalystType("BITMAP", 0, 0))
+    Assert.assertEquals(DataTypes.StringType, 
SchemaUtils.getCatalystType("HLL", 0, 0))
 
     thrown.expect(classOf[DorisException])
     thrown.expectMessage(startsWith("Unrecognized Doris type"))
@@ -80,6 +80,11 @@ class TestSchemaUtils extends ExpectedExceptionTest {
 
   @Test
   def testConvertToSchema(): Unit = {
+
+    val sparkConf = new SparkConf()
+    sparkConf.set(ConfigurationOptions.DORIS_READ_FIELD, "k1,k2")
+    val settings = new SparkSettings(sparkConf)
+
     val k1 = new TScanColumnDesc
     k1.setName("k1")
     k1.setType(TPrimitiveType.BOOLEAN)
@@ -95,26 +100,7 @@ class TestSchemaUtils extends ExpectedExceptionTest {
     expected.put(ek1)
     expected.put(ek2)
 
-    Assert.assertEquals(expected, SchemaUtils.convertToSchema(Seq(k1, k2)))
-  }
-
-  @Test
-  def testIgnoreTypes(): Unit = {
-
-    val schema = new Schema
-    schema.setStatus(200)
-    val col1 = new Field("col1", "TINYINT", "", 0, 0, "")
-    val col2 = new Field("col2", "BITMAP", "", 0, 0, "")
-    val col3 = new Field("col3", "HLL", "", 0, 0, "")
-    schema.put(col1)
-    schema.put(col2)
-    schema.put(col3)
-
-    var fields = List[StructField]()
-    fields :+= DataTypes.createStructField("col1", DataTypes.ByteType, true)
-    val expected = DataTypes.createStructType(fields.asJava)
-    Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, null, 
"bitmap,hll"))
-
+    Assert.assertEquals(expected, SchemaUtils.convertToSchema(Seq(k1, k2), 
settings))
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to