abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/866
Change subject: Add Unit Tests for Feed Runtime Input Handler
......................................................................
Add Unit Tests for Feed Runtime Input Handler
Change-Id: I7088f489a7d53dee8cf6cdbf5baa7cd8d3884f55
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
R
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
A
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
M hyracks-fullstack/hyracks/hyracks-api/pom.xml
A
hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java
A
hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java
A
hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java
A
hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
A
hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
A
hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
15 files changed, 1,768 insertions(+), 110 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/66/866/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 3920a03..63c1f01 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -48,8 +48,9 @@
public class FeedRuntimeInputHandler extends
AbstractUnaryInputUnaryOutputOperatorNodePushable {
private static final Logger LOGGER =
Logger.getLogger(FeedRuntimeInputHandler.class.getName());
- private static final ByteBuffer POISON_PILL = ByteBuffer.allocate(0);
private static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8;
+ private static final boolean DEBUG = false;
+ private final Object mutex = new Object();
private final FeedExceptionHandler exceptionHandler;
private final FrameSpiller spiller;
private final FeedPolicyAccessor fpa;
@@ -58,15 +59,16 @@
private final FrameTransporter consumer;
private final Thread consumerThread;
private final LinkedBlockingDeque<ByteBuffer> inbox;
- private final ConcurrentFramePool memoryManager;
+ private final ConcurrentFramePool framePool;
private Mode mode = Mode.PROCESS;
+ private int total = 0;
private int numDiscarded = 0;
private int numSpilled = 0;
private int numProcessedInMemory = 0;
private int numStalled = 0;
public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId
connectionId, FeedRuntimeId runtimeId,
- IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor
fta, ConcurrentFramePool feedMemoryManager)
+ IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor
fta, ConcurrentFramePool framePool)
throws HyracksDataException {
this.writer = writer;
this.spiller =
@@ -76,13 +78,17 @@
fpa.getMaxSpillOnDisk());
this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
this.fpa = fpa;
- this.memoryManager = feedMemoryManager;
+ this.framePool = framePool;
this.inbox = new LinkedBlockingDeque<>();
this.consumer = new FrameTransporter();
- this.consumerThread = new Thread();
+ this.consumerThread = new Thread(consumer);
this.consumerThread.start();
this.initialFrameSize = ctx.getInitialFrameSize();
- this.frameAction = new FrameAction(inbox);
+ this.frameAction = new FrameAction();
+ }
+
+ public LinkedBlockingDeque<ByteBuffer> getInternalBuffer() {
+ return inbox;
}
@Override
@@ -101,15 +107,20 @@
@Override
public void close() throws HyracksDataException {
- inbox.add(POISON_PILL);
- notify();
+ consumer.die();
+ synchronized (mutex) {
+ if (DEBUG) {
+ System.err.println("Producer is waking up consumer");
+ }
+ mutex.notify();
+ }
try {
consumerThread.join();
} catch (InterruptedException e) {
LOGGER.log(Level.WARNING, e.getMessage(), e);
}
try {
- memoryManager.release(inbox);
+ framePool.release(inbox);
} catch (Throwable th) {
LOGGER.log(Level.WARNING, th.getMessage(), th);
}
@@ -124,8 +135,12 @@
@Override
public void nextFrame(ByteBuffer frame) throws HyracksDataException {
try {
+ total++;
if (consumer.cause() != null) {
throw consumer.cause();
+ }
+ if (DEBUG) {
+ System.err.println("nextFrame() called. inputHandler is in
mode: " + mode.toString());
}
switch (mode) {
case PROCESS:
@@ -148,25 +163,42 @@
}
}
+ // For unit testing purposes
+ public int framesOnDisk() {
+ return spiller.remaining();
+ }
+
private ByteBuffer getFreeBuffer(int frameSize) throws
HyracksDataException {
int numFrames = frameSize / initialFrameSize;
if (numFrames == 1) {
- return memoryManager.get();
+ return framePool.get();
} else {
- return memoryManager.get(frameSize);
+ return framePool.get(frameSize);
}
}
private void discard(ByteBuffer frame) throws HyracksDataException {
+ if (DEBUG) {
+ System.err.println("starting discard(frame)");
+ }
if (fpa.spillToDiskOnCongestion()) {
+ if (DEBUG) {
+ System.err.println("Spilling to disk is enabled. Will try
that");
+ }
if (spiller.spill(frame)) {
numSpilled++;
mode = Mode.SPILL;
return;
}
} else {
+ if (DEBUG) {
+ System.err.println("Spilling to disk is disabled. Will try to
get a buffer");
+ }
ByteBuffer next = getFreeBuffer(frame.capacity());
if (next != null) {
+ if (DEBUG) {
+ System.err.println("Was able to get a buffer");
+ }
numProcessedInMemory++;
next.put(frame);
inbox.offer(next);
@@ -174,54 +206,93 @@
return;
}
}
- numDiscarded++;
+ if (((numDiscarded + 1.0) / total) > fpa.getMaxFractionDiscard()) {
+ if (DEBUG) {
+ System.err.println("in discard(frame). Discard allowance has
been consumed. --> Stalling");
+ }
+ stall(frame);
+ } else {
+ if (DEBUG) {
+ System.err.println("in discard(frame). So far, I have
discarded " + numDiscarded);
+ }
+ numDiscarded++;
+ }
}
- private synchronized void exitProcessState(ByteBuffer frame) throws
HyracksDataException {
+ private void exitProcessState(ByteBuffer frame) throws
HyracksDataException {
if (fpa.spillToDiskOnCongestion()) {
mode = Mode.SPILL;
spiller.open();
spill(frame);
} else {
+ if (DEBUG) {
+ System.err.println("Spilling is disabled -->
discardOrStall(frame)");
+ }
discardOrStall(frame);
}
}
private void discardOrStall(ByteBuffer frame) throws HyracksDataException {
if (fpa.discardOnCongestion()) {
- numDiscarded++;
mode = Mode.DISCARD;
discard(frame);
} else {
+ if (DEBUG) {
+ System.err.println("Discard is disabled --> stall(frame)");
+ }
stall(frame);
}
}
private void stall(ByteBuffer frame) throws HyracksDataException {
try {
+ if (DEBUG) {
+ System.err.println("in stall(frame). So far, I have stalled "
+ numStalled);
+ }
numStalled++;
// If spilling is enabled, we wait on the spiller
if (fpa.spillToDiskOnCongestion()) {
+ if (DEBUG) {
+ System.err.println("in stall(frame). Spilling is enabled
so we will attempt to spill");
+ }
synchronized (spiller) {
while (spiller.usedBudget() >
MAX_SPILL_USED_BEFORE_RESUME) {
+ if (DEBUG) {
+ System.err.println(
+ "in stall(frame). Spilling has been
consumed. We will wait for it to be less than "
+ + MAX_SPILL_USED_BEFORE_RESUME + "
consumed. Current consumption = "
+ + spiller.usedBudget());
+ }
spiller.wait();
}
}
spiller.spill(frame);
- synchronized (this) {
- notify();
+ numSpilled++;
+ synchronized (mutex) {
+ if (DEBUG) {
+ System.err.println("Producer is waking up consumer");
+ }
+ mutex.notify();
}
return;
}
+ if (DEBUG) {
+ System.err.println("in stall(frame). Spilling is disabled. We
will subscribe to frame pool");
+ }
// Spilling is disabled, we subscribe to feedMemoryManager
frameAction.setFrame(frame);
- synchronized (frameAction) {
- if (memoryManager.subscribe(frameAction)) {
- frameAction.wait();
- }
+ framePool.subscribe(frameAction);
+ ByteBuffer temp = frameAction.getAllocated();
+ inbox.put(temp);
+ numProcessedInMemory++;
+ if (DEBUG) {
+ System.err.println("stall(frame) has been completed. Notifying
the consumer that a frame is ready");
}
- synchronized (this) {
- notify();
+ synchronized (mutex) {
+ if (DEBUG) {
+ System.err.println("Producer is waking up consumer");
+ }
+ mutex.notify();
}
} catch (InterruptedException e) {
throw new HyracksDataException(e);
@@ -229,47 +300,81 @@
}
private void process(ByteBuffer frame) throws HyracksDataException {
- // Get a page from
- ByteBuffer next = getFreeBuffer(frame.capacity());
+ // Get a page from frame pool
+ ByteBuffer next = null;
+ if (frame.capacity() <= framePool.getMaxFrameSize()) {
+ next = getFreeBuffer(frame.capacity());
+ }
if (next != null) {
numProcessedInMemory++;
next.put(frame);
- inbox.offer(next);
- if (inbox.size() == 1) {
- synchronized (this) {
- notify();
+ try {
+ inbox.put(next);
+ if (inbox.size() == 1) {
+ synchronized (mutex) {
+ if (DEBUG) {
+ System.err.println("Producer is waking up
consumer");
+ }
+ mutex.notify();
+ }
}
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
}
} else {
- // out of memory. we switch to next mode as per policy --
synchronized method
+ if (DEBUG) {
+ System.err.println("Couldn't allocate memory -->
exitProcessState(frame)");
+ }
+ // out of memory. we switch to next mode as per policy
exitProcessState(frame);
}
}
private void spill(ByteBuffer frame) throws HyracksDataException {
if (spiller.switchToMemory()) {
- synchronized (this) {
+ synchronized (mutex) {
// Check if there is memory
- ByteBuffer next = getFreeBuffer(frame.capacity());
+ ByteBuffer next = null;
+ if (frame.capacity() <= framePool.getMaxFrameSize()) {
+ next = getFreeBuffer(frame.capacity());
+ }
if (next != null) {
spiller.close();
numProcessedInMemory++;
next.put(frame);
inbox.offer(next);
+ if (inbox.size() == 1) {
+ if (DEBUG) {
+ System.err.println("Producer is waking up consumer
(size of inbox = 1)");
+ }
+ mutex.notify();
+ }
mode = Mode.PROCESS;
} else {
- // spill. This will always succeed since spilled = 0 (must
verify that budget can't be 0)
+ // spill. This will always succeed since spilled = 0 (TODO
must verify that budget can't be 0)
spiller.spill(frame);
numSpilled++;
- notify();
+ if (DEBUG) {
+ System.err.println("Producer is waking up consumer");
+ }
+ mutex.notify();
}
}
} else {
// try to spill. If failed switch to either discard or stall
if (spiller.spill(frame)) {
+ if (spiller.remaining() == 1) {
+ synchronized (mutex) {
+ if (DEBUG) {
+ System.err.println("Producer is waking up
consumer");
+ }
+ mutex.notify();
+ }
+ }
numSpilled++;
} else {
if (fpa.discardOnCongestion()) {
+ mode = Mode.DISCARD;
discard(frame);
} else {
stall(frame);
@@ -282,7 +387,7 @@
return mode;
}
- public synchronized void setMode(Mode mode) {
+ public void setMode(Mode mode) {
this.mode = mode;
}
@@ -311,15 +416,22 @@
private class FrameTransporter implements Runnable {
private volatile Throwable cause;
+ private int consumed = 0;
+ private boolean done = false;
public Throwable cause() {
return cause;
+ }
+
+ public void die() {
+ done = true;
}
private Throwable consume(ByteBuffer frame) {
while (frame != null) {
try {
writer.nextFrame(frame);
+ consumed++;
frame = null;
} catch (HyracksDataException e) {
// It is fine to catch throwable here since this thread is
always expected to terminate gracefully
@@ -340,7 +452,7 @@
public void run() {
try {
ByteBuffer frame = inbox.poll();
- while (frame != POISON_PILL) {
+ while (true) {
if (frame != null) {
try {
if (consume(frame) != null) {
@@ -348,7 +460,7 @@
}
} finally {
// Done with frame.
- memoryManager.release(frame);
+ framePool.release(frame);
}
}
frame = inbox.poll();
@@ -366,13 +478,22 @@
writer.flush();
// At this point. We consumed all memory and spilled
// We can't assume the next will be in memory. what if
there is 0 memory?
- synchronized (FeedRuntimeInputHandler.this) {
+ synchronized (mutex) {
frame = inbox.poll();
if (frame == null) {
// Nothing in memory
if (spiller.switchToMemory()) {
+ if (done) {
+ break;
+ }
+ if (DEBUG) {
+ System.err.println("Consumer is going
to sleep");
+ }
// Nothing in disk
- FeedRuntimeInputHandler.this.wait();
+ mutex.wait();
+ if (DEBUG) {
+ System.err.println("Consumer is waking
up");
+ }
}
}
}
@@ -383,5 +504,14 @@
}
// cleanup will always be done through the close() call
}
+
+ @Override
+ public String toString() {
+ return "consumed: " + consumed;
+ }
+ }
+
+ public int getTotal() {
+ return total;
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
index 4a2120a..f2ffeea 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
@@ -19,25 +19,46 @@
package org.apache.asterix.external.feed.dataflow;
import java.nio.ByteBuffer;
-import java.util.concurrent.LinkedBlockingDeque;
-import rx.functions.Action1;
-
-public class FrameAction implements Action1<ByteBuffer> {
- private final LinkedBlockingDeque<ByteBuffer> inbox;
+public class FrameAction {
+ private ByteBuffer allocated;
private ByteBuffer frame;
+ private static final boolean DEBUG = true;
- public FrameAction(LinkedBlockingDeque<ByteBuffer> inbox) {
- this.inbox = inbox;
+ public FrameAction() {
}
- @Override
public void call(ByteBuffer freeFrame) {
- freeFrame.put(frame);
- inbox.add(freeFrame);
- synchronized (this) {
- notify();
+ if (DEBUG) {
+ System.err.println("FrameAction: My subscription is being
answered");
}
+ freeFrame.clear();
+ freeFrame.put(frame);
+ synchronized (this) {
+ allocated = freeFrame;
+ if (DEBUG) {
+ System.err.println("FrameAction: Waking up waiting threads");
+ }
+ notifyAll();
+ }
+ }
+
+ public synchronized ByteBuffer getAllocated() throws InterruptedException {
+ if (DEBUG) {
+ System.err.println("FrameAction: Attempting to get allocated
buffer");
+ }
+ while (allocated == null) {
+ if (DEBUG) {
+ System.err.println("FrameAction: Allocated buffer is not ready
yet. I will wait for it");
+ }
+ wait();
+ if (DEBUG) {
+ System.err.println("FrameAction: Awoken Up");
+ }
+ }
+ ByteBuffer temp = allocated;
+ allocated = null;
+ return temp;
}
public ByteBuffer getFrame() {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
index f0d226a..f3f1664 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
@@ -44,6 +44,7 @@
public class FrameSpiller {
private static final Logger LOGGER =
Logger.getLogger(FrameSpiller.class.getName());
private static final int FRAMES_PER_FILE = 1024;
+ public static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8;
private final String fileNamePrefix;
private final ArrayDeque<File> files = new ArrayDeque<>();
@@ -126,6 +127,10 @@
return frame.getBuffer();
} catch (Exception e) {
throw new HyracksDataException(e);
+ } finally {
+ synchronized (this) {
+ notify();
+ }
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
index 25aa86a..5a3e786 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
@@ -28,10 +28,15 @@
import org.apache.asterix.external.feed.dataflow.FrameAction;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
public class ConcurrentFramePool {
+ private static final boolean DEBUG = false;
private static final String ERROR_INVALID_FRAME_SIZE =
"The size should be an integral multiple of the default frame
size";
+ private static final String ERROR_LARGER_THAN_BUDGET_REQUEST =
+ "The requested frame size must not be greater than the allocated
budget";
+ private static final Logger LOGGER =
Logger.getLogger(ConcurrentFramePool.class.getName());
private final String nodeId;
private final int budget;
private final int defaultFrameSize;
@@ -49,10 +54,30 @@
this.largeFramesPools = new HashMap<>();
}
+ public int getMaxFrameSize() {
+ return budget * defaultFrameSize;
+ }
+
public synchronized ByteBuffer get() {
+ // Subscribers have higher priority
+ if (subscribers.isEmpty()) {
+ return getIgnoreSubscribers();
+ }
+ if (DEBUG) {
+ System.err.println("Unable to allocate buffer since subscribers
are in-line. Number of subscribers = "
+ + subscribers.size());
+ }
+ return null;
+ }
+
+ private ByteBuffer getIgnoreSubscribers() {
if (handedOut < budget) {
handedOut++;
return allocate();
+ }
+ if (DEBUG) {
+ System.err.println("Unable to allocate buffer without exceeding
budget. Remaining = " + remaining()
+ + ", Requested = 1");
}
return null;
}
@@ -61,11 +86,15 @@
return budget - handedOut;
}
- public synchronized ByteBuffer get(int bufferSize) throws
HyracksDataException {
+ private ByteBuffer getIgnoreSubscribers(int bufferSize) throws
HyracksDataException {
+ // Subscribers have higher priority
if (bufferSize % defaultFrameSize != 0) {
throw new HyracksDataException(ERROR_INVALID_FRAME_SIZE);
}
int multiplier = bufferSize / defaultFrameSize;
+ if (multiplier > budget) {
+ throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
+ }
if (handedOut + multiplier <= budget) {
handedOut += multiplier;
ArrayDeque<ByteBuffer> largeFramesPool =
largeFramesPools.get(multiplier);
@@ -79,6 +108,21 @@
return largeFramesPool.poll();
}
// Not enough budget
+ if (DEBUG) {
+ System.err.println("Unable to allocate buffer without exceeding
budget. Remaining = " + remaining()
+ + ", Requested = " + multiplier);
+ }
+ return null;
+ }
+
+ public synchronized ByteBuffer get(int bufferSize) throws
HyracksDataException {
+ if (subscribers.isEmpty()) {
+ return getIgnoreSubscribers(bufferSize);
+ }
+ if (DEBUG) {
+ System.err.println("Unable to allocate buffer since subscribers
are in-line. Number of subscribers = "
+ + subscribers.size());
+ }
return null;
}
@@ -150,6 +194,9 @@
public synchronized void release(ByteBuffer buffer) throws
HyracksDataException {
int multiples = buffer.capacity() / defaultFrameSize;
handedOut -= multiples;
+ if (DEBUG) {
+ System.err.println("Releasing " + multiples + " frames. Remaining
frames = " + remaining());
+ }
if (multiples == 1) {
pool.add(buffer);
} else {
@@ -165,19 +212,40 @@
FrameAction frameAction = subscribers.peek();
// check if we have enough and answer immediately.
if (frameAction.getSize() == defaultFrameSize) {
- buffer = get();
+ if (DEBUG) {
+ System.err.println("Attempting to callback a subscriber
that requested 1 frame");
+ }
+ buffer = getIgnoreSubscribers();
} else {
- buffer = get(frameAction.getSize());
+ if (DEBUG) {
+ System.err.println("Attempting to callback a subscriber
that requested "
+ + frameAction.getSize() / defaultFrameSize + "
frame");
+ }
+ buffer = getIgnoreSubscribers(frameAction.getSize());
}
if (buffer != null) {
try {
frameAction.call(buffer);
+ } catch (Exception e) {
+ LOGGER.error("Error while attempting to answer a
subscription. Buffer will be reclaimed", e);
+ // TODO(amoudi): Add test cases and get rid of recursion
+ release(buffer);
} finally {
subscribers.remove();
+ if (DEBUG) {
+ System.err.println(
+ "A subscription has been satisfied. " +
subscribers.size() + " remaining subscribers");
+ }
}
} else {
+ if (DEBUG) {
+ System.err.println("Failed to allocate requested frames");
+ }
break;
}
+ }
+ if (DEBUG) {
+ System.err.println(subscribers.size() + " remaining subscribers");
}
}
@@ -187,18 +255,30 @@
ByteBuffer buffer;
// check if we have enough and answer immediately.
if (frameAction.getSize() == defaultFrameSize) {
- buffer = get();
+ buffer = getIgnoreSubscribers();
} else {
- buffer = get(frameAction.getSize());
+ buffer = getIgnoreSubscribers(frameAction.getSize());
}
if (buffer != null) {
frameAction.call(buffer);
// There is no need to subscribe. perform action and return
false
return false;
}
+ } else {
+ int multiplier = frameAction.getSize() / defaultFrameSize;
+ if (multiplier > budget) {
+ throw new
HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
+ }
}
// none of the above, add to subscribers and return true
subscribers.add(frameAction);
return true;
}
+
+ /*
+ * For unit testing purposes
+ */
+ public Collection<FrameAction> getSubscribers() {
+ return subscribers;
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
index f888542..18d4cff 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
@@ -35,11 +35,11 @@
private final String targetId;
private final int hashCode;
- public FeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int
partition, String operandId) {
+ public FeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int
partition, String targetId) {
this.feedId = feedId;
this.runtimeType = runtimeType;
this.partition = partition;
- this.targetId = operandId;
+ this.targetId = targetId;
this.hashCode = toString().hashCode();
}
diff --git
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
similarity index 76%
rename from
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
rename to
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
index 8a6d1b6..74863a2 100644
---
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
+++
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
@@ -22,8 +22,10 @@
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Random;
+import java.util.concurrent.LinkedBlockingDeque;
import org.apache.asterix.common.config.AsterixFeedProperties;
+import org.apache.asterix.external.feed.dataflow.FrameAction;
import org.apache.asterix.external.feed.management.ConcurrentFramePool;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.junit.Assert;
@@ -36,7 +38,7 @@
import junit.framework.TestSuite;
@RunWith(PowerMockRunner.class)
-public class FeedMemoryManagerUnitTest extends TestCase {
+public class ConcurrentFramePoolUnitTest extends TestCase {
private static final int DEFAULT_FRAME_SIZE = 32768;
private static final int NUM_FRAMES = 2048;
@@ -45,7 +47,7 @@
private static final int MAX_SIZE = 52;
private static final double RELEASE_PROBABILITY = 0.20;
- public FeedMemoryManagerUnitTest(String testName) {
+ public ConcurrentFramePoolUnitTest(String testName) {
super(testName);
}
@@ -53,7 +55,7 @@
* @return the suite of tests being tested
*/
public static Test suite() {
- return new TestSuite(FeedMemoryManagerUnitTest.class);
+ return new TestSuite(ConcurrentFramePoolUnitTest.class);
}
@org.junit.Test
@@ -318,6 +320,117 @@
}
}
+ @org.junit.Test
+ public void testFixedSizeSubscribtion() {
+ try {
+ AsterixFeedProperties afp =
Mockito.mock(AsterixFeedProperties.class);
+
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+ int i = 0;
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ LinkedBlockingDeque<ByteBuffer> buffers = new
LinkedBlockingDeque<>();
+ FrameAction frameAction = new FrameAction();
+ frameAction.setFrame(buffer);
+ while (!fmm.subscribe(frameAction)) {
+ buffers.put(frameAction.getAllocated());
+ i++;
+ }
+ // One subscriber.
+ // Check that all frames have been consumed
+ Assert.assertEquals(i, NUM_FRAMES);
+ // Release a frame (That will be handed out to the subscriber)
+ fmm.release(buffers.take());
+ // Check that all frames have been consumed (since the released
frame have been handed to the consumer)
+ Assert.assertEquals(0, fmm.remaining());
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ }
+ }
+
+ @org.junit.Test
+ public void testLargerThanBudgetRequests() {
+ HyracksDataException hde = null;
+ try {
+ ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode",
DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE);
+ fmm.get(32 * DEFAULT_FRAME_SIZE);
+ } catch (HyracksDataException e) {
+ hde = e;
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ }
+ Assert.assertTrue(hde != null);
+ }
+
+ @org.junit.Test
+ public void testLargerThanBudgetSubscribe() {
+ HyracksDataException hde = null;
+ try {
+ ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode",
DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE);
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 32);
+ FrameAction frameAction = new FrameAction();
+ frameAction.setFrame(buffer);
+ fmm.subscribe(frameAction);
+ } catch (HyracksDataException e) {
+ hde = e;
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ }
+ Assert.assertTrue(hde != null);
+ }
+
+ @org.junit.Test
+ public void testgetWhileSubscribersExist() {
+ try {
+ AsterixFeedProperties afp =
Mockito.mock(AsterixFeedProperties.class);
+
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+ int i = 0;
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ LinkedBlockingDeque<ByteBuffer> buffers = new
LinkedBlockingDeque<>();
+ FrameAction frameAction = new FrameAction();
+ frameAction.setFrame(buffer);
+ while (!fmm.subscribe(frameAction)) {
+ buffers.put(frameAction.getAllocated());
+ i++;
+ }
+ // One subscriber.
+ // Check that all frames have been consumed
+ Assert.assertEquals(i, NUM_FRAMES);
+ // Release a frame (That will be handed out to the subscriber)
+ fmm.release(buffers.take());
+ // Check that all frames have been consumed (since the released
frame have been handed to the consumer)
+ Assert.assertEquals(fmm.remaining(), 0);
+ buffers.put(frameAction.getAllocated());
+ // Create another subscriber that takes frames of double the size
+ ByteBuffer bufferTimes2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE *
2);
+ LinkedBlockingDeque<ByteBuffer> buffersTimes2 = new
LinkedBlockingDeque<>();
+ FrameAction frameActionTimes2 = new FrameAction();
+ frameActionTimes2.setFrame(bufferTimes2);
+ Assert.assertEquals(true, fmm.subscribe(frameActionTimes2));
+ // release a small one
+ fmm.release(buffers.take());
+ Assert.assertEquals(fmm.remaining(), 1);
+ // Check that a small get fails
+ Assert.assertEquals(null, fmm.get());
+ // release another small one
+ fmm.release(buffers.take());
+ // Check that no small frames exists in the pool since subscriber
request was satisfied
+ Assert.assertEquals(fmm.remaining(), 0);
+ buffersTimes2.add(frameActionTimes2.getAllocated());
+ fmm.release(buffers);
+ fmm.release(bufferTimes2);
+ Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ }
+ }
+
/*
* Runnables used for unit tests
*/
diff --git
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
new file mode 100644
index 0000000..c465721
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -0,0 +1,884 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.ConcurrentFramePool;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.TestControlledFrameWriter;
+import org.apache.hyracks.api.test.TestFrameWriter;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+@RunWith(PowerMockRunner.class)
+public class InputHandlerTest extends TestCase {
+
+ private static final int DEFAULT_FRAME_SIZE = 32768;
+ private static final int NUM_FRAMES = 128;
+ private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE *
NUM_FRAMES;
+ private static final String DATAVERSE = "dataverse";
+ private static final String DATASET = "dataset";
+ private static final String FEED = "feed";
+ private static final String NODE_ID = "NodeId";
+ private static final long WAIT_DURATION = 50;
+ private static final int WAIT_CYCLES = 5;
+ private static final float DISCARD_ALLOWANCE = 0.15f;
+ private static final ExecutorService EXECUTOR =
Executors.newFixedThreadPool(1);
+
+ public InputHandlerTest(String testName) {
+ super(testName);
+ }
+
+ public static Test suite() {
+ return new TestSuite(InputHandlerTest.class);
+ }
+
+ private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext
ctx, IFrameWriter writer,
+ FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws
HyracksDataException {
+ FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class);
+ FeedId feedId = new FeedId(DATAVERSE, FEED);
+ FeedConnectionId connectionId = new FeedConnectionId(feedId, DATASET);
+ FeedRuntimeId runtimeId =
+ new FeedRuntimeId(feedId, FeedRuntimeType.COLLECT, 0,
FeedRuntimeId.DEFAULT_TARGET_ID);
+ return new FeedRuntimeInputHandler(ctx, connectionId, runtimeId,
writer, fpa, fta, framePool);
+ }
+
+ /*
+ * Testing the following scenarios
+ * 01. Positive Frames memory budget with fixed size frames, no spill, no
discard.
+ * 02. Positive Frames memory budget with variable size frames, no spill,
no discard.
+ * 03. Positive Frames memory budget with fixed size frames, with spill,
no discard.
+ * 04. Positive Frames memory budget with variable size frames, with
spill, no discard.
+ * 05. Positive Frames memory budget with fixed size frames, no spill,
with discard.
+ * 06. Positive Frames memory budget with variable size frames, no spill,
with discard.
+ * 07. Positive Frames memory budget with fixed size frames, with spill,
with discard.
+ * 08. Positive Frames memory budget with variable size frames, with
spill, with discard.
+ * 09. 0 Frames memory budget with fixed size frames, with spill, no
discard.
+ * 10. 0 Frames memory budget with variable size frames, with spill, no
discard.
+ * 11. TODO 0 Frames memory budget with fixed size frames, with spill,
with discard.
+ * 12. TODO 0 Frames memory budget with variable size frames, with spill,
with discard.
+ * 13. TODO Test exception handling with Open, NextFrame,Flush,Close,Fail
exception throwing FrameWriter
+ * 14. TODO Test exception while waiting for subscription
+ */
+
+ private static FeedPolicyAccessor createFeedPolicyAccessor(boolean spill,
boolean discard, long spillBudget,
+ float discardFraction) {
+ FeedPolicyAccessor fpa = Mockito.mock(FeedPolicyAccessor.class);
+ Mockito.when(fpa.bufferingEnabled()).thenReturn(true);
+ Mockito.when(fpa.spillToDiskOnCongestion()).thenReturn(spill);
+ Mockito.when(fpa.getMaxSpillOnDisk()).thenReturn(spillBudget);
+ Mockito.when(fpa.discardOnCongestion()).thenReturn(discard);
+ Mockito.when(fpa.getMaxFractionDiscard()).thenReturn(discardFraction);
+ return fpa;
+ }
+
+ @org.junit.Test
+ public void testZeroMemoryVarSizeFrameWithDiskNoDiscard() {
+ try {
+ int numRounds = 5;
+ Random random = new Random();
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // No spill, No discard
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, false, NUM_FRAMES *
DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestFrameWriter writer =
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID,
0, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer,
fpa, framePool);
+ handler.open();
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ handler.nextFrame(buffer);
+ Assert.assertEquals(0, handler.getNumProcessedInMemory());
+ Assert.assertEquals(1, handler.getNumSpilled());
+ // add NUM_FRAMES times
+ for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+ int multiplier = random.nextInt(10) + 1;
+ buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+ handler.nextFrame(buffer);
+ }
+ // Check that no records were discarded
+ Assert.assertEquals(handler.getNumDiscarded(), 0);
+ // Check that no records were spilled
+ Assert.assertEquals(NUM_FRAMES * numRounds + 1,
handler.getNumSpilled());
+ writer.validate(false);
+ handler.close();
+ // Check that nextFrame was called
+ Assert.assertEquals(NUM_FRAMES * numRounds + 1,
writer.nextFrameCount());
+ writer.validate(true);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @org.junit.Test
+ public void testZeroMemoryFixedSizeFrameWithDiskNoDiscard() {
+ try {
+ int numRounds = 10;
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // No spill, No discard
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, false, NUM_FRAMES *
DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestFrameWriter writer =
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID,
0, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer,
fpa, framePool);
+ handler.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ handler.nextFrame(frame.getBuffer());
+ Assert.assertEquals(0, handler.getNumProcessedInMemory());
+ Assert.assertEquals(1, handler.getNumSpilled());
+ // add NUM_FRAMES times
+ for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ // Check that no records were discarded
+ Assert.assertEquals(handler.getNumDiscarded(), 0);
+ // Check that no records were spilled
+ Assert.assertEquals(NUM_FRAMES * numRounds + 1,
handler.getNumSpilled());
+ writer.validate(false);
+ handler.close();
+ // Check that nextFrame was called
+ Assert.assertEquals(NUM_FRAMES * numRounds + 1,
writer.nextFrameCount());
+ writer.validate(true);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+
+ }
+
+ /*
+ * Spill = false;
+ * Discard = true; discard only 5%
+ * Fixed size frames
+ */
+ @org.junit.Test
+ public void testMemoryVarSizeFrameWithSpillWithDiscard() {
+ try {
+ int numberOfMemoryFrames = 50;
+ int numberOfSpillFrames = 50;
+ int notDiscarded = 0;
+ int totalMinFrames = 0;
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // Spill budget = Memory budget, No discard
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE *
numberOfSpillFrames, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer =
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool =
+ new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames *
DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer,
fpa, framePool);
+ handler.open();
+ ByteBuffer buffer1 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ ByteBuffer buffer2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 2);
+ ByteBuffer buffer3 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 3);
+ ByteBuffer buffer4 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 4);
+ ByteBuffer buffer5 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 5);
+ while (true) {
+ if (totalMinFrames + 1 < numberOfMemoryFrames) {
+ handler.nextFrame(buffer1);
+ notDiscarded++;
+ totalMinFrames++;
+ } else {
+ break;
+ }
+ if (totalMinFrames + 2 < numberOfMemoryFrames) {
+ notDiscarded++;
+ totalMinFrames += 2;
+ handler.nextFrame(buffer2);
+ } else {
+ break;
+ }
+ if (totalMinFrames + 3 < numberOfMemoryFrames) {
+ notDiscarded++;
+ totalMinFrames += 3;
+ handler.nextFrame(buffer3);
+ } else {
+ break;
+ }
+ }
+ // Now we need to verify that the frame pool memory has been
consumed!
+ Assert.assertTrue(framePool.remaining() < 3);
+ Assert.assertEquals(0, handler.getNumSpilled());
+ Assert.assertEquals(0, handler.getNumStalled());
+ Assert.assertEquals(0, handler.getNumDiscarded());
+ while (true) {
+ if (handler.getNumSpilled() < numberOfSpillFrames) {
+ notDiscarded++;
+ handler.nextFrame(buffer3);
+ } else {
+ break;
+ }
+ if (handler.getNumSpilled() < numberOfSpillFrames) {
+ notDiscarded++;
+ handler.nextFrame(buffer4);
+ } else {
+ break;
+ }
+ if (handler.getNumSpilled() < numberOfSpillFrames) {
+ notDiscarded++;
+ handler.nextFrame(buffer5);
+ } else {
+ break;
+ }
+ }
+ Assert.assertTrue(framePool.remaining() < 3);
+ Assert.assertEquals(handler.framesOnDisk(),
handler.getNumSpilled());
+ Assert.assertEquals(handler.framesOnDisk(), numberOfSpillFrames);
+ Assert.assertEquals(0, handler.getNumStalled());
+ Assert.assertEquals(0, handler.getNumDiscarded());
+ // We can only discard one frame
+ double numDiscarded = 0;
+ boolean nextShouldDiscard =
+ ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <=
fpa.getMaxFractionDiscard();
+ while (nextShouldDiscard) {
+ handler.nextFrame(buffer5);
+ numDiscarded++;
+ nextShouldDiscard = ((numDiscarded + 1.0) /
(handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+ }
+ Assert.assertTrue(framePool.remaining() < 3);
+ Assert.assertEquals(handler.framesOnDisk(),
handler.getNumSpilled());
+ Assert.assertEquals(0, handler.getNumStalled());
+ Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
+ // Next Call should block since we're exceeding the discard
allowance
+ Future<?> result = EXECUTOR.submit(new Pusher(buffer5, handler));
+ int counter = 0;
+ while (!result.isDone() && counter < WAIT_CYCLES) {
+ Thread.sleep(WAIT_DURATION);
+ counter++;
+ }
+ if (result.isDone()) {
+ Assert.fail("The producer should switch to stall mode since it
is exceeding the discard allowance");
+ }
+ // consume memory frames
+ writer.unfreeze();
+ result.get();
+ handler.close();
+ Assert.assertEquals(writer.nextFrameCount(), notDiscarded + 1);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+
+ }
+
+ /*
+ * Spill = true;
+ * Discard = true
+ * Fixed size frames
+ */
+ @org.junit.Test
+ public void testMemoryFixedSizeFrameWithSpillWithDiscard() {
+ try {
+ int numberOfMemoryFrames = 50;
+ int numberOfSpillFrames = 50;
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // Spill budget = Memory budget, No discard
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE *
numberOfSpillFrames, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer =
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool =
+ new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames *
DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer,
fpa, framePool);
+ handler.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ for (int i = 0; i < numberOfMemoryFrames; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ // Now we need to verify that the frame pool memory has been
consumed!
+ Assert.assertEquals(0, framePool.remaining());
+ Assert.assertEquals(numberOfMemoryFrames, handler.getTotal());
+ Assert.assertEquals(0, handler.getNumSpilled());
+ Assert.assertEquals(0, handler.getNumStalled());
+ Assert.assertEquals(0, handler.getNumDiscarded());
+ for (int i = 0; i < numberOfSpillFrames; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ Assert.assertEquals(0, framePool.remaining());
+ Assert.assertEquals(numberOfMemoryFrames + numberOfSpillFrames,
handler.getTotal());
+ Assert.assertEquals(numberOfSpillFrames, handler.getNumSpilled());
+ Assert.assertEquals(0, handler.getNumStalled());
+ Assert.assertEquals(0, handler.getNumDiscarded());
+ // We can only discard one frame
+ double numDiscarded = 0;
+ boolean nextShouldDiscard =
+ ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <=
fpa.getMaxFractionDiscard();
+ while (nextShouldDiscard) {
+ handler.nextFrame(frame.getBuffer());
+ numDiscarded++;
+ nextShouldDiscard = ((numDiscarded + 1.0) /
(handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+ }
+ Assert.assertEquals(0, framePool.remaining());
+ Assert.assertEquals((int) (numberOfMemoryFrames +
numberOfSpillFrames + numDiscarded), handler.getTotal());
+ Assert.assertEquals(numberOfSpillFrames, handler.getNumSpilled());
+ Assert.assertEquals(0, handler.getNumStalled());
+ Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
+ // Next Call should block since we're exceeding the discard
allowance
+ Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(),
handler));
+ int counter = 0;
+ while (!result.isDone() && counter < WAIT_CYCLES) {
+ Thread.sleep(WAIT_DURATION);
+ counter++;
+ }
+ if (result.isDone()) {
+ Assert.fail("The producer should switch to stall mode since it
is exceeding the discard allowance");
+ } else {
+ Assert.assertEquals((int) numDiscarded,
handler.getNumDiscarded());
+ }
+ // consume memory frames
+ writer.unfreeze();
+ counter = 0;
+ while (handler.getInternalBuffer().size() > 0 && counter <
WAIT_CYCLES) {
+ Thread.sleep(WAIT_DURATION);
+ counter++;
+ }
+ counter = 0;
+ while (!result.isDone() && counter < WAIT_CYCLES) {
+ Thread.sleep(WAIT_DURATION);
+ counter++;
+ }
+ Assert.assertTrue(result.isDone());
+ Assert.assertEquals(writer.nextFrameCount(), numberOfMemoryFrames
+ numberOfSpillFrames + 1);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+
+ }
+
+ /*
+ * Spill = false;
+ * Discard = true; discard only 5%
+ * Fixed size frames
+ */
+ @org.junit.Test
+ public void testMemoryVariableSizeFrameNoSpillWithDiscard() {
+ try {
+ int discardTestFrames = 100;
+ Random random = new Random();
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // Spill budget = Memory budget, No discard
+ FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true,
DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer =
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool =
+ new ConcurrentFramePool(NODE_ID, discardTestFrames *
DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer,
fpa, framePool);
+ handler.open();
+ // add NUM_FRAMES times
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ int multiplier = 1;
+ int numFrames = 0;
+ // add NUM_FRAMES times
+ while ((multiplier <= framePool.remaining())) {
+ numFrames++;
+ handler.nextFrame(buffer);
+ multiplier = random.nextInt(10) + 1;
+ buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+ }
+ // Next call should NOT block but should discard.
+ double numDiscarded = 0.0;
+ boolean nextShouldDiscard =
+ ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <=
fpa.getMaxFractionDiscard();
+ while (nextShouldDiscard) {
+ handler.nextFrame(buffer);
+ numDiscarded++;
+ nextShouldDiscard = ((numDiscarded + 1.0) /
(handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+ }
+ Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
+ int counter = 0;
+ while (!result.isDone() && counter < WAIT_CYCLES) {
+ Thread.sleep(WAIT_DURATION);
+ counter++;
+ }
+ if (result.isDone()) {
+ Assert.fail("The producer should switch to stall mode since it
is exceeding the discard allowance");
+ } else {
+ // Check that no records were discarded
+ assertEquals((int) numDiscarded, handler.getNumDiscarded());
+ // Check that one frame is spilled
+ assertEquals(handler.getNumSpilled(), 0);
+ }
+ // consume memory frames
+ writer.unfreeze();
+ result.get();
+ handler.close();
+ Assert.assertEquals(writer.nextFrameCount(), numFrames + 1);
+ // exit
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+
+ }
+
+ /*
+ * Spill = false;
+ * Discard = true; discard only 5%
+ * Fixed size frames
+ */
+ @org.junit.Test
+ public void testMemoryFixedSizeFrameNoSpillWithDiscard() {
+ try {
+ int discardTestFrames = 100;
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // Spill budget = Memory budget, No discard
+ FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true,
DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer =
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool =
+ new ConcurrentFramePool(NODE_ID, discardTestFrames *
DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer,
fpa, framePool);
+ handler.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ // add NUM_FRAMES times
+ for (int i = 0; i < discardTestFrames; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ // Next 5 calls call should NOT block but should discard.
+ double numDiscarded = 0.0;
+ boolean nextShouldDiscard =
+ ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <=
fpa.getMaxFractionDiscard();
+ while (nextShouldDiscard) {
+ handler.nextFrame(frame.getBuffer());
+ numDiscarded++;
+ nextShouldDiscard = ((numDiscarded + 1.0) /
(handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+ }
+ // Next Call should block since we're exceeding the discard
allowance
+ Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(),
handler));
+ int counter = 0;
+ while (!result.isDone() && counter < WAIT_CYCLES) {
+ Thread.sleep(WAIT_DURATION);
+ counter++;
+ }
+ if (result.isDone()) {
+ Assert.fail("The producer should switch to stall mode since it
is exceeding the discard allowance");
+ } else {
+ // Check that no records were discarded
+ assertEquals((int) numDiscarded, handler.getNumDiscarded());
+ // Check that one frame is spilled
+ assertEquals(handler.getNumSpilled(), 0);
+ }
+ // consume memory frames
+ writer.unfreeze();
+ counter = 0;
+ while (handler.getInternalBuffer().size() > 0 && counter <
WAIT_CYCLES) {
+ Thread.sleep(WAIT_DURATION);
+ counter++;
+ }
+ Assert.assertTrue(writer.nextFrameCount() == discardTestFrames +
1);
+ // exit
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ /*
+ * Spill = true;
+ * Discard = false;
+ * Fixed size frames
+ */
+ @org.junit.Test
+ public void testMemoryFixedSizeFrameWithSpillNoDiscard() {
+ try {
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // Spill budget = Memory budget, No discard
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE *
NUM_FRAMES, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer =
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID,
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer,
fpa, framePool);
+ handler.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ // add NUM_FRAMES times
+ for (int i = 0; i < NUM_FRAMES; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ // Next call should NOT block. we will do it in a different thread
+ Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(),
handler));
+ int counter = 0;
+ while (!result.isDone() && counter < WAIT_CYCLES) {
+ Thread.sleep(WAIT_DURATION);
+ counter++;
+ }
+ if (!result.isDone()) {
+ Assert.fail("The producer shouldn't wait for an empty frame
since Spilling is enabled");
+ } else {
+ // Check that no records were discarded
+ assertEquals(handler.getNumDiscarded(), 0);
+ // Check that one frame is spilled
+ assertEquals(handler.getNumSpilled(), 1);
+ }
+ // consume memory frames
+ for (int i = 0; i < NUM_FRAMES; i++) {
+ writer.kick();
+ }
+ counter = 0;
+ while ((!(handler.framesOnDisk() == 0)) && counter < WAIT_CYCLES) {
+ Thread.sleep(WAIT_DURATION);
+ counter++;
+ }
+ Assert.assertTrue(handler.framesOnDisk() == 0);
+ // exit
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ /*
+ * Spill = false;
+ * Discard = false;
+ * Fixed size frames
+ * Very fast next operator
+ */
+ @org.junit.Test
+ public void testMemoryFixedSizeFrameNoDiskNoDiscardFastConsumer() {
+ try {
+ int numRounds = 10;
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // No spill, No discard
+ FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false,
0L, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestFrameWriter writer =
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID,
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer,
fpa, framePool);
+ handler.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ // add NUM_FRAMES times
+ for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ // Check that no records were discarded
+ Assert.assertEquals(handler.getNumDiscarded(), 0);
+ // Check that no records were spilled
+ Assert.assertEquals(handler.getNumSpilled(), 0);
+ writer.validate(false);
+ handler.close();
+ // Check that nextFrame was called
+ Assert.assertEquals(NUM_FRAMES * numRounds,
writer.nextFrameCount());
+ writer.validate(true);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ /*
+ * Spill = false;
+ * Discard = false;
+ * Fixed size frames
+ * Slow next operator
+ */
+ @org.junit.Test
+ public void testMemoryFixedSizeFrameNoDiskNoDiscardSlowConsumer() {
+ try {
+ int numRounds = 10;
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // No spill, No discard
+ FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false,
0L, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestFrameWriter writer =
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID,
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer,
fpa, framePool);
+ handler.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ writer.setNextDuration(1);
+ // add NUM_FRAMES times
+ for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ // Check that no records were discarded
+ Assert.assertEquals(handler.getNumDiscarded(), 0);
+ // Check that no records were spilled
+ Assert.assertEquals(handler.getNumSpilled(), 0);
+ int counter = 0;
+ while (writer.nextFrameCount() != (NUM_FRAMES * numRounds)) {
+ Thread.sleep(WAIT_DURATION);
+ counter++;
+ Assert.assertTrue(counter < WAIT_CYCLES);
+ }
+ // Check that nextFrame was called
+ writer.validate(false);
+ handler.close();
+ writer.validate(true);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ /*
+ * Spill = false
+ * Discard = false
+ * VarSizeFrame
+ */
+ public void testMemoryVarSizeFrameNoDiskNoDiscard() {
+ try {
+ Random random = new Random();
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // No spill, No discard
+ FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false,
0L, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer =
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID,
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer,
fpa, framePool);
+ handler.open();
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ int multiplier = 1;
+ // add NUM_FRAMES times
+ while ((multiplier <= framePool.remaining())) {
+ handler.nextFrame(buffer);
+ multiplier = random.nextInt(10) + 1;
+ buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+ }
+ // we can't satisfy the next request
+ // get the needed frames
+ int diff = multiplier - framePool.remaining();
+ // Next call should block we will do it in a different thread
+ Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
+ Thread.sleep(WAIT_DURATION);
+ // Check that the nextFrame didn't return
+ if (result.isDone()) {
+ Assert.fail();
+ } else {
+ // Check that no records were discarded
+ assertEquals(handler.getNumDiscarded(), 0);
+ // Check that no records were spilled
+ assertEquals(handler.getNumSpilled(), 0);
+ // Check that no records were discarded
+ // Check that the inputHandler subscribed to the framePool
+ int count = 0;
+ while (framePool.getSubscribers().isEmpty()) {
+ if (count > WAIT_CYCLES) {
+ Assert.fail("Input Handler didn't subscribe to
framePool");
+ }
+ Thread.sleep(WAIT_DURATION);
+ count++;
+ }
+ // Check that number of stalled = 1
+ assertEquals(handler.getNumStalled(), 1);
+ while (diff > 0) {
+ diff -= writer.getCurrentMultiplier();
+ writer.kick();
+ }
+ count = 0;
+ while (!framePool.getSubscribers().isEmpty()) {
+ if (count > WAIT_CYCLES) {
+ Assert.fail("Input Handler didn't subscribe to
framePool");
+ }
+ Thread.sleep(WAIT_DURATION);
+ count++;
+ }
+ }
+ // Check that the pool answered the subscription
+ result.get(WAIT_DURATION * WAIT_CYCLES, TimeUnit.MILLISECONDS);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ /*
+ * Spill = true;
+ * Discard = false;
+ * Variable size frames
+ */
+ @org.junit.Test
+ public void testMemoryVarSizeFrameWithSpillNoDiscard() {
+ try {
+ Random random = new Random();
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // Spill budget = Memory budget, No discard
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE *
NUM_FRAMES, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer =
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID,
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer,
fpa, framePool);
+ handler.open();
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ int multiplier = 1;
+ // add NUM_FRAMES times
+ while ((multiplier <= framePool.remaining())) {
+ handler.nextFrame(buffer);
+ multiplier = random.nextInt(10) + 1;
+ buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+ }
+ // Next call should Not block. we will do it in a different thread
+ Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
+ int counter = 0;
+ while (!result.isDone() && counter < WAIT_CYCLES) {
+ Thread.sleep(WAIT_DURATION);
+ counter++;
+ }
+ if (!result.isDone()) {
+ Assert.fail("The producer shouldn't wait for an empty frame
since Spilling is enabled");
+ } else {
+ // Check that no records were discarded
+ assertEquals(handler.getNumDiscarded(), 0);
+ // Check that one frame is spilled
+ assertEquals(handler.getNumSpilled(), 1);
+ }
+ // consume memory frames
+ while (!handler.getInternalBuffer().isEmpty()) {
+ writer.kick();
+ }
+ // There should be 1 frame on disk
+ Assert.assertEquals(1, handler.framesOnDisk());
+ writer.kick();
+ counter = 0;
+ while ((!(handler.framesOnDisk() == 0)) && counter < WAIT_CYCLES) {
+ Thread.sleep(WAIT_DURATION);
+ counter++;
+ }
+ Assert.assertTrue(handler.framesOnDisk() == 0);
+ // exit
+ writer.unfreeze();
+ result.get();
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ /*
+ * Spill = false;
+ * Discard = false;
+ * Fixed size frames
+ */
+ @org.junit.Test
+ public void testMemoryFixedSizeFrameNoDiskNoDiscard() {
+ try {
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // No spill, No discard
+ FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false,
0L, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer =
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID,
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer,
fpa, framePool);
+ handler.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ // add NUM_FRAMES times
+ for (int i = 0; i < NUM_FRAMES; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ // Next call should block we will do it in a different thread
+ Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(),
handler));
+ Thread.sleep(WAIT_DURATION);
+ // Check that the nextFrame didn't return
+ if (result.isDone()) {
+ Assert.fail();
+ } else {
+ // Check that no records were discarded
+ assertEquals(handler.getNumDiscarded(), 0);
+ // Check that no records were spilled
+ assertEquals(handler.getNumSpilled(), 0);
+ // Check that no records were discarded
+ // Check that the inputHandler subscribed to the framePool
+ int count = 0;
+ while (framePool.getSubscribers().isEmpty()) {
+ if (count > WAIT_CYCLES) {
+ Assert.fail("Input Handler didn't subscribe to
framePool");
+ }
+ Thread.sleep(WAIT_DURATION);
+ count++;
+ }
+ // Check that number of stalled = 1
+ assertEquals(handler.getNumStalled(), 1);
+ writer.kick();
+ }
+ // Check that the pool answered the subscription
+ result.get(WAIT_DURATION * WAIT_CYCLES, TimeUnit.MILLISECONDS);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ private class Pusher implements Runnable {
+ private final ByteBuffer buffer;
+ private final IFrameWriter writer;
+
+ public Pusher(ByteBuffer buffer, IFrameWriter writer) {
+ this.buffer = buffer;
+ this.writer = writer;
+ }
+
+ @Override
+ public void run() {
+ try {
+ writer.nextFrame(buffer);
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 3961921..628c0b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -77,5 +77,22 @@
<artifactId>hyracks-util</artifactId>
<version>0.2.18-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>2.0.2-beta</version>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>1.6.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.6.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java
new file mode 100644
index 0000000..e0bbfe8
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java
@@ -0,0 +1,24 @@
+package org.apache.hyracks.api.test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.mockito.invocation.InvocationOnMock;
+
+public class CountAndThrowError extends CountAnswer {
+ private String errorMessage;
+
+ public CountAndThrowError(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ @Override
+ public Object call() throws HyracksDataException {
+ count++;
+ throw new UnknownError(errorMessage);
+ }
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ count++;
+ throw new UnknownError(errorMessage);
+ }
+}
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java
new file mode 100644
index 0000000..5a5ad59
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.mockito.invocation.InvocationOnMock;
+
+public class CountAndThrowException extends CountAnswer {
+ private String errorMessage;
+
+ public CountAndThrowException(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ @Override
+ public Object call() throws HyracksDataException {
+ count++;
+ throw new HyracksDataException(errorMessage);
+ }
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ count++;
+ throw new HyracksDataException(errorMessage);
+ }
+}
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java
new file mode 100644
index 0000000..e8a6654
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class CountAnswer implements Answer<Object> {
+ protected int count = 0;
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ count++;
+ return null;
+ }
+
+ public Object call() throws HyracksDataException {
+ count++;
+ return null;
+ }
+
+ public int getCallCount() {
+ return count;
+ }
+
+ public void reset() {
+ count = 0;
+ }
+}
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
new file mode 100644
index 0000000..4bddfa9
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import java.util.Collection;
+
+public class FrameWriterTestUtils {
+ public static final String EXCEPTION_MESSAGE = "IFrameWriter Exception in
the call to the method ";
+ public static final String ERROR_MESSAGE = "IFrameWriter Error in the call
to the method ";
+
+ public enum FrameWriterOperation {
+ Open,
+ NextFrame,
+ Fail,
+ Flush,
+ Close
+ }
+
+ public static TestFrameWriter create(Collection<FrameWriterOperation>
exceptionThrowingOperations,
+ Collection<FrameWriterOperation> errorThrowingOperations) {
+ CountAnswer openAnswer =
+ createAnswer(FrameWriterOperation.Open,
exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer nextAnswer =
+ createAnswer(FrameWriterOperation.NextFrame,
exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer flushAnswer =
+ createAnswer(FrameWriterOperation.Flush,
exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer failAnswer =
+ createAnswer(FrameWriterOperation.Fail,
exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer closeAnswer =
+ createAnswer(FrameWriterOperation.Close,
exceptionThrowingOperations, errorThrowingOperations);
+ return new TestFrameWriter(openAnswer, nextAnswer, flushAnswer,
failAnswer, closeAnswer);
+ }
+
+ public static CountAnswer createAnswer(FrameWriterOperation operation,
+ Collection<FrameWriterOperation> exceptionThrowingOperations,
+ Collection<FrameWriterOperation> errorThrowingOperations) {
+ if (exceptionThrowingOperations.contains(operation)) {
+ return new CountAndThrowException(EXCEPTION_MESSAGE +
operation.toString());
+ } else if (exceptionThrowingOperations.contains(operation)) {
+ return new CountAndThrowError(ERROR_MESSAGE +
operation.toString());
+ } else {
+ return new CountAnswer();
+ }
+ }
+
+ public static TestControlledFrameWriter create(int initialFrameSize) {
+ return new TestControlledFrameWriter(initialFrameSize);
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
new file mode 100644
index 0000000..2a3f70d
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TestControlledFrameWriter extends TestFrameWriter {
+ private boolean frozen = false;
+ private boolean timed = false;
+ private long duration = Long.MAX_VALUE;
+ private final int initialFrameSize;
+ private volatile int currentMultiplier = 0;
+ private volatile int kicks = 0;
+
+ public TestControlledFrameWriter(int initialFrameSize) {
+ super(new CountAnswer(), new CountAnswer(), new CountAnswer(), new
CountAnswer(), new CountAnswer());
+ this.initialFrameSize = initialFrameSize;
+ }
+
+ public int getCurrentMultiplier() {
+ return currentMultiplier;
+ }
+
+ public synchronized void freeze() {
+ frozen = true;
+ }
+
+ public synchronized void time(long ms) {
+ frozen = true;
+ timed = true;
+ duration = ms;
+ }
+
+ public synchronized void unfreeze() {
+ frozen = false;
+ notify();
+ }
+
+ public synchronized void kick() {
+ kicks++;
+ notify();
+ }
+
+ @Override
+ public synchronized void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
+ super.nextFrame(buffer);
+ currentMultiplier = buffer.capacity() / initialFrameSize;
+ if (frozen) {
+ try {
+ if (timed) {
+ wait(duration);
+ } else {
+ while (frozen && kicks == 0) {
+ wait();
+ }
+ kicks--;
+ }
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ currentMultiplier = 0;
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
new file mode 100644
index 0000000..dce2c78
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TestFrameWriter implements IFrameWriter {
+ private final CountAnswer openAnswer;
+ private final CountAnswer nextAnswer;
+ private final CountAnswer flushAnswer;
+ private final CountAnswer failAnswer;
+ private final CountAnswer closeAnswer;
+ private long openDuration = 0L;
+ private long nextDuration = 0L;
+ private long flushDuration = 0L;
+ private long failDuration = 0L;
+ private long closeDuration = 0L;
+
+ public TestFrameWriter(CountAnswer openAnswer, CountAnswer nextAnswer,
CountAnswer flushAnswer,
+ CountAnswer failAnswer, CountAnswer closeAnswer) {
+ this.openAnswer = openAnswer;
+ this.nextAnswer = nextAnswer;
+ this.closeAnswer = closeAnswer;
+ this.flushAnswer = flushAnswer;
+ this.failAnswer = failAnswer;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (openDuration > 0) {
+ try {
+ Thread.sleep(openDuration);
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ openAnswer.call();
+ }
+
+ public int openCount() {
+ return openAnswer.getCallCount();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (nextDuration > 0) {
+ try {
+ Thread.sleep(nextDuration);
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ nextAnswer.call();
+ }
+
+ public int nextFrameCount() {
+ return nextAnswer.getCallCount();
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ if (flushDuration > 0) {
+ try {
+ Thread.sleep(flushDuration);
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ flushAnswer.call();
+ }
+
+ public int flushCount() {
+ return flushAnswer.getCallCount();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ if (failDuration > 0) {
+ try {
+ Thread.sleep(failDuration);
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ failAnswer.call();
+ }
+
+ public int failCount() {
+ return failAnswer.getCallCount();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (closeDuration > 0) {
+ try {
+ Thread.sleep(closeDuration);
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ closeAnswer.call();
+ }
+
+ public int closeCount() {
+ return closeAnswer.getCallCount();
+ }
+
+ public synchronized boolean validate(boolean finished) {
+ if (failAnswer.getCallCount() > 1 || closeAnswer.getCallCount() > 1 ||
openAnswer.getCallCount() > 1) {
+ return false;
+ }
+ if (openAnswer.getCallCount() == 0
+ && (nextAnswer.getCallCount() > 0 || failAnswer.getCallCount()
> 0 || closeAnswer.getCallCount() > 0)) {
+ return false;
+ }
+ if (finished) {
+ if (closeAnswer.getCallCount() == 0 && (nextAnswer.getCallCount()
> 0 || failAnswer.getCallCount() > 0
+ || openAnswer.getCallCount() > 0)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void reset() {
+ openAnswer.reset();
+ nextAnswer.reset();
+ flushAnswer.reset();
+ failAnswer.reset();
+ closeAnswer.reset();
+ }
+
+ public long getOpenDuration() {
+ return openDuration;
+ }
+
+ public void setOpenDuration(long openDuration) {
+ this.openDuration = openDuration;
+ }
+
+ public long getNextDuration() {
+ return nextDuration;
+ }
+
+ public void setNextDuration(long nextDuration) {
+ this.nextDuration = nextDuration;
+ }
+
+ public long getFlushDuration() {
+ return flushDuration;
+ }
+
+ public void setFlushDuration(long flushDuration) {
+ this.flushDuration = flushDuration;
+ }
+
+ public long getFailDuration() {
+ return failDuration;
+ }
+
+ public void setFailDuration(long failDuration) {
+ this.failDuration = failDuration;
+ }
+
+ public long getCloseDuration() {
+ return closeDuration;
+ }
+
+ public void setCloseDuration(long closeDuration) {
+ this.closeDuration = closeDuration;
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
index df3a211..d3e7a3a 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -30,6 +30,9 @@
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.CountAndThrowError;
+import org.apache.hyracks.api.test.CountAndThrowException;
+import org.apache.hyracks.api.test.CountAnswer;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -92,8 +95,8 @@
public boolean validate(boolean finished) {
// get number of open calls
int openCount = openException.getCallCount() +
openNormal.getCallCount() + openError.getCallCount();
- int nextFrameCount = nextFrameException.getCallCount() +
nextFrameNormal.getCallCount()
- + nextFrameError.getCallCount();
+ int nextFrameCount =
+ nextFrameException.getCallCount() +
nextFrameNormal.getCallCount() + nextFrameError.getCallCount();
int failCount = failException.getCallCount() +
failNormal.getCallCount() + failError.getCallCount();
int closeCount = closeException.getCallCount() +
closeNormal.getCallCount() + closeError.getCallCount();
@@ -422,8 +425,9 @@
public AbstractTreeIndexOperatorDescriptor[] mockIndexOpDesc() throws
HyracksDataException, IndexException {
IIndexDataflowHelperFactory[] indexDataflowHelperFactories =
mockIndexHelperFactories();
ISearchOperationCallbackFactory[] searchOpCallbackFactories =
mockSearchOpCallbackFactories();
- AbstractTreeIndexOperatorDescriptor[] opDescs = new
AbstractTreeIndexOperatorDescriptor[indexDataflowHelperFactories.length
- * searchOpCallbackFactories.length];
+ AbstractTreeIndexOperatorDescriptor[] opDescs =
+ new
AbstractTreeIndexOperatorDescriptor[indexDataflowHelperFactories.length
+ * searchOpCallbackFactories.length];
int k = 0;
for (int i = 0; i < indexDataflowHelperFactories.length; i++) {
for (int j = 0; j < searchOpCallbackFactories.length; j++) {
@@ -450,52 +454,6 @@
private ISearchOperationCallback mockSearchOpCallback() {
ISearchOperationCallback opCallback =
Mockito.mock(ISearchOperationCallback.class);
return opCallback;
- }
-
- public class CountAnswer implements Answer<Object> {
- protected int count = 0;
-
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- count++;
- return null;
- }
-
- public int getCallCount() {
- return count;
- }
-
- public void reset() {
- count = 0;
- }
- }
-
- public class CountAndThrowException extends CountAnswer {
- private String errorMessage;
-
- public CountAndThrowException(String errorMessage) {
- this.errorMessage = errorMessage;
- }
-
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- count++;
- throw new HyracksDataException(errorMessage);
- }
- }
-
- public class CountAndThrowError extends CountAnswer {
- private String errorMessage;
-
- public CountAndThrowError(String errorMessage) {
- this.errorMessage = errorMessage;
- }
-
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- count++;
- throw new UnknownError(errorMessage);
- }
}
public IFrameWriter[] createOutputWriters() throws Exception {
--
To view, visit https://asterix-gerrit.ics.uci.edu/866
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7088f489a7d53dee8cf6cdbf5baa7cd8d3884f55
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>