http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java new file mode 100644 index 0000000..c3f77e7 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.common.ProtoObject; +import org.apache.tajo.datum.Datum; + +import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto; +import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; + +/** + * The Comparator class for Tuples + * + * @see Tuple + */ +public class BaseTupleComparator extends TupleComparator implements ProtoObject<TupleComparatorProto> { + private final Schema schema; + private final SortSpec [] sortSpecs; + private final int[] sortKeyIds; + private final boolean[] asc; + @SuppressWarnings("unused") + private final boolean[] nullFirsts; + + private Datum left; + private Datum right; + private int compVal; + + /** + * @param schema The schema of input tuples + * @param sortKeys The description of sort keys + */ + public BaseTupleComparator(Schema schema, SortSpec[] sortKeys) { + Preconditions.checkArgument(sortKeys.length > 0, + "At least one sort key must be specified."); + + this.schema = schema; + this.sortSpecs = sortKeys; + this.sortKeyIds = new int[sortKeys.length]; + this.asc = new boolean[sortKeys.length]; + this.nullFirsts = new boolean[sortKeys.length]; + for (int i = 0; i < sortKeys.length; i++) { + if (sortKeys[i].getSortKey().hasQualifier()) { + this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName()); + } else { + this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName()); + } + + this.asc[i] = sortKeys[i].isAscending(); + this.nullFirsts[i]= sortKeys[i].isNullFirst(); + } + } + + public BaseTupleComparator(TupleComparatorProto proto) { + this.schema = new Schema(proto.getSchema()); + + this.sortSpecs = new SortSpec[proto.getSortSpecsCount()]; + for (int i = 0; i < proto.getSortSpecsCount(); i++) { + sortSpecs[i] = new SortSpec(proto.getSortSpecs(i)); + } + + this.sortKeyIds = new int[proto.getCompSpecsCount()]; + this.asc = new boolean[proto.getCompSpecsCount()]; + this.nullFirsts = new boolean[proto.getCompSpecsCount()]; + + for (int i = 0; i < proto.getCompSpecsCount(); i++) { + TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i); + sortKeyIds[i] = sortSepcProto.getColumnId(); + asc[i] = sortSepcProto.getAscending(); + nullFirsts[i] = sortSepcProto.getNullFirst(); + } + } + + public Schema getSchema() { + return schema; + } + + public SortSpec [] getSortSpecs() { + return sortSpecs; + } + + public int [] getSortKeyIds() { + return sortKeyIds; + } + + public boolean isAscendingFirstKey() { + return this.asc[0]; + } + + @Override + public int compare(Tuple tuple1, Tuple tuple2) { + for (int i = 0; i < sortKeyIds.length; i++) { + left = tuple1.get(sortKeyIds[i]); + right = tuple2.get(sortKeyIds[i]); + + if (left.isNull() || right.isNull()) { + if (!left.equals(right)) { + if (left.isNull()) { + compVal = 1; + } else if (right.isNull()) { + compVal = -1; + } + if (nullFirsts[i]) { + if (compVal != 0) { + compVal *= -1; + } + } + } else { + compVal = 0; + } + } else { + if (asc[i]) { + compVal = left.compareTo(right); + } else { + compVal = right.compareTo(left); + } + } + + if (compVal < 0 || compVal > 0) { + return compVal; + } + } + return 0; + } + + @Override + public int hashCode() { + return Objects.hashCode(sortKeyIds); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof BaseTupleComparator) { + BaseTupleComparator other = (BaseTupleComparator) obj; + if (sortKeyIds.length != other.sortKeyIds.length) { + return false; + } + + for (int i = 0; i < sortKeyIds.length; i++) { + if (sortKeyIds[i] != other.sortKeyIds[i] || + asc[i] != other.asc[i] || + nullFirsts[i] != other.nullFirsts[i]) { + return false; + } + } + + return true; + } else { + return false; + } + } + + @Override + public TupleComparatorProto getProto() { + TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder(); + builder.setSchema(schema.getProto()); + for (int i = 0; i < sortSpecs.length; i++) { + builder.addSortSpecs(sortSpecs[i].getProto()); + } + + TupleComparatorSpecProto.Builder sortSpecBuilder; + for (int i = 0; i < sortKeyIds.length; i++) { + sortSpecBuilder = TupleComparatorSpecProto.newBuilder(); + sortSpecBuilder.setColumnId(sortKeyIds[i]); + sortSpecBuilder.setAscending(asc[i]); + sortSpecBuilder.setNullFirst(nullFirsts[i]); + builder.addCompSpecs(sortSpecBuilder); + } + + return builder.build(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + String prefix = ""; + for (int i = 0; i < sortKeyIds.length; i++) { + sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i]) + .append(",Asc=").append(asc[i]) + .append(",NullFirst=").append(nullFirsts[i]); + prefix = " ,"; + } + return sb.toString(); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java index 42b49a8..609a3df 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java @@ -36,7 +36,7 @@ public class BinarySerializerDeserializer implements SerializerDeserializer { throws IOException { byte[] bytes; int length = 0; - if (datum == null || datum instanceof NullDatum) { + if (datum == null || datum.isNull()) { return 0; } http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java index e0f8a2e..8b7e2e0 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java @@ -23,7 +23,7 @@ package org.apache.tajo.storage; import com.google.common.base.Preconditions; import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.exception.UnsupportedException; @@ -71,7 +71,12 @@ public class FrameTuple implements Tuple, Cloneable { @Override public boolean isNull(int fieldid) { - return get(fieldid) instanceof NullDatum; + return get(fieldid).isNull(); + } + + @Override + public boolean isNotNull(int fieldid) { + return !isNull(fieldid); } @Override @@ -177,6 +182,11 @@ public class FrameTuple implements Tuple, Cloneable { } @Override + public IntervalDatum getInterval(int fieldId) { + return (IntervalDatum) get(fieldId); + } + + @Override public char [] getUnicodeChars(int fieldId) { return get(fieldId).asUnicodeChars(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java index 167e4a8..bfbe478 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java @@ -20,6 +20,7 @@ package org.apache.tajo.storage; import org.apache.tajo.catalog.Schema; import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.exception.UnsupportedException; @@ -68,7 +69,12 @@ public class LazyTuple implements Tuple, Cloneable { @Override public boolean isNull(int fieldid) { - return get(fieldid) instanceof NullDatum; + return get(fieldid).isNull(); + } + + @Override + public boolean isNotNull(int fieldid) { + return !isNull(fieldid); } @Override @@ -199,6 +205,11 @@ public class LazyTuple implements Tuple, Cloneable { } @Override + public IntervalDatum getInterval(int fieldId) { + return (IntervalDatum) get(fieldId); + } + + @Override public char[] getUnicodeChars(int fieldId) { return get(fieldId).asUnicodeChars(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java deleted file mode 100644 index f19b61f..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.tajo.datum.*; -import org.apache.tajo.util.ClassSize; - -public class MemoryUtil { - - /** Overhead for an NullDatum */ - public static final long NULL_DATUM; - - /** Overhead for an BoolDatum */ - public static final long BOOL_DATUM; - - /** Overhead for an CharDatum */ - public static final long CHAR_DATUM; - - /** Overhead for an BitDatum */ - public static final long BIT_DATUM; - - /** Overhead for an Int2Datum */ - public static final long INT2_DATUM; - - /** Overhead for an Int4Datum */ - public static final long INT4_DATUM; - - /** Overhead for an Int8Datum */ - public static final long INT8_DATUM; - - /** Overhead for an Float4Datum */ - public static final long FLOAT4_DATUM; - - /** Overhead for an Float8Datum */ - public static final long FLOAT8_DATUM; - - /** Overhead for an TextDatum */ - public static final long TEXT_DATUM; - - /** Overhead for an BlobDatum */ - public static final long BLOB_DATUM; - - /** Overhead for an DateDatum */ - public static final long DATE_DATUM; - - /** Overhead for an TimeDatum */ - public static final long TIME_DATUM; - - /** Overhead for an TimestampDatum */ - public static final long TIMESTAMP_DATUM; - - static { - NULL_DATUM = ClassSize.estimateBase(NullDatum.class, false); - - CHAR_DATUM = ClassSize.estimateBase(CharDatum.class, false); - - BOOL_DATUM = ClassSize.estimateBase(BooleanDatum.class, false); - - BIT_DATUM = ClassSize.estimateBase(BitDatum.class, false); - - INT2_DATUM = ClassSize.estimateBase(Int2Datum.class, false); - - INT4_DATUM = ClassSize.estimateBase(Int4Datum.class, false); - - INT8_DATUM = ClassSize.estimateBase(Int8Datum.class, false); - - FLOAT4_DATUM = ClassSize.estimateBase(Float4Datum.class, false); - - FLOAT8_DATUM = ClassSize.estimateBase(Float8Datum.class, false); - - TEXT_DATUM = ClassSize.estimateBase(TextDatum.class, false); - - BLOB_DATUM = ClassSize.estimateBase(BlobDatum.class, false); - - DATE_DATUM = ClassSize.estimateBase(DateDatum.class, false); - - TIME_DATUM = ClassSize.estimateBase(TimeDatum.class, false); - - TIMESTAMP_DATUM = ClassSize.estimateBase(TimestampDatum.class, false); - } - - public static long calculateMemorySize(Tuple tuple) { - long total = ClassSize.OBJECT; - for (Datum datum : tuple.getValues()) { - switch (datum.type()) { - - case NULL_TYPE: - total += NULL_DATUM; - break; - - case BOOLEAN: - total += BOOL_DATUM; - break; - - case BIT: - total += BIT_DATUM; - break; - - case CHAR: - total += CHAR_DATUM + datum.size(); - break; - - case INT1: - case INT2: - total += INT2_DATUM; - break; - - case INT4: - total += INT4_DATUM; - break; - - case INT8: - total += INT8_DATUM; - break; - - case FLOAT4: - total += FLOAT4_DATUM; - break; - - case FLOAT8: - total += FLOAT4_DATUM; - break; - - case TEXT: - total += TEXT_DATUM + datum.size(); - break; - - case DATE: - total += DATE_DATUM; - break; - - case TIME: - total += TIME_DATUM; - break; - - case TIMESTAMP: - total += TIMESTAMP_DATUM; - break; - - default: - break; - } - } - - return total; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java index 1f57675..7f729e1 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java @@ -31,6 +31,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatumFactory; +import org.apache.tajo.util.UnsafeUtil; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.BitArray; @@ -42,6 +43,8 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class RawFile { + public static final String FILE_EXTENSION = "raw"; + private static final Log LOG = LogFactory.getLog(RawFile.class); public static class RawFileScanner extends FileScanner implements SeekableScanner { @@ -380,7 +383,7 @@ public class RawFile { tableStats.setNumRows(recordCount); } - StorageUtil.closeBuffer(buffer); + UnsafeUtil.free(buffer); IOUtils.cleanup(LOG, channel, fis); } @@ -722,7 +725,7 @@ public class RawFile { LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path); } - StorageUtil.closeBuffer(buffer); + UnsafeUtil.free(buffer); IOUtils.cleanup(LOG, channel, randomAccessFile); } http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java index 70044ca..24b6280 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -23,8 +23,10 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.exception.UnknownDataTypeException; +import org.apache.tajo.tuple.offheap.RowWriter; import org.apache.tajo.util.BitArray; import java.nio.ByteBuffer; @@ -177,7 +179,8 @@ public class RowStoreUtil { nullFlags = new BitArray(schema.size()); headerSize = nullFlags.bytesLength(); } - public byte [] toBytes(Tuple tuple) { + + public byte[] toBytes(Tuple tuple) { nullFlags.clear(); int size = estimateTupleDataSize(tuple); ByteBuffer bb = ByteBuffer.allocate(size + headerSize); @@ -191,42 +194,64 @@ public class RowStoreUtil { col = schema.getColumn(i); switch (col.getDataType().getType()) { - case NULL_TYPE: nullFlags.set(i); break; - case BOOLEAN: bb.put(tuple.get(i).asByte()); break; - case BIT: bb.put(tuple.get(i).asByte()); break; - case CHAR: bb.put(tuple.get(i).asByte()); break; - case INT2: bb.putShort(tuple.get(i).asInt2()); break; - case INT4: bb.putInt(tuple.get(i).asInt4()); break; - case INT8: bb.putLong(tuple.get(i).asInt8()); break; - case FLOAT4: bb.putFloat(tuple.get(i).asFloat4()); break; - case FLOAT8: bb.putDouble(tuple.get(i).asFloat8()); break; - case TEXT: - byte [] _string = tuple.get(i).asByteArray(); - bb.putInt(_string.length); - bb.put(_string); - break; - case DATE: bb.putInt(tuple.get(i).asInt4()); break; - case TIME: - case TIMESTAMP: - bb.putLong(tuple.get(i).asInt8()); - break; - case INTERVAL: - IntervalDatum interval = (IntervalDatum) tuple.get(i); - bb.putInt(interval.getMonths()); - bb.putLong(interval.getMilliSeconds()); - break; - case BLOB: - byte [] bytes = tuple.get(i).asByteArray(); - bb.putInt(bytes.length); - bb.put(bytes); - break; - case INET4: - byte [] ipBytes = tuple.get(i).asByteArray(); - bb.put(ipBytes); - break; - case INET6: bb.put(tuple.get(i).asByteArray()); break; - default: - throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); + case NULL_TYPE: + nullFlags.set(i); + break; + case BOOLEAN: + bb.put(tuple.get(i).asByte()); + break; + case BIT: + bb.put(tuple.get(i).asByte()); + break; + case CHAR: + bb.put(tuple.get(i).asByte()); + break; + case INT2: + bb.putShort(tuple.get(i).asInt2()); + break; + case INT4: + bb.putInt(tuple.get(i).asInt4()); + break; + case INT8: + bb.putLong(tuple.get(i).asInt8()); + break; + case FLOAT4: + bb.putFloat(tuple.get(i).asFloat4()); + break; + case FLOAT8: + bb.putDouble(tuple.get(i).asFloat8()); + break; + case TEXT: + byte[] _string = tuple.get(i).asByteArray(); + bb.putInt(_string.length); + bb.put(_string); + break; + case DATE: + bb.putInt(tuple.get(i).asInt4()); + break; + case TIME: + case TIMESTAMP: + bb.putLong(tuple.get(i).asInt8()); + break; + case INTERVAL: + IntervalDatum interval = (IntervalDatum) tuple.get(i); + bb.putInt(interval.getMonths()); + bb.putLong(interval.getMilliSeconds()); + break; + case BLOB: + byte[] bytes = tuple.get(i).asByteArray(); + bb.putInt(bytes.length); + bb.put(bytes); + break; + case INET4: + byte[] ipBytes = tuple.get(i).asByteArray(); + bb.put(ipBytes); + break; + case INET6: + bb.put(tuple.get(i).asByteArray()); + break; + default: + throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); } } @@ -237,7 +262,7 @@ public class RowStoreUtil { bb.position(finalPosition); bb.flip(); - byte [] buf = new byte [bb.limit()]; + byte[] buf = new byte[bb.limit()]; bb.get(buf); return buf; } @@ -254,24 +279,38 @@ public class RowStoreUtil { col = schema.getColumn(i); switch (col.getDataType().getType()) { - case BOOLEAN: - case BIT: - case CHAR: size += 1; break; - case INT2: size += 2; break; - case DATE: - case INT4: - case FLOAT4: size += 4; break; - case TIME: - case TIMESTAMP: - case INT8: - case FLOAT8: size += 8; break; - case INTERVAL: size += 12; break; - case TEXT: - case BLOB: size += (4 + tuple.get(i).asByteArray().length); break; - case INET4: - case INET6: size += tuple.get(i).asByteArray().length; break; - default: - throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); + case BOOLEAN: + case BIT: + case CHAR: + size += 1; + break; + case INT2: + size += 2; + break; + case DATE: + case INT4: + case FLOAT4: + size += 4; + break; + case TIME: + case TIMESTAMP: + case INT8: + case FLOAT8: + size += 8; + break; + case INTERVAL: + size += 12; + break; + case TEXT: + case BLOB: + size += (4 + tuple.get(i).asByteArray().length); + break; + case INET4: + case INET6: + size += tuple.get(i).asByteArray().length; + break; + default: + throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); } } @@ -284,4 +323,55 @@ public class RowStoreUtil { return schema; } } + + public static void convert(Tuple tuple, RowWriter writer) { + writer.startRow(); + + for (int i = 0; i < writer.dataTypes().length; i++) { + if (tuple.isNull(i)) { + writer.skipField(); + continue; + } + switch (writer.dataTypes()[i].getType()) { + case BOOLEAN: + writer.putBool(tuple.getBool(i)); + break; + case INT1: + case INT2: + writer.putInt2(tuple.getInt2(i)); + break; + case INT4: + case DATE: + case INET4: + writer.putInt4(tuple.getInt4(i)); + break; + case INT8: + case TIMESTAMP: + case TIME: + writer.putInt8(tuple.getInt8(i)); + break; + case FLOAT4: + writer.putFloat4(tuple.getFloat4(i)); + break; + case FLOAT8: + writer.putFloat8(tuple.getFloat8(i)); + break; + case TEXT: + writer.putText(tuple.getBytes(i)); + break; + case INTERVAL: + writer.putInterval((IntervalDatum) tuple.getInterval(i)); + break; + case PROTOBUF: + writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i)); + break; + case NULL_TYPE: + writer.skipField(); + break; + default: + throw new UnsupportedException("Unknown data type: " + writer.dataTypes()[i]); + } + } + writer.endRow(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java index 07fa16b..f35c9ee 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java @@ -30,13 +30,11 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.KeyValueSet; import parquet.hadoop.ParquetOutputFormat; -import sun.nio.ch.DirectBuffer; import java.io.DataInput; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -193,16 +191,6 @@ public class StorageUtil extends StorageConstants { } } - public static void closeBuffer(ByteBuffer buffer) { - if (buffer != null) { - if (buffer.isDirect()) { - ((DirectBuffer) buffer).cleaner().clean(); - } else { - buffer.clear(); - } - } - } - public static int readFully(InputStream is, byte[] buffer, int offset, int length) throws IOException { int nread = 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java index a2c08de..64e62ba 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java @@ -26,7 +26,6 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; /** * This class is not thread-safe. @@ -69,6 +68,11 @@ public class TableStatistics { numRows++; } + public void incrementRows(long num) { + numRows += num; + } + + public long getNumRows() { return this.numRows; } @@ -82,7 +86,7 @@ public class TableStatistics { } public void analyzeField(int idx, Datum datum) { - if (datum instanceof NullDatum) { + if (datum.isNull()) { numNulls[idx]++; return; } http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java index d2ccdc7..b42c1b5 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java @@ -45,7 +45,7 @@ public class TextSerializerDeserializer implements SerializerDeserializer { int length = 0; TajoDataTypes.DataType dataType = col.getDataType(); - if (datum == null || datum instanceof NullDatum) { + if (datum == null || datum.isNull()) { switch (dataType.getType()) { case CHAR: case TEXT: http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java index c183171..53e68c7 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java @@ -19,7 +19,6 @@ package org.apache.tajo.storage; import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.ProtobufDatum; public interface Tuple extends Cloneable { @@ -28,6 +27,9 @@ public interface Tuple extends Cloneable { public boolean contains(int fieldid); public boolean isNull(int fieldid); + + @SuppressWarnings("unused") + public boolean isNotNull(int fieldid); public void clear(); @@ -65,7 +67,9 @@ public interface Tuple extends Cloneable { public String getText(int fieldId); - public ProtobufDatum getProtobufDatum(int fieldId); + public Datum getProtobufDatum(int fieldId); + + public Datum getInterval(int fieldId); public char [] getUnicodeChars(int fieldId); http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java index 51388a4..720226b 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java @@ -1,4 +1,4 @@ -/** +/*** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,164 +18,8 @@ package org.apache.tajo.storage; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.common.ProtoObject; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; - import java.util.Comparator; -import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto; -import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; - -/** - * The Comparator class for Tuples - * - * @see Tuple - */ -public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComparatorProto> { - private final int[] sortKeyIds; - private final boolean[] asc; - @SuppressWarnings("unused") - private final boolean[] nullFirsts; - - private Datum left; - private Datum right; - private int compVal; - - /** - * @param schema The schema of input tuples - * @param sortKeys The description of sort keys - */ - public TupleComparator(Schema schema, SortSpec[] sortKeys) { - Preconditions.checkArgument(sortKeys.length > 0, - "At least one sort key must be specified."); - - this.sortKeyIds = new int[sortKeys.length]; - this.asc = new boolean[sortKeys.length]; - this.nullFirsts = new boolean[sortKeys.length]; - for (int i = 0; i < sortKeys.length; i++) { - if (sortKeys[i].getSortKey().hasQualifier()) { - this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName()); - } else { - this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName()); - } - - this.asc[i] = sortKeys[i].isAscending(); - this.nullFirsts[i]= sortKeys[i].isNullFirst(); - } - } - - public TupleComparator(TupleComparatorProto proto) { - this.sortKeyIds = new int[proto.getCompSpecsCount()]; - this.asc = new boolean[proto.getCompSpecsCount()]; - this.nullFirsts = new boolean[proto.getCompSpecsCount()]; - - for (int i = 0; i < proto.getCompSpecsCount(); i++) { - TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i); - sortKeyIds[i] = sortSepcProto.getColumnId(); - asc[i] = sortSepcProto.getAscending(); - nullFirsts[i] = sortSepcProto.getNullFirst(); - } - } - - public boolean isAscendingFirstKey() { - return this.asc[0]; - } - - @Override - public int compare(Tuple tuple1, Tuple tuple2) { - for (int i = 0; i < sortKeyIds.length; i++) { - left = tuple1.get(sortKeyIds[i]); - right = tuple2.get(sortKeyIds[i]); - - if (left instanceof NullDatum || right instanceof NullDatum) { - if (!left.equals(right)) { - if (left instanceof NullDatum) { - compVal = 1; - } else if (right instanceof NullDatum) { - compVal = -1; - } - if (nullFirsts[i]) { - if (compVal != 0) { - compVal *= -1; - } - } - } else { - compVal = 0; - } - } else { - if (asc[i]) { - compVal = left.compareTo(right); - } else { - compVal = right.compareTo(left); - } - } - - if (compVal < 0 || compVal > 0) { - return compVal; - } - } - return 0; - } - - @Override - public int hashCode() { - return Objects.hashCode(sortKeyIds); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof TupleComparator) { - TupleComparator other = (TupleComparator) obj; - if (sortKeyIds.length != other.sortKeyIds.length) { - return false; - } - - for (int i = 0; i < sortKeyIds.length; i++) { - if (sortKeyIds[i] != other.sortKeyIds[i] || - asc[i] != other.asc[i] || - nullFirsts[i] != other.nullFirsts[i]) { - return false; - } - } - - return true; - } else { - return false; - } - } - - @Override - public TupleComparatorProto getProto() { - TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder(); - TupleComparatorSpecProto.Builder sortSpecBuilder; - - for (int i = 0; i < sortKeyIds.length; i++) { - sortSpecBuilder = TupleComparatorSpecProto.newBuilder(); - sortSpecBuilder.setColumnId(sortKeyIds[i]); - sortSpecBuilder.setAscending(asc[i]); - sortSpecBuilder.setNullFirst(nullFirsts[i]); - builder.addCompSpecs(sortSpecBuilder); - } - - return builder.build(); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - - String prefix = ""; - for (int i = 0; i < sortKeyIds.length; i++) { - sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i]) - .append(",Asc=").append(asc[i]) - .append(",NullFirst=").append(nullFirsts[i]); - prefix = " ,"; - } - return sb.toString(); - } -} \ No newline at end of file +public abstract class TupleComparator implements Comparator<Tuple> { + public abstract int compare(Tuple o1, Tuple o2); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java index 6cc09d4..dba02f7 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java @@ -30,10 +30,10 @@ import java.util.Comparator; public class TupleRange implements Comparable<TupleRange>, Cloneable { private Tuple start; private Tuple end; - private final TupleComparator comp; + private final BaseTupleComparator comp; public TupleRange(final SortSpec[] sortSpecs, final Tuple start, final Tuple end) { - this.comp = new TupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs); + this.comp = new BaseTupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs); // if there is only one value, start == end this.start = start; this.end = end; http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java index 4fb35f9..0e2560c 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java @@ -19,10 +19,7 @@ package org.apache.tajo.storage; import com.google.gson.annotations.Expose; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.Inet4Datum; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.datum.*; import org.apache.tajo.exception.UnimplementedException; import java.net.InetAddress; @@ -38,7 +35,6 @@ public class VTuple implements Tuple, Cloneable { public VTuple(Tuple tuple) { this.values = tuple.getValues().clone(); - this.offset = ((VTuple)tuple).offset; } public VTuple(Datum [] datum) { @@ -57,7 +53,12 @@ public class VTuple implements Tuple, Cloneable { @Override public boolean isNull(int fieldid) { - return values[fieldid] instanceof NullDatum; + return values[fieldid].isNull(); + } + + @Override + public boolean isNotNull(int fieldid) { + return !isNull(fieldid); } @Override @@ -179,6 +180,11 @@ public class VTuple implements Tuple, Cloneable { } @Override + public IntervalDatum getInterval(int fieldId) { + return (IntervalDatum) values[fieldId]; + } + + @Override public char[] getUnicodeChars(int fieldId) { return values[fieldId].asUnicodeChars(); } @@ -193,23 +199,7 @@ public class VTuple implements Tuple, Cloneable { } public String toString() { - boolean first = true; - StringBuilder str = new StringBuilder(); - str.append("("); - for(int i=0; i < values.length; i++) { - if(values[i] != null) { - if(first) { - first = false; - } else { - str.append(", "); - } - str.append(i) - .append("=>") - .append(values[i]); - } - } - str.append(")"); - return str.toString(); + return toDisplayString(getValues()); } @Override @@ -230,4 +220,24 @@ public class VTuple implements Tuple, Cloneable { } return false; } + + public static String toDisplayString(Datum [] values) { + boolean first = true; + StringBuilder str = new StringBuilder(); + str.append("("); + for(int i=0; i < values.length; i++) { + if(values[i] != null) { + if(first) { + first = false; + } else { + str.append(", "); + } + str.append(i) + .append("=>") + .append(values[i]); + } + } + str.append(")"); + return str.toString(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java index 6af8da0..6aca8d7 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java @@ -18,30 +18,27 @@ package org.apache.tajo.storage.avro; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.storage.FileAppender; import org.apache.tajo.storage.TableStatistics; import org.apache.tajo.storage.Tuple; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; /** * FileAppender for writing to Avro files. @@ -102,7 +99,7 @@ public class AvroAppender extends FileAppender { } private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) { - if (tuple.get(i) instanceof NullDatum) { + if (tuple.get(i).isNull()) { return null; } switch (avroType) { http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java index 74be7ff..7024bdc 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java @@ -20,13 +20,13 @@ package org.apache.tajo.storage.index; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Schema; -import org.apache.tajo.storage.TupleComparator; +import org.apache.tajo.storage.BaseTupleComparator; import java.io.IOException; public interface IndexMethod { IndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema, - TupleComparator comparator) throws IOException; + BaseTupleComparator comparator) throws IOException; IndexReader getIndexReader(final Path fileName, Schema keySchema, - TupleComparator comparator) throws IOException; + BaseTupleComparator comparator) throws IOException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java index 5d43bd5..d24d474 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java @@ -27,12 +27,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.index.IndexMethod; import org.apache.tajo.storage.index.IndexWriter; import org.apache.tajo.storage.index.OrderIndexReader; @@ -67,13 +67,13 @@ public class BSTIndex implements IndexMethod { @Override public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema, - TupleComparator comparator) throws IOException { + BaseTupleComparator comparator) throws IOException { return new BSTIndexWriter(fileName, level, keySchema, comparator); } @Override public BSTIndexReader getIndexReader(Path fileName, Schema keySchema, - TupleComparator comparator) throws IOException { + BaseTupleComparator comparator) throws IOException { return new BSTIndexReader(fileName, keySchema, comparator); } @@ -89,7 +89,7 @@ public class BSTIndex implements IndexMethod { private Path fileName; private final Schema keySchema; - private final TupleComparator compartor; + private final BaseTupleComparator compartor; private final KeyOffsetCollector collector; private KeyOffsetCollector rootCollector; @@ -108,7 +108,7 @@ public class BSTIndex implements IndexMethod { * @throws IOException */ public BSTIndexWriter(final Path fileName, int level, Schema keySchema, - TupleComparator comparator) throws IOException { + BaseTupleComparator comparator) throws IOException { this.fileName = fileName; this.level = level; this.keySchema = keySchema; @@ -141,7 +141,7 @@ public class BSTIndex implements IndexMethod { collector.put(key, offset); } - public TupleComparator getComparator() { + public BaseTupleComparator getComparator() { return this.compartor; } @@ -253,7 +253,7 @@ public class BSTIndex implements IndexMethod { private class KeyOffsetCollector { private TreeMap<Tuple, LinkedList<Long>> map; - public KeyOffsetCollector(TupleComparator comparator) { + public KeyOffsetCollector(BaseTupleComparator comparator) { map = new TreeMap<Tuple, LinkedList<Long>>(comparator); } @@ -283,7 +283,7 @@ public class BSTIndex implements IndexMethod { public class BSTIndexReader implements OrderIndexReader , Closeable{ private Path fileName; private Schema keySchema; - private TupleComparator comparator; + private BaseTupleComparator comparator; private FileSystem fs; private FSDataInputStream indexIn; @@ -312,7 +312,7 @@ public class BSTIndex implements IndexMethod { * @param comparator * @throws IOException */ - public BSTIndexReader(final Path fileName, Schema keySchema, TupleComparator comparator) throws IOException { + public BSTIndexReader(final Path fileName, Schema keySchema, BaseTupleComparator comparator) throws IOException { this.fileName = fileName; this.keySchema = keySchema; this.comparator = comparator; @@ -327,7 +327,7 @@ public class BSTIndex implements IndexMethod { return this.keySchema; } - public TupleComparator getComparator() { + public BaseTupleComparator getComparator() { return this.comparator; } @@ -350,7 +350,7 @@ public class BSTIndex implements IndexMethod { TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder(); compProto.mergeFrom(compBytes); - this.comparator = new TupleComparator(compProto.build()); + this.comparator = new BaseTupleComparator(compProto.build()); // level this.level = indexIn.readInt(); http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java new file mode 100644 index 0000000..e0c7c97 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.rawfile; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.storage.FileScanner; +import org.apache.tajo.storage.SeekableScanner; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.OffHeapRowBlockReader; +import org.apache.tajo.tuple.offheap.ZeroCopyTuple; +import org.apache.tajo.unit.StorageUnit; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; + +public class DirectRawFileScanner extends FileScanner implements SeekableScanner { + private static final Log LOG = LogFactory.getLog(DirectRawFileScanner.class); + + private FileChannel channel; + private TajoDataTypes.DataType[] columnTypes; + private Path path; + + private boolean eof = false; + private long fileSize; + private FileInputStream fis; + private long recordCount; + + private ZeroCopyTuple unSafeTuple = new ZeroCopyTuple(); + private OffHeapRowBlock tupleBuffer; + private OffHeapRowBlockReader reader = new OffHeapRowBlockReader(tupleBuffer); + + public DirectRawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException { + super(conf, schema, meta, null); + this.path = path; + } + + @SuppressWarnings("unused") + public DirectRawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException { + this(conf, schema, meta, fragment.getPath()); + } + + public void init() throws IOException { + File file; + try { + if (path.toUri().getScheme() != null) { + file = new File(path.toUri()); + } else { + file = new File(path.toString()); + } + } catch (IllegalArgumentException iae) { + throw new IOException(iae); + } + fis = new FileInputStream(file); + channel = fis.getChannel(); + fileSize = channel.size(); + + if (tableStats != null) { + tableStats.setNumBytes(fileSize); + } + if (LOG.isDebugEnabled()) { + LOG.debug("RawFileScanner open:" + path + "," + channel.position() + ", size :" + channel.size()); + } + + columnTypes = new TajoDataTypes.DataType[schema.size()]; + for (int i = 0; i < schema.size(); i++) { + columnTypes[i] = schema.getColumn(i).getDataType(); + } + + tupleBuffer = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + reader = new OffHeapRowBlockReader(tupleBuffer); + + fetchNeeded = !next(tupleBuffer); + + super.init(); + } + + @Override + public long getNextOffset() throws IOException { + return channel.position() - reader.remainForRead(); + } + + @Override + public void seek(long offset) throws IOException { + channel.position(offset); + fetchNeeded = true; + } + + public boolean next(OffHeapRowBlock rowblock) throws IOException { + return rowblock.copyFromChannel(channel, tableStats); + } + + private boolean fetchNeeded = true; + + @Override + public Tuple next() throws IOException { + if(eof) { + return null; + } + + while(true) { + if (fetchNeeded) { + if (!next(tupleBuffer)) { + return null; + } + reader.reset(); + } + + fetchNeeded = !reader.next(unSafeTuple); + + if (!fetchNeeded) { + recordCount++; + return unSafeTuple; + } + } + } + + @Override + public void reset() throws IOException { + // reload initial buffer + fetchNeeded = true; + channel.position(0); + eof = false; + reader.reset(); + } + + @Override + public void close() throws IOException { + if (tableStats != null) { + tableStats.setReadBytes(fileSize); + tableStats.setNumRows(recordCount); + } + tupleBuffer.release(); + tupleBuffer = null; + reader = null; + IOUtils.cleanup(LOG, channel, fis); + } + + @Override + public boolean isProjectable() { + return false; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public boolean isSplittable(){ + return false; + } + + @Override + public float getProgress() { + try { + tableStats.setNumRows(recordCount); + long filePos = 0; + if (channel != null) { + filePos = channel.position(); + tableStats.setReadBytes(filePos); + } + + if(eof || channel == null) { + tableStats.setReadBytes(fileSize); + return 1.0f; + } + + if (filePos == 0) { + return 0.0f; + } else { + return Math.min(1.0f, ((float)filePos / (float)fileSize)); + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + return 0.0f; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java new file mode 100644 index 0000000..1108163 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.rawfile; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.storage.FileAppender; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.TableStatistics; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.BaseTupleBuilder; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.UnSafeTuple; +import org.apache.tajo.unit.StorageUnit; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +public class DirectRawFileWriter extends FileAppender { + public static final String FILE_EXTENSION = "draw"; + private static final Log LOG = LogFactory.getLog(DirectRawFileWriter.class); + + private FileChannel channel; + private RandomAccessFile randomAccessFile; + private TajoDataTypes.DataType[] columnTypes; + private long pos; + + private TableStatistics stats; + + private BaseTupleBuilder builder; + + public DirectRawFileWriter(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException { + super(conf, schema, meta, path); + } + + public void init() throws IOException { + File file; + try { + if (path.toUri().getScheme() != null) { + file = new File(path.toUri()); + } else { + file = new File(path.toString()); + } + } catch (IllegalArgumentException iae) { + throw new IOException(iae); + } + + randomAccessFile = new RandomAccessFile(file, "rw"); + channel = randomAccessFile.getChannel(); + pos = 0; + + columnTypes = new TajoDataTypes.DataType[schema.size()]; + for (int i = 0; i < schema.size(); i++) { + columnTypes[i] = schema.getColumn(i).getDataType(); + } + + if (enabledStats) { + this.stats = new TableStatistics(this.schema); + } + + builder = new BaseTupleBuilder(schema); + + super.init(); + } + + @Override + public long getOffset() throws IOException { + return pos; + } + + public void writeRowBlock(OffHeapRowBlock rowBlock) throws IOException { + channel.write(rowBlock.nioBuffer()); + if (enabledStats) { + stats.incrementRows(rowBlock.rows()); + } + + pos = channel.position(); + } + + private ByteBuffer buffer; + private void ensureSize(int size) throws IOException { + if (buffer.remaining() < size) { + + buffer.limit(buffer.position()); + buffer.flip(); + channel.write(buffer); + + buffer.clear(); + } + } + + @Override + public void addTuple(Tuple t) throws IOException { + if (enabledStats) { + for (int i = 0; i < schema.size(); i++) { + stats.analyzeField(i, t.get(i)); + } + } + + if (buffer == null) { + buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB); + } + + UnSafeTuple unSafeTuple; + + if (!(t instanceof UnSafeTuple)) { + RowStoreUtil.convert(t, builder); + unSafeTuple = builder.buildToZeroCopyTuple(); + } else { + unSafeTuple = (UnSafeTuple) t; + } + + ByteBuffer bb = unSafeTuple.nioBuffer(); + ensureSize(bb.limit()); + buffer.put(bb); + + pos = channel.position() + (buffer.limit() - buffer.remaining()); + + if (enabledStats) { + stats.incrementRow(); + } + } + + @Override + public void flush() throws IOException { + if (buffer != null) { + buffer.limit(buffer.position()); + buffer.flip(); + channel.write(buffer); + buffer.clear(); + } + } + + @Override + public void close() throws IOException { + flush(); + if (enabledStats) { + stats.setNumBytes(getOffset()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path); + } + + IOUtils.cleanup(LOG, channel, randomAccessFile); + } + + @Override + public TableStats getStats() { + if (enabledStats) { + stats.setNumBytes(pos); + return stats.getTableStat(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java new file mode 100644 index 0000000..ec08250 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java @@ -0,0 +1,110 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.*; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable { + private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class); + + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + // buffer + private ByteBuffer buffer; + private long address; + + public BaseTupleBuilder(Schema schema) { + super(SchemaUtil.toDataTypes(schema)); + buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder()); + address = UnsafeUtil.getAddress(buffer); + } + + @Override + public long address() { + return address; + } + + public void ensureSize(int size) { + if (buffer.remaining() - size < 0) { // check the remain size + // enlarge new buffer and copy writing data + int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2); + ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize); + long newAddress = ((DirectBuffer)newByteBuf).address(); + UNSAFE.copyMemory(this.address, newAddress, buffer.limit()); + LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false)); + + // release existing buffer and replace variables + UnsafeUtil.free(buffer); + buffer = newByteBuf; + address = newAddress; + } + } + + @Override + public int position() { + return 0; + } + + @Override + public void forward(int length) { + } + + @Override + public void endRow() { + super.endRow(); + buffer.position(0).limit(offset()); + } + + @Override + public Tuple build() { + return buildToHeapTuple(); + } + + public HeapTuple buildToHeapTuple() { + byte [] bytes = new byte[buffer.limit()]; + UNSAFE.copyMemory(null, address, bytes, Unsafe.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit()); + return new HeapTuple(bytes, dataTypes()); + } + + public ZeroCopyTuple buildToZeroCopyTuple() { + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + zcTuple.set(buffer, 0, buffer.limit(), dataTypes()); + return zcTuple; + } + + public void release() { + UnsafeUtil.free(buffer); + buffer = null; + address = 0; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java new file mode 100644 index 0000000..be734e1 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.storage.Tuple; + +public interface RowBlockReader<T extends Tuple> { + + /** + * Return for each tuple + * + * @return True if tuple block is filled with tuples. Otherwise, It will return false. + */ + public boolean next(T tuple); + + public void reset(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java new file mode 100644 index 0000000..c43c018 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java @@ -0,0 +1,26 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.RowWriter; + +public interface TupleBuilder extends RowWriter { + public Tuple build(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java new file mode 100644 index 0000000..9662d5a --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.UnsafeUtil; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class DirectBufTuple extends UnSafeTuple implements Deallocatable { + private ByteBuffer bb; + + public DirectBufTuple(int length, DataType[] types) { + bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder()); + set(bb, 0, length, types); + } + + @Override + public void release() { + UnsafeUtil.free(bb); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java new file mode 100644 index 0000000..a327123 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +/** + * Fixed size limit specification + */ +public class FixedSizeLimitSpec extends ResizableLimitSpec { + public FixedSizeLimitSpec(long size) { + super(size, size); + } + + public FixedSizeLimitSpec(long size, float allowedOverflowRatio) { + super(size, size, allowedOverflowRatio); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java new file mode 100644 index 0000000..7c7d8a1 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java @@ -0,0 +1,269 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.sun.tools.javac.util.Convert; +import org.apache.tajo.datum.*; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; + +import java.nio.ByteBuffer; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class HeapTuple implements Tuple { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + private static final long BASE_OFFSET = Unsafe.ARRAY_BYTE_BASE_OFFSET; + + private final byte [] data; + private final DataType [] types; + + public HeapTuple(final byte [] bytes, final DataType [] types) { + this.data = bytes; + this.types = types; + } + + @Override + public int size() { + return data.length; + } + + public ByteBuffer nioBuffer() { + return ByteBuffer.wrap(data); + } + + private int getFieldOffset(int fieldId) { + return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)); + } + + private int checkNullAndGetOffset(int fieldId) { + int offset = getFieldOffset(fieldId); + if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) { + throw new RuntimeException("Invalid Field Access: " + fieldId); + } + return offset; + } + + @Override + public boolean contains(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNull(int fieldid) { + return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNotNull(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public void clear() { + // nothing to do + } + + @Override + public void put(int fieldId, Datum value) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum)."); + } + + @Override + public void put(int fieldId, Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum [])."); + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple)."); + } + + @Override + public void put(Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(Datum [])."); + } + + @Override + public Datum get(int fieldId) { + if (isNull(fieldId)) { + return NullDatum.get(); + } + + switch (types[fieldId].getType()) { + case BOOLEAN: + return DatumFactory.createBool(getBool(fieldId)); + case INT1: + case INT2: + return DatumFactory.createInt2(getInt2(fieldId)); + case INT4: + return DatumFactory.createInt4(getInt4(fieldId)); + case INT8: + return DatumFactory.createInt8(getInt4(fieldId)); + case FLOAT4: + return DatumFactory.createFloat4(getFloat4(fieldId)); + case FLOAT8: + return DatumFactory.createFloat8(getFloat8(fieldId)); + case TEXT: + return DatumFactory.createText(getText(fieldId)); + case TIMESTAMP: + return DatumFactory.createTimestamp(getInt8(fieldId)); + case DATE: + return DatumFactory.createDate(getInt4(fieldId)); + case TIME: + return DatumFactory.createTime(getInt8(fieldId)); + case INTERVAL: + return getInterval(fieldId); + case INET4: + return DatumFactory.createInet4(getInt4(fieldId)); + case PROTOBUF: + return getProtobufDatum(fieldId); + default: + throw new UnsupportedException("Unknown type: " + types[fieldId]); + } + } + + @Override + public void setOffset(long offset) { + } + + @Override + public long getOffset() { + return 0; + } + + @Override + public boolean getBool(int fieldId) { + return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01; + } + + @Override + public byte getByte(int fieldId) { + return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public char getChar(int fieldId) { + return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public byte[] getBytes(int fieldId) { + long pos = checkNullAndGetOffset(fieldId); + int len = UNSAFE.getInt(data, BASE_OFFSET + pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UNSAFE.ARRAY_BYTE_BASE_OFFSET, len); + return bytes; + } + + @Override + public short getInt2(int fieldId) { + return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public int getInt4(int fieldId) { + return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public long getInt8(int fieldId) { + return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public float getFloat4(int fieldId) { + return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public double getFloat8(int fieldId) { + return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public String getText(int fieldId) { + return new String(getBytes(fieldId)); + } + + public IntervalDatum getInterval(int fieldId) { + long pos = checkNullAndGetOffset(fieldId); + int months = UNSAFE.getInt(data, BASE_OFFSET + pos); + pos += SizeOf.SIZE_OF_INT; + long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos); + return new IntervalDatum(months, millisecs); + } + + @Override + public Datum getProtobufDatum(int fieldId) { + byte [] bytes = getBytes(fieldId); + + ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode()); + Message.Builder builder = factory.newBuilder(); + try { + builder.mergeFrom(bytes); + } catch (InvalidProtocolBufferException e) { + return NullDatum.get(); + } + + return new ProtobufDatum(builder.build()); + } + + @Override + public char[] getUnicodeChars(int fieldId) { + long pos = checkNullAndGetOffset(fieldId); + int len = UNSAFE.getInt(data, BASE_OFFSET + pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UNSAFE.ARRAY_BYTE_BASE_OFFSET, len); + return Convert.utf2chars(bytes); + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + return this; + } + + @Override + public Datum[] getValues() { + Datum [] datums = new Datum[size()]; + for (int i = 0; i < size(); i++) { + if (contains(i)) { + datums[i] = get(i); + } else { + datums[i] = NullDatum.get(); + } + } + return datums; + } + + @Override + public String toString() { + return VTuple.toDisplayString(getValues()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java new file mode 100644 index 0000000..2f8e349 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class OffHeapMemory implements Deallocatable { + private static final Log LOG = LogFactory.getLog(OffHeapMemory.class); + + protected static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + protected ByteBuffer buffer; + protected int memorySize; + protected ResizableLimitSpec limitSpec; + protected long address; + + @VisibleForTesting + protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) { + this.buffer = buffer; + this.address = ((DirectBuffer) buffer).address(); + this.memorySize = buffer.limit(); + this.limitSpec = limitSpec; + } + + public OffHeapMemory(ResizableLimitSpec limitSpec) { + this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec); + } + + public long address() { + return address; + } + + public long size() { + return memorySize; + } + + public void resize(int newSize) { + Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes"); + + if (newSize > limitSpec.limit()) { + throw new RuntimeException("Resize cannot exceed the size limit"); + } + + if (newSize < memorySize) { + LOG.warn("The size reduction is ignored."); + } + + int newBlockSize = UnsafeUtil.alignedSize(newSize); + ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize); + long newAddress = ((DirectBuffer)newByteBuf).address(); + + UNSAFE.copyMemory(this.address, newAddress, memorySize); + + UnsafeUtil.free(buffer); + this.memorySize = newSize; + this.buffer = newByteBuf; + this.address = newAddress; + } + + public java.nio.Buffer nioBuffer() { + return (ByteBuffer) buffer.position(0).limit(memorySize); + } + + @Override + public void release() { + UnsafeUtil.free(this.buffer); + this.buffer = null; + this.address = 0; + this.memorySize = 0; + } + + public String toString() { + return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java new file mode 100644 index 0000000..689efb7 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java @@ -0,0 +1,176 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.SizeOf; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable { + private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class); + + public static final int NULL_FIELD_OFFSET = -1; + + DataType [] dataTypes; + + // Basic States + private int maxRowNum = Integer.MAX_VALUE; // optional + private int rowNum; + protected int position = 0; + + private OffHeapRowBlockWriter builder; + + private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) { + super(buffer, limitSpec); + initialize(schema); + } + + public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) { + super(limitSpec); + initialize(schema); + } + + private void initialize(Schema schema) { + dataTypes = SchemaUtil.toDataTypes(schema); + + this.builder = new OffHeapRowBlockWriter(this); + } + + @VisibleForTesting + public OffHeapRowBlock(Schema schema, int bytes) { + this(schema, new ResizableLimitSpec(bytes)); + } + + @VisibleForTesting + public OffHeapRowBlock(Schema schema, ByteBuffer buffer) { + this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT); + } + + public void position(int pos) { + this.position = pos; + } + + public void clear() { + this.position = 0; + this.rowNum = 0; + + builder.clear(); + } + + @Override + public ByteBuffer nioBuffer() { + return (ByteBuffer) buffer.position(0).limit(position); + } + + public int position() { + return position; + } + + public long usedMem() { + return position; + } + + /** + * Ensure that this buffer has enough remaining space to add the size. + * Creates and copies to a new buffer if necessary + * + * @param size Size to add + */ + public void ensureSize(int size) { + if (remain() - size < 0) { + if (!limitSpec.canIncrease(memorySize)) { + throw new RuntimeException("Cannot increase RowBlock anymore."); + } + + int newBlockSize = limitSpec.increasedSize(memorySize); + resize(newBlockSize); + LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false)); + } + } + + public long remain() { + return memorySize - position - builder.offset(); + } + + public int maxRowNum() { + return maxRowNum; + } + public int rows() { + return rowNum; + } + + public void setRows(int rowNum) { + this.rowNum = rowNum; + } + + public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException { + if (channel.position() < channel.size()) { + clear(); + + buffer.clear(); + channel.read(buffer); + memorySize = buffer.position(); + + while (position < memorySize) { + long recordPtr = address + position; + + if (remain() < SizeOf.SIZE_OF_INT) { + channel.position(channel.position() - remain()); + memorySize = (int) (memorySize - remain()); + return true; + } + + int recordSize = UNSAFE.getInt(recordPtr); + + if (remain() < recordSize) { + channel.position(channel.position() - remain()); + memorySize = (int) (memorySize - remain()); + return true; + } + + position += recordSize; + rowNum++; + } + + return true; + } else { + return false; + } + } + + public RowWriter getWriter() { + return builder; + } + + public OffHeapRowBlockReader getReader() { + return new OffHeapRowBlockReader(this); + } +}
