http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableLazySet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableLazySet.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableLazySet.java new file mode 100644 index 0000000..3548f1f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableLazySet.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.builder; + +import java.util.Collection; +import java.util.Set; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.binary.GridPortableMarshaller; +import org.apache.ignite.internal.binary.GridPortableMarshaller; +import org.apache.ignite.internal.binary.PortableUtils; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * + */ +class PortableLazySet extends PortableAbstractLazyValue { + /** */ + private final int off; + + /** + * @param reader Reader. + * @param size Size. + */ + PortableLazySet(PortableBuilderReader reader, int size) { + super(reader, reader.position() - 1); + + off = reader.position() - 1/* flag */ - 4/* size */ - 1/* col type */; + + assert size >= 0; + + for (int i = 0; i < size; i++) + reader.skipValue(); + } + + /** {@inheritDoc} */ + @Override public void writeTo(BinaryWriterExImpl writer, PortableBuilderSerializer ctx) { + if (val == null) { + int size = reader.readIntPositioned(off + 1); + + int hdrSize = 1 /* flag */ + 4 /* size */ + 1 /* col type */; + writer.write(reader.array(), off, hdrSize); + + reader.position(off + hdrSize); + + for (int i = 0; i < size; i++) { + Object o = reader.parseValue(); + + ctx.writeValue(writer, o); + } + } + else { + Collection<Object> c = (Collection<Object>)val; + + writer.writeByte(GridPortableMarshaller.COL); + writer.writeInt(c.size()); + + byte colType = reader.array()[off + 1 /* flag */ + 4 /* size */]; + writer.writeByte(colType); + + for (Object o : c) + ctx.writeValue(writer, o); + } + } + + /** {@inheritDoc} */ + @Override protected Object init() { + int size = reader.readIntPositioned(off + 1); + + reader.position(off + 1/* flag */ + 4/* size */ + 1/* col type */); + + Set<Object> res = U.newLinkedHashSet(size); + + for (int i = 0; i < size; i++) + res.add(PortableUtils.unwrapLazy(reader.parseValue())); + + return res; + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableLazyValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableLazyValue.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableLazyValue.java new file mode 100644 index 0000000..51c6d7e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableLazyValue.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.builder; + +/** + * + */ +public interface PortableLazyValue extends PortableBuilderSerializationAware { + /** + * @return Value. + */ + public Object value(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableModifiableLazyValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableModifiableLazyValue.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableModifiableLazyValue.java new file mode 100644 index 0000000..b00157e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableModifiableLazyValue.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.builder; + +import org.apache.ignite.internal.binary.BinaryWriterExImpl; + +/** + * + */ +public class PortableModifiableLazyValue extends PortableAbstractLazyValue { + /** */ + protected final int len; + + /** + * @param reader + * @param valOff + * @param len + */ + public PortableModifiableLazyValue(PortableBuilderReader reader, int valOff, int len) { + super(reader, valOff); + + this.len = len; + } + + /** {@inheritDoc} */ + @Override protected Object init() { + return reader.reader().unmarshal(valOff); + } + + /** {@inheritDoc} */ + @Override public void writeTo(BinaryWriterExImpl writer, PortableBuilderSerializer ctx) { + if (val == null) + writer.write(reader.array(), valOff, len); + else + writer.writeObject(val); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableObjectArrayLazyValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableObjectArrayLazyValue.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableObjectArrayLazyValue.java new file mode 100644 index 0000000..537a25f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableObjectArrayLazyValue.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.builder; + +import org.apache.ignite.internal.binary.GridPortableMarshaller; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.binary.BinaryInvalidTypeException; + +/** + * + */ +class PortableObjectArrayLazyValue extends PortableAbstractLazyValue { + /** */ + private Object[] lazyValsArr; + + /** */ + private int compTypeId; + + /** */ + private String clsName; + + /** + * @param reader Reader. + */ + protected PortableObjectArrayLazyValue(PortableBuilderReader reader) { + super(reader, reader.position() - 1); + + int typeId = reader.readInt(); + + if (typeId == GridPortableMarshaller.UNREGISTERED_TYPE_ID) { + clsName = reader.readString(); + + Class cls; + + try { + // TODO: IGNITE-1272 - Is class loader needed here? + cls = U.forName(reader.readString(), null); + } + catch (ClassNotFoundException e) { + throw new BinaryInvalidTypeException("Failed to load the class: " + clsName, e); + } + + compTypeId = reader.portableContext().descriptorForClass(cls, true).typeId(); + } + else { + compTypeId = typeId; + clsName = null; + } + + int size = reader.readInt(); + + lazyValsArr = new Object[size]; + + for (int i = 0; i < size; i++) + lazyValsArr[i] = reader.parseValue(); + } + + /** {@inheritDoc} */ + @Override protected Object init() { + for (int i = 0; i < lazyValsArr.length; i++) { + if (lazyValsArr[i] instanceof PortableLazyValue) + lazyValsArr[i] = ((PortableLazyValue)lazyValsArr[i]).value(); + } + + return lazyValsArr; + } + + /** {@inheritDoc} */ + @Override public void writeTo(BinaryWriterExImpl writer, PortableBuilderSerializer ctx) { + if (clsName == null) + ctx.writeArray(writer, GridPortableMarshaller.OBJ_ARR, lazyValsArr, compTypeId); + else + ctx.writeArray(writer, GridPortableMarshaller.OBJ_ARR, lazyValsArr, clsName); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortablePlainLazyValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortablePlainLazyValue.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortablePlainLazyValue.java new file mode 100644 index 0000000..f572415 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortablePlainLazyValue.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.builder; + +import org.apache.ignite.internal.binary.BinaryWriterExImpl; + +/** + * + */ +class PortablePlainLazyValue extends PortableAbstractLazyValue { + /** */ + protected final int len; + + /** + * @param reader Reader + * @param valOff Offset + * @param len Length. + */ + protected PortablePlainLazyValue(PortableBuilderReader reader, int valOff, int len) { + super(reader, valOff); + + this.len = len; + } + + /** {@inheritDoc} */ + @Override protected Object init() { + return reader.reader().unmarshal(valOff); + } + + /** {@inheritDoc} */ + @Override public void writeTo(BinaryWriterExImpl writer, PortableBuilderSerializer ctx) { + writer.write(reader.array(), valOff, len); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortablePlainPortableObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortablePlainPortableObject.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortablePlainPortableObject.java new file mode 100644 index 0000000..3b77a52 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortablePlainPortableObject.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.builder; + +import org.apache.ignite.internal.binary.BinaryObjectImpl; +import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.binary.BinaryObjectImpl; +import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.binary.BinaryObject; + +/** + * + */ +public class PortablePlainPortableObject implements PortableLazyValue { + /** */ + private final BinaryObject portableObj; + + /** + * @param portableObj Portable object. + */ + public PortablePlainPortableObject(BinaryObject portableObj) { + this.portableObj = portableObj; + } + + /** {@inheritDoc} */ + @Override public Object value() { + return portableObj; + } + + /** {@inheritDoc} */ + @Override public void writeTo(BinaryWriterExImpl writer, PortableBuilderSerializer ctx) { + BinaryObject val = portableObj; + + if (val instanceof BinaryObjectOffheapImpl) + val = ((BinaryObjectOffheapImpl)val).heapCopy(); + + writer.doWritePortableObject((BinaryObjectImpl)val); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableValueWithType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableValueWithType.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableValueWithType.java new file mode 100644 index 0000000..6c5ddfe --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableValueWithType.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.builder; + +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +class PortableValueWithType implements PortableLazyValue { + /** */ + private byte type; + + /** */ + private Object val; + + /** + * @param type Type + * @param val Value. + */ + PortableValueWithType(byte type, Object val) { + this.type = type; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public void writeTo(BinaryWriterExImpl writer, PortableBuilderSerializer ctx) { + if (val instanceof PortableBuilderSerializationAware) + ((PortableBuilderSerializationAware)val).writeTo(writer, ctx); + else + ctx.writeValue(writer, val); + } + + /** + * @return Type ID. + */ + public int typeId() { + return type; + } + + /** {@inheritDoc} */ + @Override public Object value() { + if (val instanceof PortableLazyValue) + return ((PortableLazyValue)val).value(); + + return val; + } + + /** + * @param val New value. + */ + public void value(Object val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PortableValueWithType.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/package-info.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/package-info.java new file mode 100644 index 0000000..f2c4b55 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains classes specific to portable builder API. + */ +package org.apache.ignite.internal.binary.builder; http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/package-info.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/package-info.java new file mode 100644 index 0000000..4bb0fb1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains portable APIs internal implementation. + */ +package org.apache.ignite.internal.binary; http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractInputStream.java new file mode 100644 index 0000000..9d36b47 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractInputStream.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +import org.apache.ignite.binary.BinaryObjectException; + +/** + * Portable abstract input stream. + */ +public abstract class PortableAbstractInputStream extends PortableAbstractStream + implements PortableInputStream { + /** Length of data inside array. */ + protected int len; + + /** {@inheritDoc} */ + @Override public byte readByte() { + ensureEnoughData(1); + + return readByteAndShift(); + } + + /** {@inheritDoc} */ + @Override public byte[] readByteArray(int cnt) { + ensureEnoughData(cnt); + + byte[] res = new byte[cnt]; + + copyAndShift(res, BYTE_ARR_OFF, cnt); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean readBoolean() { + return readByte() == BYTE_ONE; + } + + /** {@inheritDoc} */ + @Override public boolean[] readBooleanArray(int cnt) { + ensureEnoughData(cnt); + + boolean[] res = new boolean[cnt]; + + copyAndShift(res, BOOLEAN_ARR_OFF, cnt); + + return res; + } + + /** {@inheritDoc} */ + @Override public short readShort() { + ensureEnoughData(2); + + short res = readShortFast(); + + shift(2); + + if (!LITTLE_ENDIAN) + res = Short.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public short[] readShortArray(int cnt) { + int len = cnt << 1; + + ensureEnoughData(len); + + short[] res = new short[cnt]; + + copyAndShift(res, SHORT_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Short.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public char readChar() { + ensureEnoughData(2); + + char res = readCharFast(); + + shift(2); + + if (!LITTLE_ENDIAN) + res = Character.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public char[] readCharArray(int cnt) { + int len = cnt << 1; + + ensureEnoughData(len); + + char[] res = new char[cnt]; + + copyAndShift(res, CHAR_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Character.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public int readInt() { + ensureEnoughData(4); + + int res = readIntFast(); + + shift(4); + + if (!LITTLE_ENDIAN) + res = Integer.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public int[] readIntArray(int cnt) { + int len = cnt << 2; + + ensureEnoughData(len); + + int[] res = new int[cnt]; + + copyAndShift(res, INT_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Integer.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public byte readBytePositioned(int pos) { + int delta = pos + 1 - this.pos; + + if (delta > 0) + ensureEnoughData(delta); + + return readBytePositioned0(pos); + } + + /** {@inheritDoc} */ + @Override public short readShortPositioned(int pos) { + int delta = pos + 2 - this.pos; + + if (delta > 0) + ensureEnoughData(delta); + + return readShortPositioned0(pos); + } + + /** {@inheritDoc} */ + @Override public int readIntPositioned(int pos) { + int delta = pos + 4 - this.pos; + + if (delta > 0) + ensureEnoughData(delta); + + return readIntPositioned0(pos); + } + + /** {@inheritDoc} */ + @Override public float readFloat() { + return Float.intBitsToFloat(readInt()); + } + + /** {@inheritDoc} */ + @Override public float[] readFloatArray(int cnt) { + int len = cnt << 2; + + ensureEnoughData(len); + + float[] res = new float[cnt]; + + if (LITTLE_ENDIAN) + copyAndShift(res, FLOAT_ARR_OFF, len); + else { + for (int i = 0; i < res.length; i++) { + int x = readIntFast(); + + shift(4); + + res[i] = Float.intBitsToFloat(Integer.reverseBytes(x)); + } + } + + return res; + } + + /** {@inheritDoc} */ + @Override public long readLong() { + ensureEnoughData(8); + + long res = readLongFast(); + + shift(8); + + if (!LITTLE_ENDIAN) + res = Long.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public long[] readLongArray(int cnt) { + int len = cnt << 3; + + ensureEnoughData(len); + + long[] res = new long[cnt]; + + copyAndShift(res, LONG_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Long.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public double readDouble() { + return Double.longBitsToDouble(readLong()); + } + + /** {@inheritDoc} */ + @Override public double[] readDoubleArray(int cnt) { + int len = cnt << 3; + + ensureEnoughData(len); + + double[] res = new double[cnt]; + + if (LITTLE_ENDIAN) + copyAndShift(res, DOUBLE_ARR_OFF, len); + else { + for (int i = 0; i < res.length; i++) { + long x = readLongFast(); + + shift(8); + + res[i] = Double.longBitsToDouble(Long.reverseBytes(x)); + } + } + + return res; + } + + /** {@inheritDoc} */ + @Override public int read(byte[] arr, int off, int len) { + if (len > remaining()) + len = remaining(); + + copyAndShift(arr, BYTE_ARR_OFF + off, len); + + return len; + } + + /** {@inheritDoc} */ + @Override public void position(int pos) { + if (remaining() + this.pos < pos) + throw new BinaryObjectException("Position is out of bounds: " + pos); + else + this.pos = pos; + } + + /** {@inheritDoc} */ + @Override public long offheapPointer() { + return 0; + } + + /** + * Ensure that there is enough data. + * + * @param cnt Length. + */ + protected void ensureEnoughData(int cnt) { + if (remaining() < cnt) + throw new BinaryObjectException("Not enough data to read the value [position=" + pos + + ", requiredBytes=" + cnt + ", remainingBytes=" + remaining() + ']'); + } + + /** + * Read next byte from the stream and perform shift. + * + * @return Next byte. + */ + protected abstract byte readByteAndShift(); + + /** + * Copy data to target object shift position afterwards. + * + * @param target Target. + * @param off Offset. + * @param len Length. + */ + protected abstract void copyAndShift(Object target, long off, int len); + + /** + * Read short value (fast path). + * + * @return Short value. + */ + protected abstract short readShortFast(); + + /** + * Read char value (fast path). + * + * @return Char value. + */ + protected abstract char readCharFast(); + + /** + * Read int value (fast path). + * + * @return Int value. + */ + protected abstract int readIntFast(); + + /** + * Read long value (fast path). + * + * @return Long value. + */ + protected abstract long readLongFast(); + + /** + * Internal routine for positioned byte value read. + * + * @param pos Position. + * @return Int value. + */ + protected abstract byte readBytePositioned0(int pos); + + /** + * Internal routine for positioned short value read. + * + * @param pos Position. + * @return Int value. + */ + protected abstract short readShortPositioned0(int pos); + + /** + * Internal routine for positioned int value read. + * + * @param pos Position. + * @return Int value. + */ + protected abstract int readIntPositioned0(int pos); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractOutputStream.java new file mode 100644 index 0000000..85064c5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractOutputStream.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +/** + * Base portable output stream. + */ +public abstract class PortableAbstractOutputStream extends PortableAbstractStream + implements PortableOutputStream { + /** Minimal capacity when it is reasonable to start doubling resize. */ + private static final int MIN_CAP = 256; + + /** {@inheritDoc} */ + @Override public void writeByte(byte val) { + ensureCapacity(pos + 1); + + writeByteAndShift(val); + } + + /** {@inheritDoc} */ + @Override public void writeByteArray(byte[] val) { + ensureCapacity(pos + val.length); + + copyAndShift(val, BYTE_ARR_OFF, val.length); + } + + /** {@inheritDoc} */ + @Override public void writeBoolean(boolean val) { + writeByte(val ? BYTE_ONE : BYTE_ZERO); + } + + /** {@inheritDoc} */ + @Override public void writeBooleanArray(boolean[] val) { + ensureCapacity(pos + val.length); + + copyAndShift(val, BOOLEAN_ARR_OFF, val.length); + } + + /** {@inheritDoc} */ + @Override public void writeShort(short val) { + ensureCapacity(pos + 2); + + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + writeShortFast(val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void writeShortArray(short[] val) { + int cnt = val.length << 1; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, SHORT_ARR_OFF, cnt); + else { + for (short item : val) + writeShortFast(Short.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeChar(char val) { + ensureCapacity(pos + 2); + + if (!LITTLE_ENDIAN) + val = Character.reverseBytes(val); + + writeCharFast(val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void writeCharArray(char[] val) { + int cnt = val.length << 1; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, CHAR_ARR_OFF, cnt); + else { + for (char item : val) + writeCharFast(Character.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeInt(int val) { + ensureCapacity(pos + 4); + + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + writeIntFast(val); + + shift(4); + } + + /** {@inheritDoc} */ + @Override public void writeShort(int pos, short val) { + ensureCapacity(pos + 2); + + unsafeWriteShort(pos, val); + } + + /** {@inheritDoc} */ + @Override public void writeInt(int pos, int val) { + ensureCapacity(pos + 4); + + unsafeWriteInt(pos, val); + } + + /** {@inheritDoc} */ + @Override public void writeIntArray(int[] val) { + int cnt = val.length << 2; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, INT_ARR_OFF, cnt); + else { + for (int item : val) + writeIntFast(Integer.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeFloat(float val) { + writeInt(Float.floatToIntBits(val)); + } + + /** {@inheritDoc} */ + @Override public void writeFloatArray(float[] val) { + int cnt = val.length << 2; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, FLOAT_ARR_OFF, cnt); + else { + for (float item : val) { + writeIntFast(Integer.reverseBytes(Float.floatToIntBits(item))); + + shift(4); + } + } + } + + /** {@inheritDoc} */ + @Override public void writeLong(long val) { + ensureCapacity(pos + 8); + + if (!LITTLE_ENDIAN) + val = Long.reverseBytes(val); + + writeLongFast(val); + + shift(8); + } + + /** {@inheritDoc} */ + @Override public void writeLongArray(long[] val) { + int cnt = val.length << 3; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, LONG_ARR_OFF, cnt); + else { + for (long item : val) + writeLongFast(Long.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeDouble(double val) { + writeLong(Double.doubleToLongBits(val)); + } + + /** {@inheritDoc} */ + @Override public void writeDoubleArray(double[] val) { + int cnt = val.length << 3; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, DOUBLE_ARR_OFF, cnt); + else { + for (double item : val) { + writeLongFast(Long.reverseBytes(Double.doubleToLongBits(item))); + + shift(8); + } + } + } + + /** {@inheritDoc} */ + @Override public void write(byte[] arr, int off, int len) { + ensureCapacity(pos + len); + + copyAndShift(arr, BYTE_ARR_OFF + off, len); + } + + /** {@inheritDoc} */ + @Override public void write(long addr, int cnt) { + ensureCapacity(pos + cnt); + + copyAndShift(null, addr, cnt); + } + + /** {@inheritDoc} */ + @Override public void position(int pos) { + ensureCapacity(pos); + + unsafePosition(pos); + } + + /** {@inheritDoc} */ + @Override public long offheapPointer() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void unsafeEnsure(int cap) { + ensureCapacity(pos + cap); + } + + /** {@inheritDoc} */ + @Override public void unsafePosition(int pos) { + this.pos = pos; + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteBoolean(boolean val) { + unsafeWriteByte(val ? BYTE_ONE : BYTE_ZERO); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteFloat(float val) { + unsafeWriteInt(Float.floatToIntBits(val)); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteDouble(double val) { + unsafeWriteLong(Double.doubleToLongBits(val)); + } + + /** + * Calculate new capacity. + * + * @param curCap Current capacity. + * @param reqCap Required capacity. + * @return New capacity. + */ + protected static int capacity(int curCap, int reqCap) { + int newCap; + + if (reqCap < MIN_CAP) + newCap = MIN_CAP; + else { + newCap = curCap << 1; + + if (newCap < reqCap) + newCap = reqCap; + } + + return newCap; + } + + /** + * Write next byte to the stream. + * + * @param val Value. + */ + protected abstract void writeByteAndShift(byte val); + + /** + * Copy source object to the stream shift position afterwards. + * + * @param src Source. + * @param off Offset. + * @param len Length. + */ + protected abstract void copyAndShift(Object src, long off, int len); + + /** + * Write short value (fast path). + * + * @param val Short value. + */ + protected abstract void writeShortFast(short val); + + /** + * Write char value (fast path). + * + * @param val Char value. + */ + protected abstract void writeCharFast(char val); + + /** + * Write int value (fast path). + * + * @param val Int value. + */ + protected abstract void writeIntFast(int val); + + /** + * Write long value (fast path). + * + * @param val Long value. + */ + protected abstract void writeLongFast(long val); + + /** + * Ensure capacity. + * + * @param cnt Required byte count. + */ + protected abstract void ensureCapacity(int cnt); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractStream.java new file mode 100644 index 0000000..fcc32cb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractStream.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +import java.nio.ByteOrder; +import org.apache.ignite.internal.util.GridUnsafe; +import sun.misc.Unsafe; + +/** + * Portable abstract stream. + */ +public abstract class PortableAbstractStream implements PortableStream { + /** Byte: zero. */ + protected static final byte BYTE_ZERO = 0; + + /** Byte: one. */ + protected static final byte BYTE_ONE = 1; + + /** Whether little endian is used on the platform. */ + protected static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; + + /** Unsafe instance. */ + protected static final Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** Array offset: boolean. */ + protected static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class); + + /** Array offset: byte. */ + protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); + + /** Array offset: short. */ + protected static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class); + + /** Array offset: char. */ + protected static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class); + + /** Array offset: int. */ + protected static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class); + + /** Array offset: float. */ + protected static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class); + + /** Array offset: long. */ + protected static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class); + + /** Array offset: double. */ + protected static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class); + + /** Position. */ + protected int pos; + + /** {@inheritDoc} */ + @Override public int position() { + return pos; + } + + /** + * Shift position. + * + * @param cnt Byte count. + */ + protected void shift(int cnt) { + pos += cnt; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableHeapInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableHeapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableHeapInputStream.java new file mode 100644 index 0000000..d8717ce --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableHeapInputStream.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +import java.util.Arrays; + +/** + * Portable off-heap input stream. + */ +public final class PortableHeapInputStream extends PortableAbstractInputStream { + /** + * Create stream with pointer set at the given position. + * + * @param data Data. + * @param pos Position. + * @return Stream. + */ + public static PortableHeapInputStream create(byte[] data, int pos) { + assert pos < data.length; + + PortableHeapInputStream stream = new PortableHeapInputStream(data); + + stream.pos = pos; + + return stream; + } + + /** Data. */ + private byte[] data; + + /** + * Constructor. + * + * @param data Data. + */ + public PortableHeapInputStream(byte[] data) { + this.data = data; + + len = data.length; + } + + /** + * @return Copy of this stream. + */ + public PortableHeapInputStream copy() { + PortableHeapInputStream in = new PortableHeapInputStream(Arrays.copyOf(data, data.length)); + + in.position(pos); + + return in; + } + + /** + * Method called from JNI to resize stream. + * + * @param len Required length. + * @return Underlying byte array. + */ + public byte[] resize(int len) { + if (data.length < len) { + byte[] data0 = new byte[len]; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, data0, BYTE_ARR_OFF, data.length); + + data = data0; + } + + return data; + } + + /** {@inheritDoc} */ + @Override public int remaining() { + return data.length - pos; + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return data; + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[len]; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, res.length); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return true; + } + + /** {@inheritDoc} */ + @Override protected byte readByteAndShift() { + return data[pos++]; + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object target, long off, int len) { + UNSAFE.copyMemory(data, BYTE_ARR_OFF + pos, target, off, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected short readShortFast() { + return UNSAFE.getShort(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected char readCharFast() { + return UNSAFE.getChar(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected int readIntFast() { + return UNSAFE.getInt(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected long readLongFast() { + return UNSAFE.getLong(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected byte readBytePositioned0(int pos) { + return UNSAFE.getByte(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected short readShortPositioned0(int pos) { + short res = UNSAFE.getShort(data, BYTE_ARR_OFF + pos); + + if (!LITTLE_ENDIAN) + res = Short.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override protected int readIntPositioned0(int pos) { + int res = UNSAFE.getInt(data, BYTE_ARR_OFF + pos); + + if (!LITTLE_ENDIAN) + res = Integer.reverseBytes(res); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableHeapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableHeapOutputStream.java new file mode 100644 index 0000000..8f9ca4a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableHeapOutputStream.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +/** + * Portable heap output stream. + */ +public final class PortableHeapOutputStream extends PortableAbstractOutputStream { + /** Allocator. */ + private final PortableMemoryAllocatorChunk chunk; + + /** Data. */ + private byte[] data; + + /** + * Constructor. + * + * @param cap Initial capacity. + */ + public PortableHeapOutputStream(int cap) { + this(cap, PortableMemoryAllocator.INSTANCE.chunk()); + } + + /** + * Constructor. + * + * @param cap Capacity. + * @param chunk Chunk. + */ + public PortableHeapOutputStream(int cap, PortableMemoryAllocatorChunk chunk) { + this.chunk = chunk; + + data = chunk.allocate(cap); + } + + /** {@inheritDoc} */ + @Override public void close() { + chunk.release(data, pos); + } + + /** {@inheritDoc} */ + @Override public void ensureCapacity(int cnt) { + if (cnt > data.length) { + int newCap = capacity(data.length, cnt); + + data = chunk.reallocate(data, newCap); + } + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return data; + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[pos]; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, pos); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void writeByteAndShift(byte val) { + data[pos++] = val; + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object src, long off, int len) { + UNSAFE.copyMemory(src, off, data, BYTE_ARR_OFF + pos, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected void writeShortFast(short val) { + UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeCharFast(char val) { + UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeIntFast(int val) { + UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeLongFast(long val) { + UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteByte(byte val) { + UNSAFE.putByte(data, BYTE_ARR_OFF + pos++, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(short val) { + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(int pos, short val) { + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteChar(char val) { + if (!LITTLE_ENDIAN) + val = Character.reverseBytes(val); + + UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteInt(int val) { + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); + + shift(4); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteInt(int pos, int val) { + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteLong(long val) { + if (!LITTLE_ENDIAN) + val = Long.reverseBytes(val); + + UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val); + + shift(8); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableInputStream.java new file mode 100644 index 0000000..cf71dc7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableInputStream.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +import org.apache.ignite.internal.binary.PortablePositionReadable; +import org.apache.ignite.internal.binary.PortablePositionReadable; + +/** + * Portable input stream. + */ +public interface PortableInputStream extends PortableStream, PortablePositionReadable { + /** + * Read byte value. + * + * @return Byte value. + */ + public byte readByte(); + + /** + * Read byte array. + * + * @param cnt Expected item count. + * @return Byte array. + */ + public byte[] readByteArray(int cnt); + + /** + * Reads {@code cnt} of bytes into byte array. + * + * @param arr Expected item count. + * @param off offset + * @param cnt number of bytes to read. + * @return actual length read. + */ + public int read(byte[] arr, int off, int cnt); + + /** + * Read boolean value. + * + * @return Boolean value. + */ + public boolean readBoolean(); + + /** + * Read boolean array. + * + * @param cnt Expected item count. + * @return Boolean array. + */ + public boolean[] readBooleanArray(int cnt); + + /** + * Read short value. + * + * @return Short value. + */ + public short readShort(); + + /** + * Read short array. + * + * @param cnt Expected item count. + * @return Short array. + */ + public short[] readShortArray(int cnt); + + /** + * Read char value. + * + * @return Char value. + */ + public char readChar(); + + /** + * Read char array. + * + * @param cnt Expected item count. + * @return Char array. + */ + public char[] readCharArray(int cnt); + + /** + * Read int value. + * + * @return Int value. + */ + public int readInt(); + + /** + * Read int array. + * + * @param cnt Expected item count. + * @return Int array. + */ + public int[] readIntArray(int cnt); + + /** + * Read float value. + * + * @return Float value. + */ + public float readFloat(); + + /** + * Read float array. + * + * @param cnt Expected item count. + * @return Float array. + */ + public float[] readFloatArray(int cnt); + + /** + * Read long value. + * + * @return Long value. + */ + public long readLong(); + + /** + * Read long array. + * + * @param cnt Expected item count. + * @return Long array. + */ + public long[] readLongArray(int cnt); + + /** + * Read double value. + * + * @return Double value. + */ + public double readDouble(); + + /** + * Read double array. + * + * @param cnt Expected item count. + * @return Double array. + */ + public double[] readDoubleArray(int cnt); + + /** + * Gets amount of remaining data in bytes. + * + * @return Remaining data. + */ + public int remaining(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableMemoryAllocator.java new file mode 100644 index 0000000..f20a7bc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableMemoryAllocator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +/** + * Thread-local memory allocator. + */ +public final class PortableMemoryAllocator { + /** Memory allocator instance. */ + public static final PortableMemoryAllocator INSTANCE = new PortableMemoryAllocator(); + + /** Holders. */ + private static final ThreadLocal<PortableMemoryAllocatorChunk> holders = new ThreadLocal<>(); + + /** + * Ensures singleton. + */ + private PortableMemoryAllocator() { + // No-op. + } + + public PortableMemoryAllocatorChunk chunk() { + PortableMemoryAllocatorChunk holder = holders.get(); + + if (holder == null) + holders.set(holder = new PortableMemoryAllocatorChunk()); + + return holder; + } + + /** + * Checks whether a thread-local array is acquired or not. + * The function is used by Unit tests. + * + * @return {@code true} if acquired {@code false} otherwise. + */ + public boolean isAcquired() { + PortableMemoryAllocatorChunk holder = holders.get(); + + return holder != null && holder.isAcquired(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableMemoryAllocatorChunk.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableMemoryAllocatorChunk.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableMemoryAllocatorChunk.java new file mode 100644 index 0000000..749a0b4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableMemoryAllocatorChunk.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.internal.U; +import sun.misc.Unsafe; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK; + +/** + * Memory allocator chunk. + */ +public class PortableMemoryAllocatorChunk { + /** Unsafe instance. */ + protected static final Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** Array offset: byte. */ + protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); + + /** Buffer size re-check frequency. */ + private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, 10000); + + /** Data array */ + private byte[] data; + + /** Max message size detected between checks. */ + private int maxMsgSize; + + /** Last time array size is checked. */ + private long lastCheck = U.currentTimeMillis(); + + /** Whether the holder is acquired or not. */ + private boolean acquired; + + /** + * Allocate. + * + * @param size Desired size. + * @return Data. + */ + public byte[] allocate(int size) { + if (acquired) + return new byte[size]; + + acquired = true; + + if (data == null || size > data.length) + data = new byte[size]; + + return data; + } + + /** + * Reallocate. + * + * @param data Old data. + * @param size Size. + * @return New data. + */ + public byte[] reallocate(byte[] data, int size) { + byte[] newData = new byte[size]; + + if (this.data == data) + this.data = newData; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length); + + return newData; + } + + /** + * Shrinks array size if needed. + */ + public void release(byte[] data, int maxMsgSize) { + if (this.data != data) + return; + + if (maxMsgSize > this.maxMsgSize) + this.maxMsgSize = maxMsgSize; + + this.acquired = false; + + long now = U.currentTimeMillis(); + + if (now - this.lastCheck >= CHECK_FREQ) { + int halfSize = data.length >> 1; + + if (this.maxMsgSize < halfSize) + this.data = new byte[halfSize]; + + this.lastCheck = now; + } + } + + /** + * @return {@code True} if acquired. + */ + public boolean isAcquired() { + return acquired; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableOffheapInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableOffheapInputStream.java new file mode 100644 index 0000000..2a4d7d7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableOffheapInputStream.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +/** + * Portable off-heap input stream. + */ +public class PortableOffheapInputStream extends PortableAbstractInputStream { + /** Pointer. */ + private final long ptr; + + /** Capacity. */ + private final int cap; + + /** */ + private boolean forceHeap; + + /** + * Constructor. + * + * @param ptr Pointer. + * @param cap Capacity. + */ + public PortableOffheapInputStream(long ptr, int cap) { + this(ptr, cap, false); + } + + /** + * Constructor. + * + * @param ptr Pointer. + * @param cap Capacity. + * @param forceHeap If {@code true} method {@link #offheapPointer} returns 0 and unmarshalling will + * create heap-based objects. + */ + public PortableOffheapInputStream(long ptr, int cap, boolean forceHeap) { + this.ptr = ptr; + this.cap = cap; + this.forceHeap = forceHeap; + + len = cap; + } + + /** {@inheritDoc} */ + @Override public int remaining() { + return cap - pos; + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return arrayCopy(); + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[len]; + + UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, res.length); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return false; + } + + /** {@inheritDoc} */ + @Override protected byte readByteAndShift() { + return UNSAFE.getByte(ptr + pos++); + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object target, long off, int len) { + UNSAFE.copyMemory(null, ptr + pos, target, off, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected short readShortFast() { + return UNSAFE.getShort(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected char readCharFast() { + return UNSAFE.getChar(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected int readIntFast() { + return UNSAFE.getInt(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected long readLongFast() { + return UNSAFE.getLong(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected byte readBytePositioned0(int pos) { + return UNSAFE.getByte(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected short readShortPositioned0(int pos) { + short res = UNSAFE.getShort(ptr + pos); + + if (!LITTLE_ENDIAN) + res = Short.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override protected int readIntPositioned0(int pos) { + int res = UNSAFE.getInt(ptr + pos); + + if (!LITTLE_ENDIAN) + res = Integer.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public long offheapPointer() { + return forceHeap ? 0 : ptr; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableOffheapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableOffheapOutputStream.java new file mode 100644 index 0000000..9bcb1f4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableOffheapOutputStream.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +/** + * Portable offheap output stream. + */ +public class PortableOffheapOutputStream extends PortableAbstractOutputStream { + /** Pointer. */ + private long ptr; + + /** Length of bytes that cen be used before resize is necessary. */ + private int cap; + + /** + * Constructor. + * + * @param cap Capacity. + */ + public PortableOffheapOutputStream(int cap) { + this(0, cap); + } + + /** + * Constructor. + * + * @param ptr Pointer to existing address. + * @param cap Capacity. + */ + public PortableOffheapOutputStream(long ptr, int cap) { + this.ptr = ptr == 0 ? allocate(cap) : ptr; + + this.cap = cap; + } + + /** {@inheritDoc} */ + @Override public void close() { + release(ptr); + } + + /** {@inheritDoc} */ + @Override public void ensureCapacity(int cnt) { + if (cnt > cap) { + int newCap = capacity(cap, cnt); + + ptr = reallocate(ptr, newCap); + + cap = newCap; + } + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return arrayCopy(); + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[pos]; + + UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, pos); + + return res; + } + + /** + * @return Pointer. + */ + public long pointer() { + return ptr; + } + + /** + * @return Capacity. + */ + public int capacity() { + return cap; + } + + /** {@inheritDoc} */ + @Override protected void writeByteAndShift(byte val) { + UNSAFE.putByte(ptr + pos++, val); + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object src, long offset, int len) { + UNSAFE.copyMemory(src, offset, null, ptr + pos, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected void writeShortFast(short val) { + UNSAFE.putShort(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeCharFast(char val) { + UNSAFE.putChar(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeIntFast(int val) { + UNSAFE.putInt(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeLongFast(long val) { + UNSAFE.putLong(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return false; + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteByte(byte val) { + UNSAFE.putByte(ptr + pos++, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(short val) { + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + UNSAFE.putShort(ptr + pos, val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(int pos, short val) { + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + UNSAFE.putShort(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteChar(char val) { + if (!LITTLE_ENDIAN) + val = Character.reverseBytes(val); + + UNSAFE.putChar(ptr + pos, val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteInt(int val) { + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + UNSAFE.putInt(ptr + pos, val); + + shift(4); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteInt(int pos, int val) { + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + UNSAFE.putInt(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteLong(long val) { + if (!LITTLE_ENDIAN) + val = Long.reverseBytes(val); + + UNSAFE.putLong(ptr + pos, val); + + shift(8); + } + + /** + * Allocate memory. + * + * @param cap Capacity. + * @return Pointer. + */ + protected long allocate(int cap) { + return UNSAFE.allocateMemory(cap); + } + + /** + * Reallocate memory. + * + * @param ptr Old pointer. + * @param cap Capacity. + * @return New pointer. + */ + protected long reallocate(long ptr, int cap) { + return UNSAFE.reallocateMemory(ptr, cap); + } + + /** + * Release memory. + * + * @param ptr Pointer. + */ + protected void release(long ptr) { + UNSAFE.freeMemory(ptr); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableOutputStream.java new file mode 100644 index 0000000..a686e54 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableOutputStream.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +/** + * Portable output stream. + */ +public interface PortableOutputStream extends PortableStream, AutoCloseable { + /** + * Write byte value. + * + * @param val Byte value. + */ + public void writeByte(byte val); + + /** + * Write byte array. + * + * @param val Byte array. + */ + public void writeByteArray(byte[] val); + + /** + * Write boolean value. + * + * @param val Boolean value. + */ + public void writeBoolean(boolean val); + + /** + * Write boolean array. + * + * @param val Boolean array. + */ + public void writeBooleanArray(boolean[] val); + + /** + * Write short value. + * + * @param val Short value. + */ + public void writeShort(short val); + + /** + * Write short array. + * + * @param val Short array. + */ + public void writeShortArray(short[] val); + + /** + * Write char value. + * + * @param val Char value. + */ + public void writeChar(char val); + + /** + * Write char array. + * + * @param val Char array. + */ + public void writeCharArray(char[] val); + + /** + * Write int value. + * + * @param val Int value. + */ + public void writeInt(int val); + + /** + * Write short value at the given position. + * + * @param pos Position. + * @param val Value. + */ + public void writeShort(int pos, short val); + + /** + * Write int value to the given position. + * + * @param pos Position. + * @param val Value. + */ + public void writeInt(int pos, int val); + + /** + * Write int array. + * + * @param val Int array. + */ + public void writeIntArray(int[] val); + + /** + * Write float value. + * + * @param val Float value. + */ + public void writeFloat(float val); + + /** + * Write float array. + * + * @param val Float array. + */ + public void writeFloatArray(float[] val); + + /** + * Write long value. + * + * @param val Long value. + */ + public void writeLong(long val); + + /** + * Write long array. + * + * @param val Long array. + */ + public void writeLongArray(long[] val); + + /** + * Write double value. + * + * @param val Double value. + */ + public void writeDouble(double val); + + /** + * Write double array. + * + * @param val Double array. + */ + public void writeDoubleArray(double[] val); + + /** + * Write byte array. + * + * @param arr Array. + * @param off Offset. + * @param len Length. + */ + public void write(byte[] arr, int off, int len); + + /** + * Write data from unmanaged memory. + * + * @param addr Address. + * @param cnt Count. + */ + public void write(long addr, int cnt); + + /** + * Close the stream releasing resources. + */ + @Override public void close(); + + /** + * Set position in unsafe mode. + * + * @param pos Position. + */ + public void unsafePosition(int pos); + + /** + * Ensure capacity for unsafe writes. + * + * @param cap Capacity. + */ + public void unsafeEnsure(int cap); + + /** + * Write byte in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteByte(byte val); + + /** + * Write boolean in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteBoolean(boolean val); + + /** + * Write short in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteShort(short val); + + /** + * Write short in unsafe mode. + * + * @param pos Position. + * @param val Value. + */ + public void unsafeWriteShort(int pos, short val); + + /** + * Write char in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteChar(char val); + + /** + * Write int in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteInt(int val); + + /** + * Write int in unsafe mode. + * + * @param pos Position. + * @param val Value. + */ + public void unsafeWriteInt(int pos, int val); + + /** + * Write long in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteLong(long val); + + /** + * Write float in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteFloat(float val); + + /** + * Write double in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteDouble(double val); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableStream.java new file mode 100644 index 0000000..18d4609 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableStream.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +/** + * Portable stream. + */ +public interface PortableStream { + /** + * @return Position. + */ + public int position(); + + /** + * @param pos Position. + */ + public void position(int pos); + + /** + * @return Underlying array. + */ + public byte[] array(); + + /** + * @return Copy of data in the stream. + */ + public byte[] arrayCopy(); + + /** + * @return Offheap pointer if stream is offheap based, otherwise {@code 0}. + */ + public long offheapPointer(); + + /** + * @return {@code True} is stream is array based. + */ + public boolean hasArray(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/package-info.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/package-info.java new file mode 100644 index 0000000..2a6ad62 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains portable APIs implementation for streams. + */ +package org.apache.ignite.internal.binary.streams; http://git-wip-us.apache.org/repos/asf/ignite/blob/1dbf20e0/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index b0ef3f4..3c7f378 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -31,8 +31,8 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.deployment.GridDeploymentRequest; import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage; -import org.apache.ignite.internal.portable.BinaryEnumObjectImpl; -import org.apache.ignite.internal.portable.BinaryObjectImpl; +import org.apache.ignite.internal.binary.BinaryEnumObjectImpl; +import org.apache.ignite.internal.binary.BinaryObjectImpl; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; import org.apache.ignite.internal.processors.cache.CacheEntryPredicateContainsValue;
