TAJO-1046: Remove hadoop native dependency of pullserver. (jinho) Closes #143
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/469820db Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/469820db Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/469820db Branch: refs/heads/block_iteration Commit: 469820db19aedcf10ea41807e4cbc7ade48d780c Parents: 1cff979 Author: jhkim <[email protected]> Authored: Sat Sep 20 15:22:05 2014 +0900 Committer: jhkim <[email protected]> Committed: Sat Sep 20 15:22:05 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../pullserver/FadvisedFileRegionWrapper.java | 35 ---- .../pullserver/listener/FileCloseListener.java | 53 ------ .../pullserver/FadvisedFileRegionWrapper.java | 34 ---- .../pullserver/listener/FileCloseListener.java | 55 ------ .../pullserver/FadvisedFileRegionWrapper.java | 36 ---- .../pullserver/listener/FileCloseListener.java | 55 ------ .../tajo/pullserver/FadvisedChunkedFile.java | 81 +++++++++ .../tajo/pullserver/FadvisedFileRegion.java | 170 +++++++++++++++++++ .../tajo/pullserver/FileCloseListener.java | 53 ++++++ .../tajo/pullserver/PullServerAuxService.java | 4 +- .../apache/tajo/pullserver/PullServerUtil.java | 90 ++++++++++ .../tajo/pullserver/TajoPullServerService.java | 9 +- 13 files changed, 400 insertions(+), 277 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 026f020..a5f31f9 100644 --- a/CHANGES +++ b/CHANGES @@ -31,6 +31,8 @@ Release 0.9.0 - unreleased IMPROVEMENT + TAJO-1046: Remove hadoop native dependency of pullserver. (jinho) + TAJO-1040: Misuse netty HashedWheelTimer. (jinho) TAJO-1034: Reduce Explicit Use of JVM Internal Class. (Jihun Kang via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java b/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java deleted file mode 100644 index 335dff0..0000000 --- a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver; - -import org.apache.hadoop.io.ReadaheadPool; - -import java.io.IOException; -import java.io.RandomAccessFile; - -@Deprecated -public class FadvisedFileRegionWrapper extends org.apache.hadoop.mapred.FadvisedFileRegion { - - - public FadvisedFileRegionWrapper(RandomAccessFile file, long position, long count, - boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, - String identifier) throws IOException { - super(file, position, count, manageOsCache, readaheadLength, readaheadPool, identifier); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java b/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java deleted file mode 100644 index 2e36644..0000000 --- a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver.listener; - -import org.apache.hadoop.mapred.FadvisedFileRegion; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.apache.tajo.pullserver.TajoPullServerService; - -@Deprecated -public class FileCloseListener implements ChannelFutureListener { - - private FadvisedFileRegion filePart; - private String requestUri; - private TajoPullServerService pullServerService; - private long startTime; - - public FileCloseListener(FadvisedFileRegion filePart, - String requestUri, - long startTime, - TajoPullServerService pullServerService) { - this.filePart = filePart; - this.requestUri = requestUri; - this.pullServerService = pullServerService; - this.startTime = startTime; - } - - // TODO error handling; distinguish IO/connection failures, - // attribute to appropriate spill output - @Override - public void operationComplete(ChannelFuture future) { - filePart.releaseExternalResources(); - if (pullServerService != null) { - pullServerService.completeFileChunk(filePart, requestUri, startTime); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java b/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java deleted file mode 100644 index 42d1cd8..0000000 --- a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver; - -import org.apache.hadoop.io.ReadaheadPool; - -import java.io.IOException; -import java.io.RandomAccessFile; - -public class FadvisedFileRegionWrapper extends org.apache.hadoop.mapred.FadvisedFileRegion { - - - public FadvisedFileRegionWrapper(RandomAccessFile file, long position, long count, - boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, - String identifier) throws IOException { - super(file, position, count, manageOsCache, readaheadLength, readaheadPool, identifier); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java b/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java deleted file mode 100644 index be599c3..0000000 --- a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver.listener; - -import org.apache.hadoop.mapred.FadvisedFileRegion; -import org.apache.tajo.pullserver.TajoPullServerService; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; - -public class FileCloseListener implements ChannelFutureListener { - - private FadvisedFileRegion filePart; - private String requestUri; - private TajoPullServerService pullServerService; - private long startTime; - - public FileCloseListener(FadvisedFileRegion filePart, - String requestUri, - long startTime, - TajoPullServerService pullServerService) { - this.filePart = filePart; - this.requestUri = requestUri; - this.pullServerService = pullServerService; - this.startTime = startTime; - } - - // TODO error handling; distinguish IO/connection failures, - // attribute to appropriate spill output - @Override - public void operationComplete(ChannelFuture future) { - if(future.isSuccess()){ - filePart.transferSuccessful(); - } - filePart.releaseExternalResources(); - if (pullServerService != null) { - pullServerService.completeFileChunk(filePart, requestUri, startTime); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java b/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java deleted file mode 100644 index ee53bc6..0000000 --- a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver; - -import org.apache.hadoop.io.ReadaheadPool; - -import java.io.IOException; -import java.io.RandomAccessFile; - -/* tajo is not yet supported on Windows */ -public class FadvisedFileRegionWrapper extends org.apache.hadoop.mapred.FadvisedFileRegion { - public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024; - - public FadvisedFileRegionWrapper(RandomAccessFile file, long position, long count, - boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, - String identifier) throws IOException { - super(file, position, count, manageOsCache, readaheadLength, readaheadPool, - identifier, DEFAULT_SHUFFLE_BUFFER_SIZE, true); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java b/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java deleted file mode 100644 index 7d4ca3a..0000000 --- a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver.listener; - -import org.apache.hadoop.mapred.FadvisedFileRegion; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.apache.tajo.pullserver.TajoPullServerService; - -public class FileCloseListener implements ChannelFutureListener { - - private FadvisedFileRegion filePart; - private String requestUri; - private TajoPullServerService pullServerService; - private long startTime; - - public FileCloseListener(FadvisedFileRegion filePart, - String requestUri, - long startTime, - TajoPullServerService pullServerService) { - this.filePart = filePart; - this.requestUri = requestUri; - this.pullServerService = pullServerService; - this.startTime = startTime; - } - - // TODO error handling; distinguish IO/connection failures, - // attribute to appropriate spill output - @Override - public void operationComplete(ChannelFuture future) { - if(future.isSuccess()){ - filePart.transferSuccessful(); - } - filePart.releaseExternalResources(); - if (pullServerService != null) { - pullServerService.completeFileChunk(filePart, requestUri, startTime); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java new file mode 100644 index 0000000..b0b8d18 --- /dev/null +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.jboss.netty.handler.stream.ChunkedFile; + +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.RandomAccessFile; + +public class FadvisedChunkedFile extends ChunkedFile { + + private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class); + + private final boolean manageOsCache; + private final int readaheadLength; + private final ReadaheadPool readaheadPool; + private final FileDescriptor fd; + private final String identifier; + + private ReadaheadPool.ReadaheadRequest readaheadRequest; + + public FadvisedChunkedFile(RandomAccessFile file, long position, long count, + int chunkSize, boolean manageOsCache, int readaheadLength, + ReadaheadPool readaheadPool, String identifier) throws IOException { + super(file, position, count, chunkSize); + this.manageOsCache = manageOsCache; + this.readaheadLength = readaheadLength; + this.readaheadPool = readaheadPool; + this.fd = file.getFD(); + this.identifier = identifier; + } + + @Override + public Object nextChunk() throws Exception { + if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) { + readaheadRequest = readaheadPool + .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength, + getEndOffset(), readaheadRequest); + } + return super.nextChunk(); + } + + @Override + public void close() throws Exception { + if (readaheadRequest != null) { + readaheadRequest.cancel(); + } + if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) { + try { + PullServerUtil.posixFadviseIfPossible(identifier, + fd, + getStartOffset(), getEndOffset() - getStartOffset(), + NativeIO.POSIX.POSIX_FADV_DONTNEED); + } catch (Throwable t) { + LOG.warn("Failed to manage OS cache for " + identifier, t); + } + } + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java new file mode 100644 index 0000000..18cf4b6 --- /dev/null +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.jboss.netty.channel.DefaultFileRegion; + +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; + +public class FadvisedFileRegion extends DefaultFileRegion { + + private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class); + + private final boolean manageOsCache; + private final int readaheadLength; + private final ReadaheadPool readaheadPool; + private final FileDescriptor fd; + private final String identifier; + private final long count; + private final long position; + private final int shuffleBufferSize; + private final boolean shuffleTransferToAllowed; + private final FileChannel fileChannel; + + private ReadaheadPool.ReadaheadRequest readaheadRequest; + public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024; + + public FadvisedFileRegion(RandomAccessFile file, long position, long count, + boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, + String identifier) throws IOException { + this(file, position, count, manageOsCache, readaheadLength, readaheadPool, + identifier, DEFAULT_SHUFFLE_BUFFER_SIZE, true); + } + + public FadvisedFileRegion(RandomAccessFile file, long position, long count, + boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, + String identifier, int shuffleBufferSize, + boolean shuffleTransferToAllowed) throws IOException { + super(file.getChannel(), position, count); + this.manageOsCache = manageOsCache; + this.readaheadLength = readaheadLength; + this.readaheadPool = readaheadPool; + this.fd = file.getFD(); + this.identifier = identifier; + this.fileChannel = file.getChannel(); + this.count = count; + this.position = position; + this.shuffleBufferSize = shuffleBufferSize; + this.shuffleTransferToAllowed = shuffleTransferToAllowed; + } + + @Override + public long transferTo(WritableByteChannel target, long position) + throws IOException { + if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) { + readaheadRequest = readaheadPool.readaheadStream(identifier, fd, + getPosition() + position, readaheadLength, + getPosition() + getCount(), readaheadRequest); + } + + if(this.shuffleTransferToAllowed) { + return super.transferTo(target, position); + } else { + return customShuffleTransfer(target, position); + } + } + + /** + * This method transfers data using local buffer. It transfers data from + * a disk to a local buffer in memory, and then it transfers data from the + * buffer to the target. This is used only if transferTo is disallowed in + * the configuration file. super.TransferTo does not perform well on Windows + * due to a small IO request generated. customShuffleTransfer can control + * the size of the IO requests by changing the size of the intermediate + * buffer. + */ + @VisibleForTesting + long customShuffleTransfer(WritableByteChannel target, long position) + throws IOException { + long actualCount = this.count - position; + if (actualCount < 0 || position < 0) { + throw new IllegalArgumentException( + "position out of range: " + position + + " (expected: 0 - " + (this.count - 1) + ')'); + } + if (actualCount == 0) { + return 0L; + } + + long trans = actualCount; + int readSize; + ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize); + + while(trans > 0L && + (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) { + //adjust counters and buffer limit + if(readSize < trans) { + trans -= readSize; + position += readSize; + byteBuffer.flip(); + } else { + //We can read more than we need if the actualCount is not multiple + //of the byteBuffer size and file is big enough. In that case we cannot + //use flip method but we need to set buffer limit manually to trans. + byteBuffer.limit((int)trans); + byteBuffer.position(0); + position += trans; + trans = 0; + } + + //write data to the target + while(byteBuffer.hasRemaining()) { + target.write(byteBuffer); + } + + byteBuffer.clear(); + } + + return actualCount - trans; + } + + + @Override + public void releaseExternalResources() { + if (readaheadRequest != null) { + readaheadRequest.cancel(); + } + super.releaseExternalResources(); + } + + /** + * Call when the transfer completes successfully so we can advise the OS that + * we don't need the region to be cached anymore. + */ + public void transferSuccessful() { + if (PullServerUtil.isNativeIOPossible() && manageOsCache && getCount() > 0) { + try { + PullServerUtil.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(), + NativeIO.POSIX.POSIX_FADV_DONTNEED); + } catch (Throwable t) { + LOG.warn("Failed to manage OS cache for " + identifier, t); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java new file mode 100644 index 0000000..236db89 --- /dev/null +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; + +public class FileCloseListener implements ChannelFutureListener { + + private FadvisedFileRegion filePart; + private String requestUri; + private TajoPullServerService pullServerService; + private long startTime; + + public FileCloseListener(FadvisedFileRegion filePart, + String requestUri, + long startTime, + TajoPullServerService pullServerService) { + this.filePart = filePart; + this.requestUri = requestUri; + this.pullServerService = pullServerService; + this.startTime = startTime; + } + + // TODO error handling; distinguish IO/connection failures, + // attribute to appropriate spill output + @Override + public void operationComplete(ChannelFuture future) { + if(future.isSuccess()){ + filePart.transferSuccessful(); + } + filePart.releaseExternalResources(); + if (pullServerService != null) { + pullServerService.completeFileChunk(filePart, requestUri, startTime); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java index e6e7ce3..5f9f9e8 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.ReadaheadPool; -import org.apache.hadoop.mapred.FadvisedChunkedFile; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; @@ -48,7 +47,6 @@ import org.apache.tajo.QueryId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.pullserver.listener.FileCloseListener; import org.apache.tajo.pullserver.retriever.FileChunk; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; @@ -489,7 +487,7 @@ public class PullServerAuxService extends AuxiliaryService { } ChannelFuture writeFuture; if (ch.getPipeline().get(SslHandler.class) == null) { - final FadvisedFileRegionWrapper partition = new FadvisedFileRegionWrapper(spill, + final FadvisedFileRegion partition = new FadvisedFileRegion(spill, file.startOffset, file.length(), manageOsCache, readaheadLength, readaheadPool, file.getFile().getAbsolutePath()); writeFuture = ch.write(partition); http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java new file mode 100644 index 0000000..564950f --- /dev/null +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import org.apache.commons.lang.reflect.MethodUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.nativeio.NativeIO; + +import java.io.FileDescriptor; +import java.lang.reflect.Method; + +public class PullServerUtil { + private static final Log LOG = LogFactory.getLog(PullServerUtil.class); + + private static boolean nativeIOPossible = false; + private static Method posixFadviseIfPossible; + + static { + if (NativeIO.isAvailable() && loadNativeIO()) { + nativeIOPossible = true; + } else { + LOG.warn("Unable to load hadoop nativeIO"); + } + } + + public static boolean isNativeIOPossible() { + return nativeIOPossible; + } + + /** + * Call posix_fadvise on the given file descriptor. See the manpage + * for this syscall for more information. On systems where this + * call is not available, does nothing. + */ + public static void posixFadviseIfPossible(String identifier, java.io.FileDescriptor fd, + long offset, long len, int flags) { + if (nativeIOPossible) { + try { + posixFadviseIfPossible.invoke(null, identifier, fd, offset, len, flags); + } catch (Throwable t) { + nativeIOPossible = false; + LOG.warn("Failed to manage OS cache for " + identifier, t); + } + } + } + + /* load hadoop native method if possible */ + private static boolean loadNativeIO() { + boolean loaded = true; + if (nativeIOPossible) return loaded; + + Class[] parameters = {String.class, FileDescriptor.class, Long.TYPE, Long.TYPE, Integer.TYPE}; + try { + Method getCacheManipulator = MethodUtils.getAccessibleMethod(NativeIO.POSIX.class, "getCacheManipulator", new Class[0]); + Class posixClass; + if (getCacheManipulator != null) { + Object posix = MethodUtils.invokeStaticMethod(NativeIO.POSIX.class, "getCacheManipulator", null); + posixClass = posix.getClass(); + } else { + posixClass = NativeIO.POSIX.class; + } + posixFadviseIfPossible = MethodUtils.getAccessibleMethod(posixClass, "posixFadviseIfPossible", parameters); + } catch (Throwable e) { + loaded = false; + LOG.warn("Failed to access posixFadviseIfPossible :" + e.getMessage()); + } + + if (posixFadviseIfPossible == null) { + loaded = false; + } + return loaded; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index f7bc489..720f0ca 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -32,8 +32,6 @@ import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.ReadaheadPool; -import org.apache.hadoop.mapred.FadvisedChunkedFile; -import org.apache.hadoop.mapred.FadvisedFileRegion; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; @@ -47,7 +45,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.pullserver.listener.FileCloseListener; import org.apache.tajo.pullserver.retriever.FileChunk; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.storage.HashShuffleAppenderManager; @@ -384,7 +381,7 @@ public class TajoPullServerService extends AbstractService { Map<String, ProcessingStatus> processingStatusMap = new ConcurrentHashMap<String, ProcessingStatus>(); - public void completeFileChunk(FadvisedFileRegion filePart, + public void completeFileChunk(FileRegion filePart, String requestUri, long startTime) { ProcessingStatus status = processingStatusMap.get(requestUri); @@ -412,7 +409,7 @@ public class TajoPullServerService extends AbstractService { this.numFiles = numFiles; this.remainFiles = new AtomicInteger(numFiles); } - public void decrementRemainFiles(FadvisedFileRegion filePart, long fileStartTime) { + public void decrementRemainFiles(FileRegion filePart, long fileStartTime) { synchronized(remainFiles) { long fileSendTime = System.currentTimeMillis() - fileStartTime; if (fileSendTime > 20 * 1000) { @@ -649,7 +646,7 @@ public class TajoPullServerService extends AbstractService { try { spill = new RandomAccessFile(file.getFile(), "r"); if (ch.getPipeline().get(SslHandler.class) == null) { - final FadvisedFileRegionWrapper filePart = new FadvisedFileRegionWrapper(spill, + final FadvisedFileRegion filePart = new FadvisedFileRegion(spill, file.startOffset, file.length(), manageOsCache, readaheadLength, readaheadPool, file.getFile().getAbsolutePath()); writeFuture = ch.write(filePart);
