This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2533788504 [hive] Improve paimon format table conversion hive table in
hive catalog. (#4522)
2533788504 is described below
commit 253378850431ca97687847199e900d908e04e6df
Author: Kerwin <[email protected]>
AuthorDate: Mon Nov 25 13:31:25 2024 +0800
[hive] Improve paimon format table conversion hive table in hive catalog.
(#4522)
---
.../java/org/apache/paimon/table/FormatTable.java | 14 +++
.../java/org/apache/paimon/hive/HiveCatalog.java | 124 ++++++++++-----------
.../org/apache/paimon/hive/HiveCatalogTest.java | 4 +-
3 files changed, 76 insertions(+), 66 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index a53ba545c2..a4c7788c38 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.utils.SimpleFileReader;
import javax.annotation.Nullable;
import java.time.Duration;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -70,6 +71,19 @@ public interface FormatTable extends Table {
CSV
}
+ /** Parses a file format string to a corresponding {@link Format} enum
constant. */
+ static Format parseFormat(String fileFormat) {
+ try {
+ return Format.valueOf(fileFormat.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new UnsupportedOperationException(
+ "Format table unsupported file format: "
+ + fileFormat
+ + ". Supported formats: "
+ + Arrays.toString(Format.values()));
+ }
+ }
+
/** Create a new builder for {@link FormatTable}. */
static FormatTable.Builder builder() {
return new FormatTable.Builder();
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index ebd5a1edf8..0ecc78469e 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -112,7 +112,6 @@ import static
org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
/** A catalog implementation for Hive. */
@@ -122,7 +121,7 @@ public class HiveCatalog extends AbstractCatalog {
// Reserved properties
public static final String TABLE_TYPE_PROP = "table_type";
- public static final String PAIMON_TABLE_TYPE_VALUE = "paimon";
+ public static final String PAIMON_TABLE_IDENTIFIER = "PAIMON";
// we don't include paimon-hive-connector as dependencies because it
depends on
// hive-exec
@@ -766,33 +765,24 @@ public class HiveCatalog extends AbstractCatalog {
}
}
- Table table =
- newHmsTable(identifier, tblProperties,
PAIMON_TABLE_TYPE_VALUE, externalTable);
- updateHmsTable(table, identifier, tableSchema,
PAIMON_TABLE_TYPE_VALUE, location);
+ Table table = newHmsTable(identifier, tblProperties, null,
externalTable);
+ updateHmsTable(table, identifier, tableSchema, null, location);
return table;
}
private Table createHiveFormatTable(
Identifier identifier, TableSchema tableSchema, Path location,
boolean externalTable) {
- Options options = Options.fromMap(tableSchema.options());
- checkArgument(options.get(TYPE) == FORMAT_TABLE);
+ CoreOptions coreOptions = new CoreOptions(tableSchema.options());
+ checkArgument(coreOptions.type() == FORMAT_TABLE);
- String provider = tableSchema.options().get(FILE_FORMAT.key());
- checkNotNull(provider, FILE_FORMAT.key() + " should be configured.");
- // valid supported format
- FormatTable.Format.valueOf(provider.toUpperCase());
+ // file.format option has a default value and cannot be empty.
+ FormatTable.Format provider =
FormatTable.parseFormat(coreOptions.formatType());
Map<String, String> tblProperties = new HashMap<>();
Table table = newHmsTable(identifier, tblProperties, provider,
externalTable);
updateHmsTable(table, identifier, tableSchema, provider, location);
- if (FormatTable.Format.CSV.toString().equalsIgnoreCase(provider)) {
- table.getSd()
- .getSerdeInfo()
- .getParameters()
- .put(FIELD_DELIM, options.get(FIELD_DELIMITER));
- }
return table;
}
@@ -879,7 +869,8 @@ public class HiveCatalog extends AbstractCatalog {
throws TException, InterruptedException {
updateHmsTablePars(table, newSchema);
Path location = getTableLocation(identifier, table);
- updateHmsTable(table, identifier, newSchema,
newSchema.options().get("provider"), location);
+ // file format is null, because only data table support alter table.
+ updateHmsTable(table, identifier, newSchema, null, location);
clients.execute(client -> HiveAlterTableUtils.alterTable(client,
identifier, table));
}
@@ -1059,12 +1050,9 @@ public class HiveCatalog extends AbstractCatalog {
private Table newHmsTable(
Identifier identifier,
Map<String, String> tableParameters,
- String provider,
+ @Nullable FormatTable.Format provider,
boolean externalTable) {
long currentTimeMillis = System.currentTimeMillis();
- if (provider == null) {
- provider = PAIMON_TABLE_TYPE_VALUE;
- }
Table table =
new Table(
identifier.getTableName(),
@@ -1082,67 +1070,83 @@ public class HiveCatalog extends AbstractCatalog {
externalTable
? TableType.EXTERNAL_TABLE.name()
: TableType.MANAGED_TABLE.name());
- table.getParameters().put(TABLE_TYPE_PROP, provider.toUpperCase());
- if (PAIMON_TABLE_TYPE_VALUE.equalsIgnoreCase(provider)) {
+
+ if (provider == null) {
+ // normal paimon table
+ table.getParameters().put(TABLE_TYPE_PROP,
PAIMON_TABLE_IDENTIFIER);
table.getParameters()
.put(hive_metastoreConstants.META_TABLE_STORAGE,
STORAGE_HANDLER_CLASS_NAME);
} else {
- table.getParameters().put(FILE_FORMAT.key(),
provider.toLowerCase());
+ // format table
+ table.getParameters().put(TABLE_TYPE_PROP, provider.name());
+ table.getParameters().put(FILE_FORMAT.key(),
provider.name().toLowerCase());
table.getParameters().put(TYPE.key(), FORMAT_TABLE.toString());
}
+
if (externalTable) {
table.getParameters().put(HIVE_EXTERNAL_TABLE_PROP, "TRUE");
}
return table;
}
- private String getSerdeClassName(String provider) {
- if (provider == null ||
provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) {
- return SERDE_CLASS_NAME;
- } else if (provider.equalsIgnoreCase("csv")) {
- return "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
- } else if (provider.equalsIgnoreCase("parquet")) {
- return
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
- } else if (provider.equalsIgnoreCase("orc")) {
- return "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
- } else {
+ private String getSerdeClassName(@Nullable FormatTable.Format provider) {
+ if (provider == null) {
return SERDE_CLASS_NAME;
}
+ switch (provider) {
+ case CSV:
+ return "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+ case PARQUET:
+ return
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
+ case ORC:
+ return "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
+ }
+ return SERDE_CLASS_NAME;
}
- private String getInputFormatName(String provider) {
- if (provider == null ||
provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) {
- return INPUT_FORMAT_CLASS_NAME;
- } else if (provider.equalsIgnoreCase("csv")) {
- return "org.apache.hadoop.mapred.TextInputFormat";
- } else if (provider.equalsIgnoreCase("parquet")) {
- return
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
- } else if (provider.equalsIgnoreCase("orc")) {
- return "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
- } else {
+ private String getInputFormatName(@Nullable FormatTable.Format provider) {
+ if (provider == null) {
return INPUT_FORMAT_CLASS_NAME;
}
+ switch (provider) {
+ case CSV:
+ return "org.apache.hadoop.mapred.TextInputFormat";
+ case PARQUET:
+ return
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
+ case ORC:
+ return "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
+ }
+ return INPUT_FORMAT_CLASS_NAME;
}
- private String getOutputFormatClassName(String provider) {
- if (provider == null ||
provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) {
- return OUTPUT_FORMAT_CLASS_NAME;
- } else if (provider.equalsIgnoreCase("csv")) {
- return
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
- } else if (provider.equalsIgnoreCase("parquet")) {
- return
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
- } else if (provider.equalsIgnoreCase("orc")) {
- return "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
- } else {
+ private String getOutputFormatClassName(@Nullable FormatTable.Format
provider) {
+ if (provider == null) {
return OUTPUT_FORMAT_CLASS_NAME;
}
+ switch (provider) {
+ case CSV:
+ return
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
+ case PARQUET:
+ return
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
+ case ORC:
+ return "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
+ }
+ return OUTPUT_FORMAT_CLASS_NAME;
+ }
+
+ private Map<String, String> setSerDeInfoParam(@Nullable FormatTable.Format
provider) {
+ Map<String, String> param = new HashMap<>();
+ if (provider == FormatTable.Format.CSV) {
+ param.put(FIELD_DELIM, options.get(FIELD_DELIMITER));
+ }
+ return param;
}
private void updateHmsTable(
Table table,
Identifier identifier,
TableSchema schema,
- String provider,
+ @Nullable FormatTable.Format provider,
Path location) {
StorageDescriptor sd = table.getSd() != null ? table.getSd() : new
StorageDescriptor();
@@ -1206,14 +1210,6 @@ public class HiveCatalog extends AbstractCatalog {
locationHelper.specifyTableLocation(table, location.toString());
}
- private Map<String, String> setSerDeInfoParam(String provider) {
- Map<String, String> param = new HashMap<>();
- if (provider != null && provider.equalsIgnoreCase("csv")) {
- param.put(FIELD_DELIM, options.get(FIELD_DELIMITER));
- }
- return param;
- }
-
private void updateHmsTablePars(Table table, TableSchema schema) {
if (syncAllProperties()) {
table.getParameters().putAll(schema.options());
diff --git
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
index 3ba3f89e41..267bdf0c71 100644
---
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
+++
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
@@ -55,7 +55,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORECONNECTURLKEY;
-import static org.apache.paimon.hive.HiveCatalog.PAIMON_TABLE_TYPE_VALUE;
+import static org.apache.paimon.hive.HiveCatalog.PAIMON_TABLE_IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalog.TABLE_TYPE_PROP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -218,7 +218,7 @@ public class HiveCatalogTest extends CatalogTestBase {
assertThat(tableProperties).containsEntry("comment", "this is a
hive table");
assertThat(tableProperties)
.containsEntry(
- TABLE_TYPE_PROP,
PAIMON_TABLE_TYPE_VALUE.toUpperCase(Locale.ROOT));
+ TABLE_TYPE_PROP,
PAIMON_TABLE_IDENTIFIER.toUpperCase(Locale.ROOT));
} catch (Exception e) {
fail("Test failed due to exception: " + e.getMessage());
}