[ https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15854934#comment-15854934 ]
ASF GitHub Bot commented on DRILL-5080: --------------------------------------- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/717#discussion_r99246295 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/InMemorySorter.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.xsort.managed; + +import java.util.LinkedList; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder; +import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.SortResults; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.selection.SelectionVector4; + +public class InMemorySorter implements SortResults { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InMemorySorter.class); + + private SortRecordBatchBuilder builder; + private MSorter mSorter; + private final FragmentContext context; + private final BufferAllocator oAllocator; + private SelectionVector4 sv4; + private final OperatorCodeGenerator opCg; + private int batchCount; + + public InMemorySorter(FragmentContext context, BufferAllocator allocator, OperatorCodeGenerator opCg) { + this.context = context; + this.oAllocator = allocator; + this.opCg = opCg; + } + + public SelectionVector4 sort(LinkedList<BatchGroup.InputBatch> batchGroups, VectorAccessible batch, + VectorContainer destContainer) { + if (builder != null) { + builder.clear(); + builder.close(); + } + builder = new SortRecordBatchBuilder(oAllocator); + + for (BatchGroup.InputBatch group : batchGroups) { + RecordBatchData rbd = new RecordBatchData(group.getContainer(), oAllocator); + rbd.setSv2(group.getSv2()); + builder.add(rbd); + } + batchGroups.clear(); + + try { + builder.build(context, destContainer); + sv4 = builder.getSv4(); + mSorter = opCg.createNewMSorter(batch); + mSorter.setup(context, oAllocator, sv4, destContainer, sv4.getCount()); + } catch (SchemaChangeException e) { + throw UserException.unsupportedError(e) + .message("Unexpected schema change - likely code error.") + .build(logger); + } + + // For testing memory-leaks, inject exception after mSorter finishes setup + ExternalSortBatch.injector.injectUnchecked(context.getExecutionControls(), ExternalSortBatch.INTERRUPTION_AFTER_SETUP); + mSorter.sort(destContainer); + + // sort may have prematurely exited due to should continue returning false. + if (!context.shouldContinue()) { + return null; + } + + // For testing memory-leak purpose, inject exception after mSorter finishes sorting + ExternalSortBatch.injector.injectUnchecked(context.getExecutionControls(), ExternalSortBatch.INTERRUPTION_AFTER_SORT); + sv4 = mSorter.getSV4(); + + destContainer.buildSchema(SelectionVectorMode.FOUR_BYTE); + return sv4; + } + + @Override + public boolean next() { + boolean more = sv4.next(); + if (more) { batchCount++; } + return more; + } + + @Override + public void close() { + if (builder != null) { + builder.clear(); + builder.close(); + } + if (mSorter != null) { + mSorter.clear(); + } + } + + @Override + public int getBatchCount() { + return batchCount; + } + + @Override + public int getRecordCount() { + return sv4.getTotalCount(); + } +} --- End diff -- Fixed. > Create a memory-managed version of the External Sort operator > ------------------------------------------------------------- > > Key: DRILL-5080 > URL: https://issues.apache.org/jira/browse/DRILL-5080 > Project: Apache Drill > Issue Type: Improvement > Affects Versions: 1.8.0 > Reporter: Paul Rogers > Assignee: Paul Rogers > Fix For: 1.10.0 > > Attachments: ManagedExternalSortDesign.pdf > > > We propose to create a "managed" version of the external sort operator that > works to a clearly-defined memory limit. Attached is a design specification > for the work. > The project will include fixing a number of bugs related to the external > sort, include as sub-tasks of this umbrella task. -- This message was sent by Atlassian JIRA (v6.3.15#6346)