[ 
https://issues.apache.org/jira/browse/HIVE-26035?focusedWorklogId=843250&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-843250
 ]

ASF GitHub Bot logged work on HIVE-26035:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Feb/23 14:30
            Start Date: 02/Feb/23 14:30
    Worklog Time Spent: 10m 
      Work Description: VenuReddy2103 commented on code in PR #3905:
URL: https://github.com/apache/hive/pull/3905#discussion_r1094605604


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java:
##########
@@ -0,0 +1,804 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import static org.apache.commons.lang3.StringUtils.repeat;
+import static org.apache.hadoop.hive.metastore.Batchable.NO_BATCHING;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.jdo.PersistenceManager;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.model.MColumnDescriptor;
+import org.apache.hadoop.hive.metastore.model.MFieldSchema;
+import org.apache.hadoop.hive.metastore.model.MOrder;
+import org.apache.hadoop.hive.metastore.model.MPartition;
+import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege;
+import org.apache.hadoop.hive.metastore.model.MPartitionPrivilege;
+import org.apache.hadoop.hive.metastore.model.MSerDeInfo;
+import org.apache.hadoop.hive.metastore.model.MStorageDescriptor;
+import org.apache.hadoop.hive.metastore.model.MStringList;
+import org.datanucleus.ExecutionContext;
+import org.datanucleus.api.jdo.JDOPersistenceManager;
+import org.datanucleus.identity.DatastoreId;
+import org.datanucleus.metadata.AbstractClassMetaData;
+import org.datanucleus.metadata.IdentityType;
+
+/**
+ * This class contains the methods to insert into tables on the underlying 
database using direct SQL
+ */
+class DirectSqlInsertPart {
+  private final PersistenceManager pm;
+  private final DatabaseProduct dbType;
+  private final int batchSize;
+
+  public DirectSqlInsertPart(PersistenceManager pm, DatabaseProduct dbType, 
int batchSize) {
+    this.pm = pm;
+    this.dbType = dbType;
+    this.batchSize = batchSize;
+  }
+
+  /**
+   * Interface to execute multiple row insert query in batch for direct SQL
+   */
+  interface BatchExecutionContext {
+    void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException;
+  }
+
+  private Long getDataStoreId(Class<?> modelClass) throws MetaException {
+    ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext();
+    AbstractClassMetaData cmd = 
ec.getMetaDataManager().getMetaDataForClass(modelClass, 
ec.getClassLoaderResolver());
+    if (cmd.getIdentityType() == IdentityType.DATASTORE) {
+      return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec, 
cmd, -1);
+    } else {
+      throw new MetaException("Identity type is not datastore.");
+    }
+  }
+
+  private void insertInBatch(String tableName, String columns, int 
columnCount, int rowCount,
+      BatchExecutionContext batchExecutionContext) throws MetaException {
+    if (rowCount == 0 || columnCount == 0) {
+      return;
+    }
+    int maxRowsInBatch = (batchSize == NO_BATCHING) ? rowCount : batchSize;
+    int maxBatches = rowCount / maxRowsInBatch;
+    int last = rowCount % maxRowsInBatch;
+    String rowFormat = "(" + repeat(",?", columnCount).substring(1) + ")";
+    String query = "";
+    if (maxBatches > 0) {
+      query = dbType.getBatchInsertQuery(tableName, columns, rowFormat, 
maxRowsInBatch);
+    }
+    int batchParamCount = maxRowsInBatch * columnCount;
+    for (int batch = 0; batch < maxBatches; batch++) {
+      batchExecutionContext.execute(query, maxRowsInBatch, batchParamCount);
+    }
+    if (last != 0) {
+      query = dbType.getBatchInsertQuery(tableName, columns, rowFormat, last);
+      batchExecutionContext.execute(query, last, last * columnCount);
+    }
+  }
+
+  private void executeQuery(String queryText, Object[] params) throws 
MetaException {
+    try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
+      MetastoreDirectSqlUtils.executeWithArray(query.getInnerQuery(), params, 
queryText);
+    }
+  }
+
+  private void insertSerdeInBatch(Map<Long, MSerDeInfo> serdeIdToSerDeInfo) 
throws MetaException {
+    int rowCount = serdeIdToSerDeInfo.size();
+    String columns = 
"(\"SERDE_ID\",\"DESCRIPTION\",\"DESERIALIZER_CLASS\",\"NAME\",\"SERDE_TYPE\",\"SLIB\","
+        + "\"SERIALIZER_CLASS\")";
+    int columnCount = 7;
+    BatchExecutionContext batchExecutionContext = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MSerDeInfo>> it = 
serdeIdToSerDeInfo.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, MSerDeInfo> entry = it.next();
+          MSerDeInfo serdeInfo = entry.getValue();
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = serdeInfo.getDescription();
+          params[paramIndex++] = serdeInfo.getDeserializerClass();
+          params[paramIndex++] = serdeInfo.getName();
+          params[paramIndex++] = serdeInfo.getSerdeType();
+          params[paramIndex++] = serdeInfo.getSerializationLib();
+          params[paramIndex++] = serdeInfo.getSerializerClass();
+        }
+        executeQuery(batchQueryText, params);
+      }
+    };
+    insertInBatch("\"SERDES\"", columns, columnCount, rowCount, 
batchExecutionContext);
+  }
+
+  private void insertStorageDescriptorInBatch(Map<Long, MStorageDescriptor> 
sdIdToStorageDescriptor,
+      Map<Long, Long> sdIdToSerdeId, Map<Long, Long> sdIdToCdId) throws 
MetaException {
+    int rowCount = sdIdToStorageDescriptor.size();
+    String columns = 
"(\"SD_ID\",\"CD_ID\",\"INPUT_FORMAT\",\"IS_COMPRESSED\",\"IS_STOREDASSUBDIRECTORIES\","
+        + "\"LOCATION\",\"NUM_BUCKETS\",\"OUTPUT_FORMAT\",\"SERDE_ID\")";
+    int columnCount = 9;
+    BatchExecutionContext batchExecutionContext = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MStorageDescriptor>> it = 
sdIdToStorageDescriptor.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, MStorageDescriptor> entry = it.next();
+          MStorageDescriptor sd = entry.getValue();
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = sdIdToCdId.get(entry.getKey());
+          params[paramIndex++] = sd.getInputFormat();
+          params[paramIndex++] = dbType.getBoolean(sd.isCompressed());
+          params[paramIndex++] = 
dbType.getBoolean(sd.isStoredAsSubDirectories());
+          params[paramIndex++] = sd.getLocation();
+          params[paramIndex++] = sd.getNumBuckets();
+          params[paramIndex++] = sd.getOutputFormat();
+          params[paramIndex++] = sdIdToSerdeId.get(entry.getKey());
+        }
+        executeQuery(batchQueryText, params);
+      }
+    };
+    insertInBatch("\"SDS\"", columns, columnCount, rowCount, 
batchExecutionContext);
+  }
+
+  private void insertPartitionInBatch(Map<Long, MPartition> partIdToPartition, 
Map<Long, Long> partIdToSdId)
+      throws MetaException {
+    int rowCount = partIdToPartition.size();
+    String columns = 
"(\"PART_ID\",\"CREATE_TIME\",\"LAST_ACCESS_TIME\",\"PART_NAME\",\"SD_ID\",\"TBL_ID\","
+        + "\"WRITE_ID\")";
+    int columnCount = 7;
+    BatchExecutionContext batchExecutionContext = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MPartition>> it = 
partIdToPartition.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, MPartition> entry = it.next();
+          MPartition partition = entry.getValue();
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = partition.getCreateTime();
+          params[paramIndex++] = partition.getLastAccessTime();
+          params[paramIndex++] = partition.getPartitionName();
+          params[paramIndex++] = partIdToSdId.get(entry.getKey());
+          params[paramIndex++] = partition.getTable().getId();
+          params[paramIndex++] = partition.getWriteId();
+        }
+        executeQuery(batchQueryText, params);
+      }
+    };
+    insertInBatch("\"PARTITIONS\"", columns, columnCount, rowCount, 
batchExecutionContext);
+  }
+
+  private void insertSerdeParamInBatch(Map<Long, MSerDeInfo> 
serdeIdToSerDeInfo) throws MetaException {
+    int rowCount = 0;
+    for (MSerDeInfo serDeInfo : serdeIdToSerDeInfo.values()) {
+      rowCount += serDeInfo.getParameters().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"SERDE_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")";
+    int columnCount = 3;
+    BatchExecutionContext batchExecutionContext = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MSerDeInfo>> serdeIt = 
serdeIdToSerDeInfo.entrySet().iterator();
+      Map.Entry<Long, MSerDeInfo> serdeEntry = serdeIt.next();
+      Iterator<Map.Entry<String, String>> it = 
serdeEntry.getValue().getParameters().entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            Map.Entry<String, String> entry = it.next();
+            params[paramIndex++] = serdeEntry.getKey();
+            params[paramIndex++] = entry.getKey();
+            params[paramIndex++] = entry.getValue();
+            index++;
+          }
+          if (index < batchRowCount) {
+            serdeEntry = serdeIt.next();

Review Comment:
   done





Issue Time Tracking
-------------------

    Worklog Id:     (was: 843250)
    Time Spent: 8h  (was: 7h 50m)

> Explore moving to directsql for ObjectStore::addPartitions
> ----------------------------------------------------------
>
>                 Key: HIVE-26035
>                 URL: https://issues.apache.org/jira/browse/HIVE-26035
>             Project: Hive
>          Issue Type: Bug
>            Reporter: Rajesh Balamohan
>            Assignee: Venugopal Reddy K
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 8h
>  Remaining Estimate: 0h
>
> Currently {{addPartitions}} uses datanuclues and is super slow for large 
> number of partitions. It will be good to move to direct sql. Lots of repeated 
> SQLs can be avoided as well (e.g SDS, SERDE, TABLE_PARAMS)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to