[ https://issues.apache.org/jira/browse/PHOENIX-2405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15363646#comment-15363646 ]
ASF GitHub Bot commented on PHOENIX-2405: ----------------------------------------- Github user maryannxue commented on a diff in the pull request: https://github.com/apache/phoenix/pull/175#discussion_r69668023 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java --- @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import org.apache.commons.io.input.CountingInputStream; +import org.apache.commons.io.output.DeferredFileOutputStream; + +import org.apache.phoenix.memory.MemoryManager; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; + + +import java.io.*; +import java.util.AbstractQueue; +import java.util.Iterator; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*; + + +public abstract class SpoolingByteBufferSegmentQueue<T> extends AbstractQueue<T> { + + private ResultQueue<T> spoolFrom; + + private boolean closed ; + private boolean flushed; + private DeferredFileOutputStream spoolTo; + private MemoryChunk chunk; + private int size = 0; + private long inMemByteSize = 0L; + private int index; + + + + SpoolingByteBufferSegmentQueue(int index, MemoryManager mm, final int thresholdBytes, String spoolDirectory) { + + long startTime = System.currentTimeMillis(); + chunk = mm.allocate(0, thresholdBytes); + long waitTime = System.currentTimeMillis() - startTime; + GLOBAL_MEMORY_WAIT_TIME.update(waitTime); + + int size = (int)chunk.getSize(); + spoolTo = new DeferredFileOutputStream(size, "ResultSpooler",".bin", new File(spoolDirectory)) { + @Override + protected void thresholdReached() throws IOException { + try { + super.thresholdReached(); + } finally { + chunk.close(); + } + } + }; + + + } + + public int index() { + return this.index; + } + + + + protected abstract InMemoryResultQueue<T> createInMemoryResultQueue(byte[] bytes, MemoryChunk memoryChunk); + + protected abstract OnDiskResultQueue<T> createOnDiskResultQueue(File file); + + @Override + public boolean offer(T t) { + if (closed || flushed){ + return false; + } + boolean result = writeRecord(t, spoolTo); + if(result){ + if(!spoolTo.isInMemory()){ + flushToDisk(); + } + size++; + } + + + return result; + } + + protected abstract boolean writeRecord(T t, OutputStream outputStream); + + private void flushToMemory(){ + byte[] data = spoolTo.getData(); + chunk.resize(data.length); + spoolFrom = createInMemoryResultQueue(data, chunk); + GLOBAL_MEMORY_CHUNK_BYTES.update(data.length); + flushed = true; + } + + + private void flushToDisk(){ + long sizeOfSpoolFile = spoolTo.getFile().length(); + GLOBAL_SPOOL_FILE_SIZE.update(sizeOfSpoolFile); + GLOBAL_SPOOL_FILE_COUNTER.increment(); + spoolFrom = createOnDiskResultQueue(spoolTo.getFile()); + if (spoolTo.getFile() != null) { + spoolTo.getFile().deleteOnExit(); + } + inMemByteSize = 0; + flushed = true; + } + + + public boolean isFlushed(){ + return flushed; + } + + public T peek() { + if(!flushed){ + flushToMemory(); + } + return spoolFrom.peek(); + } + + @Override + public T poll() { + if(!flushed){ + flushToMemory(); + } + return spoolFrom.poll(); + } + + public void close() throws IOException { + if(spoolFrom != null){ + spoolFrom.close(); + } + } + + @Override + public Iterator<T> iterator() { + if(!flushed){ + flushToMemory(); + } + return spoolFrom.iterator(); + } + + @Override + public int size() { + return size ; + } + + public long getInMemByteSize(){ + return inMemByteSize; + }; + + private static abstract class ResultQueue<T> extends AbstractQueue<T> implements Closeable{} + + protected static abstract class InMemoryResultQueue<T> extends ResultQueue<T> { + private final MemoryChunk memoryChunk; + protected final byte[] bytes; + private T next; + private AtomicInteger offset = new AtomicInteger(0); + + protected InMemoryResultQueue(byte[] bytes, MemoryChunk memoryChunk) { + this.bytes = bytes; + this.memoryChunk = memoryChunk; + advance(offset); + } + + protected abstract T advance(AtomicInteger offset); + + @Override + public boolean offer(T t) { + return false; + } + + @Override + public T peek(){ + return next; + } + + @Override + public T poll() { + T current = next; + next = advance(offset); + return current; + } + + + public void close() { + memoryChunk.close(); + } + + + @Override + public Iterator<T> iterator() { + return new Iterator<T>(){ + AtomicInteger iteratorOffset = new AtomicInteger(offset.get()); + private T next = advance(iteratorOffset); --- End diff -- One reminder: I said in my previous review that we'd better avoid discarding a old XXXSegmentQueue and starting a new one all the time when we switch between these segment queues. So we should ultimately remove this offset thing. > Improve performance and stability of server side sort for ORDER BY > ------------------------------------------------------------------ > > Key: PHOENIX-2405 > URL: https://issues.apache.org/jira/browse/PHOENIX-2405 > Project: Phoenix > Issue Type: Bug > Reporter: James Taylor > Assignee: Haoran Zhang > Labels: gsoc2016 > Fix For: 4.8.0 > > > We currently use memory mapped files to buffer data as it's being sorted in > an ORDER BY (see MappedByteBufferQueue). The following types of exceptions > have been seen to occur: > {code} > Caused by: java.lang.OutOfMemoryError: Map failed > at sun.nio.ch.FileChannelImpl.map0(Native Method) > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:904) > {code} > [~apurtell] has read that memory mapped files are not cleaned up after very > well in Java: > {quote} > "Map failed" means the JVM ran out of virtual address space. If you search > around stack overflow for suggestions on what to do when your app (in this > case Phoenix) encounters this issue when using mapped buffers, the answers > tend toward manually cleaning up the mapped buffers or explicitly triggering > a full GC. See > http://stackoverflow.com/questions/8553158/prevent-outofmemory-when-using-java-nio-mappedbytebuffer > for example. There are apparently long standing JVM/JRE problems with > reclamation of mapped buffers. I think we may want to explore in Phoenix a > different way to achieve what the current code is doing. > {quote} > Instead of using memory mapped files, we could use heap memory, or perhaps > there are other mechanisms too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)