[GitHub] drill issue #828: DRILL-5229: update kudu-client to 1.3.0
Github user sudheeshkatkam commented on the issue: https://github.com/apache/drill/pull/828 Can you post the complete error message? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r117869278 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java --- @@ -64,6 +64,12 @@ String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size"; String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold"; + // Spill Options common to all spilling operators --- End diff -- OK, boot-time. Wish Drill had a clear way to distinguish boot-time from session options. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r117865877 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -546,44 +1254,204 @@ private void checkGroupAndAggrValues(int incomingRowIdx) { holder.value = vv0.getAccessor().get(incomingRowIdx) ; } */ +/* +if ( handlingSpills && ( incomingRowIdx == 0 ) ) { + // for debugging -- show the first row from a spilled batch + Object tmp0 = (incoming).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector(); + Object tmp1 = (incoming).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector(); + Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector(); + + if (tmp0 != null && tmp1 != null && tmp2 != null) { +NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0); +NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1); +NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2); +logger.debug("The first row = {} , {} , {}", vv0.getAccessor().get(incomingRowIdx), vv1.getAccessor().get(incomingRowIdx), vv2.getAccessor().get(incomingRowIdx)); + } +} +*/ +// The hash code is computed once, then its lower bits are used to determine the +// partition to use, and the higher bits determine the location in the hash table. +int hashCode; +try { + htables[0].updateBatches(); + hashCode = htables[0].getHashCode(incomingRowIdx); +} catch (SchemaChangeException e) { + throw new IllegalStateException("Unexpected schema change", e); +} -htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */); +// right shift hash code for secondary (or tertiary...) spilling +for (int i = 0; i < cycleNum; i++) { hashCode >>>= bitsInMask; } +int currentPartition = hashCode & partitionMask ; +hashCode >>>= bitsInMask; +HashTable.PutStatus putStatus = null; +long allocatedBefore = allocator.getAllocatedMemory(); + +// Insert the key columns into the hash table +try { + putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode); +} catch (OutOfMemoryException exc) { + throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill +} catch (SchemaChangeException e) { + throw new IllegalStateException("Unexpected schema change", e); +} int currentIdx = htIdxHolder.value; -// get the batch index and index within the batch -if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) { - addBatchHolder(); +long addedMem = allocator.getAllocatedMemory() - allocatedBefore; +if ( addedMem > 0 ) { + logger.trace("MEMORY CHECK HT: allocated {} added {} partition {}",allocatedBefore,addedMem,currentPartition); } -BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK); + +// Check if put() added a new batch (for the keys) inside the hash table, hence a matching batch +// (for the aggregate columns) needs to be created +if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) { + try { +long allocatedBeforeAggCol = allocator.getAllocatedMemory(); + +addBatchHolder(currentPartition); + +if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned batch +long totalAddedMem = allocator.getAllocatedMemory() - allocatedBefore; +logger.trace("MEMORY CHECK AGG: added {} total (with HT) added {}",allocator.getAllocatedMemory()-allocatedBeforeAggCol,totalAddedMem); +// resize the batch estimate if needed (e.g., varchars may take more memory than estimated) +if ( totalAddedMem > estMaxBatchSize ) { + logger.trace("Adjusting Batch size estimate from {} to {}",estMaxBatchSize,totalAddedMem); + estMaxBatchSize = totalAddedMem; +} + } catch (OutOfMemoryException exc) { +throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill + } +} +BatchHolder bh = batchHolders[currentPartition].get((currentIdx >>> 16) & HashTable.BATCH_MASK); int idxWithinBatch = currentIdx & HashTable.BATCH_MASK; +if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) { + numGroupedRecords++; +} + +// === +// If the last batch just became full - that is the time to check the memory limits !! +
[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r117865368 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -546,44 +1254,204 @@ private void checkGroupAndAggrValues(int incomingRowIdx) { holder.value = vv0.getAccessor().get(incomingRowIdx) ; } */ +/* +if ( handlingSpills && ( incomingRowIdx == 0 ) ) { + // for debugging -- show the first row from a spilled batch + Object tmp0 = (incoming).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector(); + Object tmp1 = (incoming).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector(); + Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector(); + + if (tmp0 != null && tmp1 != null && tmp2 != null) { +NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0); +NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1); +NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2); +logger.debug("The first row = {} , {} , {}", vv0.getAccessor().get(incomingRowIdx), vv1.getAccessor().get(incomingRowIdx), vv2.getAccessor().get(incomingRowIdx)); + } +} +*/ +// The hash code is computed once, then its lower bits are used to determine the +// partition to use, and the higher bits determine the location in the hash table. +int hashCode; +try { + htables[0].updateBatches(); + hashCode = htables[0].getHashCode(incomingRowIdx); +} catch (SchemaChangeException e) { + throw new IllegalStateException("Unexpected schema change", e); +} -htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */); +// right shift hash code for secondary (or tertiary...) spilling +for (int i = 0; i < cycleNum; i++) { hashCode >>>= bitsInMask; } +int currentPartition = hashCode & partitionMask ; +hashCode >>>= bitsInMask; +HashTable.PutStatus putStatus = null; +long allocatedBefore = allocator.getAllocatedMemory(); + +// Insert the key columns into the hash table +try { + putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode); +} catch (OutOfMemoryException exc) { + throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill +} catch (SchemaChangeException e) { + throw new IllegalStateException("Unexpected schema change", e); +} int currentIdx = htIdxHolder.value; -// get the batch index and index within the batch -if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) { - addBatchHolder(); +long addedMem = allocator.getAllocatedMemory() - allocatedBefore; +if ( addedMem > 0 ) { + logger.trace("MEMORY CHECK HT: allocated {} added {} partition {}",allocatedBefore,addedMem,currentPartition); } -BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK); + +// Check if put() added a new batch (for the keys) inside the hash table, hence a matching batch +// (for the aggregate columns) needs to be created +if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) { + try { +long allocatedBeforeAggCol = allocator.getAllocatedMemory(); + +addBatchHolder(currentPartition); + +if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned batch +long totalAddedMem = allocator.getAllocatedMemory() - allocatedBefore; +logger.trace("MEMORY CHECK AGG: added {} total (with HT) added {}",allocator.getAllocatedMemory()-allocatedBeforeAggCol,totalAddedMem); +// resize the batch estimate if needed (e.g., varchars may take more memory than estimated) +if ( totalAddedMem > estMaxBatchSize ) { + logger.trace("Adjusting Batch size estimate from {} to {}",estMaxBatchSize,totalAddedMem); + estMaxBatchSize = totalAddedMem; +} + } catch (OutOfMemoryException exc) { --- End diff -- addBatchHolder() calls newBatchHolder(), which should allocate the new batch using the allocator, hence (I think) may OOM if not enough memory left. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r117863850 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -546,44 +1254,204 @@ private void checkGroupAndAggrValues(int incomingRowIdx) { holder.value = vv0.getAccessor().get(incomingRowIdx) ; } */ +/* +if ( handlingSpills && ( incomingRowIdx == 0 ) ) { + // for debugging -- show the first row from a spilled batch + Object tmp0 = (incoming).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector(); + Object tmp1 = (incoming).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector(); + Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector(); + + if (tmp0 != null && tmp1 != null && tmp2 != null) { +NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0); +NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1); +NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2); +logger.debug("The first row = {} , {} , {}", vv0.getAccessor().get(incomingRowIdx), vv1.getAccessor().get(incomingRowIdx), vv2.getAccessor().get(incomingRowIdx)); + } +} +*/ +// The hash code is computed once, then its lower bits are used to determine the +// partition to use, and the higher bits determine the location in the hash table. +int hashCode; +try { + htables[0].updateBatches(); + hashCode = htables[0].getHashCode(incomingRowIdx); +} catch (SchemaChangeException e) { + throw new IllegalStateException("Unexpected schema change", e); +} -htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */); +// right shift hash code for secondary (or tertiary...) spilling +for (int i = 0; i < cycleNum; i++) { hashCode >>>= bitsInMask; } +int currentPartition = hashCode & partitionMask ; +hashCode >>>= bitsInMask; +HashTable.PutStatus putStatus = null; +long allocatedBefore = allocator.getAllocatedMemory(); + +// Insert the key columns into the hash table +try { + putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode); +} catch (OutOfMemoryException exc) { + throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill +} catch (SchemaChangeException e) { + throw new IllegalStateException("Unexpected schema change", e); +} int currentIdx = htIdxHolder.value; -// get the batch index and index within the batch -if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) { - addBatchHolder(); +long addedMem = allocator.getAllocatedMemory() - allocatedBefore; +if ( addedMem > 0 ) { + logger.trace("MEMORY CHECK HT: allocated {} added {} partition {}",allocatedBefore,addedMem,currentPartition); } -BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK); + +// Check if put() added a new batch (for the keys) inside the hash table, hence a matching batch +// (for the aggregate columns) needs to be created +if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) { + try { +long allocatedBeforeAggCol = allocator.getAllocatedMemory(); + +addBatchHolder(currentPartition); + +if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned batch +long totalAddedMem = allocator.getAllocatedMemory() - allocatedBefore; --- End diff -- totalAddedMem is for both the group-by keys and the aggr columns. allocatedBefore is the initial size of the allocation, then the keys are added to the hash table, (then allocatedBeforeAggCol keeps the size), then a batch holder for the agg-columns is added, and the total is computed. The total is important, like if the incoming batch becomes bigger, and we try to adjust the estimate for the batch size. The allocatedBeforeAggCol only gives some tracing refinement to tell which batch grew (the one in the hash table or the one for the agg columns). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r117862000 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -546,44 +1254,204 @@ private void checkGroupAndAggrValues(int incomingRowIdx) { holder.value = vv0.getAccessor().get(incomingRowIdx) ; } */ +/* +if ( handlingSpills && ( incomingRowIdx == 0 ) ) { --- End diff -- This debugging code is written for a specific schema (VarChar,VarChar,Bigint); placing it inside EXTRA_DEBUG_1 may create the impression that it is a generic debug code. Leaving it as a comment/sample allows for easy rewriting to print out data (of any other schema). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r117861343 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int currPart) { +if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition +int currPartSize = batchHolders[currPart].size(); +if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current
[jira] [Created] (DRILL-5532) TPCH Query 16 failed when planner.width.max_per_node <10
Dechang Gu created DRILL-5532: - Summary: TPCH Query 16 failed when planner.width.max_per_node <10 Key: DRILL-5532 URL: https://issues.apache.org/jira/browse/DRILL-5532 Project: Apache Drill Issue Type: Bug Components: Functions - Drill Affects Versions: 1.11.0 Environment: 10 node cluster, rhel6.4 Reporter: Dechang Gu Assignee: Jinfeng Ni When set planner.width.max_per_node < 10 (tried 6, 8, and 9): TPCH query 16 failed with the following error: {code} java.sql.SQLException: SYSTEM ERROR: NullPointerException Fragment 1:1 [Error Id: ba4661b4-2ff6-4eff-889c-fc4604a57418 on ucs-node8.perf.lab:31010] (java.lang.NullPointerException) null org.apache.drill.exec.vector.VarCharVector$Mutator.setSafe():607 org.apache.drill.exec.vector.NullableVarCharVector$Mutator.setSafe():560 org.apache.drill.exec.test.generated.StreamingAggregatorGen250.outputRecordKeysPrev():525 org.apache.drill.exec.test.generated.StreamingAggregatorGen250.outputToBatchPrev():322 org.apache.drill.exec.test.generated.StreamingAggregatorGen250.doWork():182 org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch.innerNext():170 org.apache.drill.exec.record.AbstractRecordBatch.next():162 org.apache.drill.exec.physical.impl.BaseRootExec.next():105 org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.innerNext():144 org.apache.drill.exec.physical.impl.BaseRootExec.next():95 org.apache.drill.exec.work.fragment.FragmentExecutor$1.run():234 org.apache.drill.exec.work.fragment.FragmentExecutor$1.run():227 java.security.AccessController.doPrivileged():-2 javax.security.auth.Subject.doAs():415 org.apache.hadoop.security.UserGroupInformation.doAs():1595 org.apache.drill.exec.work.fragment.FragmentExecutor.run():227 org.apache.drill.common.SelfCleaningRunnable.run():38 java.util.concurrent.ThreadPoolExecutor.runWorker():1145 java.util.concurrent.ThreadPoolExecutor$Worker.run():615 java.lang.Thread.run():745 at org.apache.drill.jdbc.impl.DrillCursor.nextRowInternally(DrillCursor.java:489) at org.apache.drill.jdbc.impl.DrillCursor.next(DrillCursor.java:593) at org.apache.calcite.avatica.AvaticaResultSet.next(AvaticaResultSet.java:215) at org.apache.drill.jdbc.impl.DrillResultSetImpl.next(DrillResultSetImpl.java:140) at PipSQueak.fetchRows(PipSQueak.java:420) at PipSQueak.runTest(PipSQueak.java:116) at PipSQueak.main(PipSQueak.java:556) Caused by: org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR: NullPointerException Fragment 1:1 [Error Id: ba4661b4-2ff6-4eff-889c-fc4604a57418 on ucs-node8.perf.lab:31010] (java.lang.NullPointerException) null org.apache.drill.exec.vector.VarCharVector$Mutator.setSafe():607 org.apache.drill.exec.vector.NullableVarCharVector$Mutator.setSafe():560 org.apache.drill.exec.test.generated.StreamingAggregatorGen250.outputRecordKeysPrev():525 org.apache.drill.exec.test.generated.StreamingAggregatorGen250.outputToBatchPrev():322 org.apache.drill.exec.test.generated.StreamingAggregatorGen250.doWork():182 org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch.innerNext():170 org.apache.drill.exec.record.AbstractRecordBatch.next():162 org.apache.drill.exec.physical.impl.BaseRootExec.next():105 org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.innerNext():144 org.apache.drill.exec.physical.impl.BaseRootExec.next():95 org.apache.drill.exec.work.fragment.FragmentExecutor$1.run():234 org.apache.drill.exec.work.fragment.FragmentExecutor$1.run():227 java.security.AccessController.doPrivileged():-2 javax.security.auth.Subject.doAs():415 org.apache.hadoop.security.UserGroupInformation.doAs():1595 org.apache.drill.exec.work.fragment.FragmentExecutor.run():227 org.apache.drill.common.SelfCleaningRunnable.run():38 java.util.concurrent.ThreadPoolExecutor.runWorker():1145 java.util.concurrent.ThreadPoolExecutor$Worker.run():615 java.lang.Thread.run():745 at org.apache.drill.exec.rpc.user.QueryResultHandler.resultArrived(QueryResultHandler.java:123) at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:343) at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:88) at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:274) at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:244) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at
[GitHub] drill issue #828: DRILL-5229: update kudu-client to 1.3.0
Github user sudheeshkatkam commented on the issue: https://github.com/apache/drill/pull/828 @eskabetxe thank you for the pull request! How did you test the patch? I have not looked into the Kudu plugin before, but I noticed that the unit tests are [ignored by default](https://github.com/apache/drill/commit/aa39c66d92ec03c90f3715df7cc4fde49d76a334). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r117806316 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); --- End diff -- There was a user intervention - the setting of num_partitions to 1 !! Though for clarity, the message was changed now to "Spilling was disabled due to configuration setting of num_partitions to 1" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r117804950 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java --- @@ -497,6 +510,21 @@ public int numResizing() { return numResizing; } + /** + * + * @return Size of extra memory needed if the HT (i.e. startIndices) is doubled + */ + @Override + public int extraMemoryNeededForResize() { --- End diff -- Good observation !! Fixed . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r117801284 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java --- @@ -544,86 +572,69 @@ private static int roundUpToPowerOf2(int number) { return rounded; } - @Override - public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount) { -put(incomingRowIdx, htIdxHolder); + public int getHashCode(int incomingRowIdx) throws SchemaChangeException { +return getHashBuild(incomingRowIdx); } - private PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder) { + /** put() uses the hash code (from gethashCode() above) to insert the key(s) from the incoming + * row into the hash table. The code selects the bucket in the startIndices, then the keys are + * placed into the chained list - by storing the key values into a batch, and updating its + * "links" member. Last it modifies the index holder to the batch offset so that the caller + * can store the remaining parts of the row into a matching batch (outside the hash table). + * Returning + * + * @param incomingRowIdx - position of the incoming row + * @param htIdxHolder - to return batch + batch-offset (for caller to manage a matching batch) + * @param hashCode - computed over the key(s) by calling getHashCode() + * @return Status - the key(s) was ADDED or was already PRESENT + */ + @Override + public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException { -int hash = getHashBuild(incomingRowIdx); -int i = getBucketIndex(hash, numBuckets()); -int startIdx = startIndices.getAccessor().get(i); +int bucketIndex = getBucketIndex(hashCode, numBuckets()); +int startIdx = startIndices.getAccessor().get(bucketIndex); int currentIdx; -int currentIdxWithinBatch; -BatchHolder bh; BatchHolder lastEntryBatch = null; int lastEntryIdxWithinBatch = EMPTY_SLOT; +// if startIdx is non-empty, follow the hash chain links until we find a matching +// key or reach the end of the chain (and remember the last link there) +for ( currentIdxHolder.value = startIdx; + currentIdxHolder.value != EMPTY_SLOT; + /* isKeyMatch() below also advances the currentIdxHolder to the next link */) { -if (startIdx == EMPTY_SLOT) { - // this is the first entry in this bucket; find the first available slot in the - // container of keys and values - currentIdx = freeIndex++; - addBatchIfNeeded(currentIdx); + // remember the current link, which would be the last when the next link is empty + lastEntryBatch = batchHolders.get((currentIdxHolder.value >>> 16) & HashTable.BATCH_MASK); + lastEntryIdxWithinBatch = currentIdxHolder.value & BATCH_MASK; - if (EXTRA_DEBUG) { -logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i, -incomingRowIdx, currentIdx); + if (lastEntryBatch.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) { +htIdxHolder.value = currentIdxHolder.value; +return PutStatus.KEY_PRESENT; } - - insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch); - // update the start index array - startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), currentIdx); - htIdxHolder.value = currentIdx; - return PutStatus.KEY_ADDED; } -currentIdx = startIdx; -boolean found = false; - -bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK); -currentIdxHolder.value = currentIdx; - -// if startIdx is non-empty, follow the hash chain links until we find a matching -// key or reach the end of the chain -while (true) { - currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK; +// no match was found, so insert a new entry +currentIdx = freeIndex++; +boolean addedBatch = addBatchIfNeeded(currentIdx); - if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) { -htIdxHolder.value = currentIdxHolder.value; -found = true; -break; - } else if (currentIdxHolder.value == EMPTY_SLOT) { -lastEntryBatch = bh; -lastEntryIdxWithinBatch = currentIdxWithinBatch; -break; - } else { -bh = batchHolders.get((currentIdxHolder.value >>> 16) & HashTable.BATCH_MASK); -lastEntryBatch = bh; - } +if (EXTRA_DEBUG) { +
Re: [ANNOUNCE] New Committer: Paul Rogers
Congrats Paul! On Fri, May 19, 2017 at 11:36 AM, Vitalii Diravkawrote: > Congratulations Paul! Really well deserved! > > Kind regards > Vitalii > > On Fri, May 19, 2017 at 6:31 PM, Parth Chandra wrote: > > > I thinks it's time to put a link to Paul's wiki in the Apache Drill web > > site. > > > > On Fri, May 19, 2017 at 11:16 AM, Sudheesh Katkam > > wrote: > > > > > Forgot to mention, not many developers know about this: > > > https://github.com/paul-rogers/drill/wiki > > > > > > So thank you Paul, for that informative wiki, and all your > contributions. > > > > > > On May 19, 2017, at 10:50 AM, Paul Rogers > > r...@mapr.com>> wrote: > > > > > > Thanks everyone! > > > > > > - Paul > > > > > > On May 19, 2017, at 10:30 AM, Kunal Khatua kkhat > > > u...@mapr.com>> wrote: > > > > > > Congratulations, Paul !! Thank you for your contributions! > > > > > > > > > From: Khurram Faraaz > > > > Sent: Friday, May 19, 2017 10:07:09 AM > > > To: dev > > > Subject: Re: [ANNOUNCE] New Committer: Paul Rogers > > > > > > Congratulations, Paul! > > > > > > > > > From: Bridget Bevens > > > > Sent: Friday, May 19, 2017 10:29:29 PM > > > To: dev > > > Subject: Re: [ANNOUNCE] New Committer: Paul Rogers > > > > > > Congratulations, Paul! > > > > > > > > > From: Jinfeng Ni > > > > Sent: Friday, May 19, 2017 9:57:35 AM > > > To: dev > > > Subject: Re: [ANNOUNCE] New Committer: Paul Rogers > > > > > > Congratulations, Paul! > > > > > > > > > On Fri, May 19, 2017 at 9:36 AM, Aman Bawa > > mapr.com>> wrote: > > > > > > Congratulations, Paul! > > > > > > On 5/19/17, 8:22 AM, "Aman Sinha" > mansi...@apache.org>> wrote: > > > > > > The Project Management Committee (PMC) for Apache Drill has invited > > > Paul > > > Rogers to become a committer, and we are pleased to announce that he > > > has > > > accepted. > > > > > > Paul has a long list of contributions that have touched many aspects > > > of the > > > product. > > > > > > Welcome Paul, and thank you for your contributions. Keep up the good > > > work ! > > > > > > - Aman > > > > > > (on behalf of the Apache Drill PMC) > > > > > > > > > > > > > > > > > > > > >
[GitHub] drill pull request #818: DRILL-5140: Fix CompileException in run-time genera...
Github user vvysotskyi commented on a diff in the pull request: https://github.com/apache/drill/pull/818#discussion_r117708028 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java --- @@ -77,10 +81,43 @@ private final CodeGenerator codeGenerator; public final JDefinedClass clazz; - private final LinkedList[] blocks; + private final JCodeModel model; private final OptionSet optionManager; + private ClassGenerator innerClassGenerator; + private LinkedList[] blocks; + private LinkedList[] oldBlocks; + + /** + * Assumed that field has 3 indexes within the constant pull: index of the CONSTANT_Fieldref_info + --- End diff -- In this calculation is taken into account that CONSTANT_NameAndType_info.descriptor has limited range of its values, so it was taken 3 entries for the each class field and method. In this formula supposed that each class field and local variable use different literal values that have two entries. I am agree with you that there may be cases that have not been covered by this formula. The formula is needed for at least to consider the number of generated methods, difference between entries count for class fields and local variables. The 'magic' number 1000 was used in this formula to reserve constant pool for class references and unaccounted cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (DRILL-5397) Random Error : Unable to get holder type for minor type [LATE] and mode [OPTIONAL]
[ https://issues.apache.org/jira/browse/DRILL-5397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Volodymyr Vysotskyi resolved DRILL-5397. Resolution: Fixed Fix Version/s: 1.11.0 Fixed in [9972669|https://github.com/apache/drill/commit/9972669df1a78c3d4e60f93b4295ad35b43207f0] > Random Error : Unable to get holder type for minor type [LATE] and mode > [OPTIONAL] > -- > > Key: DRILL-5397 > URL: https://issues.apache.org/jira/browse/DRILL-5397 > Project: Apache Drill > Issue Type: Bug > Components: Execution - Data Types, Storage - JSON >Affects Versions: 1.10.0 >Reporter: Rahul Challapalli > Fix For: 1.11.0 > > > git.commit.id.abbrev=38ef562 > The below query did not fail when running sequentially. However when I ran > the test suite at [1], which contains the below query, by using 50 threads > submitting queries concurrently, I hit the below error > {code} > select kvgen(bldgs[0]) from (select kvgen(geo.features[0].location.bldgs) > bldgs from `json_kvgenflatten/nested.json` geo) > Failed with exception > java.sql.SQLException: SYSTEM ERROR: UnsupportedOperationException: Unable to > get holder type for minor type [LATE] and mode [OPTIONAL] > Fragment 0:0 > [Error Id: 67223a94-b24b-4bde-a87a-743b093b23a6 on qa-node183.qa.lab:31010] > (java.lang.UnsupportedOperationException) Unable to get holder type for > minor type [LATE] and mode [OPTIONAL] > org.apache.drill.exec.expr.TypeHelper.getHolderType():602 > org.apache.drill.exec.expr.ClassGenerator.getHolderType():666 > org.apache.drill.exec.expr.ClassGenerator.declare():368 > org.apache.drill.exec.expr.ClassGenerator.declare():364 > > org.apache.drill.exec.expr.EvaluationVisitor$EvalVisitor.visitUnknown():349 > > org.apache.drill.exec.expr.EvaluationVisitor$ConstantFilter.visitUnknown():1320 > org.apache.drill.exec.expr.EvaluationVisitor$CSEFilter.visitUnknown():1026 > org.apache.drill.exec.expr.EvaluationVisitor$CSEFilter.visitUnknown():795 > > org.apache.drill.common.expression.visitors.AbstractExprVisitor.visitNullConstant():162 > > org.apache.drill.exec.expr.EvaluationVisitor$CSEFilter.visitNullConstant():1003 > > org.apache.drill.exec.expr.EvaluationVisitor$CSEFilter.visitNullConstant():795 > org.apache.drill.common.expression.TypedNullConstant.accept():46 > > org.apache.drill.exec.expr.EvaluationVisitor$EvalVisitor.visitFunctionHolderExpression():193 > > org.apache.drill.exec.expr.EvaluationVisitor$ConstantFilter.visitFunctionHolderExpression():1077 > > org.apache.drill.exec.expr.EvaluationVisitor$CSEFilter.visitFunctionHolderExpression():815 > > org.apache.drill.exec.expr.EvaluationVisitor$CSEFilter.visitFunctionHolderExpression():795 > org.apache.drill.common.expression.FunctionHolderExpression.accept():47 > org.apache.drill.exec.expr.EvaluationVisitor.addExpr():104 > org.apache.drill.exec.expr.ClassGenerator.addExpr():261 > > org.apache.drill.exec.physical.impl.project.ProjectRecordBatch.setupNewSchema():458 > org.apache.drill.exec.record.AbstractSingleRecordBatch.innerNext():78 > > org.apache.drill.exec.physical.impl.project.ProjectRecordBatch.innerNext():135 > org.apache.drill.exec.record.AbstractRecordBatch.next():162 > > org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator.next():215 > org.apache.drill.exec.physical.impl.BaseRootExec.next():104 > > org.apache.drill.exec.physical.impl.ScreenCreator$ScreenRoot.innerNext():81 > org.apache.drill.exec.physical.impl.BaseRootExec.next():94 > org.apache.drill.exec.work.fragment.FragmentExecutor$1.run():232 > org.apache.drill.exec.work.fragment.FragmentExecutor$1.run():226 > java.security.AccessController.doPrivileged():-2 > javax.security.auth.Subject.doAs():422 > org.apache.hadoop.security.UserGroupInformation.doAs():1595 > org.apache.drill.exec.work.fragment.FragmentExecutor.run():226 > org.apache.drill.common.SelfCleaningRunnable.run():38 > java.util.concurrent.ThreadPoolExecutor.runWorker():1142 > java.util.concurrent.ThreadPoolExecutor$Worker.run():617 > java.lang.Thread.run():745 > at > org.apache.drill.jdbc.impl.DrillCursor.nextRowInternally(DrillCursor.java:489) > at > org.apache.drill.jdbc.impl.DrillCursor.loadInitialSchema(DrillCursor.java:561) > at > org.apache.drill.jdbc.impl.DrillResultSetImpl.execute(DrillResultSetImpl.java:1895) > at > org.apache.drill.jdbc.impl.DrillResultSetImpl.execute(DrillResultSetImpl.java:61) > at > oadd.org.apache.calcite.avatica.AvaticaConnection$1.execute(AvaticaConnection.java:473) > at > org.apache.drill.jdbc.impl.DrillMetaImpl.prepareAndExecute(DrillMetaImpl.java:1100) >