http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java new file mode 100644 index 0000000..4a9313f --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java @@ -0,0 +1,63 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; + +public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + OffHeapRowBlock rowBlock; + + // Read States + private int curRowIdxForRead; + private int curPosForRead; + + public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) { + this.rowBlock = rowBlock; + } + + public long remainForRead() { + return rowBlock.memorySize - curPosForRead; + } + + @Override + public boolean next(ZeroCopyTuple tuple) { + if (curRowIdxForRead < rowBlock.rows()) { + + long recordStartPtr = rowBlock.address() + curPosForRead; + int recordLen = UNSAFE.getInt(recordStartPtr); + tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes); + + curPosForRead += recordLen; + curRowIdxForRead++; + + return true; + } else { + return false; + } + } + + @Override + public void reset() { + curPosForRead = 0; + curRowIdxForRead = 0; + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java new file mode 100644 index 0000000..147f7be --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.collect.Lists; +import org.apache.tajo.storage.Tuple; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class OffHeapRowBlockUtils { + + public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) { + List<Tuple> tupleList = Lists.newArrayList(); + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + while(reader.next(zcTuple)) { + tupleList.add(zcTuple); + zcTuple = new ZeroCopyTuple(); + } + Collections.sort(tupleList, comparator); + return tupleList; + } + + public static Tuple [] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) { + Tuple [] tuples = new Tuple[rowBlock.rows()]; + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) { + tuples[i] = zcTuple; + zcTuple = new ZeroCopyTuple(); + } + Arrays.sort(tuples, comparator); + return tuples; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java new file mode 100644 index 0000000..d177e0c --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.common.TajoDataTypes; + +public class OffHeapRowBlockWriter extends OffHeapRowWriter { + OffHeapRowBlock rowBlock; + + OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) { + super(rowBlock.dataTypes); + this.rowBlock = rowBlock; + } + + public long address() { + return rowBlock.address(); + } + + public int position() { + return rowBlock.position(); + } + + @Override + public void forward(int length) { + rowBlock.position(position() + length); + } + + public void ensureSize(int size) { + rowBlock.ensureSize(size); + } + + @Override + public void endRow() { + super.endRow(); + rowBlock.setRows(rowBlock.rows() + 1); + } + + @Override + public TajoDataTypes.DataType[] dataTypes() { + return rowBlock.dataTypes; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java new file mode 100644 index 0000000..31bc206 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.util.SizeOf; +import sun.misc.Unsafe; + +/** + * + * Row Record Structure + * + * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N | + * 4 bytes 4 bytes 4 bytes + * + */ +public abstract class OffHeapRowWriter implements RowWriter { + /** record size + offset list */ + private final int headerSize; + /** field offsets */ + private final int [] fieldOffsets; + private final TajoDataTypes.DataType [] dataTypes; + + private int curFieldIdx; + private int curOffset; + + public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) { + this.dataTypes = dataTypes; + fieldOffsets = new int[dataTypes.length]; + headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1); + } + + public void clear() { + curOffset = 0; + curFieldIdx = 0; + } + + public long recordStartAddr() { + return address() + position(); + } + + public abstract long address(); + + public abstract void ensureSize(int size); + + public int offset() { + return curOffset; + } + + /** + * Current position + * + * @return The position + */ + public abstract int position(); + + /** + * Forward the address; + * + * @param length Length to be forwarded + */ + public abstract void forward(int length); + + @Override + public TajoDataTypes.DataType[] dataTypes() { + return dataTypes; + } + + public boolean startRow() { + curOffset = headerSize; + curFieldIdx = 0; + return true; + } + + public void endRow() { + long rowHeaderPos = address() + position(); + OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset); + rowHeaderPos += SizeOf.SIZE_OF_INT; + + for (int i = 0; i < curFieldIdx; i++) { + OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]); + rowHeaderPos += SizeOf.SIZE_OF_INT; + } + for (int i = curFieldIdx; i < dataTypes.length; i++) { + OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET); + rowHeaderPos += SizeOf.SIZE_OF_INT; + } + + // rowOffset is equivalent to a byte length of this row. + forward(curOffset); + } + + public void skipField() { + fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + private void forwardField() { + fieldOffsets[curFieldIdx++] = curOffset; + } + + public void putBool(boolean val) { + ensureSize(SizeOf.SIZE_OF_BOOL); + forwardField(); + + OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00)); + + curOffset += SizeOf.SIZE_OF_BOOL; + } + + public void putInt2(short val) { + ensureSize(SizeOf.SIZE_OF_SHORT); + forwardField(); + + OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_SHORT; + } + + public void putInt4(int val) { + ensureSize(SizeOf.SIZE_OF_INT); + forwardField(); + + OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_INT; + } + + public void putInt8(long val) { + ensureSize(SizeOf.SIZE_OF_LONG); + forwardField(); + + OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_LONG; + } + + public void putFloat4(float val) { + ensureSize(SizeOf.SIZE_OF_FLOAT); + forwardField(); + + OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_FLOAT; + } + + public void putFloat8(double val) { + ensureSize(SizeOf.SIZE_OF_DOUBLE); + forwardField(); + + OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_DOUBLE; + } + + public void putText(String val) { + byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET); + putText(bytes); + } + + public void putText(byte[] val) { + int bytesLen = val.length; + + ensureSize(SizeOf.SIZE_OF_INT + bytesLen); + forwardField(); + + OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen); + curOffset += SizeOf.SIZE_OF_INT; + + OffHeapMemory.UNSAFE.copyMemory(val, Unsafe.ARRAY_BYTE_BASE_OFFSET, null, recordStartAddr() + curOffset, bytesLen); + curOffset += bytesLen; + } + + public void putBlob(byte[] val) { + int bytesLen = val.length; + + ensureSize(SizeOf.SIZE_OF_INT + bytesLen); + forwardField(); + + OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen); + curOffset += SizeOf.SIZE_OF_INT; + + OffHeapMemory.UNSAFE.copyMemory(val, Unsafe.ARRAY_BYTE_BASE_OFFSET, null, recordStartAddr() + curOffset, bytesLen); + curOffset += bytesLen; + } + + public void putTimestamp(long val) { + putInt8(val); + } + + public void putDate(int val) { + putInt4(val); + } + + public void putTime(long val) { + putInt8(val); + } + + public void putInterval(IntervalDatum val) { + ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG); + forwardField(); + + long offset = recordStartAddr() + curOffset; + OffHeapMemory.UNSAFE.putInt(offset, val.getMonths()); + offset += SizeOf.SIZE_OF_INT; + OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds()); + curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG; + } + + public void putInet4(int val) { + putInt4(val); + } + + public void putProtoDatum(ProtobufDatum val) { + putBlob(val.asByteArray()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java new file mode 100644 index 0000000..14e67b2 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.util.FileUtil; + +/** + * It specifies the maximum size or increasing ratio. In addition, + * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31 + * due to ByteBuffer. + */ +public class ResizableLimitSpec { + private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class); + + public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE; + public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE); + + private final long initSize; + private final long limitBytes; + private final float incRatio; + private final float allowedOVerflowRatio; + private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f; + private final static float DEFAULT_INCREASE_RATIO = 1.0f; + + public ResizableLimitSpec(long initSize) { + this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes) { + this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) { + this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) { + Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes."); + Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB."); + Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes."); + Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB."); + Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0."); + + if (initSize == limitBytes) { + long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio)); + + if (overflowedSize > Integer.MAX_VALUE) { + overflowedSize = Integer.MAX_VALUE; + } + + this.initSize = overflowedSize; + this.limitBytes = overflowedSize; + } else { + this.initSize = initSize; + limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio)); + + if (limitBytes > Integer.MAX_VALUE) { + this.limitBytes = Integer.MAX_VALUE; + } else { + this.limitBytes = limitBytes; + } + } + + this.allowedOVerflowRatio = allowedOverflowRatio; + this.incRatio = incRatio; + } + + public long initialSize() { + return initSize; + } + + public long limit() { + return limitBytes; + } + + public float remainRatio(long currentSize) { + Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); + if (currentSize > Integer.MAX_VALUE) { + currentSize = Integer.MAX_VALUE; + } + return (float)currentSize / (float)limitBytes; + } + + public boolean canIncrease(long currentSize) { + return remain(currentSize) > 0; + } + + public long remain(long currentSize) { + Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); + return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize; + } + + public int increasedSize(int currentSize) { + if (currentSize < initSize) { + return (int) initSize; + } + + if (currentSize > Integer.MAX_VALUE) { + LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)"); + return Integer.MAX_VALUE; + } + long nextSize = (long) (currentSize + ((float) currentSize * incRatio)); + + if (nextSize > limitBytes) { + LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")"); + nextSize = limitBytes; + } + + if (nextSize > Integer.MAX_VALUE) { + LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")"); + nextSize = Integer.MAX_VALUE; + } + + return (int) nextSize; + } + + @Override + public String toString() { + return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit=" + + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio + + ",inc_ratio=" + incRatio; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java new file mode 100644 index 0000000..59f8d1b --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java @@ -0,0 +1,73 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; + +/** + * The call sequence should be as follows: + * + * <pre> + * startRow() --> skipField() or putXXX --> endRow() + * </pre> + * + * The total number of skipField and putXXX invocations must be equivalent to the number of fields. + */ +public interface RowWriter { + + public TajoDataTypes.DataType [] dataTypes(); + + public boolean startRow(); + + public void endRow(); + + public void skipField(); + + public void putBool(boolean val); + + public void putInt2(short val); + + public void putInt4(int val); + + public void putInt8(long val); + + public void putFloat4(float val); + + public void putFloat8(double val); + + public void putText(String val); + + public void putText(byte [] val); + + public void putBlob(byte[] val); + + public void putTimestamp(long val); + + public void putTime(long val); + + public void putDate(int val); + + public void putInterval(IntervalDatum val); + + public void putInet4(int val); + + public void putProtoDatum(ProtobufDatum datum); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java new file mode 100644 index 0000000..7ba753b --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java @@ -0,0 +1,308 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.sun.tools.javac.util.Convert; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.UnsafeUtil; +import org.apache.tajo.datum.*; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public abstract class UnSafeTuple implements Tuple { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + private DirectBuffer bb; + private int relativePos; + private int length; + private DataType [] types; + + protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) { + this.bb = (DirectBuffer) bb; + this.relativePos = relativePos; + this.length = length; + this.types = types; + } + + void set(ByteBuffer bb, DataType [] types) { + set(bb, 0, bb.limit(), types); + } + + @Override + public int size() { + return types.length; + } + + public ByteBuffer nioBuffer() { + return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice(); + } + + public HeapTuple toHeapTuple() { + byte [] bytes = new byte[length]; + UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, Unsafe.ARRAY_BYTE_BASE_OFFSET, length); + return new HeapTuple(bytes, types); + } + + public void copyFrom(UnSafeTuple tuple) { + Preconditions.checkNotNull(tuple); + + ((ByteBuffer) bb).clear(); + if (length < tuple.length) { + UnsafeUtil.free((ByteBuffer) bb); + bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder()); + this.relativePos = 0; + this.length = tuple.length; + } + + ((ByteBuffer) bb).put(tuple.nioBuffer()); + } + + private int getFieldOffset(int fieldId) { + return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)); + } + + public long getFieldAddr(int fieldId) { + int fieldOffset = getFieldOffset(fieldId); + if (fieldOffset == -1) { + throw new RuntimeException("Invalid Field Access: " + fieldId); + } + return bb.address() + relativePos + fieldOffset; + } + + @Override + public boolean contains(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNull(int fieldid) { + return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNotNull(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public void clear() { + // nothing to do + } + + @Override + public void put(int fieldId, Datum value) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum)."); + } + + @Override + public void put(int fieldId, Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum [])."); + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple)."); + } + + @Override + public void put(Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(Datum [])."); + } + + @Override + public Datum get(int fieldId) { + if (isNull(fieldId)) { + return NullDatum.get(); + } + + switch (types[fieldId].getType()) { + case BOOLEAN: + return DatumFactory.createBool(getBool(fieldId)); + case INT1: + case INT2: + return DatumFactory.createInt2(getInt2(fieldId)); + case INT4: + return DatumFactory.createInt4(getInt4(fieldId)); + case INT8: + return DatumFactory.createInt8(getInt4(fieldId)); + case FLOAT4: + return DatumFactory.createFloat4(getFloat4(fieldId)); + case FLOAT8: + return DatumFactory.createFloat8(getFloat8(fieldId)); + case TEXT: + return DatumFactory.createText(getText(fieldId)); + case TIMESTAMP: + return DatumFactory.createTimestamp(getInt8(fieldId)); + case DATE: + return DatumFactory.createDate(getInt4(fieldId)); + case TIME: + return DatumFactory.createTime(getInt8(fieldId)); + case INTERVAL: + return getInterval(fieldId); + case INET4: + return DatumFactory.createInet4(getInt4(fieldId)); + case PROTOBUF: + return getProtobufDatum(fieldId); + default: + throw new UnsupportedException("Unknown type: " + types[fieldId]); + } + } + + @Override + public void setOffset(long offset) { + } + + @Override + public long getOffset() { + return 0; + } + + @Override + public boolean getBool(int fieldId) { + return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01; + } + + @Override + public byte getByte(int fieldId) { + return UNSAFE.getByte(getFieldAddr(fieldId)); + } + + @Override + public char getChar(int fieldId) { + return UNSAFE.getChar(getFieldAddr(fieldId)); + } + + @Override + public byte[] getBytes(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(null, pos, bytes, UNSAFE.ARRAY_BYTE_BASE_OFFSET, len); + return bytes; + } + + @Override + public short getInt2(int fieldId) { + long addr = getFieldAddr(fieldId); + return UNSAFE.getShort(addr); + } + + @Override + public int getInt4(int fieldId) { + return UNSAFE.getInt(getFieldAddr(fieldId)); + } + + @Override + public long getInt8(int fieldId) { + return UNSAFE.getLong(getFieldAddr(fieldId)); + } + + @Override + public float getFloat4(int fieldId) { + return UNSAFE.getFloat(getFieldAddr(fieldId)); + } + + @Override + public double getFloat8(int fieldId) { + return UNSAFE.getDouble(getFieldAddr(fieldId)); + } + + @Override + public String getText(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(null, pos, bytes, UNSAFE.ARRAY_BYTE_BASE_OFFSET, len); + return new String(bytes); + } + + public IntervalDatum getInterval(int fieldId) { + long pos = getFieldAddr(fieldId); + int months = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + long millisecs = UNSAFE.getLong(pos); + return new IntervalDatum(months, millisecs); + } + + @Override + public Datum getProtobufDatum(int fieldId) { + byte [] bytes = getBytes(fieldId); + + ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode()); + Message.Builder builder = factory.newBuilder(); + try { + builder.mergeFrom(bytes); + } catch (InvalidProtocolBufferException e) { + return NullDatum.get(); + } + + return new ProtobufDatum(builder.build()); + } + + @Override + public char[] getUnicodeChars(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(null, pos, bytes, UNSAFE.ARRAY_BYTE_BASE_OFFSET, len); + return Convert.utf2chars(bytes); + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + return toHeapTuple(); + } + + @Override + public Datum[] getValues() { + Datum [] datums = new Datum[size()]; + for (int i = 0; i < size(); i++) { + if (contains(i)) { + datums[i] = get(i); + } else { + datums[i] = NullDatum.get(); + } + } + return datums; + } + + @Override + public String toString() { + return VTuple.toDisplayString(getValues()); + } + + public abstract void release(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java new file mode 100644 index 0000000..73e1e2f --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java @@ -0,0 +1,99 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.primitives.Longs; +import com.google.common.primitives.UnsignedLongs; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; + +import java.nio.ByteOrder; + +/** + * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator. + */ +public class UnSafeTupleBytesComparator { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + static final boolean littleEndian = + ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN); + + public static int compare(long ptr1, long ptr2) { + int lstrLen = UNSAFE.getInt(ptr1); + int rstrLen = UNSAFE.getInt(ptr2); + + ptr1 += SizeOf.SIZE_OF_INT; + ptr2 += SizeOf.SIZE_OF_INT; + + int minLength = Math.min(lstrLen, rstrLen); + int minWords = minLength / Longs.BYTES; + + /* + * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a + * time is no slower than comparing 4 bytes at a time even on 32-bit. + * On the other hand, it is substantially faster on 64-bit. + */ + for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) { + long lw = UNSAFE.getLong(ptr1); + long rw = UNSAFE.getLong(ptr2); + long diff = lw ^ rw; + + if (diff != 0) { + if (!littleEndian) { + return UnsignedLongs.compare(lw, rw); + } + + // Use binary search + int n = 0; + int y; + int x = (int) diff; + if (x == 0) { + x = (int) (diff >>> 32); + n = 32; + } + + y = x << 16; + if (y == 0) { + n += 16; + } else { + x = y; + } + + y = x << 8; + if (y == 0) { + n += 8; + } + return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL)); + } + + ptr1 += SizeOf.SIZE_OF_LONG; + ptr2 += SizeOf.SIZE_OF_LONG; + } + + // The epilogue to cover the last (minLength % 8) elements. + for (int i = minWords * Longs.BYTES; i < minLength; i++) { + int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++); + if (result != 0) { + return result; + } + } + return lstrLen - rstrLen; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java new file mode 100644 index 0000000..51dbb29 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import java.nio.ByteBuffer; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class ZeroCopyTuple extends UnSafeTuple { + + public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) { + super.set(bb, relativePos, length, types); + } + + @Override + public void release() { + // nothing to do + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/proto/IndexProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/proto/IndexProtos.proto b/tajo-storage/src/main/proto/IndexProtos.proto index bcb0cbe..f5c8a08 100644 --- a/tajo-storage/src/main/proto/IndexProtos.proto +++ b/tajo-storage/src/main/proto/IndexProtos.proto @@ -25,5 +25,7 @@ option java_generate_equals_and_hash = true; import "CatalogProtos.proto"; message TupleComparatorProto { - repeated TupleComparatorSpecProto compSpecs = 1; + required SchemaProto schema = 1; + repeated SortSpecProto sortSpecs = 2; + repeated TupleComparatorSpecProto compSpecs = 3; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml index 4669477..b227e9d 100644 --- a/tajo-storage/src/main/resources/storage-default.xml +++ b/tajo-storage/src/main/resources/storage-default.xml @@ -40,7 +40,7 @@ <!--- Registered Scanner Handler --> <property> <name>tajo.storage.scanner-handler</name> - <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value> + <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,directraw</value> </property> <!--- Fragment Class Configurations --> @@ -53,6 +53,10 @@ <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> <property> + <name>tajo.storage.fragment.directraw.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> <name>tajo.storage.fragment.rcfile.class</name> <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> @@ -94,6 +98,11 @@ </property> <property> + <name>tajo.storage.scanner-handler.directraw.class</name> + <value>org.apache.tajo.storage.rawfile.DirectRawFileScanner</value> + </property> + + <property> <name>tajo.storage.scanner-handler.v2.raw.class</name> <value>org.apache.tajo.storage.RawFile$RawFileScanner</value> </property> @@ -175,6 +184,11 @@ </property> <property> + <name>tajo.storage.appender-handler.directraw.class</name> + <value>org.apache.tajo.storage.rawfile.DirectRawFileWriter</value> + </property> + + <property> <name>tajo.storage.appender-handler.rcfile.class</name> <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value> </property> http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java index ab7c2b2..639ca04 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java @@ -69,7 +69,7 @@ public class TestTupleComparator { SortSpec sortKey1 = new SortSpec(schema.getColumn("col4"), true, false); SortSpec sortKey2 = new SortSpec(schema.getColumn("col5"), true, false); - TupleComparator tc = new TupleComparator(schema, + BaseTupleComparator tc = new BaseTupleComparator(schema, new SortSpec[] {sortKey1, sortKey2}); assertEquals(-1, tc.compare(tuple1, tuple2)); assertEquals(1, tc.compare(tuple2, tuple1)); http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java index 54798a4..29ce226 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java @@ -74,7 +74,8 @@ public class TestBSTIndex { public static Collection<Object[]> generateParameters() { return Arrays.asList(new Object[][]{ {StoreType.CSV}, - {StoreType.RAW} + {StoreType.RAW}, + {StoreType.DIRECTRAW} }); } @@ -91,15 +92,15 @@ public class TestBSTIndex { Path tablePath = new Path(testDir, "testFindValue_" + storeType); Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); appender.init(); - Tuple tuple; + Tuple key; for (int i = 0; i < TUPLE_NUM; i++) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); + key = new VTuple(5); + key.put(0, DatumFactory.createInt4(i)); + key.put(1, DatumFactory.createInt8(i)); + key.put(2, DatumFactory.createFloat8(i)); + key.put(3, DatumFactory.createFloat4(i)); + key.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(key); } appender.close(); @@ -115,7 +116,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("long", Type.INT8)); keySchema.addColumn(new Column("double", Type.FLOAT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + storeType + ".idx"), @@ -132,11 +133,11 @@ public class TestBSTIndex { while (true) { keyTuple = new VTuple(2); offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; + key = scanner.next(); + if (key == null) break; - keyTuple.put(0, tuple.get(1)); - keyTuple.put(1, tuple.get(2)); + keyTuple.put(0, key.get(1)); + keyTuple.put(1, key.get(2)); creater.write(keyTuple, offset); } @@ -144,29 +145,29 @@ public class TestBSTIndex { creater.close(); scanner.close(); - tuple = new VTuple(keySchema.size()); + key = new VTuple(keySchema.size()); BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp); reader.open(); scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema); scanner.init(); for (int i = 0; i < TUPLE_NUM - 1; i++) { - tuple.put(0, DatumFactory.createInt8(i)); - tuple.put(1, DatumFactory.createFloat8(i)); - long offsets = reader.find(tuple); + key.put(0, DatumFactory.createInt8(i)); + key.put(1, DatumFactory.createFloat8(i)); + long offsets = reader.find(key); scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8())); - assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8())); + Tuple found = scanner.next(); + assertTrue("seek check [" + (i) + " ," + (found.get(1).asInt8()) + "]", (i) == (found.get(1).asInt8())); + assertTrue("seek check [" + (i) + " ," + (found.get(2).asFloat8()) + "]", (i) == (found.get(2).asFloat8())); offsets = reader.next(); if (offsets == -1) { continue; } scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4())); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8())); + found = scanner.next(); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (found.get(0).asInt4())); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (found.get(1).asInt8())); } reader.close(); scanner.close(); @@ -189,7 +190,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("long", Type.INT8)); keySchema.addColumn(new Column("double", Type.FLOAT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"), @@ -197,19 +198,19 @@ public class TestBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - Tuple tuple; + Tuple key; long offset; for (int i = 0; i < TUPLE_NUM; i++) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); + key = new VTuple(5); + key.put(0, DatumFactory.createInt4(i)); + key.put(1, DatumFactory.createInt8(i)); + key.put(2, DatumFactory.createFloat8(i)); + key.put(3, DatumFactory.createFloat4(i)); + key.put(4, DatumFactory.createText("field_" + i)); offset = appender.getOffset(); - appender.addTuple(tuple); - creater.write(tuple, offset); + appender.addTuple(key); + creater.write(key, offset); } appender.flush(); appender.close(); @@ -222,7 +223,7 @@ public class TestBSTIndex { long fileLen = status.getLen(); FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - tuple = new VTuple(keySchema.size()); + key = new VTuple(keySchema.size()); BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"), keySchema, comp); reader.open(); @@ -230,22 +231,23 @@ public class TestBSTIndex { scanner.init(); for (int i = 0; i < TUPLE_NUM - 1; i++) { - tuple.put(0, DatumFactory.createInt8(i)); - tuple.put(1, DatumFactory.createFloat8(i)); - long offsets = reader.find(tuple); + key.put(0, DatumFactory.createInt8(i)); + key.put(1, DatumFactory.createFloat8(i)); + long offsets = reader.find(key); scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(1).asInt8())); - assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(2).asFloat8())); + + Tuple found = scanner.next(); + assertTrue("[seek check " + (i) + " ]", (i) == (found.get(1).asInt8())); + assertTrue("[seek check " + (i) + " ]", (i) == (found.get(2).asFloat8())); offsets = reader.next(); if (offsets == -1) { continue; } scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4())); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8())); + found = scanner.next(); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (found.get(0).asInt4())); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (found.get(1).asInt8())); } reader.close(); scanner.close(); @@ -281,7 +283,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("long", Type.INT8)); keySchema.addColumn(new Column("double", Type.FLOAT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"), @@ -336,6 +338,7 @@ public class TestBSTIndex { tuple.put(2, DatumFactory.createFloat8(i)); tuple.put(3, DatumFactory.createFloat4(i)); tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); } appender.close(); @@ -352,7 +355,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("int", Type.INT4)); keySchema.addColumn(new Column("long", Type.INT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"), @@ -442,7 +445,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("int", Type.INT4)); keySchema.addColumn(new Column("long", Type.INT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"), @@ -522,7 +525,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("long", Type.INT8)); keySchema.addColumn(new Column("double", Type.FLOAT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + storeType + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); @@ -604,7 +607,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("int", Type.INT4)); keySchema.addColumn(new Column("long", Type.INT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + storeType + ".idx"), @@ -709,7 +712,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("int", Type.INT4)); keySchema.addColumn(new Column("long", Type.INT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"), @@ -765,15 +768,15 @@ public class TestBSTIndex { Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); appender.init(); - Tuple tuple; + Tuple key; for (int i = (TUPLE_NUM - 1); i >= 0; i--) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); + key = new VTuple(5); + key.put(0, DatumFactory.createInt4(i)); + key.put(1, DatumFactory.createInt8(i)); + key.put(2, DatumFactory.createFloat8(i)); + key.put(3, DatumFactory.createFloat4(i)); + key.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(key); } appender.close(); @@ -789,7 +792,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("long", Type.INT8)); keySchema.addColumn(new Column("double", Type.FLOAT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); @@ -806,11 +809,11 @@ public class TestBSTIndex { while (true) { keyTuple = new VTuple(2); offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; + key = scanner.next(); + if (key == null) break; - keyTuple.put(0, tuple.get(1)); - keyTuple.put(1, tuple.get(2)); + keyTuple.put(0, key.get(1)); + keyTuple.put(1, key.get(2)); creater.write(keyTuple, offset); } @@ -818,7 +821,7 @@ public class TestBSTIndex { creater.close(); scanner.close(); - tuple = new VTuple(keySchema.size()); + key = new VTuple(keySchema.size()); BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"), keySchema, comp); @@ -827,22 +830,22 @@ public class TestBSTIndex { scanner.init(); for (int i = (TUPLE_NUM - 1); i > 0; i--) { - tuple.put(0, DatumFactory.createInt8(i)); - tuple.put(1, DatumFactory.createFloat8(i)); - long offsets = reader.find(tuple); + key.put(0, DatumFactory.createInt8(i)); + key.put(1, DatumFactory.createFloat8(i)); + long offsets = reader.find(key); scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8())); - assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8())); + Tuple found = scanner.next(); + assertTrue("seek check [" + (i) + " ," + (found.get(1).asInt8()) + "]", (i) == (found.get(1).asInt8())); + assertTrue("seek check [" + (i) + " ," + (found.get(2).asFloat8()) + "]", (i) == (found.get(2).asFloat8())); offsets = reader.next(); if (offsets == -1) { continue; } scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(0).asInt4())); - assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(1).asInt8())); + found = scanner.next(); + assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (found.get(0).asInt4())); + assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (found.get(1).asInt8())); } reader.close(); scanner.close(); @@ -880,7 +883,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("int", Type.INT4)); keySchema.addColumn(new Column("long", Type.INT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java index 53a2531..85c0334 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java @@ -104,7 +104,7 @@ public class TestSingleCSVFileBSTIndex { keySchema.addColumn(new Column("long", Type.INT8)); keySchema.addColumn(new Column("double", Type.FLOAT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, @@ -193,7 +193,7 @@ public class TestSingleCSVFileBSTIndex { keySchema.addColumn(new Column("int", Type.INT4)); keySchema.addColumn(new Column("long", Type.INT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"), http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java b/tajo-storage/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java new file mode 100644 index 0000000..8d2f00d --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java @@ -0,0 +1,318 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.raw; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.OffHeapRowBlockReader; +import org.apache.tajo.tuple.offheap.TestOffHeapRowBlock; +import org.apache.tajo.tuple.offheap.ZeroCopyTuple; +import org.apache.tajo.storage.rawfile.DirectRawFileScanner; +import org.apache.tajo.storage.rawfile.DirectRawFileWriter; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.FileUtil; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.tuple.offheap.TestOffHeapRowBlock.*; +import static org.junit.Assert.*; + +public class TestDirectRawFile { + private static final Log LOG = LogFactory.getLog(TestDirectRawFile.class); + + public static Path writeRowBlock(TajoConf conf, TableMeta meta, OffHeapRowBlock rowBlock, Path outputFile) + throws IOException { + DirectRawFileWriter writer = new DirectRawFileWriter(conf, schema, meta, outputFile); + writer.init(); + writer.writeRowBlock(rowBlock); + writer.close(); + + FileSystem fs = FileSystem.getLocal(conf); + FileStatus status = fs.getFileStatus(outputFile); + assertTrue(status.getLen() > 0); + LOG.info("Written file size: " + FileUtil.humanReadableByteCount(status.getLen(), false)); + + return outputFile; + } + + public static Path writeRowBlock(TajoConf conf, TableMeta meta, OffHeapRowBlock rowBlock) throws IOException { + Path testDir = CommonTestingUtil.getTestDir(); + Path outputFile = new Path(testDir, "output.draw"); + return writeRowBlock(conf, meta, rowBlock, outputFile); + } + + //@Test + public void testRWForAllTypes() throws IOException { + int rowNum = 50000; + + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(rowNum); + TajoConf conf = new TajoConf(); + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW); + Path outputFile = writeRowBlock(conf, meta, rowBlock); + rowBlock.release(); + + + FileSystem fs = FileSystem.getLocal(conf); + FileStatus status = fs.getFileStatus(outputFile); + assertTrue(status.getLen() > 0); + LOG.info("Written file size: " + FileUtil.humanReadableByteCount(status.getLen(), false)); + + + OffHeapRowBlock readBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 1); + OffHeapRowBlockReader blockReader = new OffHeapRowBlockReader(readBlock); + DirectRawFileScanner reader = new DirectRawFileScanner(conf, schema, meta, outputFile); + reader.init(); + + long readStart = System.currentTimeMillis(); + ZeroCopyTuple tuple = new ZeroCopyTuple(); + int j = 0; + + while(reader.next(readBlock)) { + blockReader.reset(); + while (blockReader.next(tuple)) { + TestOffHeapRowBlock.validateTupleResult(j, tuple); + j++; + } + } + LOG.info("Total read rows: " + j); + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + reader.close(); + readBlock.release(); + + assertEquals(rowNum, j); + } + + @Test + public void testRWForAllTypesWithNextTuple() throws IOException { + int rowNum = 10000; + + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(rowNum); + + TajoConf conf = new TajoConf(); + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW); + Path outputFile = writeRowBlock(conf, meta, rowBlock); + rowBlock.release(); + + DirectRawFileScanner reader = new DirectRawFileScanner(conf, schema, meta, outputFile); + reader.init(); + + long readStart = System.currentTimeMillis(); + int j = 0; + Tuple tuple; + while ((tuple = reader.next()) != null) { + TestOffHeapRowBlock.validateTupleResult(j, tuple); + j++; + } + + LOG.info("Total read rows: " + j); + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + reader.close(); + assertEquals(rowNum, j); + } + + @Test + public void testRepeatedScan() throws IOException { + int rowNum = 2; + + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(rowNum); + + TajoConf conf = new TajoConf(); + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW); + Path outputFile = writeRowBlock(conf, meta, rowBlock); + + rowBlock.release(); + + DirectRawFileScanner reader = new DirectRawFileScanner(conf, schema, meta, outputFile); + reader.init(); + + int j = 0; + while (reader.next() != null) { + j++; + } + assertEquals(rowNum, j); + + for (int i = 0; i < 5; i++) { + assertNull(reader.next()); + } + + reader.close(); + } + + @Test + public void testReset() throws IOException { + int rowNum = 2; + + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(rowNum); + + TajoConf conf = new TajoConf(); + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW); + Path outputFile = writeRowBlock(conf, meta, rowBlock); + rowBlock.release(); + + DirectRawFileScanner reader = new DirectRawFileScanner(conf, schema, meta, outputFile); + reader.init(); + + int j = 0; + while (reader.next() != null) { + j++; + } + assertEquals(rowNum, j); + + for (int i = 0; i < 5; i++) { + assertNull(reader.next()); + } + + reader.reset(); + + j = 0; + while (reader.next() != null) { + j++; + } + assertEquals(rowNum, j); + + for (int i = 0; i < 5; i++) { + assertNull(reader.next()); + } + reader.close(); + } + + //@Test + public void testRWWithAddTupleForAllTypes() throws IOException { + int rowNum = 10; + + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(rowNum); + OffHeapRowBlockReader blockReader = new OffHeapRowBlockReader(rowBlock); + + Path testDir = CommonTestingUtil.getTestDir(); + Path outputFile = new Path(testDir, "output.draw"); + TajoConf conf = new TajoConf(); + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW); + DirectRawFileWriter writer = new DirectRawFileWriter(conf, schema, meta, outputFile); + writer.init(); + + blockReader.reset(); + int i = 0; + ZeroCopyTuple tuple = new ZeroCopyTuple(); + while(blockReader.next(tuple)) { + writer.addTuple(tuple); + } + writer.close(); + rowBlock.release(); + + + FileSystem fs = FileSystem.getLocal(conf); + FileStatus status = fs.getFileStatus(outputFile); + assertTrue(status.getLen() > 0); + LOG.info("Written file size: " + FileUtil.humanReadableByteCount(status.getLen(), false)); + + OffHeapRowBlock readBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 1); + OffHeapRowBlockReader blockReader2 = new OffHeapRowBlockReader(readBlock); + DirectRawFileScanner reader = new DirectRawFileScanner(conf, schema, meta, outputFile); + reader.init(); + + long readStart = System.currentTimeMillis(); + tuple = new ZeroCopyTuple(); + int j = 0; + while(reader.next(readBlock)) { + blockReader2.reset(); + while (blockReader2.next(tuple)) { + TestOffHeapRowBlock.validateTupleResult(j, tuple); + j++; + } + } + LOG.info("Total read rows: " + j); + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + reader.close(); + readBlock.release(); + + assertEquals(rowNum, j); + } + + //@Test + public void testNullityValidation() throws IOException { + int rowNum = 1000; + + long allocateStart = System.currentTimeMillis(); + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024); + long allocatedEnd = System.currentTimeMillis(); + LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " + + (allocatedEnd - allocateStart) + " msec"); + + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + fillRowBlockWithNull(i, rowBlock.getWriter()); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing and nullity validating take " + (writeEnd - writeStart) +" msec"); + + Path testDir = CommonTestingUtil.getTestDir(); + Path outputFile = new Path(testDir, "output.draw"); + TajoConf conf = new TajoConf(); + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW); + DirectRawFileWriter writer = new DirectRawFileWriter(conf, schema, meta, outputFile); + writer.init(); + writer.writeRowBlock(rowBlock); + writer.close(); + rowBlock.release(); + + + FileSystem fs = FileSystem.getLocal(conf); + FileStatus status = fs.getFileStatus(outputFile); + assertTrue(status.getLen() > 0); + LOG.info("Written file size: " + FileUtil.humanReadableByteCount(status.getLen(), false)); + + OffHeapRowBlock readBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 1); + OffHeapRowBlockReader blockReader = new OffHeapRowBlockReader(rowBlock); + DirectRawFileScanner reader = new DirectRawFileScanner(conf, schema, meta, outputFile); + reader.init(); + + long readStart = System.currentTimeMillis(); + ZeroCopyTuple tuple = new ZeroCopyTuple(); + int j = 0; + + do { + blockReader.reset(); + while (blockReader.next(tuple)) { + validateNullity(j, tuple); + j++; + } + } while(reader.next(readBlock)); + LOG.info("Total read rows: " + j); + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + reader.close(); + readBlock.release(); + + assertEquals(rowNum, j); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java index 357dadb..6366141 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java @@ -219,7 +219,7 @@ public class TestStorages { || storeType == StoreType.CSV || storeType == StoreType.PARQUET || storeType == StoreType.AVRO) { - assertTrue(tuple.get(0) == null || tuple.get(0) instanceof NullDatum); + assertTrue(tuple.get(0) == null || tuple.get(0).isNull()); } assertTrue(tupleCnt + 2 == tuple.get(1).asInt8()); assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4()); http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java new file mode 100644 index 0000000..b332364 --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.tuple.offheap.*; +import org.junit.Test; + +public class TestBaseTupleBuilder { + + @Test + public void testBuild() { + BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema); + + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(10248); + OffHeapRowBlockReader reader = rowBlock.getReader(); + + ZeroCopyTuple inputTuple = new ZeroCopyTuple(); + + HeapTuple heapTuple = null; + ZeroCopyTuple zcTuple = null; + int i = 0; + while(reader.next(inputTuple)) { + RowStoreUtil.convert(inputTuple, builder); + + heapTuple = builder.buildToHeapTuple(); + TestOffHeapRowBlock.validateTupleResult(i, heapTuple); + + zcTuple = builder.buildToZeroCopyTuple(); + TestOffHeapRowBlock.validateTupleResult(i, zcTuple); + + i++; + } + } + + @Test + public void testBuildWithNull() { + BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema); + + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlockWithNull(10248); + OffHeapRowBlockReader reader = rowBlock.getReader(); + + ZeroCopyTuple inputTuple = new ZeroCopyTuple(); + + HeapTuple heapTuple = null; + ZeroCopyTuple zcTuple = null; + int i = 0; + while(reader.next(inputTuple)) { + RowStoreUtil.convert(inputTuple, builder); + + heapTuple = builder.buildToHeapTuple(); + TestOffHeapRowBlock.validateNullity(i, heapTuple); + + zcTuple = builder.buildToZeroCopyTuple(); + TestOffHeapRowBlock.validateNullity(i, zcTuple); + + i++; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java new file mode 100644 index 0000000..96f465a --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java @@ -0,0 +1,45 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.catalog.SchemaUtil; +import org.junit.Test; + +public class TestHeapTuple { + + @Test + public void testHeapTuple() { + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(1024); + + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + int i = 0; + while (reader.next(zcTuple)) { + byte [] bytes = new byte[zcTuple.nioBuffer().limit()]; + zcTuple.nioBuffer().get(bytes); + + HeapTuple heapTuple = new HeapTuple(bytes, SchemaUtil.toDataTypes(TestOffHeapRowBlock.schema)); + TestOffHeapRowBlock.validateTupleResult(i, heapTuple); + i++; + } + + rowBlock.release(); + } +} \ No newline at end of file
