[ 
https://issues.apache.org/jira/browse/HIVE-26035?focusedWorklogId=839511&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-839511
 ]

ASF GitHub Bot logged work on HIVE-26035:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Jan/23 05:38
            Start Date: 17/Jan/23 05:38
    Worklog Time Spent: 10m 
      Work Description: VenuReddy2103 commented on code in PR #3905:
URL: https://github.com/apache/hive/pull/3905#discussion_r1071755797


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java:
##########
@@ -515,6 +529,803 @@ public List<String> 
getMaterializedViewsForRewriting(String dbName) throws MetaE
     }
   }
 
+  private Long getDataStoreId(Class<?> modelClass) throws MetaException {
+    ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext();
+    AbstractClassMetaData cmd = 
ec.getMetaDataManager().getMetaDataForClass(modelClass, 
ec.getClassLoaderResolver());
+    if (cmd.getIdentityType() == IdentityType.DATASTORE) {
+      return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec, 
cmd, -1);
+    } else {
+      throw new MetaException("Identity type is not datastore.");
+    }
+  }
+
+  /**
+   * Interface to execute multiple row insert query in batch for direct SQL
+   */
+  interface BatchExecutionContext {
+    void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException;
+  }
+
+  private void insertInBatch(String tableName, String columns, int 
columnCount, String rowFormat, int rowCount,
+      BatchExecutionContext bec) throws MetaException {
+    if (rowCount == 0 || columnCount == 0) {
+      return;
+    }
+    int maxParamsCount = maxParamsInInsert;
+    if (maxParamsCount < columnCount) {
+      LOG.error("Maximum number of parameters in the direct SQL batch insert 
query is less than the table: {}"
+          + " columns. Executing single row insert queries.", tableName);
+      maxParamsCount = columnCount;
+    }
+    int maxRowsInBatch = maxParamsCount / columnCount;
+    int maxBatches = rowCount / maxRowsInBatch;
+    int last = rowCount % maxRowsInBatch;
+    String query = "";
+    if (maxBatches > 0) {
+      query = dbType.getBatchInsertQuery(tableName, columns, rowFormat, 
maxRowsInBatch);
+    }
+    int batchParamCount = maxRowsInBatch * columnCount;
+    for (int batch = 0; batch < maxBatches; batch++) {
+      bec.execute(query, maxRowsInBatch, batchParamCount);
+    }
+    if (last != 0) {
+      query = dbType.getBatchInsertQuery(tableName, columns, rowFormat, last);
+      bec.execute(query, last, last * columnCount);
+    }
+  }
+
+  private void insertSerdeInBatch(Map<Long, MSerDeInfo> serdeIdToSerDeInfo) 
throws MetaException {
+    int rowCount = serdeIdToSerDeInfo.size();
+    String columns = 
"(\"SERDE_ID\",\"DESCRIPTION\",\"DESERIALIZER_CLASS\",\"NAME\",\"SERDE_TYPE\",\"SLIB\","
+        + "\"SERIALIZER_CLASS\")";
+    String row = "(?,?,?,?,?,?,?)";
+    int columnCount = 7;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MSerDeInfo>> it = 
serdeIdToSerDeInfo.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, MSerDeInfo> entry = it.next();
+          MSerDeInfo serdeInfo = entry.getValue();
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = serdeInfo.getDescription();
+          params[paramIndex++] = serdeInfo.getDeserializerClass();
+          params[paramIndex++] = serdeInfo.getName();
+          params[paramIndex++] = serdeInfo.getSerdeType();
+          params[paramIndex++] = serdeInfo.getSerializationLib();
+          params[paramIndex++] = serdeInfo.getSerializerClass();
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SERDES, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertStorageDescriptorInBatch(Map<Long, MStorageDescriptor> 
sdIdToStorageDescriptor,
+      Map<Long, Long> sdIdToSerdeId, Map<Long, Long> sdIdToCdId) throws 
MetaException {
+    int rowCount = sdIdToStorageDescriptor.size();
+    String columns = 
"(\"SD_ID\",\"CD_ID\",\"INPUT_FORMAT\",\"IS_COMPRESSED\",\"IS_STOREDASSUBDIRECTORIES\","
+        + "\"LOCATION\",\"NUM_BUCKETS\",\"OUTPUT_FORMAT\",\"SERDE_ID\")";
+    String row = "(?,?,?,?,?,?,?,?,?)";
+    int columnCount = 9;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MStorageDescriptor>> it = 
sdIdToStorageDescriptor.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, MStorageDescriptor> entry = it.next();
+          MStorageDescriptor sd = entry.getValue();
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = sdIdToCdId.get(entry.getKey());
+          params[paramIndex++] = sd.getInputFormat();
+          params[paramIndex++] = dbType.getBoolean(sd.isCompressed());
+          params[paramIndex++] = 
dbType.getBoolean(sd.isStoredAsSubDirectories());
+          params[paramIndex++] = sd.getLocation();
+          params[paramIndex++] = sd.getNumBuckets();
+          params[paramIndex++] = sd.getOutputFormat();
+          params[paramIndex++] = sdIdToSerdeId.get(entry.getKey());
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SDS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertPartitionInBatch(Map<Long, MPartition> partIdToPartition, 
Map<Long, Long> partIdToSdId)
+      throws MetaException {
+    int rowCount = partIdToPartition.size();
+    String columns = 
"(\"PART_ID\",\"CREATE_TIME\",\"LAST_ACCESS_TIME\",\"PART_NAME\",\"SD_ID\",\"TBL_ID\","
+        + "\"WRITE_ID\")";
+    String row = "(?,?,?,?,?,?,?)";
+    int columnCount = 7;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MPartition>> it = 
partIdToPartition.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, MPartition> entry = it.next();
+          MPartition partition = entry.getValue();
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = partition.getCreateTime();
+          params[paramIndex++] = partition.getLastAccessTime();
+          params[paramIndex++] = partition.getPartitionName();
+          params[paramIndex++] = partIdToSdId.get(entry.getKey());
+          params[paramIndex++] = partition.getTable().getId();
+          params[paramIndex++] = partition.getWriteId();
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(PARTITIONS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertSerdeParamInBatch(Map<Long, MSerDeInfo> 
serdeIdToSerDeInfo) throws MetaException {
+    int rowCount = 0;
+    for (MSerDeInfo serDeInfo : serdeIdToSerDeInfo.values()) {
+      rowCount += serDeInfo.getParameters().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"SERDE_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MSerDeInfo>> serdeIt = 
serdeIdToSerDeInfo.entrySet().iterator();
+      Map.Entry<Long, MSerDeInfo> serdeEntry = serdeIt.next();
+      Iterator<Map.Entry<String, String>> it = 
serdeEntry.getValue().getParameters().entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            Map.Entry<String, String> entry = it.next();
+            params[paramIndex++] = serdeEntry.getKey();
+            params[paramIndex++] = entry.getKey();
+            params[paramIndex++] = entry.getValue();
+            index++;
+          }
+          if (index < batchRowCount) {
+            serdeEntry = serdeIt.next();
+            it = serdeEntry.getValue().getParameters().entrySet().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SERDE_PARAMS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertStorageDescriptorParamInBatch(Map<Long, 
MStorageDescriptor> sdIdToStorageDescriptor)
+      throws MetaException {
+    int rowCount = 0;
+    for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) {
+      rowCount += sd.getParameters().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"SD_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = 
sdIdToStorageDescriptor.entrySet().iterator();
+      Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next();
+      Iterator<Map.Entry<String, String>> it = 
sdEntry.getValue().getParameters().entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            Map.Entry<String, String> entry = it.next();
+            params[paramIndex++] = sdEntry.getKey();
+            params[paramIndex++] = entry.getKey();
+            params[paramIndex++] = entry.getValue();
+            index++;
+          }
+          if (index < batchRowCount) {
+            sdEntry = sdIt.next();
+            it = sdEntry.getValue().getParameters().entrySet().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SD_PARAMS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertPartitionParamInBatch(Map<Long, MPartition> 
partIdToPartition) throws MetaException {
+    int rowCount = 0;
+    for (MPartition part : partIdToPartition.values()) {
+      rowCount += part.getParameters().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"PART_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MPartition>> partIt = 
partIdToPartition.entrySet().iterator();
+      Map.Entry<Long, MPartition> partEntry = partIt.next();
+      Iterator<Map.Entry<String, String>> it = 
partEntry.getValue().getParameters().entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            Map.Entry<String, String> entry = it.next();
+            params[paramIndex++] = partEntry.getKey();
+            params[paramIndex++] = entry.getKey();
+            params[paramIndex++] = entry.getValue();
+            index++;
+          }
+          if (index < batchRowCount) {
+            partEntry = partIt.next();
+            it = partEntry.getValue().getParameters().entrySet().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(PARTITION_PARAMS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertPartitionKeyValInBatch(Map<Long, MPartition> 
partIdToPartition) throws MetaException {
+    int rowCount = 0;
+    for (MPartition part : partIdToPartition.values()) {
+      rowCount += part.getValues().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"PART_ID\",\"PART_KEY_VAL\",\"INTEGER_IDX\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      int colIndex = 0;
+      final Iterator<Map.Entry<Long, MPartition>> partIt = 
partIdToPartition.entrySet().iterator();
+      Map.Entry<Long, MPartition> partEntry = partIt.next();
+      Iterator<String> it = partEntry.getValue().getValues().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            params[paramIndex++] = partEntry.getKey();
+            params[paramIndex++] = it.next();
+            params[paramIndex++] = colIndex++;
+            index++;
+          }
+          if (index < batchRowCount) {
+            colIndex = 0;
+            partEntry = partIt.next();
+            it = partEntry.getValue().getValues().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(PARTITION_KEY_VALS, columns, columnCount, row, rowCount, 
bec);
+  }
+
+
+  private void insertColumnDescriptorInBatch(Map<Long, MColumnDescriptor> 
cdIdToColumnDescriptor) throws MetaException {
+    int rowCount = cdIdToColumnDescriptor.size();
+    String columns = "(\"CD_ID\")";
+    String row = "(?)";
+    int columnCount = 1;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Long> it = cdIdToColumnDescriptor.keySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          params[paramIndex++] = it.next();
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(CDS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertColumnV2InBatch(Map<Long, MColumnDescriptor> 
cdIdToColumnDescriptor) throws MetaException {
+    int rowCount = 0;
+    for (MColumnDescriptor cd : cdIdToColumnDescriptor.values()) {
+      rowCount += cd.getCols().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = 
"(\"CD_ID\",\"COMMENT\",\"COLUMN_NAME\",\"TYPE_NAME\",\"INTEGER_IDX\")";
+    String row = "(?,?,?,?,?)";
+    int columnCount = 5;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      int colIndex = 0;
+      final Iterator<Map.Entry<Long, MColumnDescriptor>> cdIt = 
cdIdToColumnDescriptor.entrySet().iterator();
+      Map.Entry<Long, MColumnDescriptor> cdEntry = cdIt.next();
+      Iterator<MFieldSchema> it = cdEntry.getValue().getCols().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            MFieldSchema fieldSchema = it.next();
+            params[paramIndex++] = cdEntry.getKey();
+            params[paramIndex++] = fieldSchema.getComment();
+            params[paramIndex++] = fieldSchema.getName();
+            params[paramIndex++] = fieldSchema.getType();
+            params[paramIndex++] = colIndex++;
+            index++;
+          }
+          if (index < batchRowCount) {
+            colIndex = 0;
+            cdEntry = cdIt.next();
+            it = cdEntry.getValue().getCols().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(COLUMNS_V2, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertBucketColInBatch(Map<Long, MStorageDescriptor> 
sdIdToStorageDescriptor) throws MetaException {
+    int rowCount = 0;
+    for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) {
+      rowCount += sd.getBucketCols().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"SD_ID\",\"BUCKET_COL_NAME\",\"INTEGER_IDX\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      int colIndex = 0;
+      final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = 
sdIdToStorageDescriptor.entrySet().iterator();
+      Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next();
+      Iterator<String> it = sdEntry.getValue().getBucketCols().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            params[paramIndex++] = sdEntry.getKey();
+            params[paramIndex++] = it.next();
+            params[paramIndex++] = colIndex++;
+            index++;
+          }
+          if (index < batchRowCount) {
+            colIndex = 0;
+            sdEntry = sdIt.next();
+            it = sdEntry.getValue().getBucketCols().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(BUCKETING_COLS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertSortColInBatch(Map<Long, MStorageDescriptor> 
sdIdToStorageDescriptor) throws MetaException {
+    int rowCount = 0;
+    for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) {
+      rowCount += sd.getSortCols().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"SD_ID\",\"COLUMN_NAME\",\"ORDER\",\"INTEGER_IDX\")";
+    String row = "(?,?,?,?)";
+    int columnCount = 4;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      int colIndex = 0;
+      final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = 
sdIdToStorageDescriptor.entrySet().iterator();
+      Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next();
+      Iterator<MOrder> it = sdEntry.getValue().getSortCols().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            MOrder order = it.next();
+            params[paramIndex++] = sdEntry.getKey();
+            params[paramIndex++] = order.getCol();
+            params[paramIndex++] = order.getOrder();
+            params[paramIndex++] = colIndex++;
+            index++;
+          }
+          if (index < batchRowCount) {
+            colIndex = 0;
+            sdEntry = sdIt.next();
+            it = sdEntry.getValue().getSortCols().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SORT_COLS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertSkewedStringListInBatch(List<Long> stringListIds) throws 
MetaException {
+    int rowCount = stringListIds.size();
+    String columns = "(\"STRING_LIST_ID\")";
+    String row = "(?)";
+    int columnCount = 1;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Long> it = stringListIds.iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          params[paramIndex++] = it.next();
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SKEWED_STRING_LIST, columns, columnCount, row, rowCount, 
bec);
+  }
+
+  private void insertSkewedStringListValInBatch(Map<Long, List<String>> 
stringListIdToStringList) throws MetaException {
+    int rowCount = 0;
+    for (List<String> stringList : stringListIdToStringList.values()) {
+      rowCount += stringList.size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = 
"(\"STRING_LIST_ID\",\"STRING_LIST_VALUE\",\"INTEGER_IDX\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      int colIndex = 0;
+      final Iterator<Map.Entry<Long, List<String>>> stringListIt = 
stringListIdToStringList.entrySet().iterator();
+      Map.Entry<Long, List<String>> stringListEntry = stringListIt.next();
+      Iterator<String> it = stringListEntry.getValue().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            params[paramIndex++] = stringListEntry.getKey();
+            params[paramIndex++] = it.next();
+            params[paramIndex++] = colIndex++;
+            index++;
+          }
+          if (index < batchRowCount) {
+            colIndex = 0;
+            stringListEntry = stringListIt.next();
+            it = stringListEntry.getValue().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SKEWED_STRING_LIST_VALUES, columns, columnCount, row, 
rowCount, bec);
+  }
+
+  private void insertSkewedColInBatch(Map<Long, MStorageDescriptor> 
sdIdToStorageDescriptor) throws MetaException {
+    int rowCount = 0;
+    for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) {
+      rowCount += sd.getSkewedColNames().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"SD_ID\",\"SKEWED_COL_NAME\",\"INTEGER_IDX\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      int colIndex = 0;
+      final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = 
sdIdToStorageDescriptor.entrySet().iterator();
+      Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next();
+      Iterator<String> it = sdEntry.getValue().getSkewedColNames().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            params[paramIndex++] = sdEntry.getKey();
+            params[paramIndex++] = it.next();
+            params[paramIndex++] = colIndex++;
+            index++;
+          }
+          if (index < batchRowCount) {
+            colIndex = 0;
+            sdEntry = sdIt.next();
+            it = sdEntry.getValue().getSkewedColNames().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SKEWED_COL_NAMES, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertSkewedValInBatch(List<Long> stringListIds, Map<Long, 
Long> stringListIdToSdId)
+      throws MetaException {
+    int rowCount = stringListIds.size();
+    String columns = "(\"SD_ID_OID\",\"STRING_LIST_ID_EID\",\"INTEGER_IDX\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      int colIndex = 0;
+      long prevSdId = -1;
+      final Iterator<Long> it = stringListIds.iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Long stringListId = it.next();
+          Long sdId = stringListIdToSdId.get(stringListId);
+          params[paramIndex++] = sdId;
+          params[paramIndex++] = stringListId;
+          if (prevSdId != sdId) {
+            colIndex = 0;
+          }
+          params[paramIndex++] = colIndex++;
+          prevSdId = sdId;
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SKEWED_VALUES, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertSkewedLocationInBatch(Map<Long, String> 
stringListIdToLocation, Map<Long, Long> stringListIdToSdId)
+      throws MetaException {
+    int rowCount = stringListIdToLocation.size();
+    String columns = "(\"SD_ID\",\"STRING_LIST_ID_KID\",\"LOCATION\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, String>> it = 
stringListIdToLocation.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, String> entry = it.next();
+          params[paramIndex++] = stringListIdToSdId.get(entry.getKey());
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = entry.getValue();
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SKEWED_COL_VALUE_LOC_MAP, columns, columnCount, row, 
rowCount, bec);
+  }
+
+  private void insertPartitionPrivilegeInBatch(Map<Long, MPartitionPrivilege> 
partGrantIdToPrivilege,
+      Map<Long, Long> partGrantIdToPartId) throws MetaException {
+    int rowCount = partGrantIdToPrivilege.size();
+    String columns = 
"(\"PART_GRANT_ID\",\"AUTHORIZER\",\"CREATE_TIME\",\"GRANT_OPTION\",\"GRANTOR\",\"GRANTOR_TYPE\","
+        + "\"PART_ID\",\"PRINCIPAL_NAME\",\"PRINCIPAL_TYPE\",\"PART_PRIV\")";
+    String row = "(?,?,?,?,?,?,?,?,?,?)";
+    int columnCount = 10;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MPartitionPrivilege>> it = 
partGrantIdToPrivilege.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, MPartitionPrivilege> entry = it.next();
+          MPartitionPrivilege partPrivilege = entry.getValue();
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = partPrivilege.getAuthorizer();
+          params[paramIndex++] = partPrivilege.getCreateTime();
+          params[paramIndex++] = partPrivilege.getGrantOption();
+          params[paramIndex++] = partPrivilege.getGrantor();
+          params[paramIndex++] = partPrivilege.getGrantorType();
+          params[paramIndex++] = partGrantIdToPartId.get(entry.getKey());
+          params[paramIndex++] = partPrivilege.getPrincipalName();
+          params[paramIndex++] = partPrivilege.getPrincipalType();
+          params[paramIndex++] = partPrivilege.getPrivilege();
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(PART_PRIVS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertPartitionColPrivilegeInBatch(Map<Long, 
MPartitionColumnPrivilege> partColumnGrantIdToPrivilege,
+      Map<Long, Long> partColumnGrantIdToPartId) throws MetaException {
+    int rowCount = partColumnGrantIdToPrivilege.size();
+    String columns = 
"(\"PART_COLUMN_GRANT_ID\",\"AUTHORIZER\",\"COLUMN_NAME\",\"CREATE_TIME\",\"GRANT_OPTION\","
+        + 
"\"GRANTOR\",\"GRANTOR_TYPE\",\"PART_ID\",\"PRINCIPAL_NAME\",\"PRINCIPAL_TYPE\",\"PART_COL_PRIV\")";
+    String row = "(?,?,?,?,?,?,?,?,?,?,?)";
+    int columnCount = 11;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MPartitionColumnPrivilege>> it
+          = partColumnGrantIdToPrivilege.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, MPartitionColumnPrivilege> entry = it.next();
+          MPartitionColumnPrivilege partColumnPrivilege = entry.getValue();
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = partColumnPrivilege.getAuthorizer();
+          params[paramIndex++] = partColumnPrivilege.getColumnName();
+          params[paramIndex++] = partColumnPrivilege.getCreateTime();
+          params[paramIndex++] = partColumnPrivilege.getGrantOption();
+          params[paramIndex++] = partColumnPrivilege.getGrantor();
+          params[paramIndex++] = partColumnPrivilege.getGrantorType();
+          params[paramIndex++] = partColumnGrantIdToPartId.get(entry.getKey());
+          params[paramIndex++] = partColumnPrivilege.getPrincipalName();
+          params[paramIndex++] = partColumnPrivilege.getPrincipalType();
+          params[paramIndex++] = partColumnPrivilege.getPrivilege();
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(PART_COL_PRIVS, columns, columnCount, row, rowCount, bec);
+  }
+
+  /**
+   * Add partitions in batch using direct SQL
+   * @param parts list of partitions
+   * @param partPrivilegesList list of partition privileges
+   * @param partColPrivilegesList list of partition column privileges
+   * @throws MetaException
+   */
+  public void addPartitions(List<MPartition> parts, 
List<List<MPartitionPrivilege>> partPrivilegesList,

Review Comment:
   agreed





Issue Time Tracking
-------------------

    Worklog Id:     (was: 839511)
    Time Spent: 2h 20m  (was: 2h 10m)

> Explore moving to directsql for ObjectStore::addPartitions
> ----------------------------------------------------------
>
>                 Key: HIVE-26035
>                 URL: https://issues.apache.org/jira/browse/HIVE-26035
>             Project: Hive
>          Issue Type: Bug
>            Reporter: Rajesh Balamohan
>            Assignee: Venugopal Reddy K
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Currently {{addPartitions}} uses datanuclues and is super slow for large 
> number of partitions. It will be good to move to direct sql. Lots of repeated 
> SQLs can be avoided as well (e.g SDS, SERDE, TABLE_PARAMS)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to