IGNITE-1803: Introduced field descriptor and optimized schema lookups for common cases.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/00c3a433 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/00c3a433 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/00c3a433 Branch: refs/heads/ignite-950-new Commit: 00c3a4335669ff7a7c4d18ce1d57a04acd3aa894 Parents: bcb237a Author: vozerov-gridgain <[email protected]> Authored: Fri Oct 30 17:16:35 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Oct 30 17:16:35 2015 +0300 ---------------------------------------------------------------------- .../internal/portable/PortableContext.java | 100 +++------ .../internal/portable/PortableFieldImpl.java | 82 +++++++ .../internal/portable/PortableObjectEx.java | 24 +- .../internal/portable/PortableObjectImpl.java | 132 ++++++++++- .../portable/PortableObjectOffheapImpl.java | 38 ++++ .../internal/portable/PortableObjectSchema.java | 63 ------ .../internal/portable/PortableReaderExImpl.java | 126 +++++++++-- .../internal/portable/PortableSchema.java | 223 +++++++++++++++++++ .../portable/PortableSchemaRegistry.java | 172 ++++++++++++++ .../apache/ignite/portable/PortableField.java | 39 ++++ .../apache/ignite/portable/PortableObject.java | 9 + 11 files changed, 851 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/00c3a433/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java index 3c08df6..e61cba7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java @@ -156,7 +156,7 @@ public class PortableContext implements Externalizable { private boolean keepDeserialized; /** Object schemas. */ - private volatile Map<Integer, Object> schemas; + private volatile Map<Integer, PortableSchemaRegistry> schemas; /** * For {@link Externalizable}. @@ -857,95 +857,51 @@ public class PortableContext implements Externalizable { } /** - * Get schema for the given schema ID. + * Get schema registry for type ID. * - * @param schemaId Schema ID. - * @return Schema or {@code null} if there are no such schema. + * @param typeId Type ID. + * @return Schema registry for type ID. */ - @SuppressWarnings("unchecked") - @Nullable public PortableObjectSchema schema(int typeId, int schemaId) { - Map<Integer, Object> schemas0 = schemas; - - if (schemas0 != null) { - Object typeSchemas = schemas0.get(typeId); - - if (typeSchemas instanceof IgniteBiTuple) { - // The most common case goes first. - IgniteBiTuple<Integer, PortableObjectSchema> schema = - (IgniteBiTuple<Integer, PortableObjectSchema>)typeSchemas; + public PortableSchemaRegistry schemaRegistry(int typeId) { + Map<Integer, PortableSchemaRegistry> schemas0 = schemas; - if (schema.get1() == schemaId) - return schema.get2(); - } - else if (typeSchemas instanceof Map) { - Map<Integer, PortableObjectSchema> curSchemas = (Map<Integer, PortableObjectSchema>)typeSchemas; - - return curSchemas.get(schemaId); - } - } - - return null; - } - - /** - * Add schema. - * - * @param schemaId Schema ID. - * @param newTypeSchema New schema. - */ - @SuppressWarnings("unchecked") - public void addSchema(int typeId, int schemaId, PortableObjectSchema newTypeSchema) { - synchronized (this) { - if (schemas == null) { - // This is the very first schema recorded. - Map<Integer, Object> newSchemas = new HashMap<>(); + if (schemas0 == null) { + synchronized (this) { + schemas0 = schemas; - newSchemas.put(typeId, new IgniteBiTuple<>(schemaId, newTypeSchema)); + if (schemas0 == null) { + schemas0 = new HashMap<>(); - schemas = newSchemas; - } - else { - Object typeSchemas = schemas.get(typeId); + PortableSchemaRegistry reg = new PortableSchemaRegistry(); - if (typeSchemas == null) { - // This is the very first object schema. - Map<Integer, Object> newSchemas = new HashMap<>(schemas); + schemas0.put(typeId, reg); - newSchemas.put(typeId, new IgniteBiTuple<>(schemaId, newTypeSchema)); + schemas = schemas0; - schemas = newSchemas; + return reg; } - else if (typeSchemas instanceof IgniteBiTuple) { - IgniteBiTuple<Integer, PortableObjectSchema> typeSchema = - (IgniteBiTuple<Integer, PortableObjectSchema>)typeSchemas; - - if (typeSchema.get1() != schemaId) { - Map<Integer, PortableObjectSchema> newTypeSchemas = new HashMap(); - - newTypeSchemas.put(typeSchema.get1(), typeSchema.get2()); - newTypeSchemas.put(schemaId, newTypeSchema); - - Map<Integer, Object> newSchemas = new HashMap<>(schemas); + } + } - newSchemas.put(typeId, newTypeSchemas); + PortableSchemaRegistry reg = schemas0.get(typeId); - schemas = newSchemas; - } - } - else { - Map<Integer, PortableObjectSchema> newTypeSchemas = - new HashMap((Map<Integer, PortableObjectSchema>)typeSchemas); + if (reg == null) { + synchronized (this) { + reg = schemas.get(typeId); - newTypeSchemas.put(schemaId, newTypeSchema); + if (reg == null) { + reg = new PortableSchemaRegistry(); - Map<Integer, Object> newSchemas = new HashMap<>(schemas); + schemas0 = new HashMap<>(schemas); - newSchemas.put(typeId, newTypeSchemas); + schemas0.put(typeId, reg); - schemas = newSchemas; + schemas = schemas0; } } } + + return reg; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/00c3a433/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableFieldImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableFieldImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableFieldImpl.java new file mode 100644 index 0000000..5780b76 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableFieldImpl.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable; + +import org.apache.ignite.portable.PortableField; +import org.apache.ignite.portable.PortableObject; + +/** + * Implementation of portable field descriptor. + */ +public class PortableFieldImpl implements PortableField { + /** Well-known object schemas. */ + private final PortableSchemaRegistry schemas; + + /** Pre-calculated field ID. */ + private final int fieldId; + + /** + * Constructor. + * + * @param schemas Schemas. + * @param fieldId Field ID. + */ + public PortableFieldImpl(PortableSchemaRegistry schemas, int fieldId) { + this.schemas = schemas; + this.fieldId = fieldId; + } + + /** {@inheritDoc} */ + @Override public boolean exists(PortableObject obj) { + PortableObjectEx obj0 = (PortableObjectEx)obj; + + return fieldOffset(obj0) != 0; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> T value(PortableObject obj) { + PortableObjectEx obj0 = (PortableObjectEx)obj; + + int offset = fieldOffset(obj0); + + return offset != 0 ? (T)obj0.fieldByOffset(offset) : null; + } + + /** + * Get relative field offset. + * + * @param obj Object. + * @return Field offset. + */ + private int fieldOffset(PortableObjectEx obj) { + int schemaId = obj.schemaId(); + + PortableSchema schema = schemas.schema(schemaId); + + if (schema == null) { + schema = obj.createSchema(); + + schemas.addSchema(schemaId, schema); + } + + assert schema != null; + + return schema.offset(fieldId); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00c3a433/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectEx.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectEx.java index ef9ee24..42c973b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectEx.java @@ -67,12 +67,34 @@ public abstract class PortableObjectEx implements PortableObject { @Nullable public abstract <F> F field(int fieldId) throws PortableException; /** + * Get field by offset. + * + * @param fieldOffset Field offset. + * @return Field value. + */ + @Nullable protected abstract <F> F fieldByOffset(int fieldOffset); + + /** * @param ctx Reader context. * @param fieldName Field name. - * @return Field name. + * @return Field value. */ @Nullable protected abstract <F> F field(PortableReaderContext ctx, String fieldName); + /** + * Get schema ID. + * + * @return Schema ID. + */ + protected abstract int schemaId(); + + /** + * Create schema for object. + * + * @return Schema. + */ + protected abstract PortableSchema createSchema(); + /** {@inheritDoc} */ @Override public PortableObject clone() throws CloneNotSupportedException { return (PortableObject)super.clone(); http://git-wip-us.apache.org/repos/asf/ignite/blob/00c3a433/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectImpl.java index f1868b1..dfe6d88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectImpl.java @@ -17,11 +17,6 @@ package org.apache.ignite.internal.portable; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.IgniteCodeGeneratingFail; @@ -35,10 +30,26 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.apache.ignite.portable.PortableException; +import org.apache.ignite.portable.PortableField; import org.apache.ignite.portable.PortableMetadata; import org.apache.ignite.portable.PortableObject; import org.jetbrains.annotations.Nullable; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.nio.ByteBuffer; + +import static org.apache.ignite.internal.portable.GridPortableMarshaller.BOOLEAN; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.BYTE; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.CHAR; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.DOUBLE; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.FLOAT; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.INT; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.LONG; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.SHORT; + /** * Portable object implementation. */ @@ -248,6 +259,94 @@ public final class PortableObjectImpl extends PortableObjectEx implements Extern /** {@inheritDoc} */ @SuppressWarnings("unchecked") + @Nullable @Override protected <F> F fieldByOffset(int fieldOffset) { + Object val; + + int schemaOffset = PRIM.readInt(arr, start + GridPortableMarshaller.SCHEMA_OR_RAW_OFF_POS); + int fieldPos = PRIM.readInt(arr, start + schemaOffset + fieldOffset); + + // Read header and try performing fast lookup for well-known types (the most common types go first). + byte hdr = PRIM.readByte(arr, fieldPos); + + switch (hdr) { + case INT: + val = PRIM.readInt(arr, fieldPos + 1); + + break; + + case LONG: + val = PRIM.readLong(arr, fieldPos + 1); + + break; + + case BOOLEAN: + val = PRIM.readBoolean(arr, fieldPos + 1); + + break; + + case SHORT: + val = PRIM.readShort(arr, fieldPos + 1); + + break; + + case BYTE: + val = PRIM.readByte(arr, fieldPos + 1); + + break; + + case CHAR: + val = PRIM.readChar(arr, fieldPos + 1); + + break; + + case FLOAT: + val = PRIM.readFloat(arr, fieldPos + 1); + + break; + + case DOUBLE: + val = PRIM.readDouble(arr, fieldPos + 1); + + break; + +// case DECIMAL: +// val = doReadDecimal(); +// +// break; +// +// case STRING: +// val = doReadString(); +// +// break; +// +// case UUID: +// val = doReadUuid(); +// +// break; +// +// case DATE: +// val = doReadDate(); +// +// break; +// +// case TIMESTAMP: +// val = doReadTimestamp(); +// +// break; + + default: { + // TODO: Pass absolute offset, not relative. + PortableReaderExImpl reader = new PortableReaderExImpl(ctx, arr, start, null); + + val = reader.unmarshalFieldByOffset(fieldOffset); + } + } + + return (F)val; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Nullable @Override protected <F> F field(PortableReaderContext rCtx, String fieldName) { PortableReaderExImpl reader = new PortableReaderExImpl(ctx, new PortableHeapInputStream(arr), @@ -298,6 +397,29 @@ public final class PortableObjectImpl extends PortableObjectEx implements Extern } /** {@inheritDoc} */ + @Override protected int schemaId() { + return PRIM.readInt(arr, start + GridPortableMarshaller.SCHEMA_ID_POS); + } + + /** {@inheritDoc} */ + @Override protected PortableSchema createSchema() { + PortableReaderExImpl reader = new PortableReaderExImpl(ctx, arr, start, null); + + return reader.createSchema(); + } + + /** {@inheritDoc} */ + @Override public PortableField fieldDescriptor(String fieldName) throws PortableException { + int typeId = typeId(); + + PortableSchemaRegistry schemaReg = ctx.schemaRegistry(typeId); + + int fieldId = ctx.userTypeIdMapper(typeId).fieldId(typeId, fieldName); + + return new PortableFieldImpl(schemaReg, fieldId); + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/00c3a433/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectOffheapImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectOffheapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectOffheapImpl.java index 0b3e3ea..f023f2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectOffheapImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectOffheapImpl.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.apache.ignite.portable.PortableException; +import org.apache.ignite.portable.PortableField; import org.apache.ignite.portable.PortableMetadata; import org.apache.ignite.portable.PortableObject; import org.jetbrains.annotations.Nullable; @@ -101,6 +102,32 @@ public class PortableObjectOffheapImpl extends PortableObjectEx implements Exter } /** {@inheritDoc} */ + @Override protected int schemaId() { + return UNSAFE.getInt(ptr + start + GridPortableMarshaller.SCHEMA_ID_POS); + } + + /** {@inheritDoc} */ + @Override protected PortableSchema createSchema() { + PortableReaderExImpl reader = new PortableReaderExImpl(ctx, + new PortableOffheapInputStream(ptr, size, false), + start, + null); + + return reader.createSchema(); + } + + /** {@inheritDoc} */ + @Override public PortableField fieldDescriptor(String fieldName) throws PortableException { + int typeId = typeId(); + + PortableSchemaRegistry schemaReg = ctx.schemaRegistry(typeId); + + int fieldId = ctx.userTypeIdMapper(typeId).fieldId(typeId, fieldName); + + return new PortableFieldImpl(schemaReg, fieldId); + } + + /** {@inheritDoc} */ @Override public int start() { return start; } @@ -152,6 +179,17 @@ public class PortableObjectOffheapImpl extends PortableObjectEx implements Exter /** {@inheritDoc} */ @SuppressWarnings("unchecked") + @Nullable @Override protected <F> F fieldByOffset(int fieldOffset) { + PortableReaderExImpl reader = new PortableReaderExImpl(ctx, + new PortableOffheapInputStream(ptr, size, false), + start, + null); + + return (F)reader.unmarshalFieldByOffset(fieldOffset); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Nullable @Override protected <F> F field(PortableReaderContext rCtx, String fieldName) { PortableReaderExImpl reader = new PortableReaderExImpl(ctx, new PortableOffheapInputStream(ptr, size, false), http://git-wip-us.apache.org/repos/asf/ignite/blob/00c3a433/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectSchema.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectSchema.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectSchema.java deleted file mode 100644 index 917ee73..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableObjectSchema.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.portable; - -import java.util.Map; - -/** - * Portable object schema. - */ -public class PortableObjectSchema { - /** Schema ID. */ - private final int schemaId; - - /** Fields. */ - private final Map<Integer, Integer> fields; - - /** - * Constructor. - * - * @param schemaId Schema ID. - * @param fields Fields. - */ - public PortableObjectSchema(int schemaId, Map<Integer, Integer> fields) { - this.schemaId = schemaId; - this.fields = fields; - } - - /** - * Get schema ID. - * - * @return Schema ID. - */ - public int schemaId() { - return schemaId; - } - - /** - * Get field offset position. - * - * @param fieldId Field ID. - * @return Field offset position. - */ - public int fieldOffsetPosition(int fieldId) { - Integer pos = fields.get(fieldId); - - return pos != null ? pos : 0; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/00c3a433/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableReaderExImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableReaderExImpl.java index 3d47cb0..aa1519d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableReaderExImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableReaderExImpl.java @@ -47,6 +47,7 @@ import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; import java.util.Date; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; import java.util.Properties; @@ -117,6 +118,9 @@ import static org.apache.ignite.internal.portable.GridPortableMarshaller.UUID_AR */ @SuppressWarnings("unchecked") public class PortableReaderExImpl implements PortableReader, PortableRawReaderEx, ObjectInput { + /** Length of a single field descriptor. */ + private static final int FIELD_DESC_LEN = 16; + /** */ private final PortableContext ctx; @@ -151,7 +155,7 @@ public class PortableReaderExImpl implements PortableReader, PortableRawReaderEx private int footerStart; /** Footer end. */ - private int footerEnd; + private int footerLen; /** ID mapper. */ private PortableIdMapper idMapper; @@ -160,7 +164,7 @@ public class PortableReaderExImpl implements PortableReader, PortableRawReaderEx private int schemaId; /** Object schema. */ - private PortableObjectSchema schema; + private PortableSchema schema; /** * @param ctx Context. @@ -224,7 +228,7 @@ public class PortableReaderExImpl implements PortableReader, PortableRawReaderEx IgniteBiTuple<Integer, Integer> footer = PortableUtils.footerAbsolute(in, start); footerStart = footer.get1(); - footerEnd = footer.get2(); + footerLen = footer.get2() - footerStart; schemaId = in.readIntPositioned(start + GridPortableMarshaller.SCHEMA_ID_POS); @@ -303,6 +307,21 @@ public class PortableReaderExImpl implements PortableReader, PortableRawReaderEx } /** + * @param fieldOffset Field offset. + * @return Unmarshalled value. + * @throws PortableException In case of error. + */ + @Nullable Object unmarshalFieldByOffset(int fieldOffset) throws PortableException { + assert fieldOffset != 0; + + parseHeaderIfNeeded(); + + in.position(start + in.readIntPositioned(footerStart + fieldOffset)); + + return unmarshal(); + } + + /** * @param fieldId Field ID. * @return Value. * @throws PortableException In case of error. @@ -1418,12 +1437,20 @@ public class PortableReaderExImpl implements PortableReader, PortableRawReaderEx /** * @param fieldName Field name. - * @return {@code true} if field is set. + * @return {@code True} if field is set. */ public boolean hasField(String fieldName) { return hasField(fieldId(fieldName)); } + /** + * @param fieldId Field ID. + * @return {@code True} if field is set. + */ + private boolean hasField(int fieldId) { + return fieldOffset(fieldId) != 0; + } + /** {@inheritDoc} */ @Override public PortableRawReader rawReader() { in.position(rawOff); @@ -2528,33 +2555,100 @@ public class PortableReaderExImpl implements PortableReader, PortableRawReaderEx } /** + * Create schema. + * + * @return Schema. + */ + public PortableSchema createSchema() { + parseHeaderIfNeeded(); + + LinkedHashMap<Integer, Integer> fields = new LinkedHashMap<>(); + + int searchPos = footerStart; + int searchEnd = searchPos + footerLen; + + while (searchPos < searchEnd) { + int fieldId = in.readIntPositioned(searchPos); + + fields.put(fieldId, searchPos + 4 - footerStart); + + searchPos += 8; + } + + return new PortableSchema(fields); + } + + /** * @param id Field ID. * @return Field offset. */ - private boolean hasField(int id) { + private int fieldOffset(int id) { assert hdrLen != 0; - int searchHead = footerStart; - int searchTail = footerEnd; + if (footerLen == 0) + return 0; + + int searchPos = footerStart; + int searchTail = searchPos + footerLen; - while (true) { - if (searchHead >= searchTail) - return false; + if (hasLowFieldsCount(footerLen)) { + while (true) { + if (searchPos >= searchTail) + return 0; - int id0 = in.readIntPositioned(searchHead); + int id0 = in.readIntPositioned(searchPos); - if (id0 == id) { - int offset = in.readIntPositioned(searchHead + 4); + if (id0 == id) { + int pos = start + in.readIntPositioned(searchPos + 4); - in.position(start + offset); + in.position(pos); - return true; + return pos; + } + + searchPos += 8; } + } + else { + PortableSchema schema0 = schema; + + if (schema0 == null) { + schema0 = ctx.schemaRegistry(typeId).schema(schemaId); + + if (schema0 == null) { + schema0 = createSchema(); - searchHead += 8; + ctx.schemaRegistry(typeId).addSchema(schemaId, schema0); + } + + schema = schema0; + } + + int fieldOffsetPos = schema.offset(id); + + if (fieldOffsetPos != 0) { + int pos = start + in.readIntPositioned(footerStart + fieldOffsetPos); + + in.position(pos); + + return pos; + } + else + return 0; } } + /** + * Check whether object has low amount of fields. + * + * @param footerLen Footer length. + */ + private boolean hasLowFieldsCount(int footerLen) { + assert hdrParsed; + + return footerLen < (FIELD_DESC_LEN << 4); + } + /** {@inheritDoc} */ @Override public int readUnsignedByte() throws IOException { return readByte() & 0xff; http://git-wip-us.apache.org/repos/asf/ignite/blob/00c3a433/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java new file mode 100644 index 0000000..09bfe35 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Schema describing portable object content. We rely on the following assumptions: + * - When amount of fields in the object is low, it is better to inline these values into int fields thus allowing + * for quick comparisons performed within already fetched L1 cache line. + * - When there are more fields, we store them inside a hash map. + */ +public class PortableSchema { + /** Inline flag. */ + private final boolean inline; + + /** Map with offsets. */ + private final HashMap<Integer, Integer> map; + + /** ID 1. */ + private final int id1; + + /** Offset 1. */ + private final int offset1; + + /** ID 2. */ + private final int id2; + + /** Offset 2. */ + private final int offset2; + + /** ID 3. */ + private final int id3; + + /** Offset 3. */ + private final int offset3; + + /** ID 4. */ + private final int id4; + + /** Offset 4. */ + private final int offset4; + + /** ID 1. */ + private final int id5; + + /** Offset 1. */ + private final int offset5; + + /** ID 2. */ + private final int id6; + + /** Offset 2. */ + private final int offset6; + + /** ID 3. */ + private final int id7; + + /** Offset 3. */ + private final int offset7; + + /** ID 4. */ + private final int id8; + + /** Offset 4. */ + private final int offset8; + + /** + * Constructor. + * + * @param vals Values. + */ + public PortableSchema(LinkedHashMap<Integer, Integer> vals) { + if (vals.size() <= 8) { + inline = true; + + Iterator<Map.Entry<Integer, Integer>> iter = vals.entrySet().iterator(); + + Map.Entry<Integer, Integer> entry = iter.hasNext() ? iter.next() : null; + + if (entry != null) { + id1 = entry.getKey(); + offset1 = entry.getValue(); + } + else{ + id1 = 0; + offset1 = 0; + } + + if ((entry = iter.hasNext() ? iter.next() : null) != null) { + id2 = entry.getKey(); + offset2 = entry.getValue(); + } + else{ + id2 = 0; + offset2 = 0; + } + + if ((entry = iter.hasNext() ? iter.next() : null) != null) { + id3 = entry.getKey(); + offset3 = entry.getValue(); + } + else{ + id3 = 0; + offset3 = 0; + } + + if ((entry = iter.hasNext() ? iter.next() : null) != null) { + id4 = entry.getKey(); + offset4 = entry.getValue(); + } + else{ + id4 = 0; + offset4 = 0; + } + + if ((entry = iter.hasNext() ? iter.next() : null) != null) { + id5 = entry.getKey(); + offset5 = entry.getValue(); + } + else{ + id5 = 0; + offset5 = 0; + } + + if ((entry = iter.hasNext() ? iter.next() : null) != null) { + id6 = entry.getKey(); + offset6 = entry.getValue(); + } + else{ + id6 = 0; + offset6 = 0; + } + + if ((entry = iter.hasNext() ? iter.next() : null) != null) { + id7 = entry.getKey(); + offset7 = entry.getValue(); + } + else{ + id7 = 0; + offset7 = 0; + } + + if ((entry = iter.hasNext() ? iter.next() : null) != null) { + id8 = entry.getKey(); + offset8 = entry.getValue(); + } + else{ + id8 = 0; + offset8 = 0; + } + + map = null; + } + else { + inline = false; + + id1 = id2 = id3 = id4 = id5 = id6 = id7 = id8 = 0; + offset1 = offset2 = offset3 = offset4 = offset5 = offset6 = offset7 = offset8 = 0; + + map = new HashMap<>(vals); + } + } + + /** + * Get offset for the given field ID. + * + * @param id Field ID. + * @return Offset or {@code 0} if there is no such field. + */ + public int offset(int id) { + if (inline) { + if (id == id1) + return offset1; + + if (id == id2) + return offset2; + + if (id == id3) + return offset3; + + if (id == id4) + return offset4; + + if (id == id5) + return offset5; + + if (id == id6) + return offset6; + + if (id == id7) + return offset7; + + if (id == id8) + return offset8; + + return 0; + } + else { + Integer off = map.get(id); + + return off != null ? off : 0; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00c3a433/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchemaRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchemaRegistry.java new file mode 100644 index 0000000..e442b1f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchemaRegistry.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable; + +import org.jetbrains.annotations.Nullable; + +import java.util.HashMap; + +/** + * Portable schema registry. Contains all well-known object schemas. + * <p> + * We rely on the fact that usually object has only few different schemas. For this reason we inline several + * of them with optional fallback to normal hash map lookup. + * + */ +public class PortableSchemaRegistry { + /** Empty schema ID. */ + private static final int EMPTY = 0; + + /** Whether registry still works in inline mode. */ + private volatile boolean inline = true; + + /** First schema ID. */ + private int schemaId1; + + /** Second schema ID. */ + private int schemaId2; + + /** Third schema ID. */ + private int schemaId3; + + /** Fourth schema ID. */ + private int schemaId4; + + /** First schema. */ + private PortableSchema schema1; + + /** Second schema. */ + private PortableSchema schema2; + + /** Third schema. */ + private PortableSchema schema3; + + /** Fourth schema. */ + private PortableSchema schema4; + + /** Schemas with COW semantics. */ + private volatile HashMap<Integer, PortableSchema> schemas; + + /** + * Get schema for the given ID. We rely on very relaxed memory semantics here assuming that it is not critical + * to return false-positive {@code null} values. + * + * @param schemaId Schema ID. + * @return Schema or {@code null}. + */ + @Nullable public PortableSchema schema(int schemaId) { + if (inline) { + if (schemaId == schemaId1) + return schema1; + else if (schemaId == schemaId2) + return schema2; + else if (schemaId == schemaId3) + return schema3; + else if (schemaId == schemaId4) + return schema4; + } + else { + HashMap<Integer, PortableSchema> schemas0 = schemas; + + // Null can be observed here due to either data race or race condition when switching to non-inlined mode. + // Both of them are benign for us because they lead only to unnecessary schema re-calc. + if (schemas0 != null) + return schemas0.get(schemaId); + } + + return null; + } + + /** + * Add schema. + * + * @param schemaId Schema ID. + * @param schema Schema. + */ + public void addSchema(int schemaId, PortableSchema schema) { + synchronized (this) { + if (inline) { + // Check if this is already known schema. + if (schemaId == schemaId1 || schemaId == schemaId2 || schemaId == schemaId3 || schemaId == schemaId4) + return; + + // Try positioning new schema in inline mode. + if (schemaId1 == EMPTY) { + schemaId1 = schemaId; + + schema1 = schema; + + inline = true; // Forcing HB edge just in case. + + return; + } + + if (schemaId2 == EMPTY) { + schemaId2 = schemaId; + + schema2 = schema; + + inline = true; // Forcing HB edge just in case. + + return; + } + + if (schemaId3 == EMPTY) { + schemaId3 = schemaId; + + schema3 = schema; + + inline = true; // Forcing HB edge just in case. + + return; + } + + if (schemaId4 == EMPTY) { + schemaId4 = schemaId; + + schema4 = schema; + + inline = true; // Forcing HB edge just in case. + + return; + } + + // No luck, switching to hash map mode. + HashMap<Integer, PortableSchema> newSchemas = new HashMap<>(); + + newSchemas.put(schemaId1, schema1); + newSchemas.put(schemaId2, schema2); + newSchemas.put(schemaId3, schema3); + newSchemas.put(schemaId4, schema4); + + newSchemas.put(schemaId, schema); + + schemas = newSchemas; + + inline = false; + } + else { + HashMap<Integer, PortableSchema> newSchemas = new HashMap<>(schemas); + + newSchemas.put(schemaId, schema); + + schemas = newSchemas; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00c3a433/modules/core/src/main/java/org/apache/ignite/portable/PortableField.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/portable/PortableField.java b/modules/core/src/main/java/org/apache/ignite/portable/PortableField.java new file mode 100644 index 0000000..f8851ee --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/portable/PortableField.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.portable; + +/** + * Portable object field. Can be used to speed object field lookup. + */ +public interface PortableField { + /** + * Check whether field exists in the object. + * + * @param obj Object. + * @return {@code True} if exists. + */ + public boolean exists(PortableObject obj); + + /** + * Get field's value from the given object. + * + * @param obj Object. + * @return Value. + */ + public <T> T value(PortableObject obj); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00c3a433/modules/core/src/main/java/org/apache/ignite/portable/PortableObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/portable/PortableObject.java b/modules/core/src/main/java/org/apache/ignite/portable/PortableObject.java index 92b25aa..ce7063a 100644 --- a/modules/core/src/main/java/org/apache/ignite/portable/PortableObject.java +++ b/modules/core/src/main/java/org/apache/ignite/portable/PortableObject.java @@ -137,6 +137,15 @@ public interface PortableObject extends IgniteObject, Cloneable { @Override public boolean hasField(String fieldName); /** + * Gets field descriptor. + * + * @param fieldName Field name. + * @return Field descriptor. + * @throws PortableException If failed. + */ + public PortableField fieldDescriptor(String fieldName) throws PortableException; + + /** * Gets fully deserialized instance of portable object. * * @return Fully deserialized instance of portable object.
