This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 58dee665872 HIVE-29192: Iceberg: [V3] Add support for native default 
column type during create (#6074)
58dee665872 is described below

commit 58dee6658720997f5ec668201f61fa9fc33b50bf
Author: Ayush Saxena <[email protected]>
AuthorDate: Mon Sep 22 13:27:08 2025 +0530

    HIVE-29192: Iceberg: [V3] Add support for native default column type during 
create (#6074)
---
 .../apache/iceberg/hive/HiveSchemaConverter.java   | 73 +++++++++++++---
 .../org/apache/iceberg/hive/HiveSchemaUtil.java    | 97 ++++++++++++++++++++--
 .../iceberg/hive/client/HiveRESTCatalogClient.java |  2 +-
 .../hive/client/TestHiveRESTCatalogClient.java     |  2 +-
 .../iceberg/mr/hive/BaseHiveIcebergMetaHook.java   | 12 ++-
 .../iceberg/mr/hive/HiveIcebergMetaHook.java       |  3 +-
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 10 +++
 .../mr/hive/writer/HiveIcebergRecordWriter.java    | 11 +++
 .../iceberg/mr/hive/writer/WriterBuilder.java      | 16 ++++
 .../mr/mapreduce/IcebergInternalRecordWrapper.java |  8 +-
 .../iceberg_default_column_type_mismatch.q         |  6 ++
 .../test/queries/positive/iceberg_default_column.q | 31 +++++++
 .../iceberg_default_column_type_mismatch.q.out     | 10 +++
 .../results/positive/iceberg_default_column.q.out  | 87 +++++++++++++++++++
 .../apache/hive/TestHiveRESTCatalogClientIT.java   |  6 +-
 .../change/AlterTableChangeColumnAnalyzer.java     |  5 +-
 .../ql/ddl/table/constraint/ConstraintsUtils.java  | 72 +++++++++-------
 .../ql/ddl/table/create/CreateTableAnalyzer.java   | 30 ++++---
 .../hive/ql/metadata/HiveStorageHandler.java       |  5 +-
 .../hadoop/hive/ql/parse/BaseSemanticAnalyzer.java |  8 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     |  7 ++
 .../hadoop/hive/ql/session/SessionStateUtil.java   |  1 +
 22 files changed, 419 insertions(+), 83 deletions(-)

diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaConverter.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaConverter.java
index 40ed5ee01aa..d7891fe7601 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaConverter.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaConverter.java
@@ -21,6 +21,8 @@
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -28,8 +30,12 @@
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.slf4j.Logger;
@@ -51,29 +57,56 @@ private HiveSchemaConverter(boolean autoConvert) {
     this.id = 1;
   }
 
-  static Schema convert(List<String> names, List<TypeInfo> typeInfos, 
List<String> comments, boolean autoConvert) {
+  static Schema convert(List<String> names, List<TypeInfo> typeInfos, 
List<String> comments, boolean autoConvert,
+      Map<String, String> defaultValues) {
     HiveSchemaConverter converter = new HiveSchemaConverter(autoConvert);
-    return new Schema(converter.convertInternal(names, typeInfos, comments));
+    return new Schema(converter.convertInternal(names, typeInfos, 
defaultValues, comments));
   }
 
   static Type convert(TypeInfo typeInfo, boolean autoConvert) {
     HiveSchemaConverter converter = new HiveSchemaConverter(autoConvert);
-    return converter.convertType(typeInfo);
+    return converter.convertType(typeInfo, null);
   }
 
-  List<Types.NestedField> convertInternal(List<String> names, List<TypeInfo> 
typeInfos, List<String> comments) {
+  List<Types.NestedField> convertInternal(List<String> names, List<TypeInfo> 
typeInfos,
+      Map<String, String> defaultValues, List<String> comments) {
     List<Types.NestedField> result = 
Lists.newArrayListWithExpectedSize(names.size());
     int outerId = id + names.size();
     id = outerId;
     for (int i = 0; i < names.size(); ++i) {
-      result.add(Types.NestedField.optional(outerId - names.size() + i, 
names.get(i), convertType(typeInfos.get(i)),
-          comments.isEmpty() || i >= comments.size() ? null : 
comments.get(i)));
-    }
+      Type type = convertType(typeInfos.get(i), 
defaultValues.get(names.get(i)));
+      String columnName = names.get(i);
+      Types.NestedField.Builder fieldBuilder =
+          Types.NestedField.builder()
+              .asOptional()
+              .withId(outerId - names.size() + i)
+              .withName(columnName)
+              .ofType(type)
+              .withDoc(comments.isEmpty() || i >= comments.size() ? null : 
comments.get(i));
+
+      if (defaultValues.containsKey(columnName)) {
+        if (type.isPrimitiveType()) {
+          Object icebergDefaultValue = 
getDefaultValue(defaultValues.get(columnName), type);
+          fieldBuilder.withWriteDefault(Expressions.lit(icebergDefaultValue));
+        } else if (!type.isStructType()) {
+          throw new UnsupportedOperationException(
+              "Default values for " + columnName + " of type " + type + " are 
not supported");
+        }
+      }
 
+      result.add(fieldBuilder.build());
+    }
     return result;
   }
 
-  Type convertType(TypeInfo typeInfo) {
+  private static Object getDefaultValue(String defaultValue, Type type) {
+    return switch (type.typeId()) {
+      case DATE, TIME, TIMESTAMP, TIMESTAMP_NANO -> 
Literal.of(stripQuotes(defaultValue)).to(type).value();
+      default -> Conversions.fromPartitionString(type, 
stripQuotes(defaultValue));
+    };
+  }
+
+  Type convertType(TypeInfo typeInfo, String defaultValue) {
     switch (typeInfo.getCategory()) {
       case PRIMITIVE:
         switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
@@ -129,19 +162,19 @@ Type convertType(TypeInfo typeInfo) {
         StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
         List<Types.NestedField> fields =
             convertInternal(structTypeInfo.getAllStructFieldNames(), 
structTypeInfo.getAllStructFieldTypeInfos(),
-                    Collections.emptyList());
+                getDefaultValuesMap(defaultValue), Collections.emptyList());
         return Types.StructType.of(fields);
       case MAP:
         MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
         int keyId = id++;
-        Type keyType = convertType(mapTypeInfo.getMapKeyTypeInfo());
+        Type keyType = convertType(mapTypeInfo.getMapKeyTypeInfo(), 
defaultValue);
         int valueId = id++;
-        Type valueType = convertType(mapTypeInfo.getMapValueTypeInfo());
+        Type valueType = convertType(mapTypeInfo.getMapValueTypeInfo(), 
defaultValue);
         return Types.MapType.ofOptional(keyId, valueId, keyType, valueType);
       case LIST:
         ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
         int listId = id++;
-        Type listType = convertType(listTypeInfo.getListElementTypeInfo());
+        Type listType = convertType(listTypeInfo.getListElementTypeInfo(), 
defaultValue);
         return Types.ListType.ofOptional(listId, listType);
       case VARIANT:
         return Types.VariantType.get();
@@ -149,4 +182,20 @@ Type convertType(TypeInfo typeInfo) {
         throw new IllegalArgumentException("Unknown type " + 
typeInfo.getCategory());
     }
   }
+
+  private static Map<String, String> getDefaultValuesMap(String defaultValue) {
+    if (StringUtils.isEmpty(defaultValue)) {
+      return Collections.emptyMap();
+    }
+    // For Struct, the default value is expected to be in key:value format
+    return 
Splitter.on(',').trimResults().withKeyValueSeparator(':').split(stripQuotes(defaultValue));
+  }
+
+  public static String stripQuotes(String val) {
+    if (val.charAt(0) == '\'' && val.charAt(val.length() - 1) == '\'' ||
+        val.charAt(0) == '"' && val.charAt(val.length() - 1) == '"') {
+      return val.substring(1, val.length() - 1);
+    }
+    return val;
+  }
 }
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
index 38388316652..ce563b1e55d 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
@@ -20,20 +20,25 @@
 package org.apache.iceberg.hive;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.iceberg.util.Pair;
 
 
@@ -60,18 +65,20 @@ public static List<FieldSchema> convert(Schema schema) {
    * @return An equivalent Iceberg Schema
    */
   public static Schema convert(List<FieldSchema> fieldSchemas) {
-    return convert(fieldSchemas, false);
+    return convert(fieldSchemas, Collections.emptyMap(), false);
   }
 
   /**
    * Converts a Hive schema (list of FieldSchema objects) to an Iceberg schema.
-   * @param fieldSchemas The list of the columns
-   * @param autoConvert If <code>true</code> then TINYINT and SMALLINT is 
converted to INTEGER and VARCHAR and CHAR is
-   *                    converted to STRING. Otherwise if these types are used 
in the Hive schema then exception is
-   *                    thrown.
+   *
+   * @param fieldSchemas  The list of the columns
+   * @param defaultValues Default values for columns, if any. The map is from 
column name to default value.
+   * @param autoConvert   If <code>true</code> then TINYINT and SMALLINT is 
converted to INTEGER and VARCHAR and CHAR is
+   *                      converted to STRING. Otherwise if these types are 
used in the Hive schema then exception is
+   *                      thrown.
    * @return An equivalent Iceberg Schema
    */
-  public static Schema convert(List<FieldSchema> fieldSchemas, boolean 
autoConvert) {
+  public static Schema convert(List<FieldSchema> fieldSchemas, Map<String, 
String> defaultValues, boolean autoConvert) {
     List<String> names = 
Lists.newArrayListWithExpectedSize(fieldSchemas.size());
     List<TypeInfo> typeInfos = 
Lists.newArrayListWithExpectedSize(fieldSchemas.size());
     List<String> comments = 
Lists.newArrayListWithExpectedSize(fieldSchemas.size());
@@ -81,7 +88,7 @@ public static Schema convert(List<FieldSchema> fieldSchemas, 
boolean autoConvert
       typeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(col.getType()));
       comments.add(col.getComment());
     }
-    return HiveSchemaConverter.convert(names, typeInfos, comments, 
autoConvert);
+    return HiveSchemaConverter.convert(names, typeInfos, comments, 
autoConvert, defaultValues);
   }
 
   /**
@@ -105,7 +112,7 @@ public static PartitionSpec spec(Schema schema, 
List<FieldSchema> fieldSchemas)
    * @return The Iceberg schema
    */
   public static Schema convert(List<String> names, List<TypeInfo> types, 
List<String> comments) {
-    return HiveSchemaConverter.convert(names, types, comments, false);
+    return HiveSchemaConverter.convert(names, types, comments, false, 
Collections.emptyMap());
   }
 
   /**
@@ -119,7 +126,7 @@ public static Schema convert(List<String> names, 
List<TypeInfo> types, List<Stri
    * @return The Iceberg schema
    */
   public static Schema convert(List<String> names, List<TypeInfo> types, 
List<String> comments, boolean autoConvert) {
-    return HiveSchemaConverter.convert(names, types, comments, autoConvert);
+    return HiveSchemaConverter.convert(names, types, comments, autoConvert, 
Collections.emptyMap());
   }
 
   /**
@@ -329,4 +336,76 @@ public static String convertToTypeString(Type type) {
         throw new UnsupportedOperationException(type + " is not supported");
     }
   }
+
+  public static void setDefaultValues(Record record, List<Types.NestedField> 
fields, Set<String> missingColumns) {
+    for (Types.NestedField field : fields) {
+      Object fieldValue = record.getField(field.name());
+
+      if (fieldValue == null) {
+        boolean isMissing = missingColumns.contains(field.name());
+
+        if (isMissing) {
+          if (field.type().isStructType()) {
+            // Create struct and apply defaults to all nested fields
+            Record nestedRecord = 
GenericRecord.create(field.type().asStructType());
+            record.setField(field.name(), nestedRecord);
+            // For nested fields, we consider ALL fields as "missing" to apply 
defaults
+            setDefaultValuesForNestedStruct(nestedRecord, 
field.type().asStructType().fields());
+          } else if (field.writeDefault() != null) {
+            Object defaultValue = convertToWriteType(field.writeDefault(), 
field.type());
+            record.setField(field.name(), defaultValue);
+          }
+        }
+        // Explicit NULLs remain NULL
+      } else if (field.type().isStructType() && fieldValue instanceof Record) {
+        // For existing structs, apply defaults to any null nested fields
+        setDefaultValuesForNestedStruct((Record) fieldValue, 
field.type().asStructType().fields());
+      }
+    }
+  }
+
+  // Special method for nested structs that always applies defaults to null 
fields
+  private static void setDefaultValuesForNestedStruct(Record record, 
List<Types.NestedField> fields) {
+    for (Types.NestedField field : fields) {
+      Object fieldValue = record.getField(field.name());
+
+      if (fieldValue == null && field.writeDefault() != null) {
+        // Always apply default to null fields in nested structs
+        Object defaultValue = convertToWriteType(field.writeDefault(), 
field.type());
+        record.setField(field.name(), defaultValue);
+      } else if (field.type().isStructType() && fieldValue instanceof Record) {
+        // Recursively process nested structs
+        setDefaultValuesForNestedStruct((Record) fieldValue, 
field.type().asStructType().fields());
+      }
+    }
+  }
+
+  public static Object convertToWriteType(Object value, Type type) {
+    if (value == null) {
+      return null;
+    }
+
+    switch (type.typeId()) {
+      case DATE:
+        // Convert days since epoch (Integer) to LocalDate
+        if (value instanceof Integer) {
+          return DateTimeUtil.dateFromDays((Integer) value);
+        }
+        break;
+      case TIMESTAMP:
+        // Convert microseconds since epoch (Long) to LocalDateTime
+        if (value instanceof Long) {
+          Types.TimestampType timestampType = (Types.TimestampType) type;
+          return timestampType.shouldAdjustToUTC() ?
+              DateTimeUtil.timestamptzFromMicros((Long) value) :
+              DateTimeUtil.timestampFromMicros((Long) value);
+        }
+        break;
+      default:
+        // For other types, no conversion needed
+        return value;
+    }
+
+    return value; // fallback
+  }
 }
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java
index c1ffaa4a666..64ab42dc9b6 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java
@@ -195,7 +195,7 @@ public void createTable(CreateTableRequest request) throws 
TException {
       cols.addAll(table.getPartitionKeys());
     }
     Properties catalogProperties = CatalogUtils.getCatalogProperties(table);
-    Schema schema = HiveSchemaUtil.convert(cols, true);
+    Schema schema = HiveSchemaUtil.convert(cols, Collections.emptyMap(), true);
     Map<String, String> envCtxProps = 
Optional.ofNullable(request.getEnvContext())
         .map(EnvironmentContext::getProperties)
         .orElse(Collections.emptyMap());
diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/client/TestHiveRESTCatalogClient.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/client/TestHiveRESTCatalogClient.java
index f42e7e3775e..1ae7e742774 100644
--- 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/client/TestHiveRESTCatalogClient.java
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/client/TestHiveRESTCatalogClient.java
@@ -168,7 +168,7 @@ public void testCreatePartitionedTable() throws TException {
     table.setSd(new StorageDescriptor());
     table.getSd().setCols(cols);
 
-    Schema schema = HiveSchemaUtil.convert(cols, false);
+    Schema schema = HiveSchemaUtil.convert(cols, Collections.emptyMap(), 
false);
     PartitionSpec spec = 
PartitionSpec.builderFor(schema).identity("city").build();
     String specString = PartitionSpecParser.toJson(spec);
 
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
index e160deebf41..58fd7a7bd58 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
@@ -21,6 +21,7 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -28,12 +29,14 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
@@ -161,7 +164,7 @@ public void preCreateTable(CreateTableRequest request) {
             
primaryKeys.stream().map(SQLPrimaryKey::getColumn_name).collect(Collectors.toSet()))
         .orElse(Collections.emptySet());
 
-    Schema schema = schema(catalogProperties, hmsTable, identifierFields);
+    Schema schema = schema(catalogProperties, hmsTable, identifierFields, 
request.getDefaultConstraints());
     PartitionSpec spec = spec(conf, schema, hmsTable);
 
     // If there are partition keys specified remove them from the HMS table 
and add them to the column list
@@ -303,7 +306,10 @@ protected void 
setCommonHmsTablePropertiesForIceberg(org.apache.hadoop.hive.meta
   }
 
   protected Schema schema(Properties properties, 
org.apache.hadoop.hive.metastore.api.Table hmsTable,
-                        Set<String> identifierFields) {
+      Set<String> identifierFields, List<SQLDefaultConstraint> 
sqlDefaultConstraints) {
+
+    Map<String, String> defaultValues = 
Stream.ofNullable(sqlDefaultConstraints).flatMap(Collection::stream)
+        .collect(Collectors.toMap(SQLDefaultConstraint::getColumn_name, 
SQLDefaultConstraint::getDefault_value));
     boolean autoConversion = 
conf.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false);
 
     if (properties.getProperty(InputFormatConfig.TABLE_SCHEMA) != null) {
@@ -313,7 +319,7 @@ protected Schema schema(Properties properties, 
org.apache.hadoop.hive.metastore.
     if (hmsTable.isSetPartitionKeys() && 
!hmsTable.getPartitionKeys().isEmpty()) {
       cols.addAll(hmsTable.getPartitionKeys());
     }
-    Schema schema = HiveSchemaUtil.convert(cols, autoConversion);
+    Schema schema = HiveSchemaUtil.convert(cols, defaultValues, 
autoConversion);
 
     return getSchemaWithIdentifierFields(schema, identifierFields);
   }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index aec7ab7b03b..6c1a696f5e2 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -310,7 +310,8 @@ private void 
doPreAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable
       preAlterTableProperties = new PreAlterTableProperties();
       preAlterTableProperties.tableLocation = sd.getLocation();
       preAlterTableProperties.format = sd.getInputFormat();
-      preAlterTableProperties.schema = schema(catalogProperties, hmsTable, 
Collections.emptySet());
+      preAlterTableProperties.schema =
+          schema(catalogProperties, hmsTable, Collections.emptySet(), 
Collections.emptyList());
       preAlterTableProperties.partitionKeys = hmsTable.getPartitionKeys();
 
       context.getProperties().put(HiveMetaHook.ALLOW_PARTITION_KEY_CHANGE, 
"true");
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 644e201c285..6dc92ff411e 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -28,6 +28,7 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
@@ -303,6 +304,9 @@ public void configureOutputJobProperties(TableDesc 
tableDesc, Map<String, String
     // table-level and skipped only for output tables in HiveIcebergSerde. 
Properties from the map will be present in
     // the serde config for all tables in the query, not just the output 
tables, so we can't rely on that in the serde.
     tableDesc.getProperties().put(InputFormatConfig.OPERATION_TYPE_PREFIX + 
tableDesc.getTableName(), opType);
+    SessionStateUtil.getResource(conf, SessionStateUtil.MISSING_COLUMNS)
+        .ifPresent(cols -> map.put(SessionStateUtil.MISSING_COLUMNS, 
String.join(",", (HashSet<String>) cols)));
+
   }
 
   /**
@@ -2206,6 +2210,12 @@ public boolean 
hasUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata
     return IcebergTableUtil.hasUndergonePartitionEvolution(table);
   }
 
+  @Override
+  public boolean supportsDefaultColumnValues(Map<String, String> tblProps) {
+    return IcebergTableUtil.formatVersion(tblProps) >= 3;
+  }
+
+
   private static List<FieldSchema> schema(List<VirtualColumn> exprs) {
     return exprs.stream().map(v ->
         new FieldSchema(v.getName(), v.getTypeInfo().getTypeName(), ""))
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
index 0f1989a3ac0..9adfd15dc20 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
@@ -21,33 +21,44 @@
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 import org.apache.hadoop.io.Writable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.io.DataWriteResult;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.mr.hive.FilesForCommit;
 import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context;
 import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.types.Types;
 
 class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
 
   private final int currentSpecId;
+  private final Set<String> missingColumns;
+  private final List<Types.NestedField> missingOrStructFields;
 
   HiveIcebergRecordWriter(Table table, HiveFileWriterFactory fileWriterFactory,
       OutputFileFactory dataFileFactory, Context context) {
     super(table, newDataWriter(table, fileWriterFactory, dataFileFactory, 
context));
 
     this.currentSpecId = table.spec().specId();
+    this.missingColumns = context.missingColumns();
+    this.missingOrStructFields = 
specs.get(currentSpecId).schema().asStruct().fields().stream()
+        .filter(field -> missingColumns.contains(field.name()) || 
field.type().isStructType()).toList();
   }
 
   @Override
   public void write(Writable row) throws IOException {
     Record record = ((Container<Record>) row).get();
+    HiveSchemaUtil.setDefaultValues(record, missingOrStructFields, 
missingColumns);
+
     writer.write(record, specs.get(currentSpecId), partition(record, 
currentSpecId));
   }
 
+
   @Override
   public FilesForCommit files() {
     List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
index 5c3851f0412..807dad41328 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
@@ -21,14 +21,20 @@
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.hadoop.hive.ql.Context.Operation;
 import 
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.session.SessionStateUtil;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.iceberg.BatchScan;
 import org.apache.iceberg.DeleteFile;
@@ -47,6 +53,7 @@
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.hive.IcebergTableUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.DeleteFileSet;
 import org.apache.iceberg.util.PropertyUtil;
@@ -214,6 +221,7 @@ static class Context {
     private final boolean isMergeTask;
     private final boolean skipRowData;
     private final boolean useDVs;
+    private final Set<String> missingColumns;
 
     Context(Map<String, String> properties, UnaryOperator<String> ops, String 
tableName) {
       String dataFileFormatName =
@@ -239,6 +247,10 @@ static class Context {
       this.skipRowData = useDVs ||
           PropertyUtil.propertyAsBoolean(properties,
             ICEBERG_DELETE_SKIPROWDATA, ICEBERG_DELETE_SKIPROWDATA_DEFAULT);
+
+      this.missingColumns = 
Optional.ofNullable(ops.apply(SessionStateUtil.MISSING_COLUMNS))
+          .map(columns -> 
Arrays.stream(columns.split(",")).collect(Collectors.toCollection(HashSet::new)))
+          .orElse(Sets.newHashSet());
     }
 
     FileFormat dataFileFormat() {
@@ -280,5 +292,9 @@ boolean skipRowData() {
     public boolean useDVs() {
       return useDVs;
     }
+
+    public Set<String> missingColumns() {
+      return missingColumns;
+    }
   }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInternalRecordWrapper.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInternalRecordWrapper.java
index bf03c917383..a34ad168f18 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInternalRecordWrapper.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInternalRecordWrapper.java
@@ -27,11 +27,11 @@
 import java.util.stream.Collectors;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.StructType;
-import org.apache.iceberg.util.DateTimeUtil;
 
 public class IcebergInternalRecordWrapper implements Record, StructLike {
 
@@ -137,11 +137,9 @@ private Map<String, Integer> 
buildFieldPositionMap(StructType schema) {
   private static Function<Object, Object> converter(Type type) {
     switch (type.typeId()) {
       case TIMESTAMP:
-        return timestamp -> ((Types.TimestampType) type).shouldAdjustToUTC() ?
-            DateTimeUtil.timestamptzFromMicros((Long) timestamp) :
-            DateTimeUtil.timestampFromMicros((Long) timestamp);
+        return timestamp -> HiveSchemaUtil.convertToWriteType(timestamp, type);
       case DATE:
-        return date -> DateTimeUtil.dateFromDays((Integer) date);
+        return date -> HiveSchemaUtil.convertToWriteType(date, type);
       case STRUCT:
         IcebergInternalRecordWrapper wrapper =
             new IcebergInternalRecordWrapper(type.asStructType());
diff --git 
a/iceberg/iceberg-handler/src/test/queries/negative/iceberg_default_column_type_mismatch.q
 
b/iceberg/iceberg-handler/src/test/queries/negative/iceberg_default_column_type_mismatch.q
new file mode 100644
index 00000000000..24ec3e5d5a5
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/queries/negative/iceberg_default_column_type_mismatch.q
@@ -0,0 +1,6 @@
+CREATE TABLE t3 (
+  id INT,
+  age INT DEFAULT 'twenty five'
+)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='3');
\ No newline at end of file
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_default_column.q 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_default_column.q
new file mode 100644
index 00000000000..66e1b87d559
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_default_column.q
@@ -0,0 +1,31 @@
+CREATE TABLE t3 (
+  id INT,
+  point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+  name STRING DEFAULT 'unknown',
+  age INT DEFAULT 25,
+  salary DOUBLE DEFAULT 50000.0,
+  is_active BOOLEAN DEFAULT TRUE,
+  created_date DATE DEFAULT '2024-01-01',
+  created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+  score DECIMAL(5,2) DEFAULT 100.00,
+  category STRING DEFAULT 'general'
+)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='3');
+
+-- Case 1: Partial struct with explicit null field
+INSERT INTO t3 (id, point) VALUES (2, named_struct('x', CAST(null AS INT), 
'y', 7));
+
+-- Case 2: Only ID specified (all defaults should apply)
+INSERT INTO t3 (id) VALUES (3);
+
+-- Case 3: Explicit NULL for a primitive field (should remain NULL, not get 
default)
+INSERT INTO t3 (id, name) VALUES (4, NULL);
+
+-- Case 4: Mixed scenario - some fields provided, some missing
+INSERT INTO t3 (id, name, age) VALUES (5, 'custom_name', 30);
+
+-- Case 5: Complex struct with nested nulls
+INSERT INTO t3 (id, point) VALUES (6, named_struct('x', CAST(null AS INT), 
'y', CAST(null AS INT)));
+
+SELECT * FROM t3 ORDER BY id;
\ No newline at end of file
diff --git 
a/iceberg/iceberg-handler/src/test/results/negative/iceberg_default_column_type_mismatch.q.out
 
b/iceberg/iceberg-handler/src/test/results/negative/iceberg_default_column_type_mismatch.q.out
new file mode 100644
index 00000000000..7655755e1a9
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/results/negative/iceberg_default_column_type_mismatch.q.out
@@ -0,0 +1,10 @@
+PREHOOK: query: CREATE TABLE t3 (
+  id INT,
+  age INT DEFAULT 'twenty five'
+)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='3')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@t3
+FAILED: Execution Error, return code 40000 from 
org.apache.hadoop.hive.ql.ddl.DDLTask. java.lang.NumberFormatException: For 
input string: "twenty five"
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_default_column.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_default_column.q.out
new file mode 100644
index 00000000000..856ddf4f823
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_default_column.q.out
@@ -0,0 +1,87 @@
+PREHOOK: query: CREATE TABLE t3 (
+  id INT,
+  point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+  name STRING DEFAULT 'unknown',
+  age INT DEFAULT 25,
+  salary DOUBLE DEFAULT 50000.0,
+  is_active BOOLEAN DEFAULT TRUE,
+  created_date DATE DEFAULT '2024-01-01',
+  created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+  score DECIMAL(5,2) DEFAULT 100.00,
+  category STRING DEFAULT 'general'
+)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='3')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@t3
+POSTHOOK: query: CREATE TABLE t3 (
+  id INT,
+  point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+  name STRING DEFAULT 'unknown',
+  age INT DEFAULT 25,
+  salary DOUBLE DEFAULT 50000.0,
+  is_active BOOLEAN DEFAULT TRUE,
+  created_date DATE DEFAULT '2024-01-01',
+  created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+  score DECIMAL(5,2) DEFAULT 100.00,
+  category STRING DEFAULT 'general'
+)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='3')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@t3
+PREHOOK: query: INSERT INTO t3 (id, point) VALUES (2, named_struct('x', 
CAST(null AS INT), 'y', 7))
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@t3
+POSTHOOK: query: INSERT INTO t3 (id, point) VALUES (2, named_struct('x', 
CAST(null AS INT), 'y', 7))
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@t3
+PREHOOK: query: INSERT INTO t3 (id) VALUES (3)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@t3
+POSTHOOK: query: INSERT INTO t3 (id) VALUES (3)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@t3
+PREHOOK: query: INSERT INTO t3 (id, name) VALUES (4, NULL)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@t3
+POSTHOOK: query: INSERT INTO t3 (id, name) VALUES (4, NULL)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@t3
+PREHOOK: query: INSERT INTO t3 (id, name, age) VALUES (5, 'custom_name', 30)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@t3
+POSTHOOK: query: INSERT INTO t3 (id, name, age) VALUES (5, 'custom_name', 30)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@t3
+PREHOOK: query: INSERT INTO t3 (id, point) VALUES (6, named_struct('x', 
CAST(null AS INT), 'y', CAST(null AS INT)))
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@t3
+POSTHOOK: query: INSERT INTO t3 (id, point) VALUES (6, named_struct('x', 
CAST(null AS INT), 'y', CAST(null AS INT)))
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@t3
+PREHOOK: query: SELECT * FROM t3 ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t3
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM t3 ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t3
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+2      {"x":100,"y":7} unknown 25      50000.0 true    2024-01-01      
2024-01-01 10:00:00     100.00  general
+3      {"x":100,"y":99}        unknown 25      50000.0 true    2024-01-01      
2024-01-01 10:00:00     100.00  general
+4      {"x":100,"y":99}        NULL    25      50000.0 true    2024-01-01      
2024-01-01 10:00:00     100.00  general
+5      {"x":100,"y":99}        custom_name     30      50000.0 true    
2024-01-01      2024-01-01 10:00:00     100.00  general
+6      {"x":100,"y":99}        unknown 25      50000.0 true    2024-01-01      
2024-01-01 10:00:00     100.00  general
diff --git 
a/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientIT.java
 
b/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientIT.java
index b6386e0ec42..db5329c6431 100644
--- 
a/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientIT.java
+++ 
b/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientIT.java
@@ -37,9 +37,7 @@
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
-import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.PartitionSpecParser;
@@ -54,6 +52,8 @@
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.Collections;
 import java.util.Map;
 
 import java.util.Arrays;
@@ -214,7 +214,7 @@ private static Table 
createPartitionedTable(IMetaStoreClient db, String catName,
     sd.getSerdeInfo().setParameters(new java.util.HashMap<>());
     table.setSd(sd);
     
-    Schema schema = HiveSchemaUtil.convert(cols, false);
+    Schema schema = HiveSchemaUtil.convert(cols, Collections.emptyMap(), 
false);
     PartitionSpec spec = 
PartitionSpec.builderFor(schema).identity("city").build();
     String specString = PartitionSpecParser.toJson(spec);
     table.setParameters(new java.util.HashMap<>());
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/column/change/AlterTableChangeColumnAnalyzer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/column/change/AlterTableChangeColumnAnalyzer.java
index a8180758bb7..cf017895f44 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/column/change/AlterTableChangeColumnAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/column/change/AlterTableChangeColumnAnalyzer.java
@@ -129,8 +129,9 @@ private Constraints getConstraints(TableName tableName, 
ASTNode command, String
         break;
       case HiveParser.TOK_DEFAULT_VALUE:
         defaultConstraints = new ArrayList<>();
-        ConstraintsUtils.processDefaultConstraints(tableName, constraintChild, 
ImmutableList.of(newColumnName),
-            defaultConstraints, (ASTNode) command.getChild(2), 
this.ctx.getTokenRewriteStream());
+        ConstraintsUtils.constraintInfosToDefaultConstraints(tableName,
+            ConstraintsUtils.processDefaultConstraints(constraintChild, 
ImmutableList.of(newColumnName),
+                (ASTNode) command.getChild(2), 
this.ctx.getTokenRewriteStream()), defaultConstraints, false);
         break;
       case HiveParser.TOK_NOT_NULL:
         notNullConstraints = new ArrayList<>();
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/constraint/ConstraintsUtils.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/constraint/ConstraintsUtils.java
index a405d920cdb..0d6950b7f62 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/constraint/ConstraintsUtils.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/constraint/ConstraintsUtils.java
@@ -54,6 +54,7 @@
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.tuple.Pair;
 
 /**
  * Utilities for constraints.
@@ -66,22 +67,26 @@ private ConstraintsUtils() {
     throw new UnsupportedOperationException("ConstraintsUtils should not be 
instantiated!");
   }
 
-  private static class ConstraintInfo {
+  public static class ConstraintInfo {
     final String colName;
     final String constraintName;
     final boolean enable;
     final boolean validate;
     final boolean rely;
     final String defaultValue;
+    final TypeInfo columnType;
+    final TypeInfo defaultValueType;
 
-    ConstraintInfo(String colName, String constraintName, boolean enable, 
boolean validate, boolean rely,
-        String defaultValue) {
+    ConstraintInfo(String colName, TypeInfo columnType, String constraintName, 
boolean enable, boolean validate,
+        boolean rely, String defaultValue, TypeInfo defaultValueType) {
       this.colName = colName;
       this.constraintName = constraintName;
       this.enable = enable;
       this.validate = validate;
       this.rely = rely;
       this.defaultValue = defaultValue;
+      this.columnType = columnType;
+      this.defaultValueType = defaultValueType;
     }
   }
 
@@ -146,19 +151,21 @@ private static void 
constraintInfosToCheckConstraints(TableName tableName, List<
     }
   }
 
-  public static void processDefaultConstraints(TableName tableName, ASTNode 
child, List<String> columnNames,
-      List<SQLDefaultConstraint> defaultConstraints, final ASTNode typeChild, 
TokenRewriteStream tokenRewriteStream)
-          throws SemanticException {
-    List<ConstraintInfo> defaultInfos = generateConstraintInfos(child, 
columnNames, typeChild, tokenRewriteStream);
-    constraintInfosToDefaultConstraints(tableName, defaultInfos, 
defaultConstraints);
+  public static List<ConstraintInfo> processDefaultConstraints(ASTNode child,
+      List<String> columnNames, final ASTNode typeChild, TokenRewriteStream 
tokenRewriteStream)
+      throws SemanticException {
+    return generateConstraintInfos(child, columnNames, typeChild, 
tokenRewriteStream);
   }
 
-  private static void constraintInfosToDefaultConstraints(TableName tableName, 
List<ConstraintInfo> defaultInfos,
-      List<SQLDefaultConstraint> defaultConstraints) {
+  public static void constraintInfosToDefaultConstraints(TableName tableName, 
List<ConstraintInfo> defaultInfos,
+      List<SQLDefaultConstraint> defaultConstraints, boolean 
isNativeColumnDefaultSupported) throws SemanticException {
     for (ConstraintInfo defaultInfo : defaultInfos) {
-      defaultConstraints.add(new SQLDefaultConstraint(tableName.getCat(), 
tableName.getDb(), tableName.getTable(),
-          defaultInfo.colName, defaultInfo.defaultValue, 
defaultInfo.constraintName, defaultInfo.enable,
-          defaultInfo.validate, defaultInfo.rely));
+      ConstraintsUtils.validateDefaultValueType(defaultInfo.columnType, 
defaultInfo.defaultValueType,
+          defaultInfo.defaultValue, isNativeColumnDefaultSupported);
+      defaultConstraints.add(
+          new SQLDefaultConstraint(tableName.getCat(), tableName.getDb(), 
tableName.getTable(), defaultInfo.colName,
+              defaultInfo.defaultValue, defaultInfo.constraintName, 
defaultInfo.enable, defaultInfo.validate,
+              defaultInfo.rely));
     }
   }
 
@@ -215,6 +222,8 @@ private static List<ConstraintInfo> 
generateConstraintInfos(ASTNode child, List<
     boolean enable = true;
     boolean validate = false;
     boolean rely = true;
+    TypeInfo colTypeInfo = null;
+    TypeInfo defaultValueType = null;
     String checkOrDefaultValue = null;
     int childType = child.getToken().getType();
     for (int i = 0; i < child.getChildCount(); i++) {
@@ -243,7 +252,10 @@ private static List<ConstraintInfo> 
generateConstraintInfos(ASTNode child, List<
         rely = false;
       } else if (childType == HiveParser.TOK_DEFAULT_VALUE) {
         // try to get default value only if this is DEFAULT constraint
-        checkOrDefaultValue = getDefaultValue(grandChild, typeChildForDefault, 
tokenRewriteStream);
+        colTypeInfo = 
TypeInfoUtils.getTypeInfoFromTypeString(BaseSemanticAnalyzer.getTypeStringFromAST(typeChildForDefault));
+        Pair<TypeInfo, String> defaultValueTypeAndValue = 
getDefaultValueAndType(grandChild, tokenRewriteStream);
+        defaultValueType = defaultValueTypeAndValue.getKey();
+        checkOrDefaultValue = defaultValueTypeAndValue.getValue();
       } else if (childType == HiveParser.TOK_CHECK_CONSTRAINT) {
         checkOrDefaultValue = HiveUtils.getSqlTextWithQuotedIdentifiers(
                 grandChild, tokenRewriteStream, CHECK_CONSTRAINT_PROGRAM);
@@ -275,11 +287,14 @@ private static List<ConstraintInfo> 
generateConstraintInfos(ASTNode child, List<
 
     List<ConstraintInfo> constraintInfos = new ArrayList<>();
     if (columnNames == null) {
-      constraintInfos.add(new ConstraintInfo(null, constraintName, enable, 
validate, rely, checkOrDefaultValue));
+      constraintInfos.add(
+          new ConstraintInfo(null, colTypeInfo, constraintName, enable, 
validate, rely, checkOrDefaultValue,
+              defaultValueType));
     } else {
       for (String columnName : columnNames) {
-        constraintInfos.add(new ConstraintInfo(columnName, constraintName, 
enable, validate, rely,
-            checkOrDefaultValue));
+        constraintInfos.add(
+            new ConstraintInfo(columnName, colTypeInfo, constraintName, 
enable, validate, rely, checkOrDefaultValue,
+                defaultValueType));
       }
     }
 
@@ -290,10 +305,11 @@ private static List<ConstraintInfo> 
generateConstraintInfos(ASTNode child, List<
 
   /**
    * Validate and get the default value from the AST.
+   *
    * @param node AST node corresponding to default value
    * @return retrieve the default value and return it as string
    */
-  private static String getDefaultValue(ASTNode node, ASTNode typeChild, 
TokenRewriteStream tokenStream)
+  private static Pair<TypeInfo, String> getDefaultValueAndType(ASTNode node, 
TokenRewriteStream tokenStream)
       throws SemanticException{
     // first create expression from defaultValueAST
     TypeCheckCtx typeCheckCtx = new TypeCheckCtx(null);
@@ -311,24 +327,22 @@ private static String getDefaultValue(ASTNode node, 
ASTNode typeChild, TokenRewr
           " .Maximum character length allowed is " + DEFAULT_MAX_LEN +" ."));
     }
 
-    // Make sure the default value expression type is exactly same as column's 
type.
-    TypeInfo defaultValTypeInfo = defaultValExpr.getTypeInfo();
-    TypeInfo colTypeInfo =
-        
TypeInfoUtils.getTypeInfoFromTypeString(BaseSemanticAnalyzer.getTypeStringFromAST(typeChild));
-    if (!defaultValTypeInfo.equals(colTypeInfo)) {
-      throw new SemanticException(ErrorMsg.INVALID_CSTR_SYNTAX.getMsg("Invalid 
type: " +
-          defaultValTypeInfo.getTypeName() + " for default value: " + 
defaultValueText + ". Please make sure that " +
-          "the type is compatible with column type: " + 
colTypeInfo.getTypeName()));
-    }
-
     // throw an error if default value isn't what hive allows
     if (!isDefaultValueAllowed(defaultValExpr)) {
       throw new SemanticException(ErrorMsg.INVALID_CSTR_SYNTAX.getMsg("Invalid 
Default value: " + defaultValueText +
           ". DEFAULT only allows constant or function expressions"));
     }
-    return defaultValueText;
+    return Pair.of(defaultValExpr.getTypeInfo(), defaultValueText);
   }
 
+  public static void validateDefaultValueType(TypeInfo colTypeInfo, TypeInfo 
defaultValTypeInfo,
+      String defaultValueText, boolean isNativeColumnDefaultSupported) throws 
SemanticException {
+    if (!defaultValTypeInfo.equals(colTypeInfo) && 
!isNativeColumnDefaultSupported) {
+      throw new SemanticException(ErrorMsg.INVALID_CSTR_SYNTAX.getMsg(
+          "Invalid type: " + defaultValTypeInfo.getTypeName() + " for default 
value: " + defaultValueText
+              + ". Please make sure that " + "the type is compatible with 
column type: " + colTypeInfo.getTypeName()));
+    }
+  }
 
   private static boolean isDefaultValueAllowed(ExprNodeDesc defaultValExpr) {
     while (FunctionRegistry.isOpCast(defaultValExpr)) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableAnalyzer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableAnalyzer.java
index b4c7f4bec38..37858b8af0c 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableAnalyzer.java
@@ -74,7 +74,6 @@
 import org.apache.hadoop.hive.ql.parse.QB;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.StorageFormat;
-import org.apache.hadoop.hive.ql.parse.TableMask;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
@@ -337,6 +336,7 @@ ASTNode analyzeCreateTable(ASTNode ast, QB qb, 
PlannerContext plannerCtx)
     List<SQLUniqueConstraint> uniqueConstraints = new ArrayList<>();
     List<SQLNotNullConstraint> notNullConstraints = new ArrayList<>();
     List<SQLDefaultConstraint> defaultConstraints = new ArrayList<>();
+    List<ConstraintsUtils.ConstraintInfo> defaultConstraintInfo = new 
ArrayList<>();
     List<SQLCheckConstraint> checkConstraints = new ArrayList<>();
     List<Order> sortCols = new ArrayList<Order>();
     int numBuckets = -1;
@@ -461,14 +461,14 @@ ASTNode analyzeCreateTable(ASTNode ast, QB qb, 
PlannerContext plannerCtx)
           break;
         case HiveParser.TOK_TABCOLLIST:
           cols = getColumns(child, true, ctx.getTokenRewriteStream(), 
primaryKeys, foreignKeys, uniqueConstraints,
-              notNullConstraints, defaultConstraints, checkConstraints, conf);
+              notNullConstraints, defaultConstraintInfo, checkConstraints, 
conf);
           break;
         case HiveParser.TOK_TABLECOMMENT:
           comment = unescapeSQLString(child.getChild(0).getText());
           break;
         case HiveParser.TOK_TABLEPARTCOLS:
           partCols = getColumns(child, false, ctx.getTokenRewriteStream(), 
primaryKeys, foreignKeys, uniqueConstraints,
-              notNullConstraints, defaultConstraints, checkConstraints, conf);
+              notNullConstraints, defaultConstraintInfo, checkConstraints, 
conf);
           if (hasConstraints(partCols, defaultConstraints, notNullConstraints, 
checkConstraints)) {
             //TODO: these constraints should be supported for partition columns
             throw new SemanticException(ErrorMsg.INVALID_CSTR_SYNTAX.getMsg(
@@ -544,15 +544,6 @@ ASTNode analyzeCreateTable(ASTNode ast, QB qb, 
PlannerContext plannerCtx)
       throw new SemanticException("Unrecognized command.");
     }
 
-    if (isExt && 
ConstraintsUtils.hasEnabledOrValidatedConstraints(notNullConstraints, 
defaultConstraints,
-        checkConstraints)) {
-      throw new SemanticException(ErrorMsg.INVALID_CSTR_SYNTAX.getMsg(
-          "Constraints are disallowed with External tables. " + "Only RELY is 
allowed."));
-    }
-    if (checkConstraints != null && !checkConstraints.isEmpty()) {
-      ConstraintsUtils.validateCheckConstraint(cols, checkConstraints, 
ctx.getConf());
-    }
-
     storageFormat.fillDefaultStorageFormat(isExt, false);
 
     // check for existence of table
@@ -648,15 +639,30 @@ ASTNode analyzeCreateTable(ASTNode ast, QB qb, 
PlannerContext plannerCtx)
         } else {
           tblLocation = getDefaultLocation(qualifiedTabName.getDb(), 
qualifiedTabName.getTable(), isExt);
         }
+        boolean isNativeColumnDefaultSupported = false;
         try {
           HiveStorageHandler storageHandler = 
HiveUtils.getStorageHandler(conf, storageFormat.getStorageHandler());
           if (storageHandler != null) {
             storageHandler.addResourcesForCreateTable(tblProps, conf);
+            isNativeColumnDefaultSupported = 
storageHandler.supportsDefaultColumnValues(tblProps);
           }
         } catch (HiveException e) {
           throw new RuntimeException(e);
         }
+
+        ConstraintsUtils.constraintInfosToDefaultConstraints(qualifiedTabName, 
defaultConstraintInfo,
+            crtTblDesc.getDefaultConstraints(), 
isNativeColumnDefaultSupported);
         SessionStateUtil.addResourceOrThrow(conf, META_TABLE_LOCATION, 
tblLocation);
+
+        if (isExt && 
ConstraintsUtils.hasEnabledOrValidatedConstraints(notNullConstraints, 
crtTblDesc.getDefaultConstraints(),
+            checkConstraints)) {
+          throw new SemanticException(ErrorMsg.INVALID_CSTR_SYNTAX.getMsg(
+              "Constraints are disallowed with External tables. " + "Only RELY 
is allowed."));
+        }
+
+        if (checkConstraints != null && !checkConstraints.isEmpty()) {
+          ConstraintsUtils.validateCheckConstraint(cols, checkConstraints, 
ctx.getConf());
+        }
         break;
       case CTT: // CREATE TRANSACTIONAL TABLE
         if (isExt && !isDefaultTableTypeChanged) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index dda0873b6a2..39358ca3d59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -76,7 +76,6 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.TaskAttemptContext;
 
 import java.util.HashMap;
 import java.util.List;
@@ -1020,4 +1019,8 @@ default MergeTaskProperties 
getMergeTaskProperties(Properties properties) {
   default void setMergeTaskDeleteProperties(TableDesc tableDesc) {
     throw new UnsupportedOperationException("Storage handler does not support 
getting custom delete merge schema.");
   }
+
+  default boolean supportsDefaultColumnValues(Map<String, String> tblProps) {
+    return false;
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index a23502b34bd..f0850a27be6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -53,7 +53,6 @@
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
-import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
@@ -866,7 +865,7 @@ public static List<FieldSchema> getColumns(
       ASTNode ast, boolean lowerCase, TokenRewriteStream tokenRewriteStream,
       List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
       List<SQLUniqueConstraint> uniqueConstraints, List<SQLNotNullConstraint> 
notNullConstraints,
-      List<SQLDefaultConstraint> defaultConstraints, List<SQLCheckConstraint> 
checkConstraints,
+      List<ConstraintsUtils.ConstraintInfo> defaultConstraints, 
List<SQLCheckConstraint> checkConstraints,
       Configuration conf) throws SemanticException {
     List<FieldSchema> colList = new ArrayList<>();
     Tree parent = ast.getParent();
@@ -943,8 +942,9 @@ public static List<FieldSchema> getColumns(
                   checkConstraints, typeChild, tokenRewriteStream);
               break;
             case HiveParser.TOK_DEFAULT_VALUE:
-              ConstraintsUtils.processDefaultConstraints(tName, 
constraintChild, ImmutableList.of(col.getName()),
-                  defaultConstraints, typeChild, tokenRewriteStream);
+              defaultConstraints.addAll(
+                  ConstraintsUtils.processDefaultConstraints(constraintChild, 
ImmutableList.of(col.getName()),
+                      typeChild, tokenRewriteStream));
               break;
             case HiveParser.TOK_NOT_NULL:
               ConstraintsUtils.processNotNullConstraints(tName, 
constraintChild, ImmutableList.of(col.getName()),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 1eccd4e10e4..14e6df4dca3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -29,6 +29,7 @@
 import static 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_CTAS;
 import static 
org.apache.hadoop.hive.ql.ddl.view.create.AbstractCreateViewAnalyzer.validateTablesUsed;
 import static 
org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter.NON_FK_FILTERED;
+import static 
org.apache.hadoop.hive.ql.session.SessionStateUtil.MISSING_COLUMNS;
 
 import java.io.IOException;
 import java.security.AccessControlException;
@@ -823,6 +824,9 @@ protected List<String> getDefaultConstraints(Table tbl, 
List<String> targetSchem
 
   protected Map<String, String> getColNameToDefaultValueMap(Table tbl) throws 
SemanticException {
     Map<String, String> colNameToDefaultVal = null;
+    if (tbl.getStorageHandler() != null && 
tbl.getStorageHandler().supportsDefaultColumnValues(tbl.getParameters())) {
+      return Collections.emptyMap();
+    }
     try {
       DefaultConstraint dc = 
Hive.get().getEnabledDefaultConstraints(tbl.getDbName(), tbl.getTableName());
       colNameToDefaultVal = dc.getColNameToDefaultValueMap();
@@ -5026,6 +5030,7 @@ private RowResolver getColForInsertStmtSpec(Map<String, 
ExprNodeDesc> targetCol2
     if(targetCol2Projection.size() < targetTableColNames.size()) {
       colNameToDefaultVal = getColNameToDefaultValueMap(target);
     }
+    Set<String> missingColumns = new HashSet<>();
     for (int i = 0; i < targetTableColNames.size(); i++) {
       String f = targetTableColNames.get(i);
       if(targetCol2Projection.containsKey(f)) {
@@ -5038,6 +5043,7 @@ private RowResolver getColForInsertStmtSpec(Map<String, 
ExprNodeDesc> targetCol2
       else {
         //add new 'synthetic' columns for projections not provided by Select
         assert(colNameToDefaultVal != null);
+        missingColumns.add(f);
         ExprNodeDesc exp = null;
         if(colNameToDefaultVal.containsKey(f)) {
           // make an expression for default value
@@ -5064,6 +5070,7 @@ private RowResolver getColForInsertStmtSpec(Map<String, 
ExprNodeDesc> targetCol2
       }
       colListPos++;
     }
+    SessionStateUtil.addResource(conf, MISSING_COLUMNS, missingColumns);
     return newOutputRR;
   }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java 
b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java
index 84d2aa8f291..32f38bc0126 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java
@@ -38,6 +38,7 @@ public class SessionStateUtil {
   private static final String COMMIT_INFO_PREFIX = "COMMIT_INFO.";
   private static final String CONFLICT_DETECTION_FILTER = 
"conflictDetectionFilter.";
   public static final String DEFAULT_TABLE_LOCATION = "defaultLocation";
+  public static final String MISSING_COLUMNS = "missingColumns";
 
   private SessionStateUtil() {
   }

Reply via email to