pgaref commented on a change in pull request #2305:
URL: https://github.com/apache/hive/pull/2305#discussion_r638095151
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -139,50 +237,72 @@ public void load(MapJoinTableContainer[] mapJoinTables,
LOG.debug("Failed to get value for counter
APPROXIMATE_INPUT_RECORDS", e);
}
long keyCount = Math.max(estKeyCount, inputRecords);
+ initHTLoadingService(keyCount);
- VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
- new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+ VectorMapJoinFastTableContainer tableContainer =
+ new VectorMapJoinFastTableContainer(desc, hconf, keyCount,
numLoadThreads);
LOG.info("Loading hash table for input: {} cacheKey: {}
tableContainer: {} smallTablePos: {} " +
"estKeyCount : {} keyCount : {}", inputName, cacheKey,
- vectorMapJoinFastTableContainer.getClass().getSimpleName(),
pos, estKeyCount, keyCount);
+ tableContainer.getClass().getSimpleName(), pos, estKeyCount,
keyCount);
+
+ tableContainer.setSerde(null, null); // No SerDes here.
+ // Submit parallel loading Threads
+ submitQueueDrainThreads(tableContainer);
- vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes
here.
+ long receivedEntries = 0;
long startTime = System.currentTimeMillis();
while (kvReader.next()) {
-
vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
- (BytesWritable)kvReader.getCurrentValue());
- numEntries++;
- if (doMemCheck && (numEntries %
memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
- final long estMemUsage =
vectorMapJoinFastTableContainer.getEstimatedMemorySize();
- if (estMemUsage > effectiveThreshold) {
- String msg = "Hash table loading exceeded memory limits for
input: " + inputName +
- " numEntries: " + numEntries + " estimatedMemoryUsage: " +
estMemUsage +
+ BytesWritable currentKey = (BytesWritable) kvReader.getCurrentKey();
+ BytesWritable currentValue = (BytesWritable)
kvReader.getCurrentValue();
+ long hashCode = tableContainer.getHashCode(currentKey);
+ int partitionId = (int) ((numLoadThreads - 1) & hashCode);
+ // call getBytes as copy is called later
+ HashTableElement h = new HashTableElement(hashCode,
currentValue.copyBytes(), currentKey.copyBytes());
+ if (elementBatches[partitionId].addElement(h)) {
+ loadBatchQueues[partitionId].add(elementBatches[partitionId]);
+ elementBatches[partitionId] = new HashTableElementBatch();
+ }
+ receivedEntries++;
+ if (doMemCheck && (receivedEntries %
memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
+ final long estMemUsage = tableContainer.getEstimatedMemorySize();
+ if (estMemUsage > effectiveThreshold) {
+ String msg = "Hash table loading exceeded memory limits for
input: " + inputName +
+ " numEntries: " + receivedEntries + " estimatedMemoryUsage:
" + estMemUsage +
" effectiveThreshold: " + effectiveThreshold + "
memoryMonitorInfo: " + memoryMonitorInfo;
- LOG.error(msg);
- throw new MapJoinMemoryExhaustionError(msg);
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info("Checking hash table loader memory usage for input:
{} numEntries: {} " +
- "estimatedMemoryUsage: {} effectiveThreshold: {}",
inputName, numEntries, estMemUsage,
+ LOG.error(msg);
+ throw new MapJoinMemoryExhaustionError(msg);
+ } else {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Checking hash table loader memory usage for input:
{} numEntries: {} " +
+ "estimatedMemoryUsage: {} effectiveThreshold: {}",
inputName, receivedEntries, estMemUsage,
effectiveThreshold);
- }
}
+ }
}
}
+
+ LOG.info("Finished loading the queue for input: {} endTime : {}",
inputName, System.currentTimeMillis());
+ addQueueDoneSentinel();
+ loadExecService.shutdown();
+ loadExecService.awaitTermination(5 * 60, TimeUnit.SECONDS);
+ LOG.info("Total received entries: {} Threads {} HT entries: {}",
receivedEntries, numLoadThreads, totalEntries.get());
+
long delta = System.currentTimeMillis() - startTime;
htLoadCounter.increment(delta);
- vectorMapJoinFastTableContainer.seal();
- mapJoinTables[pos] = vectorMapJoinFastTableContainer;
+ tableContainer.seal();
+ mapJoinTables[pos] = tableContainer;
if (doMemCheck) {
LOG.info("Finished loading hash table for input: {} cacheKey: {}
numEntries: {} " +
- "estimatedMemoryUsage: {} Load Time : {} ", inputName, cacheKey,
numEntries,
- vectorMapJoinFastTableContainer.getEstimatedMemorySize(), delta);
+ "estimatedMemoryUsage: {} Load Time : {} ", inputName, cacheKey,
receivedEntries,
+ tableContainer.getEstimatedMemorySize(), delta);
} else {
LOG.info("Finished loading hash table for input: {} cacheKey: {}
numEntries: {} Load Time : {} ",
- inputName, cacheKey, numEntries, delta);
+ inputName, cacheKey, receivedEntries, delta);
}
+ } catch (InterruptedException e) {
+ throw new HiveException(e);
Review comment:
ack
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]