Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/717#discussion_r99066052 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.xsort.managed; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.SortResults; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.ValueVector; + +import com.google.common.base.Stopwatch; + +/** + * Manages a {@link PriorityQueueCopier} instance produced from code generation. + * Provides a wrapper around a copier "session" to simplify reading batches + * from the copier. + */ + +public class CopierHolder { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierHolder.class); + + private PriorityQueueCopier copier; + + private final FragmentContext context; + private final BufferAllocator allocator; + private OperatorCodeGenerator opCodeGen; + + public CopierHolder(FragmentContext context, BufferAllocator allocator, OperatorCodeGenerator opCodeGen) { + this.context = context; + this.allocator = allocator; + this.opCodeGen = opCodeGen; + } + + /** + * Start a merge operation using a temporary vector container. Used for + * intermediate merges. + * + * @param schema + * @param batchGroupList + * @param targetRecordCount + * @return + */ + + public CopierHolder.BatchMerger startMerge(BatchSchema schema, List<? extends BatchGroup> batchGroupList, int targetRecordCount) { + return new BatchMerger(this, schema, batchGroupList, targetRecordCount); + } + + /** + * Start a merge operation using the specified vector container. Used for + * the final merge operation. + * + * @param schema + * @param batchGroupList + * @param outputContainer + * @param targetRecordCount + * @return + */ + public CopierHolder.BatchMerger startFinalMerge(BatchSchema schema, List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer, int targetRecordCount) { + return new BatchMerger(this, schema, batchGroupList, outputContainer, targetRecordCount); + } + + /** + * Prepare a copier which will write a collection of vectors to disk. The copier + * uses generated code to to the actual writes. If the copier has not yet been + * created, generated code and create it. If it has been created, close it and + * prepare it for a new collection of batches. + * + * @param batch the (hyper) batch of vectors to be copied + * @param batchGroupList same batches as above, but represented as a list + * of individual batches + * @param outputContainer the container into which to copy the batches + * @param allocator allocator to use to allocate memory in the operation + */ + + @SuppressWarnings("unchecked") + private void createCopier(VectorAccessible batch, List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer) { + if (copier != null) { + opCodeGen.closeCopier(); + } else { + copier = opCodeGen.getCopier(batch); + } + + // Initialize the value vectors for the output container using the + // allocator provided + + for (VectorWrapper<?> i : batch) { + @SuppressWarnings("resource") + ValueVector v = TypeHelper.getNewVector(i.getField(), allocator); + outputContainer.add(v); + } + try { + copier.setup(context, allocator, batch, (List<BatchGroup>) batchGroupList, outputContainer); + } catch (SchemaChangeException e) { + throw UserException.unsupportedError(e) + .message("Unexpected schema change - likely code error.") + .build(logger); + } + } + + public BufferAllocator getAllocator() { return allocator; } + + public void close() { + opCodeGen.closeCopier(); + copier = null; + } + + /** + * We've gathered a set of batches, each of which has been sorted. The batches + * may have passed through a filter and thus may have "holes" where rows have + * been filtered out. We will spill records in blocks of targetRecordCount. + * To prepare, copy that many records into an outputContainer as a set of + * contiguous values in new vectors. The result is a single batch with + * vectors that combine a collection of input batches up to the + * given threshold. + * <p> + * Input (selection vector, data vector):<pre> + * [3 7 4 8 0 6 1] [5 3 6 8 2 0] + * [eh_ad_ibf] [r_qm_kn_p]</pre> + * <p> + * Output (assuming blocks of 5 records, data vectors only):<pre> + * [abcde] [fhikm] [npqr]</pre> + * <p> + * The copying operation does a merge as well: copying + * values from the sources in ordered fashion. + * <pre> + * Input: [aceg] [bdfh] + * Output: [abcdefgh]</pre> + * <p> + * Here we bind the copier to the batchGroupList of sorted, buffered batches + * to be merged. We bind the copier output to outputContainer: the copier will write its + * merged "batches" of records to that container. + * <p> + * Calls to the {@link #next()} method sequentially return merged batches + * of the desired row count. + */ + + public static class BatchMerger implements SortResults, AutoCloseable { + + private CopierHolder holder; + private VectorContainer hyperBatch; + private VectorContainer outputContainer; + private int targetRecordCount; + private int copyCount; + private int batchCount; + + /** + * Creates a merger with an temporary output container. + * + * @param holder --- End diff -- Fixed.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---