http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformMemoryManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformMemoryManagerImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformMemoryManagerImpl.java deleted file mode 100644 index 53a3e19..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformMemoryManagerImpl.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.platform.memory; - -import org.apache.ignite.internal.platform.callback.*; -import org.jetbrains.annotations.*; - -import static org.apache.ignite.internal.platform.memory.PlatformMemoryUtils.*; - -/** - * Interop memory manager implementation. - */ -public class PlatformMemoryManagerImpl implements PlatformMemoryManager { - /** Native gateway. */ - private final PlatformCallbackGateway gate; - - /** Default allocation capacity. */ - private final int dfltCap; - - /** Thread-local pool. */ - private final ThreadLocal<PlatformMemoryPool> threadLocPool = new ThreadLocal<>(); - - /** - * Constructor. - * - * @param gate Native gateway. - * @param dfltCap Default memory chunk capacity. - */ - public PlatformMemoryManagerImpl(@Nullable PlatformCallbackGateway gate, int dfltCap) { - this.gate = gate; - this.dfltCap = dfltCap; - } - - /** {@inheritDoc} */ - @Override public PlatformMemory allocate() { - return allocate(dfltCap); - } - - /** {@inheritDoc} */ - @Override public PlatformMemory allocate(int cap) { - return pool().allocate(cap); - } - - /** {@inheritDoc} */ - @Override public PlatformMemory get(long memPtr) { - int flags = flags(memPtr); - - return isExternal(flags) ? new PlatformExternalMemory(gate, memPtr) : - isPooled(flags) ? pool().get(memPtr) : new PlatformUnpooledMemory(memPtr); - } - - /** - * Gets or creates thread-local memory pool. - * - * @return Memory pool. - */ - private PlatformMemoryPool pool() { - PlatformMemoryPool pool = threadLocPool.get(); - - if (pool == null) { - pool = new PlatformMemoryPool(); - - threadLocPool.set(pool); - } - - return pool; - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformMemoryPool.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformMemoryPool.java b/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformMemoryPool.java deleted file mode 100644 index a012b5c..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformMemoryPool.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.platform.memory; - -import static org.apache.ignite.internal.platform.memory.PlatformMemoryUtils.*; - -/** - * Memory pool associated with a thread. - */ -public class PlatformMemoryPool { - /** base pointer. */ - private final long poolPtr; - - /** First pooled memory chunk. */ - private PlatformPooledMemory mem1; - - /** Second pooled memory chunk. */ - private PlatformPooledMemory mem2; - - /** Third pooled memory chunk. */ - private PlatformPooledMemory mem3; - - /** - * Constructor. - */ - public PlatformMemoryPool() { - poolPtr = allocatePool(); - - sun.misc.Cleaner.create(this, new CleanerRunnable(poolPtr)); - } - - /** - * Allocate memory chunk, optionally pooling it. - * - * @param cap Minimum capacity. - * @return Memory chunk. - */ - public PlatformMemory allocate(int cap) { - long memPtr = allocatePooled(poolPtr, cap); - - // memPtr == 0 means that we failed to acquire thread-local memory chunk, so fallback to unpooled memory. - return memPtr != 0 ? get(memPtr) : new PlatformUnpooledMemory(allocateUnpooled(cap)); - } - - /** - * Re-allocate existing pool memory chunk. - * - * @param memPtr Memory pointer. - * @param cap Minimum capacity. - */ - void reallocate(long memPtr, int cap) { - reallocatePooled(memPtr, cap); - } - - /** - * Release pooled memory chunk. - * - * @param memPtr Memory pointer. - */ - void release(long memPtr) { - releasePooled(memPtr); - } - - /** - * Get pooled memory chunk. - * - * @param memPtr Memory pointer. - * @return Memory chunk. - */ - public PlatformMemory get(long memPtr) { - long delta = memPtr - poolPtr; - - if (delta == POOL_HDR_OFF_MEM_1) { - if (mem1 == null) - mem1 = new PlatformPooledMemory(this, memPtr); - - return mem1; - } - else if (delta == POOL_HDR_OFF_MEM_2) { - if (mem2 == null) - mem2 = new PlatformPooledMemory(this, memPtr); - - return mem2; - } - else { - assert delta == POOL_HDR_OFF_MEM_3; - - if (mem3 == null) - mem3 = new PlatformPooledMemory(this, memPtr); - - return mem3; - } - } - - /** - * Cleaner runnable. - */ - private static class CleanerRunnable implements Runnable { - /** Pointer. */ - private final long poolPtr; - - /** - * Constructor. - * - * @param poolPtr Pointer. - */ - private CleanerRunnable(long poolPtr) { - assert poolPtr != 0; - - this.poolPtr = poolPtr; - } - - /** {@inheritDoc} */ - @Override public void run() { - PlatformMemoryUtils.releasePool(poolPtr); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformMemoryUtils.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformMemoryUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformMemoryUtils.java deleted file mode 100644 index d820ca6..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformMemoryUtils.java +++ /dev/null @@ -1,468 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.platform.memory; - -import org.apache.ignite.internal.util.*; -import sun.misc.*; - -import java.nio.*; - -/** - * Utility classes for memory management. - */ -public class PlatformMemoryUtils { - /** Unsafe instance. */ - public static final Unsafe UNSAFE = GridUnsafe.unsafe(); - - /** Array offset: boolean. */ - public static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class); - - /** Array offset: byte. */ - public static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); - - /** Array offset: short. */ - public static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class); - - /** Array offset: char. */ - public static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class); - - /** Array offset: int. */ - public static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class); - - /** Array offset: float. */ - public static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class); - - /** Array offset: long. */ - public static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class); - - /** Array offset: double. */ - public static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class); - - /** Whether little endian is used on the platform. */ - public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; - - /** Header length. */ - public static final int POOL_HDR_LEN = 64; - - /** Pool header offset: first memory chunk. */ - public static final int POOL_HDR_OFF_MEM_1 = 0; - - /** Pool header offset: second memory chunk. */ - public static final int POOL_HDR_OFF_MEM_2 = 20; - - /** Pool header offset: third memory chunk. */ - public static final int POOL_HDR_OFF_MEM_3 = 40; - - /** Memory chunk header length. */ - public static final int MEM_HDR_LEN = 20; - - /** Offset: capacity. */ - public static final int MEM_HDR_OFF_CAP = 8; - - /** Offset: length. */ - public static final int MEM_HDR_OFF_LEN = 12; - - /** Offset: flags. */ - public static final int MEM_HDR_OFF_FLAGS = 16; - - /** Flag: external. */ - public static final int FLAG_EXT = 0x1; - - /** Flag: pooled. */ - public static final int FLAG_POOLED = 0x2; - - /** Flag: whether this pooled memory chunk is acquired. */ - public static final int FLAG_ACQUIRED = 0x4; - - /** --- COMMON METHODS. --- */ - - /** - * Gets data pointer for the given memory chunk. - * - * @param memPtr Memory pointer. - * @return Data pointer. - */ - public static long data(long memPtr) { - return UNSAFE.getLong(memPtr); - } - - /** - * Gets capacity for the given memory chunk. - * - * @param memPtr Memory pointer. - * @return Capacity. - */ - public static int capacity(long memPtr) { - return UNSAFE.getInt(memPtr + MEM_HDR_OFF_CAP); - } - - /** - * Sets capacity for the given memory chunk. - * - * @param memPtr Memory pointer. - * @param cap Capacity. - */ - public static void capacity(long memPtr, int cap) { - assert !isExternal(memPtr) : "Attempt to update external memory chunk capacity: " + memPtr; - - UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); - } - - /** - * Gets length for the given memory chunk. - * - * @param memPtr Memory pointer. - * @return Length. - */ - public static int length(long memPtr) { - return UNSAFE.getInt(memPtr + MEM_HDR_OFF_LEN); - } - - /** - * Sets length for the given memory chunk. - * - * @param memPtr Memory pointer. - * @param len Length. - */ - public static void length(long memPtr, int len) { - UNSAFE.putInt(memPtr + MEM_HDR_OFF_LEN, len); - } - - /** - * Gets flags for the given memory chunk. - * - * @param memPtr Memory pointer. - * @return Flags. - */ - public static int flags(long memPtr) { - return UNSAFE.getInt(memPtr + MEM_HDR_OFF_FLAGS); - } - - /** - * Sets flags for the given memory chunk. - * - * @param memPtr Memory pointer. - * @param flags Flags. - */ - public static void flags(long memPtr, int flags) { - assert !isExternal(memPtr) : "Attempt to update external memory chunk flags: " + memPtr; - - UNSAFE.putInt(memPtr + MEM_HDR_OFF_FLAGS, flags); - } - - /** - * Check whether this memory chunk is external. - * - * @param memPtr Memory pointer. - * @return {@code True} if owned by native platform. - */ - public static boolean isExternal(long memPtr) { - return isExternal(flags(memPtr)); - } - - /** - * Check whether flags denote that this memory chunk is external. - * - * @param flags Flags. - * @return {@code True} if owned by native platform. - */ - public static boolean isExternal(int flags) { - return (flags & FLAG_EXT) == FLAG_EXT; - } - - /** - * Check whether this memory chunk is pooled. - * - * @param memPtr Memory pointer. - * @return {@code True} if pooled. - */ - public static boolean isPooled(long memPtr) { - return isPooled(flags(memPtr)); - } - - /** - * Check whether flags denote pooled memory chunk. - * - * @param flags Flags. - * @return {@code True} if pooled. - */ - public static boolean isPooled(int flags) { - return (flags & FLAG_POOLED) != 0; - } - - /** - * Check whether this memory chunk is pooled and acquired. - * - * @param memPtr Memory pointer. - * @return {@code True} if pooled and acquired. - */ - public static boolean isAcquired(long memPtr) { - return isAcquired(flags(memPtr)); - } - - /** - * Check whether flags denote pooled and acquired memory chunk. - * - * @param flags Flags. - * @return {@code True} if acquired. - */ - public static boolean isAcquired(int flags) { - assert isPooled(flags); - - return (flags & FLAG_ACQUIRED) != 0; - } - - /** --- UNPOOLED MEMORY MANAGEMENT. --- */ - - /** - * Allocate unpooled memory chunk. - * - * @param cap Minimum capacity. - * @return New memory pointer. - */ - public static long allocateUnpooled(int cap) { - assert cap > 0; - - long memPtr = UNSAFE.allocateMemory(MEM_HDR_LEN); - long dataPtr = UNSAFE.allocateMemory(cap); - - UNSAFE.putLong(memPtr, dataPtr); // Write address. - UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); // Write capacity. - UNSAFE.putInt(memPtr + MEM_HDR_OFF_LEN, 0); // Write length. - UNSAFE.putInt(memPtr + MEM_HDR_OFF_FLAGS, 0); // Write flags. - - return memPtr; - } - - /** - * Reallocate unpooled memory chunk. - * - * @param memPtr Memory pointer. - * @param cap Minimum capacity. - */ - public static void reallocateUnpooled(long memPtr, int cap) { - assert cap > 0; - - assert !isExternal(memPtr) : "Attempt to reallocate external memory chunk directly: " + memPtr; - assert !isPooled(memPtr) : "Attempt to reallocate pooled memory chunk directly: " + memPtr; - - long dataPtr = data(memPtr); - - long newDataPtr = UNSAFE.reallocateMemory(dataPtr, cap); - - if (dataPtr != newDataPtr) - UNSAFE.putLong(memPtr, newDataPtr); // Write new data address if needed. - - UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); // Write new capacity. - } - - /** - * Release unpooled memory chunk. - * - * @param memPtr Memory pointer. - */ - public static void releaseUnpooled(long memPtr) { - assert !isExternal(memPtr) : "Attempt to release external memory chunk directly: " + memPtr; - assert !isPooled(memPtr) : "Attempt to release pooled memory chunk directly: " + memPtr; - - UNSAFE.freeMemory(data(memPtr)); - UNSAFE.freeMemory(memPtr); - } - - /** --- POOLED MEMORY MANAGEMENT. --- */ - - /** - * Allocate pool memory. - * - * @return Pool pointer. - */ - public static long allocatePool() { - long poolPtr = UNSAFE.allocateMemory(POOL_HDR_LEN); - - UNSAFE.setMemory(poolPtr, POOL_HDR_LEN, (byte)0); - - flags(poolPtr + POOL_HDR_OFF_MEM_1, FLAG_POOLED); - flags(poolPtr + POOL_HDR_OFF_MEM_2, FLAG_POOLED); - flags(poolPtr + POOL_HDR_OFF_MEM_3, FLAG_POOLED); - - return poolPtr; - } - - /** - * Release pool memory. - * - * @param poolPtr Pool pointer. - */ - public static void releasePool(long poolPtr) { - // Clean predefined memory chunks. - long mem = UNSAFE.getLong(poolPtr + POOL_HDR_OFF_MEM_1); - - if (mem != 0) - UNSAFE.freeMemory(mem); - - mem = UNSAFE.getLong(poolPtr + POOL_HDR_OFF_MEM_2); - - if (mem != 0) - UNSAFE.freeMemory(mem); - - mem = UNSAFE.getLong(poolPtr + POOL_HDR_OFF_MEM_3); - - if (mem != 0) - UNSAFE.freeMemory(mem); - - // Clean pool chunk. - UNSAFE.freeMemory(poolPtr); - } - - /** - * Allocate pooled memory chunk. - * - * @param poolPtr Pool pointer. - * @param cap Capacity. - * @return Cross-platform memory pointer or {@code 0} in case there are no free memory chunks in the pool. - */ - public static long allocatePooled(long poolPtr, int cap) { - long memPtr1 = poolPtr + POOL_HDR_OFF_MEM_1; - - if (isAcquired(memPtr1)) { - long memPtr2 = poolPtr + POOL_HDR_OFF_MEM_2; - - if (isAcquired(memPtr2)) { - long memPtr3 = poolPtr + POOL_HDR_OFF_MEM_3; - - if (isAcquired(memPtr3)) - return 0L; - else { - allocatePooled0(memPtr3, cap); - - return memPtr3; - } - } - else { - allocatePooled0(memPtr2, cap); - - return memPtr2; - } - } - else { - allocatePooled0(memPtr1, cap); - - return memPtr1; - } - } - - /** - * Internal pooled memory chunk allocation routine. - * - * @param memPtr Memory pointer. - * @param cap Capacity. - */ - private static void allocatePooled0(long memPtr, int cap) { - assert !isExternal(memPtr); - assert isPooled(memPtr); - assert !isAcquired(memPtr); - - long data = UNSAFE.getLong(memPtr); - - if (data == 0) { - // First allocation of the chunk. - data = UNSAFE.allocateMemory(cap); - - UNSAFE.putLong(memPtr, data); - UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); - } - else { - // Ensure that we have enough capacity. - int curCap = capacity(memPtr); - - if (cap > curCap) { - data = UNSAFE.reallocateMemory(data, cap); - - UNSAFE.putLong(memPtr, data); - UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); - } - } - - flags(memPtr, FLAG_POOLED | FLAG_ACQUIRED); - } - - /** - * Reallocate pooled memory chunk. - * - * @param memPtr Memory pointer. - * @param cap Minimum capacity. - */ - public static void reallocatePooled(long memPtr, int cap) { - assert !isExternal(memPtr); - assert isPooled(memPtr); - assert isAcquired(memPtr); - - long data = UNSAFE.getLong(memPtr); - - assert data != 0; - - int curCap = capacity(memPtr); - - if (cap > curCap) { - data = UNSAFE.reallocateMemory(data, cap); - - UNSAFE.putLong(memPtr, data); - UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); - } - } - - /** - * Release pooled memory chunk. - * - * @param memPtr Memory pointer. - */ - public static void releasePooled(long memPtr) { - assert !isExternal(memPtr); - assert isPooled(memPtr); - assert isAcquired(memPtr); - - flags(memPtr, flags(memPtr) ^ FLAG_ACQUIRED); - } - - /** --- UTILITY STUFF. --- */ - - /** - * Reallocate arbitrary memory chunk. - * - * @param memPtr Memory pointer. - * @param cap Capacity. - */ - public static void reallocate(long memPtr, int cap) { - int flags = flags(memPtr); - - if (isPooled(flags)) - reallocatePooled(memPtr, cap); - else { - assert !isExternal(flags); - - reallocateUnpooled(memPtr, cap); - } - } - - /** - * Constructor. - */ - private PlatformMemoryUtils() { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformOutputStream.java b/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformOutputStream.java deleted file mode 100644 index 89527ce..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformOutputStream.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.platform.memory; - -import org.apache.ignite.internal.portable.streams.*; - -/** - * Interop output stream. - */ -public interface PlatformOutputStream extends PortableOutputStream { - /** - * Synchronize output stream with underlying memory - */ - public void synchronize(); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformOutputStreamImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformOutputStreamImpl.java deleted file mode 100644 index 13492eb..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformOutputStreamImpl.java +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.platform.memory; - -import static org.apache.ignite.internal.platform.memory.PlatformMemoryUtils.*; - -/** - * Interop output stream implementation. - */ -public class PlatformOutputStreamImpl implements PlatformOutputStream { - /** Underlying memory chunk. */ - protected final PlatformMemory mem; - - /** Pointer. */ - protected long data; - - /** Maximum capacity. */ - protected int cap; - - /** Current position. */ - protected int pos; - - /** - * Constructor. - * - * @param mem Underlying memory chunk. - */ - public PlatformOutputStreamImpl(PlatformMemory mem) { - this.mem = mem; - - data = mem.data(); - cap = mem.capacity(); - } - - /** {@inheritDoc} */ - @Override public void writeByte(byte val) { - ensureCapacity(pos + 1); - - UNSAFE.putByte(data + pos++, val); - } - - /** {@inheritDoc} */ - @Override public void writeByteArray(byte[] val) { - copyAndShift(val, BYTE_ARR_OFF, val.length); - } - - /** {@inheritDoc} */ - @Override public void writeBoolean(boolean val) { - writeByte(val ? (byte) 1 : (byte) 0); - } - - /** {@inheritDoc} */ - @Override public void writeBooleanArray(boolean[] val) { - copyAndShift(val, BOOLEAN_ARR_OFF, val.length); - } - - /** {@inheritDoc} */ - @Override public void writeShort(short val) { - ensureCapacity(pos + 2); - - UNSAFE.putShort(data + pos, val); - - shift(2); - } - - /** {@inheritDoc} */ - @Override public void writeShortArray(short[] val) { - copyAndShift(val, SHORT_ARR_OFF, val.length << 1); - } - - /** {@inheritDoc} */ - @Override public void writeChar(char val) { - ensureCapacity(pos + 2); - - UNSAFE.putChar(data + pos, val); - - shift(2); - } - - /** {@inheritDoc} */ - @Override public void writeCharArray(char[] val) { - copyAndShift(val, CHAR_ARR_OFF, val.length << 1); - } - - /** {@inheritDoc} */ - @Override public void writeInt(int val) { - ensureCapacity(pos + 4); - - UNSAFE.putInt(data + pos, val); - - shift(4); - } - - /** {@inheritDoc} */ - @Override public void writeIntArray(int[] val) { - copyAndShift(val, INT_ARR_OFF, val.length << 2); - } - - /** {@inheritDoc} */ - @Override public void writeInt(int pos, int val) { - ensureCapacity(pos + 4); - - UNSAFE.putInt(data + pos, val); - } - - /** {@inheritDoc} */ - @Override public void writeFloat(float val) { - writeInt(Float.floatToIntBits(val)); - } - - /** {@inheritDoc} */ - @Override public void writeFloatArray(float[] val) { - copyAndShift(val, FLOAT_ARR_OFF, val.length << 2); - } - - /** {@inheritDoc} */ - @Override public void writeLong(long val) { - ensureCapacity(pos + 8); - - UNSAFE.putLong(data + pos, val); - - shift(8); - } - - /** {@inheritDoc} */ - @Override public void writeLongArray(long[] val) { - copyAndShift(val, LONG_ARR_OFF, val.length << 3); - } - - /** {@inheritDoc} */ - @Override public void writeDouble(double val) { - writeLong(Double.doubleToLongBits(val)); - } - - /** {@inheritDoc} */ - @Override public void writeDoubleArray(double[] val) { - copyAndShift(val, DOUBLE_ARR_OFF, val.length << 3); - } - - /** {@inheritDoc} */ - @Override public void write(byte[] arr, int off, int len) { - copyAndShift(arr, BYTE_ARR_OFF + off, len); - } - - /** {@inheritDoc} */ - @Override public void write(long addr, int cnt) { - copyAndShift(null, addr, cnt); - } - - /** {@inheritDoc} */ - @Override public int position() { - return pos; - } - - /** {@inheritDoc} */ - @Override public void position(int pos) { - ensureCapacity(pos); - - this.pos = pos; - } - - /** {@inheritDoc} */ - @Override public void close() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public byte[] array() { - assert false; - - throw new UnsupportedOperationException("Should not be called."); - } - - /** {@inheritDoc} */ - @Override public byte[] arrayCopy() { - assert false; - - throw new UnsupportedOperationException("Should not be called."); - } - - /** {@inheritDoc} */ - @Override public long offheapPointer() { - assert false; - - throw new UnsupportedOperationException("Should not be called."); - } - - /** {@inheritDoc} */ - @Override public boolean hasArray() { - assert false; - - throw new UnsupportedOperationException("Should not be called."); - } - - /** {@inheritDoc} */ - @Override public void synchronize() { - PlatformMemoryUtils.length(mem.pointer(), pos); - } - - /** - * Ensure capacity. - * - * @param reqCap Required byte count. - */ - protected void ensureCapacity(int reqCap) { - if (reqCap > cap) { - int newCap = cap << 1; - - if (newCap < reqCap) - newCap = reqCap; - - mem.reallocate(newCap); - - assert mem.capacity() >= newCap; - - data = mem.data(); - cap = newCap; - } - } - - /** - * Shift position. - * - * @param cnt Byte count. - */ - protected void shift(int cnt) { - pos += cnt; - } - - /** - * Copy source object to the stream shifting position afterwards. - * - * @param src Source. - * @param off Offset. - * @param len Length. - */ - private void copyAndShift(Object src, long off, int len) { - ensureCapacity(pos + len); - - UNSAFE.copyMemory(src, off, null, data + pos, len); - - shift(len); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformPooledMemory.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformPooledMemory.java b/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformPooledMemory.java deleted file mode 100644 index 5043fd1..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformPooledMemory.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.platform.memory; - -import static org.apache.ignite.internal.platform.memory.PlatformMemoryUtils.*; - -/** - * Interop pooled memory chunk. - */ -public class PlatformPooledMemory extends PlatformAbstractMemory { - /** Owning memory pool. */ - private final PlatformMemoryPool pool; - - /** - * Constructor. - * - * @param pool Owning memory pool. - * @param memPtr Cross-platform memory pointer. - */ - public PlatformPooledMemory(PlatformMemoryPool pool, long memPtr) { - super(memPtr); - - assert isPooled(memPtr); - assert isAcquired(memPtr); - - this.pool = pool; - } - - /** {@inheritDoc} */ - @Override public void reallocate(int cap) { - assert isAcquired(memPtr); - - // Try doubling capacity to avoid excessive allocations. - int doubledCap = PlatformMemoryUtils.capacity(memPtr) << 1; - - if (doubledCap > cap) - cap = doubledCap; - - pool.reallocate(memPtr, cap); - } - - /** {@inheritDoc} */ - @Override public void close() { - assert isAcquired(memPtr); - - pool.release(memPtr); // Return to the pool. - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformUnpooledMemory.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformUnpooledMemory.java b/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformUnpooledMemory.java deleted file mode 100644 index f3fe227..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/platform/memory/PlatformUnpooledMemory.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.platform.memory; - -import static org.apache.ignite.internal.platform.memory.PlatformMemoryUtils.*; - -/** - * Interop un-pooled memory chunk. - */ -public class PlatformUnpooledMemory extends PlatformAbstractMemory { - /** - * Constructor. - * - * @param memPtr Cross-platform memory pointer. - */ - public PlatformUnpooledMemory(long memPtr) { - super(memPtr); - } - - /** {@inheritDoc} */ - @Override public void reallocate(int cap) { - // Try doubling capacity to avoid excessive allocations. - int doubledCap = PlatformMemoryUtils.capacity(memPtr) << 1; - - if (doubledCap > cap) - cap = doubledCap; - - reallocateUnpooled(memPtr, cap); - } - - /** {@inheritDoc} */ - @Override public void close() { - releaseUnpooled(memPtr); - } -} - http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformReaderBiClosure.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformReaderBiClosure.java b/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformReaderBiClosure.java deleted file mode 100644 index 0280ba8..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformReaderBiClosure.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.platform.utils; - -import org.apache.ignite.internal.portable.*; -import org.apache.ignite.lang.*; - -/** - * Reader bi-closure. - */ -public interface PlatformReaderBiClosure<T1, T2> { - /** - * Read object from reader. - * - * @param reader Reader. - * @return Object. - */ - IgniteBiTuple<T1, T2> read(PortableRawReaderEx reader); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformReaderClosure.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformReaderClosure.java b/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformReaderClosure.java deleted file mode 100644 index 73a24d1..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformReaderClosure.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.platform.utils; - -import org.apache.ignite.internal.portable.*; - -/** - * Reader closure. - */ -public interface PlatformReaderClosure<T> { - - /** - * Read object from reader. - * - * @param reader Reader. - * @return Object. - */ - T read(PortableRawReaderEx reader); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformWriterBiClosure.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformWriterBiClosure.java b/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformWriterBiClosure.java deleted file mode 100644 index cbd34fa..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformWriterBiClosure.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.platform.utils; - -import org.apache.ignite.internal.portable.*; - -/** - * Interop writer bi-closure. - */ -public interface PlatformWriterBiClosure<T1, T2> { - /** - * Write values. - * - * @param writer Writer. - * @param val1 Value 1. - * @param val2 Value 2. - */ - public void write(PortableRawWriterEx writer, T1 val1, T2 val2); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformWriterClosure.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformWriterClosure.java b/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformWriterClosure.java deleted file mode 100644 index d9953ca..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/platform/utils/PlatformWriterClosure.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.platform.utils; - -import org.apache.ignite.internal.portable.*; - -/** - * Interop writer closure. - */ -public interface PlatformWriterClosure<T> { - /** - * Write value. - * - * @param writer Writer. - * @param val Value. - */ - public void write(PortableRawWriterEx writer, T val); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrap.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrap.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrap.java new file mode 100644 index 0000000..319c670 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrap.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.platform.*; + +/** + * Platform bootstrap. Responsible for starting Ignite node with non-Java platform. + */ +public interface PlatformBootstrap { + /** + * Start Ignite node. + * + * @param cfg Configuration. + * @param envPtr Environment pointer. + * @param dataPtr Optional pointer to additional data required for startup. + * @return Platform processor. + */ + public PlatformProcessor start(IgniteConfiguration cfg, long envPtr, long dataPtr); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrapFactory.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrapFactory.java new file mode 100644 index 0000000..f5b3adf --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrapFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +/** + * Platform bootstrap factory. + */ +public interface PlatformBootstrapFactory { + /** + * Get bootstrap factory ID. + * + * @return ID. + */ + public int id(); + + /** + * Create bootstrap instance. + * + * @return Bootstrap instance. + */ + public PlatformBootstrap create(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformException.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformException.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformException.java new file mode 100644 index 0000000..479d533 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformException.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +/** + * Interop checked exception. + */ +public class PlatformException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Create empty exception. + */ + public PlatformException() { + // No-op. + } + + /** + * Creates new exception with given error message. + * + * @param msg Error message. + */ + public PlatformException(String msg) { + super(msg); + } + + /** + * Creates new grid exception with given throwable as a cause and + * source of error message. + * + * @param cause Non-null throwable cause. + */ + public PlatformException(Throwable cause) { + this(cause.getMessage(), cause); + } + + /** + * Creates new exception with given error message and optional nested exception. + * + * @param msg Error message. + * @param cause Optional nested exception (can be {@code null}). + */ + public PlatformException(String msg, @Nullable Throwable cause) { + super(msg, cause); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PlatformException.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java new file mode 100644 index 0000000..93cf8b4 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.resource.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.net.*; +import java.security.*; +import java.util.*; + +/** + * Entry point for platform nodes. + */ +@SuppressWarnings("UnusedDeclaration") +public class PlatformIgnition { + /** Map with active instances. */ + private static final HashMap<String, PlatformProcessor> instances = new HashMap<>(); + + /** + * Start Ignite node in platform mode. + * + * @param springCfgPath Spring configuration path. + * @param gridName Grid name. + * @param factoryId Factory ID. + * @param envPtr Environment pointer. + * @param dataPtr Optional pointer to additional data required for startup. + * @return Ignite instance. + */ + public static synchronized PlatformProcessor start(@Nullable String springCfgPath, @Nullable String gridName, + int factoryId, long envPtr, long dataPtr) { + if (envPtr <= 0) + throw new IgniteException("Environment pointer must be positive."); + + ClassLoader oldClsLdr = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(PlatformProcessor.class.getClassLoader()); + + try { + IgniteConfiguration cfg = configuration(springCfgPath); + + if (gridName != null) + cfg.setGridName(gridName); + else + gridName = cfg.getGridName(); + + PlatformBootstrap bootstrap = bootstrap(factoryId); + + PlatformProcessor proc = bootstrap.start(cfg, envPtr, dataPtr); + + PlatformProcessor old = instances.put(gridName, proc); + + assert old == null; + + return proc; + } + finally { + Thread.currentThread().setContextClassLoader(oldClsLdr); + } + } + + /** + * Get instance by environment pointer. + * + * @param gridName Grid name. + * @return Instance or {@code null} if it doesn't exist (never started or stopped). + */ + @Nullable public static synchronized PlatformProcessor instance(@Nullable String gridName) { + return instances.get(gridName); + } + + /** + * Get environment pointer of the given instance. + * + * @param gridName Grid name. + * @return Environment pointer or {@code 0} in case grid with such name doesn't exist. + */ + public static synchronized long environmentPointer(@Nullable String gridName) { + PlatformProcessor proc = instance(gridName); + + return proc != null ? proc.environmentPointer() : 0; + } + + /** + * Stop single instance. + * + * @param gridName Grid name, + * @param cancel Cancel flag. + * @return {@code True} if instance was found and stopped. + */ + public static synchronized boolean stop(@Nullable String gridName, boolean cancel) { + if (Ignition.stop(gridName, cancel)) { + PlatformProcessor old = instances.remove(gridName); + + assert old != null; + + return true; + } + else + return false; + } + + /** + * Stop all instances. + * + * @param cancel Cancel flag. + */ + public static synchronized void stopAll(boolean cancel) { + for (PlatformProcessor proc : instances.values()) + Ignition.stop(proc.ignite().name(), cancel); + + instances.clear(); + } + + /** + * Create configuration. + * + * @param springCfgPath Path to Spring XML. + * @return Configuration. + */ + private static IgniteConfiguration configuration(@Nullable String springCfgPath) { + try { + URL url = springCfgPath == null ? U.resolveIgniteUrl(IgnitionEx.DFLT_CFG) : + U.resolveSpringUrl(springCfgPath); + + IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> t = IgnitionEx.loadConfiguration(url); + + return t.get1(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to instantiate configuration from Spring XML: " + springCfgPath, e); + } + } + + /** + * Create bootstrap for the given factory ID. + * + * @param factoryId Factory ID. + * @return Bootstrap. + */ + private static PlatformBootstrap bootstrap(final int factoryId) { + PlatformBootstrapFactory factory = AccessController.doPrivileged( + new PrivilegedAction<PlatformBootstrapFactory>() { + @Override public PlatformBootstrapFactory run() { + for (PlatformBootstrapFactory factory : ServiceLoader.load(PlatformBootstrapFactory.class)) { + if (factory.id() == factoryId) + return factory; + } + + return null; + } + }); + + if (factory == null) + throw new IgniteException("Interop factory is not found (did you put into the classpath?): " + factoryId); + + return factory.create(); + } + + /** + * Private constructor. + */ + private PlatformIgnition() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoCallbackException.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoCallbackException.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoCallbackException.java new file mode 100644 index 0000000..ad61719 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoCallbackException.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * Exception raised when interop callback is not set in native platform. + */ +@SuppressWarnings("UnusedDeclaration") +public class PlatformNoCallbackException extends PlatformException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Constructor. + */ + public PlatformNoCallbackException() { + // No-op. + } + + /** + * Constructor. + * + * @param msg Message. + */ + public PlatformNoCallbackException(String msg) { + super(msg); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PlatformNoCallbackException.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java new file mode 100644 index 0000000..a8e7879 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java @@ -0,0 +1,869 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.callback; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.*; + +/** + * Gateway to all platform-dependent callbacks. Implementers might extend this class and provide additional callbacks. + */ +@SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"}) +public class PlatformCallbackGateway { + /** Environment pointer. */ + protected final long envPtr; + + /** Lock. */ + private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock(); + + /** + * Native gateway. + * + * @param envPtr Environment pointer. + */ + public PlatformCallbackGateway(long envPtr) { + this.envPtr = envPtr; + } + + /** + * Get environment pointer. + * + * @return Environment pointer. + */ + public long environmentPointer() { + return envPtr; + } + + /** + * Create cache store. + * + * @param memPtr Memory pointer. + * @return Pointer. + */ + public long cacheStoreCreate(long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.cacheStoreCreate(envPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * @param objPtr Object pointer. + * @param memPtr Memory pointer. + * @param cb Callback. + * @return Result. + */ + public int cacheStoreInvoke(long objPtr, long memPtr, Object cb) { + enter(); + + try { + return PlatformCallbackUtils.cacheStoreInvoke(envPtr, objPtr, memPtr, cb); + } + finally { + leave(); + } + } + + /** + * @param objPtr Object pointer. + */ + public void cacheStoreDestroy(long objPtr) { + enter(); + + try { + PlatformCallbackUtils.cacheStoreDestroy(envPtr, objPtr); + } + finally { + leave(); + } + } + + /** + * Creates cache store session. + * + * @param storePtr Store instance pointer. + * @return Session instance pointer. + */ + public long cacheStoreSessionCreate(long storePtr) { + enter(); + + try { + return PlatformCallbackUtils.cacheStoreSessionCreate(envPtr, storePtr); + } + finally { + leave(); + } + } + + /** + * Creates cache entry filter and returns a pointer. + * + * @param memPtr Memory pointer. + * @return Pointer. + */ + public long cacheEntryFilterCreate(long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.cacheEntryFilterCreate(envPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * @param ptr Pointer. + * @param memPtr Memory pointer. + * @return Result. + */ + public int cacheEntryFilterApply(long ptr, long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.cacheEntryFilterApply(envPtr, ptr, memPtr); + } + finally { + leave(); + } + } + + /** + * @param ptr Pointer. + */ + public void cacheEntryFilterDestroy(long ptr) { + enter(); + + try { + PlatformCallbackUtils.cacheEntryFilterDestroy(envPtr, ptr); + } + finally { + leave(); + } + } + + /** + * Invoke cache entry processor. + * + * @param outMemPtr Output memory pointer. + * @param inMemPtr Input memory pointer. + */ + public void cacheInvoke(long outMemPtr, long inMemPtr) { + enter(); + + try { + PlatformCallbackUtils.cacheInvoke(envPtr, outMemPtr, inMemPtr); + } + finally { + leave(); + } + } + + /** + * Perform native task map. Do not throw exceptions, serializing them to the output stream instead. + * + * @param taskPtr Task pointer. + * @param outMemPtr Output memory pointer (exists if topology changed, otherwise {@code 0}). + * @param inMemPtr Input memory pointer. + */ + public void computeTaskMap(long taskPtr, long outMemPtr, long inMemPtr) { + enter(); + + try { + PlatformCallbackUtils.computeTaskMap(envPtr, taskPtr, outMemPtr, inMemPtr); + } + finally { + leave(); + } + } + + /** + * Perform native task job result notification. + * + * @param taskPtr Task pointer. + * @param jobPtr Job pointer. + * @param memPtr Memory pointer (always zero for local job execution). + * @return Job result enum ordinal. + */ + public int computeTaskJobResult(long taskPtr, long jobPtr, long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.computeTaskJobResult(envPtr, taskPtr, jobPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * Perform native task reduce. + * + * @param taskPtr Task pointer. + */ + public void computeTaskReduce(long taskPtr) { + enter(); + + try { + PlatformCallbackUtils.computeTaskReduce(envPtr, taskPtr); + } + finally { + leave(); + } + } + + /** + * Complete task with native error. + * + * @param taskPtr Task pointer. + * @param memPtr Memory pointer with exception data or {@code 0} in case of success. + */ + public void computeTaskComplete(long taskPtr, long memPtr) { + enter(); + + try { + PlatformCallbackUtils.computeTaskComplete(envPtr, taskPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * Serialize native job. + * + * @param jobPtr Job pointer. + * @param memPtr Memory pointer. + * @return {@code True} if serialization succeeded. + */ + public int computeJobSerialize(long jobPtr, long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.computeJobSerialize(envPtr, jobPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * Create job in native platform. + * + * @param memPtr Memory pointer. + * @return Pointer to job. + */ + public long computeJobCreate(long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.computeJobCreate(envPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * Execute native job on a node other than where it was created. + * + * @param jobPtr Job pointer. + * @param cancel Cancel flag. + * @param memPtr Memory pointer to write result to for remote job execution or {@code 0} for local job execution. + */ + public void computeJobExecute(long jobPtr, int cancel, long memPtr) { + enter(); + + try { + PlatformCallbackUtils.computeJobExecute(envPtr, jobPtr, cancel, memPtr); + } + finally { + leave(); + } + } + + /** + * Cancel the job. + * + * @param jobPtr Job pointer. + */ + public void computeJobCancel(long jobPtr) { + enter(); + + try { + PlatformCallbackUtils.computeJobCancel(envPtr, jobPtr); + } + finally { + leave(); + } + } + + /** + * Destroy the job. + * + * @param ptr Pointer. + */ + public void computeJobDestroy(long ptr) { + enter(); + + try { + PlatformCallbackUtils.computeJobDestroy(envPtr, ptr); + } + finally { + leave(); + } + } + + /** + * Invoke local callback. + * + * @param cbPtr Callback pointer. + * @param memPtr Memory pointer. + */ + public void continuousQueryListenerApply(long cbPtr, long memPtr) { + enter(); + + try { + PlatformCallbackUtils.continuousQueryListenerApply(envPtr, cbPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * Create filter in native platform. + * + * @param memPtr Memory pointer. + * @return Pointer to created filter. + */ + public long continuousQueryFilterCreate(long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.continuousQueryFilterCreate(envPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * Invoke remote filter. + * + * @param filterPtr Filter pointer. + * @param memPtr Memory pointer. + * @return Result. + */ + public int continuousQueryFilterApply(long filterPtr, long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.continuousQueryFilterApply(envPtr, filterPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * Release remote filter. + * + * @param filterPtr Filter pointer. + */ + public void continuousQueryFilterRelease(long filterPtr) { + enter(); + + try { + PlatformCallbackUtils.continuousQueryFilterRelease(envPtr, filterPtr); + } + finally { + leave(); + } + } + + /** + * Notify native data streamer about topology update. + * + * @param ptr Data streamer native pointer. + * @param topVer Topology version. + * @param topSize Topology size. + */ + public void dataStreamerTopologyUpdate(long ptr, long topVer, int topSize) { + enter(); + + try { + PlatformCallbackUtils.dataStreamerTopologyUpdate(envPtr, ptr, topVer, topSize); + } + finally { + leave(); + } + } + + /** + * Invoke stream receiver. + * + * @param ptr Receiver native pointer. + * @param cache Cache object. + * @param memPtr Stream pointer. + * @param keepPortable Portable flag. + */ + public void dataStreamerStreamReceiverInvoke(long ptr, Object cache, long memPtr, boolean keepPortable) { + enter(); + + try { + PlatformCallbackUtils.dataStreamerStreamReceiverInvoke(envPtr, ptr, cache, memPtr, keepPortable); + } + finally { + leave(); + } + } + + /** + * Notify future with byte result. + * + * @param futPtr Future pointer. + * @param res Result. + */ + public void futureByteResult(long futPtr, int res) { + enter(); + + try { + PlatformCallbackUtils.futureByteResult(envPtr, futPtr, res); + } + finally { + leave(); + } + } + + /** + * Notify future with boolean result. + * + * @param futPtr Future pointer. + * @param res Result. + */ + public void futureBoolResult(long futPtr, int res) { + enter(); + + try { + PlatformCallbackUtils.futureBoolResult(envPtr, futPtr, res); + } + finally { + leave(); + } + } + + /** + * Notify future with short result. + * + * @param futPtr Future pointer. + * @param res Result. + */ + public void futureShortResult(long futPtr, int res) { + enter(); + + try { + PlatformCallbackUtils.futureShortResult(envPtr, futPtr, res); + } + finally { + leave(); + } + } + + /** + * Notify future with byte result. + * + * @param futPtr Future pointer. + * @param res Result. + */ + public void futureCharResult(long futPtr, int res) { + enter(); + + try { + PlatformCallbackUtils.futureCharResult(envPtr, futPtr, res); + } + finally { + leave(); + } + } + + /** + * Notify future with int result. + * + * @param futPtr Future pointer. + * @param res Result. + */ + public void futureIntResult(long futPtr, int res) { + enter(); + + try { + PlatformCallbackUtils.futureIntResult(envPtr, futPtr, res); + } + finally { + leave(); + } + } + + /** + * Notify future with float result. + * + * @param futPtr Future pointer. + * @param res Result. + */ + public void futureFloatResult(long futPtr, float res) { + enter(); + + try { + PlatformCallbackUtils.futureFloatResult(envPtr, futPtr, res); + } + finally { + leave(); + } + } + + /** + * Notify future with long result. + * + * @param futPtr Future pointer. + * @param res Result. + */ + public void futureLongResult(long futPtr, long res) { + enter(); + + try { + PlatformCallbackUtils.futureLongResult(envPtr, futPtr, res); + } + finally { + leave(); + } + } + + /** + * Notify future with double result. + * + * @param futPtr Future pointer. + * @param res Result. + */ + public void futureDoubleResult(long futPtr, double res) { + enter(); + + try { + PlatformCallbackUtils.futureDoubleResult(envPtr, futPtr, res); + } + finally { + leave(); + } + } + + /** + * Notify future with object result. + * + * @param futPtr Future pointer. + * @param memPtr Memory pointer. + */ + public void futureObjectResult(long futPtr, long memPtr) { + enter(); + + try { + PlatformCallbackUtils.futureObjectResult(envPtr, futPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * Notify future with null result. + * + * @param futPtr Future pointer. + */ + public void futureNullResult(long futPtr) { + enter(); + + try { + PlatformCallbackUtils.futureNullResult(envPtr, futPtr); + } + finally { + leave(); + } + } + + /** + * Notify future with error. + * + * @param futPtr Future pointer. + * @param memPtr Pointer to memory with error information. + */ + public void futureError(long futPtr, long memPtr) { + enter(); + + try { + PlatformCallbackUtils.futureError(envPtr, futPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * Creates message filter and returns a pointer. + * + * @param memPtr Memory pointer. + * @return Pointer. + */ + public long messagingFilterCreate(long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.messagingFilterCreate(envPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * @param ptr Pointer. + * @param memPtr Memory pointer. + * @return Result. + */ + public int messagingFilterApply(long ptr, long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.messagingFilterApply(envPtr, ptr, memPtr); + } + finally { + leave(); + }} + + /** + * @param ptr Pointer. + */ + public void messagingFilterDestroy(long ptr) { + enter(); + + try { + PlatformCallbackUtils.messagingFilterDestroy(envPtr, ptr); + } + finally { + leave(); + } + } + + /** + * Creates event filter and returns a pointer. + * + * @param memPtr Memory pointer. + * @return Pointer. + */ + public long eventFilterCreate(long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.eventFilterCreate(envPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * @param ptr Pointer. + * @param memPtr Memory pointer. + * @return Result. + */ + public int eventFilterApply(long ptr, long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.eventFilterApply(envPtr, ptr, memPtr); + } + finally { + leave(); + } + } + + /** + * @param ptr Pointer. + */ + public void eventFilterDestroy(long ptr) { + enter(); + + try { + PlatformCallbackUtils.eventFilterDestroy(envPtr, ptr); + } + finally { + leave(); + } + } + + /** + * Sends node info to native target. + * + * @param memPtr Ptr to a stream with serialized node. + */ + public void nodeInfo(long memPtr) { + enter(); + + try { + PlatformCallbackUtils.nodeInfo(envPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * Kernal start callback. + * + * @param memPtr Memory pointer. + */ + public void onStart(long memPtr) { + enter(); + + try { + PlatformCallbackUtils.onStart(envPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * Lifecycle event callback. + * + * @param ptr Holder pointer. + * @param evt Event. + */ + public void lifecycleEvent(long ptr, int evt) { + enter(); + + try { + PlatformCallbackUtils.lifecycleEvent(envPtr, ptr, evt); + } + finally { + leave(); + } + } + + /** + * Re-allocate external memory chunk. + * + * @param memPtr Cross-platform pointer. + * @param cap Capacity. + */ + public void memoryReallocate(long memPtr, int cap) { + enter(); + + try { + PlatformCallbackUtils.memoryReallocate(envPtr, memPtr, cap); + } + finally { + leave(); + } + } + + /** + * Initializes native service. + * + * @param memPtr Pointer. + * @throws org.apache.ignite.IgniteCheckedException In case of error. + */ + public long serviceInit(long memPtr) throws IgniteCheckedException { + return PlatformCallbackUtils.serviceInit(envPtr, memPtr); + } + + /** + * Executes native service. + * + * @param svcPtr Pointer to the service in the native platform. + * @param memPtr Stream pointer. + * @throws org.apache.ignite.IgniteCheckedException In case of error. + */ + public void serviceExecute(long svcPtr, long memPtr) throws IgniteCheckedException { + PlatformCallbackUtils.serviceExecute(envPtr, svcPtr, memPtr); + } + + /** + * Cancels native service. + * + * @param svcPtr Pointer to the service in the native platform. + * @param memPtr Stream pointer. + * @throws org.apache.ignite.IgniteCheckedException In case of error. + */ + public void serviceCancel(long svcPtr, long memPtr) throws IgniteCheckedException { + PlatformCallbackUtils.serviceCancel(envPtr, svcPtr, memPtr); + } + + /** + * Invokes service method. + * + * @param svcPtr Pointer to the service in the native platform. + * @param outMemPtr Output memory pointer. + * @param inMemPtr Input memory pointer. + * @throws org.apache.ignite.IgniteCheckedException In case of error. + */ + public void serviceInvokeMethod(long svcPtr, long outMemPtr, long inMemPtr) throws IgniteCheckedException { + PlatformCallbackUtils.serviceInvokeMethod(envPtr, svcPtr, outMemPtr, inMemPtr); + } + + /** + * Invokes cluster node filter. + * + * @param memPtr Stream pointer. + */ + public int clusterNodeFilterApply(long memPtr) { + return PlatformCallbackUtils.clusterNodeFilterApply(envPtr, memPtr); + } + + /** + * Kernal stop callback. + */ + public void onStop() { + block(); + + PlatformCallbackUtils.onStop(envPtr); + } + + /** + * Enter gateway. + */ + protected void enter() { + if (!lock.tryReadLock()) + throw new IgniteException("Failed to execute native callback because grid is stopping."); + } + + /** + * Leave gateway. + */ + protected void leave() { + lock.readUnlock(); + } + + /** + * Block gateway. + */ + protected void block() { + lock.writeLock(); + } +}