[ 
https://issues.apache.org/jira/browse/PHOENIX-2405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15363646#comment-15363646
 ] 

ASF GitHub Bot commented on PHOENIX-2405:
-----------------------------------------

Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/175#discussion_r69668023
  
    --- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java
 ---
    @@ -0,0 +1,357 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import org.apache.commons.io.input.CountingInputStream;
    +import org.apache.commons.io.output.DeferredFileOutputStream;
    +
    +import org.apache.phoenix.memory.MemoryManager;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.query.QueryServices;
    +import org.apache.phoenix.query.QueryServicesOptions;
    +
    +
    +import java.io.*;
    +import java.util.AbstractQueue;
    +import java.util.Iterator;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*;
    +
    +
    +public abstract class SpoolingByteBufferSegmentQueue<T>  extends 
AbstractQueue<T> {
    +
    +    private ResultQueue<T> spoolFrom;
    +
    +    private boolean closed ;
    +    private boolean flushed;
    +    private DeferredFileOutputStream spoolTo;
    +    private MemoryChunk chunk;
    +    private int size = 0;
    +    private long inMemByteSize = 0L;
    +    private int index;
    +
    +
    +
    +    SpoolingByteBufferSegmentQueue(int index, MemoryManager mm, final int 
thresholdBytes, String spoolDirectory)  {
    +
    +        long startTime = System.currentTimeMillis();
    +        chunk  = mm.allocate(0, thresholdBytes);
    +        long waitTime = System.currentTimeMillis() - startTime;
    +        GLOBAL_MEMORY_WAIT_TIME.update(waitTime);
    +
    +        int size = (int)chunk.getSize();
    +        spoolTo = new DeferredFileOutputStream(size, 
"ResultSpooler",".bin", new File(spoolDirectory)) {
    +            @Override
    +            protected void thresholdReached() throws IOException {
    +                try {
    +                    super.thresholdReached();
    +                } finally {
    +                    chunk.close();
    +                }
    +            }
    +        };
    +
    +
    +    }
    +
    +    public int index() {
    +        return this.index;
    +    }
    +
    +
    +
    +    protected abstract InMemoryResultQueue<T> 
createInMemoryResultQueue(byte[] bytes, MemoryChunk memoryChunk);
    +
    +    protected abstract OnDiskResultQueue<T> createOnDiskResultQueue(File 
file);
    +
    +    @Override
    +    public boolean offer(T t) {
    +        if (closed || flushed){
    +            return false;
    +        }
    +        boolean result = writeRecord(t, spoolTo);
    +        if(result){
    +            if(!spoolTo.isInMemory()){
    +                flushToDisk();
    +            }
    +            size++;
    +        }
    +
    +
    +        return result;
    +    }
    +
    +    protected abstract boolean writeRecord(T t, OutputStream outputStream);
    +
    +    private void flushToMemory(){
    +        byte[] data = spoolTo.getData();
    +        chunk.resize(data.length);
    +        spoolFrom = createInMemoryResultQueue(data, chunk);
    +        GLOBAL_MEMORY_CHUNK_BYTES.update(data.length);
    +        flushed = true;
    +    }
    +
    +
    +    private void flushToDisk(){
    +        long sizeOfSpoolFile = spoolTo.getFile().length();
    +        GLOBAL_SPOOL_FILE_SIZE.update(sizeOfSpoolFile);
    +        GLOBAL_SPOOL_FILE_COUNTER.increment();
    +        spoolFrom = createOnDiskResultQueue(spoolTo.getFile());
    +        if (spoolTo.getFile() != null) {
    +            spoolTo.getFile().deleteOnExit();
    +        }
    +        inMemByteSize = 0;
    +        flushed = true;
    +    }
    +
    +
    +    public boolean isFlushed(){
    +        return flushed;
    +    }
    +
    +    public T peek() {
    +        if(!flushed){
    +            flushToMemory();
    +        }
    +        return spoolFrom.peek();
    +    }
    +
    +    @Override
    +    public T poll() {
    +        if(!flushed){
    +            flushToMemory();
    +        }
    +        return spoolFrom.poll();
    +    }
    +
    +    public void close() throws IOException {
    +        if(spoolFrom != null){
    +            spoolFrom.close();
    +        }
    +    }
    +
    +    @Override
    +    public Iterator<T> iterator() {
    +        if(!flushed){
    +            flushToMemory();
    +        }
    +        return spoolFrom.iterator();
    +    }
    +
    +    @Override
    +    public int size() {
    +        return size ;
    +    }
    +
    +    public long getInMemByteSize(){
    +        return inMemByteSize;
    +    };
    +
    +    private static abstract class ResultQueue<T> extends  AbstractQueue<T> 
implements  Closeable{}
    +
    +    protected static abstract class InMemoryResultQueue<T> extends 
ResultQueue<T> {
    +        private final MemoryChunk memoryChunk;
    +        protected final byte[] bytes;
    +        private T next;
    +        private AtomicInteger offset = new AtomicInteger(0);
    +
    +        protected InMemoryResultQueue(byte[] bytes, MemoryChunk 
memoryChunk) {
    +            this.bytes = bytes;
    +            this.memoryChunk = memoryChunk;
    +            advance(offset);
    +        }
    +
    +        protected abstract T advance(AtomicInteger offset);
    +
    +        @Override
    +        public boolean offer(T t) {
    +            return false;
    +        }
    +
    +        @Override
    +        public T peek(){
    +            return next;
    +        }
    +
    +        @Override
    +        public T poll() {
    +            T current = next;
    +            next = advance(offset);
    +            return current;
    +        }
    +
    +
    +        public void close() {
    +            memoryChunk.close();
    +        }
    +
    +
    +        @Override
    +        public Iterator<T> iterator() {
    +            return new Iterator<T>(){
    +                AtomicInteger iteratorOffset = new 
AtomicInteger(offset.get());
    +                private T next = advance(iteratorOffset);
    --- End diff --
    
    One reminder: I said in my previous review that we'd better avoid 
discarding a old XXXSegmentQueue and starting a new one all the time when we 
switch between these segment queues. So we should ultimately remove this offset 
thing.


> Improve performance and stability of server side sort for ORDER BY
> ------------------------------------------------------------------
>
>                 Key: PHOENIX-2405
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-2405
>             Project: Phoenix
>          Issue Type: Bug
>            Reporter: James Taylor
>            Assignee: Haoran Zhang
>              Labels: gsoc2016
>             Fix For: 4.8.0
>
>
> We currently use memory mapped files to buffer data as it's being sorted in 
> an ORDER BY (see MappedByteBufferQueue). The following types of exceptions 
> have been seen to occur:
> {code}
> Caused by: java.lang.OutOfMemoryError: Map failed
>         at sun.nio.ch.FileChannelImpl.map0(Native Method)
>         at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:904)
> {code}
> [~apurtell] has read that memory mapped files are not cleaned up after very 
> well in Java:
> {quote}
> "Map failed" means the JVM ran out of virtual address space. If you search 
> around stack overflow for suggestions on what to do when your app (in this 
> case Phoenix) encounters this issue when using mapped buffers, the answers 
> tend toward manually cleaning up the mapped buffers or explicitly triggering 
> a full GC. See 
> http://stackoverflow.com/questions/8553158/prevent-outofmemory-when-using-java-nio-mappedbytebuffer
>  for example. There are apparently long standing JVM/JRE problems with 
> reclamation of mapped buffers. I think we may want to explore in Phoenix a 
> different way to achieve what the current code is doing.
> {quote}
> Instead of using memory mapped files, we could use heap memory, or perhaps 
> there are other mechanisms too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to