http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java new file mode 100644 index 0000000..ad8671d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java @@ -0,0 +1,1360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.direct.stream.v1; + +import java.lang.reflect.Array; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.UUID; +import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +/** + * Direct marshalling I/O stream (version 1). + */ +public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream { + /** */ + private static final Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** */ + private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); + + /** */ + private static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class); + + /** */ + private static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class); + + /** */ + private static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class); + + /** */ + private static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class); + + /** */ + private static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class); + + /** */ + private static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class); + + /** */ + private static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class); + + /** */ + private static final byte[] BYTE_ARR_EMPTY = new byte[0]; + + /** */ + private static final short[] SHORT_ARR_EMPTY = new short[0]; + + /** */ + private static final int[] INT_ARR_EMPTY = U.EMPTY_INTS; + + /** */ + private static final long[] LONG_ARR_EMPTY = U.EMPTY_LONGS; + + /** */ + private static final float[] FLOAT_ARR_EMPTY = new float[0]; + + /** */ + private static final double[] DOUBLE_ARR_EMPTY = new double[0]; + + /** */ + private static final char[] CHAR_ARR_EMPTY = new char[0]; + + /** */ + private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0]; + + /** */ + private static final ArrayCreator<byte[]> BYTE_ARR_CREATOR = new ArrayCreator<byte[]>() { + @Override public byte[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return BYTE_ARR_EMPTY; + + default: + return new byte[len]; + } + } + }; + + /** */ + private static final ArrayCreator<short[]> SHORT_ARR_CREATOR = new ArrayCreator<short[]>() { + @Override public short[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return SHORT_ARR_EMPTY; + + default: + return new short[len]; + } + } + }; + + /** */ + private static final ArrayCreator<int[]> INT_ARR_CREATOR = new ArrayCreator<int[]>() { + @Override public int[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return INT_ARR_EMPTY; + + default: + return new int[len]; + } + } + }; + + /** */ + private static final ArrayCreator<long[]> LONG_ARR_CREATOR = new ArrayCreator<long[]>() { + @Override public long[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return LONG_ARR_EMPTY; + + default: + return new long[len]; + } + } + }; + + /** */ + private static final ArrayCreator<float[]> FLOAT_ARR_CREATOR = new ArrayCreator<float[]>() { + @Override public float[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return FLOAT_ARR_EMPTY; + + default: + return new float[len]; + } + } + }; + + /** */ + private static final ArrayCreator<double[]> DOUBLE_ARR_CREATOR = new ArrayCreator<double[]>() { + @Override public double[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return DOUBLE_ARR_EMPTY; + + default: + return new double[len]; + } + } + }; + + /** */ + private static final ArrayCreator<char[]> CHAR_ARR_CREATOR = new ArrayCreator<char[]>() { + @Override public char[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return CHAR_ARR_EMPTY; + + default: + return new char[len]; + } + } + }; + + /** */ + private static final ArrayCreator<boolean[]> BOOLEAN_ARR_CREATOR = new ArrayCreator<boolean[]>() { + @Override public boolean[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return BOOLEAN_ARR_EMPTY; + + default: + return new boolean[len]; + } + } + }; + + /** */ + private static final Object NULL = new Object(); + + /** */ + private final MessageFactory msgFactory; + + /** */ + private ByteBuffer buf; + + /** */ + private byte[] heapArr; + + /** */ + private long baseOff; + + /** */ + private int arrOff = -1; + + /** */ + private Object tmpArr; + + /** */ + private int tmpArrOff; + + /** */ + private int tmpArrBytes; + + /** */ + private boolean msgTypeDone; + + /** */ + private Message msg; + + /** */ + private Iterator<?> mapIt; + + /** */ + private Iterator<?> it; + + /** */ + private Iterator<?> arrIt; + + /** */ + private Object arrCur = NULL; + + /** */ + private Object mapCur = NULL; + + /** */ + private Object cur = NULL; + + /** */ + private boolean keyDone; + + /** */ + private int readSize = -1; + + /** */ + private int readItems; + + /** */ + private Object[] objArr; + + /** */ + private Collection<Object> col; + + /** */ + private Map<Object, Object> map; + + /** */ + private boolean lastFinished; + + /** + * @param msgFactory Message factory. + */ + public DirectByteBufferStreamImplV1(MessageFactory msgFactory) { + this.msgFactory = msgFactory; + } + + /** {@inheritDoc} */ + @Override public void setBuffer(ByteBuffer buf) { + assert buf != null; + + if (this.buf != buf) { + this.buf = buf; + + heapArr = buf.isDirect() ? null : buf.array(); + baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : BYTE_ARR_OFF; + } + } + + /** {@inheritDoc} */ + @Override public int remaining() { + return buf.remaining(); + } + + /** {@inheritDoc} */ + @Override public boolean lastFinished() { + return lastFinished; + } + + /** {@inheritDoc} */ + @Override public void writeByte(byte val) { + lastFinished = buf.remaining() >= 1; + + if (lastFinished) { + int pos = buf.position(); + + UNSAFE.putByte(heapArr, baseOff + pos, val); + + buf.position(pos + 1); + } + } + + /** {@inheritDoc} */ + @Override public void writeShort(short val) { + lastFinished = buf.remaining() >= 2; + + if (lastFinished) { + int pos = buf.position(); + + UNSAFE.putShort(heapArr, baseOff + pos, val); + + buf.position(pos + 2); + } + } + + /** {@inheritDoc} */ + @Override public void writeInt(int val) { + lastFinished = buf.remaining() >= 4; + + if (lastFinished) { + int pos = buf.position(); + + UNSAFE.putInt(heapArr, baseOff + pos, val); + + buf.position(pos + 4); + } + } + + /** {@inheritDoc} */ + @Override public void writeLong(long val) { + lastFinished = buf.remaining() >= 8; + + if (lastFinished) { + int pos = buf.position(); + + UNSAFE.putLong(heapArr, baseOff + pos, val); + + buf.position(pos + 8); + } + } + + /** + * @param val Value. + */ + @Override public void writeFloat(float val) { + lastFinished = buf.remaining() >= 4; + + if (lastFinished) { + int pos = buf.position(); + + UNSAFE.putFloat(heapArr, baseOff + pos, val); + + buf.position(pos + 4); + } + } + + /** {@inheritDoc} */ + @Override public void writeDouble(double val) { + lastFinished = buf.remaining() >= 8; + + if (lastFinished) { + int pos = buf.position(); + + UNSAFE.putDouble(heapArr, baseOff + pos, val); + + buf.position(pos + 8); + } + } + + /** {@inheritDoc} */ + @Override public void writeChar(char val) { + lastFinished = buf.remaining() >= 2; + + if (lastFinished) { + int pos = buf.position(); + + UNSAFE.putChar(heapArr, baseOff + pos, val); + + buf.position(pos + 2); + } + } + + /** {@inheritDoc} */ + @Override public void writeBoolean(boolean val) { + lastFinished = buf.remaining() >= 1; + + if (lastFinished) { + int pos = buf.position(); + + UNSAFE.putBoolean(heapArr, baseOff + pos, val); + + buf.position(pos + 1); + } + } + + /** {@inheritDoc} */ + @Override public void writeByteArray(byte[] val) { + if (val != null) + lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeByteArray(byte[] val, long off, int len) { + if (val != null) + lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeShortArray(short[] val) { + if (val != null) + lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeIntArray(int[] val) { + if (val != null) + lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeLongArray(long[] val) { + if (val != null) + lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeFloatArray(float[] val) { + if (val != null) + lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeDoubleArray(double[] val) { + if (val != null) + lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeCharArray(char[] val) { + if (val != null) + lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeBooleanArray(boolean[] val) { + if (val != null) + lastFinished = writeArray(val, BOOLEAN_ARR_OFF, val.length, val.length); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeString(String val) { + writeByteArray(val != null ? val.getBytes() : null); + } + + /** {@inheritDoc} */ + @Override public void writeBitSet(BitSet val) { + writeLongArray(val != null ? val.toLongArray() : null); + } + + /** {@inheritDoc} */ + @Override public void writeUuid(UUID val) { + writeByteArray(val != null ? U.uuidToBytes(val) : null); + } + + /** {@inheritDoc} */ + @Override public void writeIgniteUuid(IgniteUuid val) { + writeByteArray(val != null ? U.igniteUuidToBytes(val) : null); + } + + /** {@inheritDoc} */ + @Override public void writeMessage(Message msg, MessageWriter writer) { + if (msg != null) { + if (buf.hasRemaining()) { + try { + writer.beforeInnerMessageWrite(); + + lastFinished = msg.writeTo(buf, writer); + } + finally { + writer.afterInnerMessageWrite(lastFinished); + } + } + else + lastFinished = false; + } + else + writeByte(Byte.MIN_VALUE); + } + + /** {@inheritDoc} */ + @Override public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer) { + if (arr != null) { + if (arrIt == null) { + writeInt(arr.length); + + if (!lastFinished) + return; + + arrIt = arrayIterator(arr); + } + + while (arrIt.hasNext() || arrCur != NULL) { + if (arrCur == NULL) + arrCur = arrIt.next(); + + write(itemType, arrCur, writer); + + if (!lastFinished) + return; + + arrCur = NULL; + } + + arrIt = null; + } + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType, + MessageWriter writer) { + if (col != null) { + if (it == null) { + writeInt(col.size()); + + if (!lastFinished) + return; + + it = col.iterator(); + } + + while (it.hasNext() || cur != NULL) { + if (cur == NULL) + cur = it.next(); + + write(itemType, cur, writer); + + if (!lastFinished) + return; + + cur = NULL; + } + + it = null; + } + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, + MessageCollectionItemType valType, MessageWriter writer) { + if (map != null) { + if (mapIt == null) { + writeInt(map.size()); + + if (!lastFinished) + return; + + mapIt = map.entrySet().iterator(); + } + + while (mapIt.hasNext() || mapCur != NULL) { + Map.Entry<K, V> e; + + if (mapCur == NULL) + mapCur = mapIt.next(); + + e = (Map.Entry<K, V>)mapCur; + + if (!keyDone) { + write(keyType, e.getKey(), writer); + + if (!lastFinished) + return; + + keyDone = true; + } + + write(valType, e.getValue(), writer); + + if (!lastFinished) + return; + + mapCur = NULL; + keyDone = false; + } + + mapIt = null; + } + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public byte readByte() { + lastFinished = buf.remaining() >= 1; + + if (lastFinished) { + int pos = buf.position(); + + buf.position(pos + 1); + + return UNSAFE.getByte(heapArr, baseOff + pos); + } + else + return 0; + } + + /** {@inheritDoc} */ + @Override public short readShort() { + lastFinished = buf.remaining() >= 2; + + if (lastFinished) { + int pos = buf.position(); + + buf.position(pos + 2); + + return UNSAFE.getShort(heapArr, baseOff + pos); + } + else + return 0; + } + + /** {@inheritDoc} */ + @Override public int readInt() { + lastFinished = buf.remaining() >= 4; + + if (lastFinished) { + int pos = buf.position(); + + buf.position(pos + 4); + + return UNSAFE.getInt(heapArr, baseOff + pos); + } + else + return 0; + } + + /** {@inheritDoc} */ + @Override public long readLong() { + lastFinished = buf.remaining() >= 8; + + if (lastFinished) { + int pos = buf.position(); + + buf.position(pos + 8); + + return UNSAFE.getLong(heapArr, baseOff + pos); + } + else + return 0; + } + + /** {@inheritDoc} */ + @Override public float readFloat() { + lastFinished = buf.remaining() >= 4; + + if (lastFinished) { + int pos = buf.position(); + + buf.position(pos + 4); + + return UNSAFE.getFloat(heapArr, baseOff + pos); + } + else + return 0; + } + + /** {@inheritDoc} */ + @Override public double readDouble() { + lastFinished = buf.remaining() >= 8; + + if (lastFinished) { + int pos = buf.position(); + + buf.position(pos + 8); + + return UNSAFE.getDouble(heapArr, baseOff + pos); + } + else + return 0; + } + + /** {@inheritDoc} */ + @Override public char readChar() { + lastFinished = buf.remaining() >= 2; + + if (lastFinished) { + int pos = buf.position(); + + buf.position(pos + 2); + + return UNSAFE.getChar(heapArr, baseOff + pos); + } + else + return 0; + } + + /** {@inheritDoc} */ + @Override public boolean readBoolean() { + lastFinished = buf.hasRemaining(); + + if (lastFinished) { + int pos = buf.position(); + + buf.position(pos + 1); + + return UNSAFE.getBoolean(heapArr, baseOff + pos); + } + else + return false; + } + + /** {@inheritDoc} */ + @Override public byte[] readByteArray() { + return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public short[] readShortArray() { + return readArray(SHORT_ARR_CREATOR, 1, SHORT_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public int[] readIntArray() { + return readArray(INT_ARR_CREATOR, 2, INT_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public long[] readLongArray() { + return readArray(LONG_ARR_CREATOR, 3, LONG_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public float[] readFloatArray() { + return readArray(FLOAT_ARR_CREATOR, 2, FLOAT_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public double[] readDoubleArray() { + return readArray(DOUBLE_ARR_CREATOR, 3, DOUBLE_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public char[] readCharArray() { + return readArray(CHAR_ARR_CREATOR, 1, CHAR_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public boolean[] readBooleanArray() { + return readArray(BOOLEAN_ARR_CREATOR, 0, BOOLEAN_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public String readString() { + byte[] arr = readByteArray(); + + return arr != null ? new String(arr) : null; + } + + /** {@inheritDoc} */ + @Override public BitSet readBitSet() { + long[] arr = readLongArray(); + + return arr != null ? BitSet.valueOf(arr) : null; + } + + /** {@inheritDoc} */ + @Override public UUID readUuid() { + byte[] arr = readByteArray(); + + return arr != null ? U.bytesToUuid(arr, 0) : null; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid readIgniteUuid() { + byte[] arr = readByteArray(); + + return arr != null ? U.bytesToIgniteUuid(arr, 0) : null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T extends Message> T readMessage(MessageReader reader) { + if (!msgTypeDone) { + if (!buf.hasRemaining()) { + lastFinished = false; + + return null; + } + + byte type = readByte(); + + msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type); + + msgTypeDone = true; + } + + if (msg != null) { + try { + reader.beforeInnerMessageRead(); + + reader.setCurrentReadClass(msg.getClass()); + + lastFinished = msg.readFrom(buf, reader); + } + finally { + reader.afterInnerMessageRead(lastFinished); + } + } + else + lastFinished = true; + + if (lastFinished) { + Message msg0 = msg; + + msgTypeDone = false; + msg = null; + + return (T)msg0; + } + else + return null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls, + MessageReader reader) { + if (readSize == -1) { + int size = readInt(); + + if (!lastFinished) + return null; + + readSize = size; + } + + if (readSize >= 0) { + if (objArr == null) + objArr = itemCls != null ? (Object[])Array.newInstance(itemCls, readSize) : new Object[readSize]; + + for (int i = readItems; i < readSize; i++) { + Object item = read(itemType, reader); + + if (!lastFinished) + return null; + + objArr[i] = item; + + readItems++; + } + } + + readSize = -1; + readItems = 0; + cur = null; + + T[] objArr0 = (T[])objArr; + + objArr = null; + + return objArr0; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, + MessageReader reader) { + if (readSize == -1) { + int size = readInt(); + + if (!lastFinished) + return null; + + readSize = size; + } + + if (readSize >= 0) { + if (col == null) + col = new ArrayList<>(readSize); + + for (int i = readItems; i < readSize; i++) { + Object item = read(itemType, reader); + + if (!lastFinished) + return null; + + col.add(item); + + readItems++; + } + } + + readSize = -1; + readItems = 0; + cur = null; + + C col0 = (C)col; + + col = null; + + return col0; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, + MessageCollectionItemType valType, boolean linked, MessageReader reader) { + if (readSize == -1) { + int size = readInt(); + + if (!lastFinished) + return null; + + readSize = size; + } + + if (readSize >= 0) { + if (map == null) + map = linked ? U.newLinkedHashMap(readSize) : U.newHashMap(readSize); + + for (int i = readItems; i < readSize; i++) { + if (!keyDone) { + Object key = read(keyType, reader); + + if (!lastFinished) + return null; + + mapCur = key; + keyDone = true; + } + + Object val = read(valType, reader); + + if (!lastFinished) + return null; + + map.put(mapCur, val); + + keyDone = false; + + readItems++; + } + } + + readSize = -1; + readItems = 0; + mapCur = null; + + M map0 = (M)map; + + map = null; + + return map0; + } + + /** + * @param arr Array. + * @param off Offset. + * @param len Length. + * @param bytes Length in bytes. + * @return Whether array was fully written + */ + private boolean writeArray(Object arr, long off, int len, int bytes) { + assert arr != null; + assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive(); + assert off > 0; + assert len >= 0; + assert bytes >= 0; + assert bytes >= arrOff; + + if (arrOff == -1) { + if (buf.remaining() < 4) + return false; + + writeInt(len); + + arrOff = 0; + } + + int toWrite = bytes - arrOff; + int pos = buf.position(); + int remaining = buf.remaining(); + + if (toWrite <= remaining) { + UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite); + + pos += toWrite; + + buf.position(pos); + + arrOff = -1; + + return true; + } + else { + UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining); + + pos += remaining; + + buf.position(pos); + + arrOff += remaining; + + return false; + } + } + + /** + * @param creator Array creator. + * @param lenShift Array length shift size. + * @param off Base offset. + * @return Array or special value if it was not fully read. + */ + @SuppressWarnings("unchecked") + private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) { + assert creator != null; + + if (tmpArr == null) { + if (buf.remaining() < 4) { + lastFinished = false; + + return null; + } + + int len = readInt(); + + switch (len) { + case -1: + lastFinished = true; + + return null; + + case 0: + lastFinished = true; + + return creator.create(0); + + default: + tmpArr = creator.create(len); + tmpArrBytes = len << lenShift; + } + } + + int toRead = tmpArrBytes - tmpArrOff; + int remaining = buf.remaining(); + int pos = buf.position(); + + lastFinished = toRead <= remaining; + + if (lastFinished) { + UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead); + + buf.position(pos + toRead); + + T arr = (T)tmpArr; + + tmpArr = null; + tmpArrBytes = 0; + tmpArrOff = 0; + + return arr; + } + else { + UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining); + + buf.position(pos + remaining); + + tmpArrOff += remaining; + + return null; + } + } + + /** + * @param type Type. + * @param val Value. + * @param writer Writer. + */ + private void write(MessageCollectionItemType type, Object val, MessageWriter writer) { + switch (type) { + case BYTE: + writeByte((Byte)val); + + break; + + case SHORT: + writeShort((Short)val); + + break; + + case INT: + writeInt((Integer)val); + + break; + + case LONG: + writeLong((Long)val); + + break; + + case FLOAT: + writeFloat((Float)val); + + break; + + case DOUBLE: + writeDouble((Double)val); + + break; + + case CHAR: + writeChar((Character)val); + + break; + + case BOOLEAN: + writeBoolean((Boolean)val); + + break; + + case BYTE_ARR: + writeByteArray((byte[])val); + + break; + + case SHORT_ARR: + writeShortArray((short[])val); + + break; + + case INT_ARR: + writeIntArray((int[])val); + + break; + + case LONG_ARR: + writeLongArray((long[])val); + + break; + + case FLOAT_ARR: + writeFloatArray((float[])val); + + break; + + case DOUBLE_ARR: + writeDoubleArray((double[])val); + + break; + + case CHAR_ARR: + writeCharArray((char[])val); + + break; + + case BOOLEAN_ARR: + writeBooleanArray((boolean[])val); + + break; + + case STRING: + writeString((String)val); + + break; + + case BIT_SET: + writeBitSet((BitSet)val); + + break; + + case UUID: + writeUuid((UUID)val); + + break; + + case IGNITE_UUID: + writeIgniteUuid((IgniteUuid)val); + + break; + + case MSG: + try { + if (val != null) + writer.beforeInnerMessageWrite(); + + writeMessage((Message)val, writer); + } + finally { + if (val != null) + writer.afterInnerMessageWrite(lastFinished); + } + + break; + + default: + throw new IllegalArgumentException("Unknown type: " + type); + } + } + + /** + * @param type Type. + * @param reader Reader. + * @return Value. + */ + private Object read(MessageCollectionItemType type, MessageReader reader) { + switch (type) { + case BYTE: + return readByte(); + + case SHORT: + return readShort(); + + case INT: + return readInt(); + + case LONG: + return readLong(); + + case FLOAT: + return readFloat(); + + case DOUBLE: + return readDouble(); + + case CHAR: + return readChar(); + + case BOOLEAN: + return readBoolean(); + + case BYTE_ARR: + return readByteArray(); + + case SHORT_ARR: + return readShortArray(); + + case INT_ARR: + return readIntArray(); + + case LONG_ARR: + return readLongArray(); + + case FLOAT_ARR: + return readFloatArray(); + + case DOUBLE_ARR: + return readDoubleArray(); + + case CHAR_ARR: + return readCharArray(); + + case BOOLEAN_ARR: + return readBooleanArray(); + + case STRING: + return readString(); + + case BIT_SET: + return readBitSet(); + + case UUID: + return readUuid(); + + case IGNITE_UUID: + return readIgniteUuid(); + + case MSG: + return readMessage(reader); + + default: + throw new IllegalArgumentException("Unknown type: " + type); + } + } + + /** + * @param arr Array. + * @return Array iterator. + */ + private Iterator<?> arrayIterator(final Object[] arr) { + return new Iterator<Object>() { + private int idx; + + @Override public boolean hasNext() { + return idx < arr.length; + } + + @Override public Object next() { + if (!hasNext()) + throw new NoSuchElementException(); + + return arr[idx++]; + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + /** + * Array creator. + */ + private static interface ArrayCreator<T> { + /** + * @param len Array length or {@code -1} if array was not fully read. + * @return New array. + */ + public T create(int len); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java new file mode 100644 index 0000000..89c9cc6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java @@ -0,0 +1,1583 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.direct.stream.v2; + +import java.lang.reflect.Array; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.RandomAccess; +import java.util.UUID; +import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +/** + * Direct marshalling I/O stream (version 2). + */ +public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream { + /** */ + private static final Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** */ + private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); + + /** */ + private static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class); + + /** */ + private static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class); + + /** */ + private static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class); + + /** */ + private static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class); + + /** */ + private static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class); + + /** */ + private static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class); + + /** */ + private static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class); + + /** */ + private static final byte[] BYTE_ARR_EMPTY = new byte[0]; + + /** */ + private static final short[] SHORT_ARR_EMPTY = new short[0]; + + /** */ + private static final int[] INT_ARR_EMPTY = U.EMPTY_INTS; + + /** */ + private static final long[] LONG_ARR_EMPTY = U.EMPTY_LONGS; + + /** */ + private static final float[] FLOAT_ARR_EMPTY = new float[0]; + + /** */ + private static final double[] DOUBLE_ARR_EMPTY = new double[0]; + + /** */ + private static final char[] CHAR_ARR_EMPTY = new char[0]; + + /** */ + private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0]; + + /** */ + private static final ArrayCreator<byte[]> BYTE_ARR_CREATOR = new ArrayCreator<byte[]>() { + @Override public byte[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return BYTE_ARR_EMPTY; + + default: + return new byte[len]; + } + } + }; + + /** */ + private static final ArrayCreator<short[]> SHORT_ARR_CREATOR = new ArrayCreator<short[]>() { + @Override public short[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return SHORT_ARR_EMPTY; + + default: + return new short[len]; + } + } + }; + + /** */ + private static final ArrayCreator<int[]> INT_ARR_CREATOR = new ArrayCreator<int[]>() { + @Override public int[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return INT_ARR_EMPTY; + + default: + return new int[len]; + } + } + }; + + /** */ + private static final ArrayCreator<long[]> LONG_ARR_CREATOR = new ArrayCreator<long[]>() { + @Override public long[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return LONG_ARR_EMPTY; + + default: + return new long[len]; + } + } + }; + + /** */ + private static final ArrayCreator<float[]> FLOAT_ARR_CREATOR = new ArrayCreator<float[]>() { + @Override public float[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return FLOAT_ARR_EMPTY; + + default: + return new float[len]; + } + } + }; + + /** */ + private static final ArrayCreator<double[]> DOUBLE_ARR_CREATOR = new ArrayCreator<double[]>() { + @Override public double[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return DOUBLE_ARR_EMPTY; + + default: + return new double[len]; + } + } + }; + + /** */ + private static final ArrayCreator<char[]> CHAR_ARR_CREATOR = new ArrayCreator<char[]>() { + @Override public char[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return CHAR_ARR_EMPTY; + + default: + return new char[len]; + } + } + }; + + /** */ + private static final ArrayCreator<boolean[]> BOOLEAN_ARR_CREATOR = new ArrayCreator<boolean[]>() { + @Override public boolean[] create(int len) { + assert len >= 0; + + switch (len) { + case 0: + return BOOLEAN_ARR_EMPTY; + + default: + return new boolean[len]; + } + } + }; + + /** */ + private static final Object NULL = new Object(); + + /** */ + private final MessageFactory msgFactory; + + /** */ + private ByteBuffer buf; + + /** */ + private byte[] heapArr; + + /** */ + private long baseOff; + + /** */ + private int arrOff = -1; + + /** */ + private Object tmpArr; + + /** */ + private int tmpArrOff; + + /** */ + private int tmpArrBytes; + + /** */ + private boolean msgTypeDone; + + /** */ + private Message msg; + + /** */ + private Iterator<?> mapIt; + + /** */ + private Iterator<?> it; + + /** */ + private int arrPos = -1; + + /** */ + private Object arrCur = NULL; + + /** */ + private Object mapCur = NULL; + + /** */ + private Object cur = NULL; + + /** */ + private boolean keyDone; + + /** */ + private int readSize = -1; + + /** */ + private int readItems; + + /** */ + private Object[] objArr; + + /** */ + private Collection<Object> col; + + /** */ + private Map<Object, Object> map; + + /** */ + private long prim; + + /** */ + private int primShift; + + /** */ + private int uuidState; + + /** */ + private long uuidMost; + + /** */ + private long uuidLeast; + + /** */ + private long uuidLocId; + + /** */ + private boolean lastFinished; + + /** + * @param msgFactory Message factory. + */ + public DirectByteBufferStreamImplV2(MessageFactory msgFactory) { + this.msgFactory = msgFactory; + } + + /** {@inheritDoc} */ + @Override public void setBuffer(ByteBuffer buf) { + assert buf != null; + + if (this.buf != buf) { + this.buf = buf; + + heapArr = buf.isDirect() ? null : buf.array(); + baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : BYTE_ARR_OFF; + } + } + + /** {@inheritDoc} */ + @Override public int remaining() { + return buf.remaining(); + } + + /** {@inheritDoc} */ + @Override public boolean lastFinished() { + return lastFinished; + } + + /** {@inheritDoc} */ + @Override public void writeByte(byte val) { + lastFinished = buf.remaining() >= 1; + + if (lastFinished) { + int pos = buf.position(); + + UNSAFE.putByte(heapArr, baseOff + pos, val); + + buf.position(pos + 1); + } + } + + /** {@inheritDoc} */ + @Override public void writeShort(short val) { + lastFinished = buf.remaining() >= 2; + + if (lastFinished) { + int pos = buf.position(); + + UNSAFE.putShort(heapArr, baseOff + pos, val); + + buf.position(pos + 2); + } + } + + /** {@inheritDoc} */ + @Override public void writeInt(int val) { + lastFinished = buf.remaining() >= 5; + + if (lastFinished) { + if (val == Integer.MAX_VALUE) + val = Integer.MIN_VALUE; + else + val++; + + int pos = buf.position(); + + while ((val & 0xFFFF_FF80) != 0) { + byte b = (byte)(val | 0x80); + + UNSAFE.putByte(heapArr, baseOff + pos++, b); + + val >>>= 7; + } + + UNSAFE.putByte(heapArr, baseOff + pos++, (byte)val); + + buf.position(pos); + } + } + + /** {@inheritDoc} */ + @Override public void writeLong(long val) { + lastFinished = buf.remaining() >= 10; + + if (lastFinished) { + if (val == Long.MAX_VALUE) + val = Long.MIN_VALUE; + else + val++; + + int pos = buf.position(); + + while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) { + byte b = (byte)(val | 0x80); + + UNSAFE.putByte(heapArr, baseOff + pos++, b); + + val >>>= 7; + } + + UNSAFE.putByte(heapArr, baseOff + pos++, (byte)val); + + buf.position(pos); + } + } + + /** {@inheritDoc} */ + @Override public void writeFloat(float val) { + lastFinished = buf.remaining() >= 4; + + if (lastFinished) { + int pos = buf.position(); + + UNSAFE.putFloat(heapArr, baseOff + pos, val); + + buf.position(pos + 4); + } + } + + /** {@inheritDoc} */ + @Override public void writeDouble(double val) { + lastFinished = buf.remaining() >= 8; + + if (lastFinished) { + int pos = buf.position(); + + UNSAFE.putDouble(heapArr, baseOff + pos, val); + + buf.position(pos + 8); + } + } + + /** {@inheritDoc} */ + @Override public void writeChar(char val) { + lastFinished = buf.remaining() >= 2; + + if (lastFinished) { + int pos = buf.position(); + + UNSAFE.putChar(heapArr, baseOff + pos, val); + + buf.position(pos + 2); + } + } + + /** {@inheritDoc} */ + @Override public void writeBoolean(boolean val) { + lastFinished = buf.remaining() >= 1; + + if (lastFinished) { + int pos = buf.position(); + + UNSAFE.putBoolean(heapArr, baseOff + pos, val); + + buf.position(pos + 1); + } + } + + /** {@inheritDoc} */ + @Override public void writeByteArray(byte[] val) { + if (val != null) + lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeByteArray(byte[] val, long off, int len) { + if (val != null) + lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeShortArray(short[] val) { + if (val != null) + lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeIntArray(int[] val) { + if (val != null) + lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeLongArray(long[] val) { + if (val != null) + lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeFloatArray(float[] val) { + if (val != null) + lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeDoubleArray(double[] val) { + if (val != null) + lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeCharArray(char[] val) { + if (val != null) + lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeBooleanArray(boolean[] val) { + if (val != null) + lastFinished = writeArray(val, BOOLEAN_ARR_OFF, val.length, val.length); + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void writeString(String val) { + writeByteArray(val != null ? val.getBytes() : null); + } + + /** {@inheritDoc} */ + @Override public void writeBitSet(BitSet val) { + writeLongArray(val != null ? val.toLongArray() : null); + } + + /** {@inheritDoc} */ + @Override public void writeUuid(UUID val) { + switch (uuidState) { + case 0: + writeBoolean(val == null); + + if (!lastFinished || val == null) + return; + + uuidState++; + + case 1: + writeLong(val.getMostSignificantBits()); + + if (!lastFinished) + return; + + uuidState++; + + case 2: + writeLong(val.getLeastSignificantBits()); + + if (!lastFinished) + return; + + uuidState = 0; + } + } + + /** {@inheritDoc} */ + @Override public void writeIgniteUuid(IgniteUuid val) { + switch (uuidState) { + case 0: + writeBoolean(val == null); + + if (!lastFinished || val == null) + return; + + uuidState++; + + case 1: + writeLong(val.globalId().getMostSignificantBits()); + + if (!lastFinished) + return; + + uuidState++; + + case 2: + writeLong(val.globalId().getLeastSignificantBits()); + + if (!lastFinished) + return; + + uuidState++; + + case 3: + writeLong(val.localId()); + + if (!lastFinished) + return; + + uuidState = 0; + } + } + + /** {@inheritDoc} */ + @Override public void writeMessage(Message msg, MessageWriter writer) { + if (msg != null) { + if (buf.hasRemaining()) { + try { + writer.beforeInnerMessageWrite(); + + lastFinished = msg.writeTo(buf, writer); + } + finally { + writer.afterInnerMessageWrite(lastFinished); + } + } + else + lastFinished = false; + } + else + writeByte(Byte.MIN_VALUE); + } + + /** {@inheritDoc} */ + @Override public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, + MessageWriter writer) { + if (arr != null) { + int len = arr.length; + + if (arrPos == -1) { + writeInt(len); + + if (!lastFinished) + return; + + arrPos = 0; + } + + while (arrPos < len || arrCur != NULL) { + if (arrCur == NULL) + arrCur = arr[arrPos++]; + + write(itemType, arrCur, writer); + + if (!lastFinished) + return; + + arrCur = NULL; + } + + arrPos = -1; + } + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType, + MessageWriter writer) { + if (col != null) { + if (col instanceof List && col instanceof RandomAccess) + writeRandomAccessList((List<T>)col, itemType, writer); + else { + if (it == null) { + writeInt(col.size()); + + if (!lastFinished) + return; + + it = col.iterator(); + } + + while (it.hasNext() || cur != NULL) { + if (cur == NULL) + cur = it.next(); + + write(itemType, cur, writer); + + if (!lastFinished) + return; + + cur = NULL; + } + + it = null; + } + } + else + writeInt(-1); + } + + /** + * @param list List. + * @param itemType Component type. + * @param writer Writer. + */ + private <T> void writeRandomAccessList(List<T> list, MessageCollectionItemType itemType, MessageWriter writer) { + assert list instanceof RandomAccess; + + int size = list.size(); + + if (arrPos == -1) { + writeInt(size); + + if (!lastFinished) + return; + + arrPos = 0; + } + + while (arrPos < size || arrCur != NULL) { + if (arrCur == NULL) + arrCur = list.get(arrPos++); + + write(itemType, arrCur, writer); + + if (!lastFinished) + return; + + arrCur = NULL; + } + + arrPos = -1; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, + MessageCollectionItemType valType, MessageWriter writer) { + if (map != null) { + if (mapIt == null) { + writeInt(map.size()); + + if (!lastFinished) + return; + + mapIt = map.entrySet().iterator(); + } + + while (mapIt.hasNext() || mapCur != NULL) { + Map.Entry<K, V> e; + + if (mapCur == NULL) + mapCur = mapIt.next(); + + e = (Map.Entry<K, V>)mapCur; + + if (!keyDone) { + write(keyType, e.getKey(), writer); + + if (!lastFinished) + return; + + keyDone = true; + } + + write(valType, e.getValue(), writer); + + if (!lastFinished) + return; + + mapCur = NULL; + keyDone = false; + } + + mapIt = null; + } + else + writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public byte readByte() { + lastFinished = buf.remaining() >= 1; + + if (lastFinished) { + int pos = buf.position(); + + buf.position(pos + 1); + + return UNSAFE.getByte(heapArr, baseOff + pos); + } + else + return 0; + } + + /** {@inheritDoc} */ + @Override public short readShort() { + lastFinished = buf.remaining() >= 2; + + if (lastFinished) { + int pos = buf.position(); + + buf.position(pos + 2); + + return UNSAFE.getShort(heapArr, baseOff + pos); + } + else + return 0; + } + + /** {@inheritDoc} */ + @Override public int readInt() { + lastFinished = false; + + int val = 0; + + while (buf.hasRemaining()) { + int pos = buf.position(); + + byte b = UNSAFE.getByte(heapArr, baseOff + pos); + + buf.position(pos + 1); + + prim |= ((long)b & 0x7F) << (7 * primShift); + + if ((b & 0x80) == 0) { + lastFinished = true; + + val = (int)prim; + + if (val == Integer.MIN_VALUE) + val = Integer.MAX_VALUE; + else + val--; + + prim = 0; + primShift = 0; + + break; + } + else + primShift++; + } + + return val; + } + + /** {@inheritDoc} */ + @Override public long readLong() { + lastFinished = false; + + long val = 0; + + while (buf.hasRemaining()) { + int pos = buf.position(); + + byte b = UNSAFE.getByte(heapArr, baseOff + pos); + + buf.position(pos + 1); + + prim |= ((long)b & 0x7F) << (7 * primShift); + + if ((b & 0x80) == 0) { + lastFinished = true; + + val = prim; + + if (val == Long.MIN_VALUE) + val = Long.MAX_VALUE; + else + val--; + + prim = 0; + primShift = 0; + + break; + } + else + primShift++; + } + + return val; + } + + /** {@inheritDoc} */ + @Override public float readFloat() { + lastFinished = buf.remaining() >= 4; + + if (lastFinished) { + int pos = buf.position(); + + buf.position(pos + 4); + + return UNSAFE.getFloat(heapArr, baseOff + pos); + } + else + return 0; + } + + /** {@inheritDoc} */ + @Override public double readDouble() { + lastFinished = buf.remaining() >= 8; + + if (lastFinished) { + int pos = buf.position(); + + buf.position(pos + 8); + + return UNSAFE.getDouble(heapArr, baseOff + pos); + } + else + return 0; + } + + /** {@inheritDoc} */ + @Override public char readChar() { + lastFinished = buf.remaining() >= 2; + + if (lastFinished) { + int pos = buf.position(); + + buf.position(pos + 2); + + return UNSAFE.getChar(heapArr, baseOff + pos); + } + else + return 0; + } + + /** {@inheritDoc} */ + @Override public boolean readBoolean() { + lastFinished = buf.hasRemaining(); + + if (lastFinished) { + int pos = buf.position(); + + buf.position(pos + 1); + + return UNSAFE.getBoolean(heapArr, baseOff + pos); + } + else + return false; + } + + /** {@inheritDoc} */ + @Override public byte[] readByteArray() { + return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public short[] readShortArray() { + return readArray(SHORT_ARR_CREATOR, 1, SHORT_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public int[] readIntArray() { + return readArray(INT_ARR_CREATOR, 2, INT_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public long[] readLongArray() { + return readArray(LONG_ARR_CREATOR, 3, LONG_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public float[] readFloatArray() { + return readArray(FLOAT_ARR_CREATOR, 2, FLOAT_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public double[] readDoubleArray() { + return readArray(DOUBLE_ARR_CREATOR, 3, DOUBLE_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public char[] readCharArray() { + return readArray(CHAR_ARR_CREATOR, 1, CHAR_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public boolean[] readBooleanArray() { + return readArray(BOOLEAN_ARR_CREATOR, 0, BOOLEAN_ARR_OFF); + } + + /** {@inheritDoc} */ + @Override public String readString() { + byte[] arr = readByteArray(); + + return arr != null ? new String(arr) : null; + } + + /** {@inheritDoc} */ + @Override public BitSet readBitSet() { + long[] arr = readLongArray(); + + return arr != null ? BitSet.valueOf(arr) : null; + } + + /** {@inheritDoc} */ + @Override public UUID readUuid() { + switch (uuidState) { + case 0: + boolean isNull = readBoolean(); + + if (!lastFinished || isNull) + return null; + + uuidState++; + + case 1: + uuidMost = readLong(); + + if (!lastFinished) + return null; + + uuidState++; + + case 2: + uuidLeast = readLong(); + + if (!lastFinished) + return null; + + uuidState = 0; + } + + UUID val = new UUID(uuidMost, uuidLeast); + + uuidMost = 0; + uuidLeast = 0; + + return val; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid readIgniteUuid() { + switch (uuidState) { + case 0: + boolean isNull = readBoolean(); + + if (!lastFinished || isNull) + return null; + + uuidState++; + + case 1: + uuidMost = readLong(); + + if (!lastFinished) + return null; + + uuidState++; + + case 2: + uuidLeast = readLong(); + + if (!lastFinished) + return null; + + uuidState++; + + case 3: + uuidLocId = readLong(); + + if (!lastFinished) + return null; + + uuidState = 0; + } + + IgniteUuid val = new IgniteUuid(new UUID(uuidMost, uuidLeast), uuidLocId); + + uuidMost = 0; + uuidLeast = 0; + uuidLocId = 0; + + return val; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T extends Message> T readMessage(MessageReader reader) { + if (!msgTypeDone) { + if (!buf.hasRemaining()) { + lastFinished = false; + + return null; + } + + byte type = readByte(); + + msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type); + + msgTypeDone = true; + } + + if (msg != null) { + try { + reader.beforeInnerMessageRead(); + + reader.setCurrentReadClass(msg.getClass()); + + lastFinished = msg.readFrom(buf, reader); + } + finally { + reader.afterInnerMessageRead(lastFinished); + } + } + else + lastFinished = true; + + if (lastFinished) { + Message msg0 = msg; + + msgTypeDone = false; + msg = null; + + return (T)msg0; + } + else + return null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls, + MessageReader reader) { + if (readSize == -1) { + int size = readInt(); + + if (!lastFinished) + return null; + + readSize = size; + } + + if (readSize >= 0) { + if (objArr == null) + objArr = itemCls != null ? (Object[])Array.newInstance(itemCls, readSize) : new Object[readSize]; + + for (int i = readItems; i < readSize; i++) { + Object item = read(itemType, reader); + + if (!lastFinished) + return null; + + objArr[i] = item; + + readItems++; + } + } + + readSize = -1; + readItems = 0; + cur = null; + + T[] objArr0 = (T[])objArr; + + objArr = null; + + return objArr0; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, + MessageReader reader) { + if (readSize == -1) { + int size = readInt(); + + if (!lastFinished) + return null; + + readSize = size; + } + + if (readSize >= 0) { + if (col == null) + col = new ArrayList<>(readSize); + + for (int i = readItems; i < readSize; i++) { + Object item = read(itemType, reader); + + if (!lastFinished) + return null; + + col.add(item); + + readItems++; + } + } + + readSize = -1; + readItems = 0; + cur = null; + + C col0 = (C)col; + + col = null; + + return col0; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, + MessageCollectionItemType valType, boolean linked, MessageReader reader) { + if (readSize == -1) { + int size = readInt(); + + if (!lastFinished) + return null; + + readSize = size; + } + + if (readSize >= 0) { + if (map == null) + map = linked ? U.newLinkedHashMap(readSize) : U.newHashMap(readSize); + + for (int i = readItems; i < readSize; i++) { + if (!keyDone) { + Object key = read(keyType, reader); + + if (!lastFinished) + return null; + + mapCur = key; + keyDone = true; + } + + Object val = read(valType, reader); + + if (!lastFinished) + return null; + + map.put(mapCur, val); + + keyDone = false; + + readItems++; + } + } + + readSize = -1; + readItems = 0; + mapCur = null; + + M map0 = (M)map; + + map = null; + + return map0; + } + + /** + * @param arr Array. + * @param off Offset. + * @param len Length. + * @param bytes Length in bytes. + * @return Whether array was fully written. + */ + private boolean writeArray(Object arr, long off, int len, int bytes) { + assert arr != null; + assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive(); + assert off > 0; + assert len >= 0; + assert bytes >= 0; + assert bytes >= arrOff; + + if (arrOff == -1) { + writeInt(len); + + if (!lastFinished) + return false; + + arrOff = 0; + } + + int toWrite = bytes - arrOff; + int pos = buf.position(); + int remaining = buf.remaining(); + + if (toWrite <= remaining) { + if (toWrite > 0) { + UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite); + + buf.position(pos + toWrite); + } + + arrOff = -1; + + return true; + } + else { + if (remaining > 0) { + UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining); + + buf.position(pos + remaining); + + arrOff += remaining; + } + + return false; + } + } + + /** + * @param creator Array creator. + * @param lenShift Array length shift size. + * @param off Base offset. + * @return Array or special value if it was not fully read. + */ + @SuppressWarnings("unchecked") + private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) { + assert creator != null; + + if (tmpArr == null) { + int len = readInt(); + + if (!lastFinished) + return null; + + switch (len) { + case -1: + lastFinished = true; + + return null; + + case 0: + lastFinished = true; + + return creator.create(0); + + default: + tmpArr = creator.create(len); + tmpArrBytes = len << lenShift; + } + } + + int toRead = tmpArrBytes - tmpArrOff; + int remaining = buf.remaining(); + int pos = buf.position(); + + lastFinished = toRead <= remaining; + + if (lastFinished) { + UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead); + + buf.position(pos + toRead); + + T arr = (T)tmpArr; + + tmpArr = null; + tmpArrBytes = 0; + tmpArrOff = 0; + + return arr; + } + else { + UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining); + + buf.position(pos + remaining); + + tmpArrOff += remaining; + + return null; + } + } + + /** + * @param type Type. + * @param val Value. + * @param writer Writer. + */ + private void write(MessageCollectionItemType type, Object val, MessageWriter writer) { + switch (type) { + case BYTE: + writeByte((Byte)val); + + break; + + case SHORT: + writeShort((Short)val); + + break; + + case INT: + writeInt((Integer)val); + + break; + + case LONG: + writeLong((Long)val); + + break; + + case FLOAT: + writeFloat((Float)val); + + break; + + case DOUBLE: + writeDouble((Double)val); + + break; + + case CHAR: + writeChar((Character)val); + + break; + + case BOOLEAN: + writeBoolean((Boolean)val); + + break; + + case BYTE_ARR: + writeByteArray((byte[])val); + + break; + + case SHORT_ARR: + writeShortArray((short[])val); + + break; + + case INT_ARR: + writeIntArray((int[])val); + + break; + + case LONG_ARR: + writeLongArray((long[])val); + + break; + + case FLOAT_ARR: + writeFloatArray((float[])val); + + break; + + case DOUBLE_ARR: + writeDoubleArray((double[])val); + + break; + + case CHAR_ARR: + writeCharArray((char[])val); + + break; + + case BOOLEAN_ARR: + writeBooleanArray((boolean[])val); + + break; + + case STRING: + writeString((String)val); + + break; + + case BIT_SET: + writeBitSet((BitSet)val); + + break; + + case UUID: + writeUuid((UUID)val); + + break; + + case IGNITE_UUID: + writeIgniteUuid((IgniteUuid)val); + + break; + + case MSG: + try { + if (val != null) + writer.beforeInnerMessageWrite(); + + writeMessage((Message)val, writer); + } + finally { + if (val != null) + writer.afterInnerMessageWrite(lastFinished); + } + + break; + + default: + throw new IllegalArgumentException("Unknown type: " + type); + } + } + + /** + * @param type Type. + * @param reader Reader. + * @return Value. + */ + private Object read(MessageCollectionItemType type, MessageReader reader) { + switch (type) { + case BYTE: + return readByte(); + + case SHORT: + return readShort(); + + case INT: + return readInt(); + + case LONG: + return readLong(); + + case FLOAT: + return readFloat(); + + case DOUBLE: + return readDouble(); + + case CHAR: + return readChar(); + + case BOOLEAN: + return readBoolean(); + + case BYTE_ARR: + return readByteArray(); + + case SHORT_ARR: + return readShortArray(); + + case INT_ARR: + return readIntArray(); + + case LONG_ARR: + return readLongArray(); + + case FLOAT_ARR: + return readFloatArray(); + + case DOUBLE_ARR: + return readDoubleArray(); + + case CHAR_ARR: + return readCharArray(); + + case BOOLEAN_ARR: + return readBooleanArray(); + + case STRING: + return readString(); + + case BIT_SET: + return readBitSet(); + + case UUID: + return readUuid(); + + case IGNITE_UUID: + return readIgniteUuid(); + + case MSG: + return readMessage(reader); + + default: + throw new IllegalArgumentException("Unknown type: " + type); + } + } + + /** + * Array creator. + */ + private static interface ArrayCreator<T> { + /** + * @param len Array length or {@code -1} if array was not fully read. + * @return New array. + */ + public T create(int len); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index b8af8da..ea82d7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -17,6 +17,26 @@ package org.apache.ignite.internal.managers.communication; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; @@ -64,27 +84,6 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -113,6 +112,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** Max closed topics to store. */ public static final int MAX_CLOSED_TOPICS = 10240; + /** Direct protocol version attribute name. */ + public static final String DIRECT_PROTO_VER_ATTR = "comm.direct.proto.ver"; + + /** Direct protocol version. */ + public static final byte DIRECT_PROTO_VER = 2; + /** Listeners by topic. */ private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<>(); @@ -266,6 +271,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } }); + ctx.addNodeAttribute(DIRECT_PROTO_VER_ATTR, DIRECT_PROTO_VER); + MessageFormatter[] formatterExt = ctx.plugins().extensions(MessageFormatter.class); if (formatterExt != null && formatterExt.length > 0) { @@ -277,12 +284,17 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } else { formatter = new MessageFormatter() { - @Override public MessageWriter writer() { - return new DirectMessageWriter(); + @Override public MessageWriter writer(UUID rmtNodeId) throws IgniteCheckedException { + assert rmtNodeId != null; + + return new DirectMessageWriter(U.directProtocolVersion(ctx, rmtNodeId)); } - @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) { - return new DirectMessageReader(msgFactory, this); + @Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory) + throws IgniteCheckedException { + assert rmtNodeId != null; + + return new DirectMessageReader(msgFactory, U.directProtocolVersion(ctx, rmtNodeId)); } }; } @@ -2432,4 +2444,4 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return S.toString(DelayedMessage.class, this, super.toString()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 3c1913a..ced0c2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -173,6 +173,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException; import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.mxbean.IgniteStandardMXBean; import org.apache.ignite.internal.processors.cache.GridCacheAttributes; @@ -9302,4 +9303,34 @@ public abstract class IgniteUtils { throw new IgniteInterruptedCheckedException(e); } } + + /** + * Defines which protocol version to use for + * communication with the provided node. + * + * @param ctx Context. + * @param nodeId Node ID. + * @return Protocol version. + * @throws IgniteCheckedException If node doesn't exist. + */ + public static byte directProtocolVersion(GridKernalContext ctx, UUID nodeId) throws IgniteCheckedException { + assert nodeId != null; + + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) + throw new IgniteCheckedException("Failed to define communication protocol version " + + "(has node left topology?): " + nodeId); + + assert !node.isLocal(); + + Byte attr = node.attribute(GridIoManager.DIRECT_PROTO_VER_ATTR); + + byte rmtProtoVer = attr != null ? attr : 1; + + if (rmtProtoVer < GridIoManager.DIRECT_PROTO_VER) + return rmtProtoVer; + else + return GridIoManager.DIRECT_PROTO_VER; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java index e732a79..6820dc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java @@ -30,13 +30,13 @@ import org.apache.ignite.internal.util.nio.GridNioFilterAdapter; import org.apache.ignite.internal.util.nio.GridNioFilterChain; import org.apache.ignite.internal.util.nio.GridNioFinishedFuture; import org.apache.ignite.internal.util.nio.GridNioFuture; +import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory; import org.apache.ignite.internal.util.nio.GridNioMetricsListener; import org.apache.ignite.internal.util.nio.GridNioServerListener; import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.nio.GridNioSessionImpl; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageFormatter; /** * Allows to re-use existing {@link GridNioFilter}s on IPC (specifically shared memory IPC) @@ -65,23 +65,23 @@ public class IpcToNioAdapter<T> { private final GridNioMetricsListener metricsLsnr; /** */ - private final MessageFormatter formatter; + private final GridNioMessageWriterFactory writerFactory; /** * @param metricsLsnr Metrics listener. * @param log Log. * @param endp Endpoint. * @param lsnr Listener. - * @param formatter Message formatter. + * @param writerFactory Writer factory. * @param filters Filters. */ public IpcToNioAdapter(GridNioMetricsListener metricsLsnr, IgniteLogger log, IpcEndpoint endp, - GridNioServerListener<T> lsnr, MessageFormatter formatter, GridNioFilter... filters) { + GridNioServerListener<T> lsnr, GridNioMessageWriterFactory writerFactory, GridNioFilter... filters) { assert metricsLsnr != null; this.metricsLsnr = metricsLsnr; this.endp = endp; - this.formatter = formatter; + this.writerFactory = writerFactory; chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters); ses = new GridNioSessionImpl(chain, null, null, true); @@ -163,7 +163,7 @@ public class IpcToNioAdapter<T> { assert writeBuf.hasArray(); try { - int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf, formatter.writer()); + int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf, writerFactory.writer(ses)); metricsLsnr.onBytesSent(cnt); } @@ -255,4 +255,4 @@ public class IpcToNioAdapter<T> { proceedSessionWriteTimeout(ses); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java index a933916..0de54e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java @@ -94,7 +94,7 @@ public interface GridCommunicationClient { public void sendMessage(byte[] data, int len) throws IgniteCheckedException; /** - * @param nodeId Node ID (provided only if versions of local and remote nodes are different). + * @param nodeId Remote node ID. Provided only for sync clients. * @param msg Message to send. * @param closure Ack closure. * @throws IgniteCheckedException If failed. @@ -107,4 +107,4 @@ public interface GridCommunicationClient { * @return {@code True} if send is asynchronous. */ public boolean async(); -} \ No newline at end of file +}