This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8465cfc [fix](connector) fix flight sql read json type (#263)
8465cfc is described below
commit 8465cfc973efc820a8883ceeabd4a6cc322a78f7
Author: gnehil <[email protected]>
AuthorDate: Mon Feb 10 10:45:49 2025 +0800
[fix](connector) fix flight sql read json type (#263)
---
.../doris/spark/client/read/DorisFlightSqlReader.java | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
index 7a2d34e..faa462b 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
@@ -72,12 +72,7 @@ public class DorisFlightSqlReader extends DorisReader {
throw new DorisException("init adbc connection failed", e);
}
}
- String tableIdentifier =
config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER);
- String[] arr = tableIdentifier.split("\\.");
-
- Schema tableSchema = frontendClient.getTableSchema(arr[0], arr[1]);
- this.schema = processDorisSchema(partition, tableSchema);
- log.debug("origin flight sql read Schema: " + tableSchema + ",
processed schema: " + schema);
+ this.schema = processDorisSchema(partition);
this.arrowReader = executeQuery();
}
@@ -153,7 +148,7 @@ public class DorisFlightSqlReader extends DorisReader {
return String.format("SELECT %s FROM %s %s%s%s", columns,
fullTableName, tablets, predicates, limit);
}
- protected Schema processDorisSchema(DorisReaderPartition partition, final
Schema originSchema) throws Exception {
+ protected Schema processDorisSchema(DorisReaderPartition partition) throws
Exception {
Schema processedSchema = new Schema();
Schema tableSchema =
frontendClient.getTableSchema(partition.getDatabase(), partition.getTable());
Map<String, Field> fieldTypeMap = tableSchema.getProperties().stream()
@@ -170,7 +165,12 @@ public class DorisFlightSqlReader extends DorisReader {
newFieldList.add(new Field(realColumn,
TPrimitiveType.VARCHAR.name(), null, 0, 0, null));
}
} else {
-
newFieldList.add(fieldTypeMap.get(readColumn.trim().replaceAll("`", "")));
+ String colName = readColumn.trim().replaceAll("`", "");
+ if
("JSON".equalsIgnoreCase(fieldTypeMap.get(colName).getType())) {
+ newFieldList.add(new Field(colName,
TPrimitiveType.JSONB.name(), null, 0, 0, null));
+ } else {
+ newFieldList.add(fieldTypeMap.get(colName));
+ }
}
}
processedSchema.setProperties(newFieldList);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]