abdullah alamoudi has submitted this change and it was merged. Change subject: Add Unit Tests for Feed Runtime Input Handler ......................................................................
Add Unit Tests for Feed Runtime Input Handler Change-Id: I7088f489a7d53dee8cf6cdbf5baa7cd8d3884f55 Reviewed-on: https://asterix-gerrit.ics.uci.edu/866 Tested-by: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java R asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java A asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java M hyracks-fullstack/hyracks/hyracks-api/pom.xml A hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java A hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java A hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java A hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java A hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java A hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java M hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml M hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java 16 files changed, 1,744 insertions(+), 127 deletions(-) Approvals: Michael Blow: Looks good to me, approved Jenkins: Verified diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java index 3920a03..d201a6a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java @@ -35,7 +35,7 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; /** - * TODO: Add unit test cases for this class + * TODO: Add Failure cases unit tests for this class * Provides for error-handling and input-side buffering for a feed runtime. * .............______............. * ............|......|............ @@ -48,8 +48,9 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperatorNodePushable { private static final Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName()); - private static final ByteBuffer POISON_PILL = ByteBuffer.allocate(0); private static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8; + private static final boolean DEBUG = false; + private final Object mutex = new Object(); private final FeedExceptionHandler exceptionHandler; private final FrameSpiller spiller; private final FeedPolicyAccessor fpa; @@ -58,15 +59,16 @@ private final FrameTransporter consumer; private final Thread consumerThread; private final LinkedBlockingDeque<ByteBuffer> inbox; - private final ConcurrentFramePool memoryManager; + private final ConcurrentFramePool framePool; private Mode mode = Mode.PROCESS; + private int total = 0; private int numDiscarded = 0; private int numSpilled = 0; private int numProcessedInMemory = 0; private int numStalled = 0; public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId, - IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool feedMemoryManager) + IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool framePool) throws HyracksDataException { this.writer = writer; this.spiller = @@ -76,13 +78,13 @@ fpa.getMaxSpillOnDisk()); this.exceptionHandler = new FeedExceptionHandler(ctx, fta); this.fpa = fpa; - this.memoryManager = feedMemoryManager; + this.framePool = framePool; this.inbox = new LinkedBlockingDeque<>(); this.consumer = new FrameTransporter(); - this.consumerThread = new Thread(); + this.consumerThread = new Thread(consumer); this.consumerThread.start(); this.initialFrameSize = ctx.getInitialFrameSize(); - this.frameAction = new FrameAction(inbox); + this.frameAction = new FrameAction(); } @Override @@ -101,15 +103,20 @@ @Override public void close() throws HyracksDataException { - inbox.add(POISON_PILL); - notify(); + consumer.poison(); + synchronized (mutex) { + if (DEBUG) { + LOGGER.info("Producer is waking up consumer"); + } + mutex.notify(); + } try { consumerThread.join(); } catch (InterruptedException e) { LOGGER.log(Level.WARNING, e.getMessage(), e); } try { - memoryManager.release(inbox); + framePool.release(inbox); } catch (Throwable th) { LOGGER.log(Level.WARNING, th.getMessage(), th); } @@ -124,8 +131,12 @@ @Override public void nextFrame(ByteBuffer frame) throws HyracksDataException { try { + total++; if (consumer.cause() != null) { throw consumer.cause(); + } + if (DEBUG) { + LOGGER.info("nextFrame() called. inputHandler is in mode: " + mode.toString()); } switch (mode) { case PROCESS: @@ -148,25 +159,42 @@ } } + // For unit testing purposes + public int framesOnDisk() { + return spiller.remaining(); + } + private ByteBuffer getFreeBuffer(int frameSize) throws HyracksDataException { int numFrames = frameSize / initialFrameSize; if (numFrames == 1) { - return memoryManager.get(); + return framePool.get(); } else { - return memoryManager.get(frameSize); + return framePool.get(frameSize); } } private void discard(ByteBuffer frame) throws HyracksDataException { + if (DEBUG) { + LOGGER.info("starting discard(frame)"); + } if (fpa.spillToDiskOnCongestion()) { + if (DEBUG) { + LOGGER.info("Spilling to disk is enabled. Will try that"); + } if (spiller.spill(frame)) { numSpilled++; mode = Mode.SPILL; return; } } else { + if (DEBUG) { + LOGGER.info("Spilling to disk is disabled. Will try to get a buffer"); + } ByteBuffer next = getFreeBuffer(frame.capacity()); if (next != null) { + if (DEBUG) { + LOGGER.info("Was able to get a buffer"); + } numProcessedInMemory++; next.put(frame); inbox.offer(next); @@ -174,102 +202,168 @@ return; } } - numDiscarded++; + if (((numDiscarded + 1.0) / total) > fpa.getMaxFractionDiscard()) { + if (DEBUG) { + LOGGER.info("in discard(frame). Discard allowance has been consumed. --> Stalling"); + } + stall(frame); + } else { + if (DEBUG) { + LOGGER.info("in discard(frame). So far, I have discarded " + numDiscarded); + } + numDiscarded++; + } } - private synchronized void exitProcessState(ByteBuffer frame) throws HyracksDataException { + private void exitProcessState(ByteBuffer frame) throws HyracksDataException { if (fpa.spillToDiskOnCongestion()) { mode = Mode.SPILL; spiller.open(); spill(frame); } else { + if (DEBUG) { + LOGGER.info("Spilling is disabled --> discardOrStall(frame)"); + } discardOrStall(frame); } } private void discardOrStall(ByteBuffer frame) throws HyracksDataException { if (fpa.discardOnCongestion()) { - numDiscarded++; mode = Mode.DISCARD; discard(frame); } else { + if (DEBUG) { + LOGGER.info("Discard is disabled --> stall(frame)"); + } stall(frame); } } private void stall(ByteBuffer frame) throws HyracksDataException { try { + if (DEBUG) { + LOGGER.info("in stall(frame). So far, I have stalled " + numStalled); + } numStalled++; // If spilling is enabled, we wait on the spiller if (fpa.spillToDiskOnCongestion()) { - synchronized (spiller) { - while (spiller.usedBudget() > MAX_SPILL_USED_BEFORE_RESUME) { - spiller.wait(); - } + if (DEBUG) { + LOGGER.info("in stall(frame). Spilling is enabled so we will attempt to spill"); } + waitforSpillSpace(); spiller.spill(frame); - synchronized (this) { - notify(); + numSpilled++; + synchronized (mutex) { + if (DEBUG) { + LOGGER.info("Producer is waking up consumer"); + } + mutex.notify(); } return; } + if (DEBUG) { + LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to frame pool"); + } // Spilling is disabled, we subscribe to feedMemoryManager frameAction.setFrame(frame); - synchronized (frameAction) { - if (memoryManager.subscribe(frameAction)) { - frameAction.wait(); - } + framePool.subscribe(frameAction); + ByteBuffer temp = frameAction.retrieve(); + inbox.put(temp); + numProcessedInMemory++; + if (DEBUG) { + LOGGER.info("stall(frame) has been completed. Notifying the consumer that a frame is ready"); } - synchronized (this) { - notify(); + synchronized (mutex) { + if (DEBUG) { + LOGGER.info("Producer is waking up consumer"); + } + mutex.notify(); } } catch (InterruptedException e) { throw new HyracksDataException(e); } } + private void waitforSpillSpace() throws InterruptedException { + synchronized (spiller) { + while (spiller.usedBudget() > MAX_SPILL_USED_BEFORE_RESUME) { + if (DEBUG) { + LOGGER.info("in stall(frame). Spilling has been consumed. We will wait for it to be less than " + + MAX_SPILL_USED_BEFORE_RESUME + " consumed. Current consumption = " + + spiller.usedBudget()); + } + spiller.wait(); + } + } + } + private void process(ByteBuffer frame) throws HyracksDataException { - // Get a page from - ByteBuffer next = getFreeBuffer(frame.capacity()); + // Get a page from frame pool + ByteBuffer next = (frame.capacity() <= framePool.getMaxFrameSize()) ? getFreeBuffer(frame.capacity()) : null; if (next != null) { + // Got a page from memory pool numProcessedInMemory++; next.put(frame); - inbox.offer(next); - if (inbox.size() == 1) { - synchronized (this) { - notify(); - } + try { + inbox.put(next); + notifyMemoryConsumer(); + } catch (InterruptedException e) { + throw new HyracksDataException(e); } } else { - // out of memory. we switch to next mode as per policy -- synchronized method + if (DEBUG) { + LOGGER.info("Couldn't allocate memory --> exitProcessState(frame)"); + } + // Out of memory. we switch to next mode as per policy exitProcessState(frame); + } + } + + private void notifyMemoryConsumer() { + if (inbox.size() == 1) { + synchronized (mutex) { + if (DEBUG) { + LOGGER.info("Producer is waking up consumer"); + } + mutex.notify(); + } } } private void spill(ByteBuffer frame) throws HyracksDataException { if (spiller.switchToMemory()) { - synchronized (this) { + synchronized (mutex) { // Check if there is memory - ByteBuffer next = getFreeBuffer(frame.capacity()); + ByteBuffer next = null; + if (frame.capacity() <= framePool.getMaxFrameSize()) { + next = getFreeBuffer(frame.capacity()); + } if (next != null) { spiller.close(); numProcessedInMemory++; next.put(frame); inbox.offer(next); + notifyMemoryConsumer(); mode = Mode.PROCESS; } else { - // spill. This will always succeed since spilled = 0 (must verify that budget can't be 0) + // spill. This will always succeed since spilled = 0 (TODO must verify that budget can't be 0) spiller.spill(frame); numSpilled++; - notify(); + if (DEBUG) { + LOGGER.info("Producer is waking up consumer"); + } + mutex.notify(); } } } else { // try to spill. If failed switch to either discard or stall if (spiller.spill(frame)) { + notifyDiskConsumer(); numSpilled++; } else { if (fpa.discardOnCongestion()) { + mode = Mode.DISCARD; discard(frame); } else { stall(frame); @@ -278,11 +372,22 @@ } } + private void notifyDiskConsumer() { + if (spiller.remaining() == 1) { + synchronized (mutex) { + if (DEBUG) { + LOGGER.info("Producer is waking up consumer"); + } + mutex.notify(); + } + } + } + public Mode getMode() { return mode; } - public synchronized void setMode(Mode mode) { + public void setMode(Mode mode) { this.mode = mode; } @@ -311,15 +416,22 @@ private class FrameTransporter implements Runnable { private volatile Throwable cause; + private int consumed = 0; + private boolean poisoned = false; public Throwable cause() { return cause; + } + + public void poison() { + poisoned = true; } private Throwable consume(ByteBuffer frame) { while (frame != null) { try { writer.nextFrame(frame); + consumed++; frame = null; } catch (HyracksDataException e) { // It is fine to catch throwable here since this thread is always expected to terminate gracefully @@ -340,7 +452,7 @@ public void run() { try { ByteBuffer frame = inbox.poll(); - while (frame != POISON_PILL) { + while (true) { if (frame != null) { try { if (consume(frame) != null) { @@ -348,7 +460,7 @@ } } finally { // Done with frame. - memoryManager.release(frame); + framePool.release(frame); } } frame = inbox.poll(); @@ -366,13 +478,22 @@ writer.flush(); // At this point. We consumed all memory and spilled // We can't assume the next will be in memory. what if there is 0 memory? - synchronized (FeedRuntimeInputHandler.this) { + synchronized (mutex) { frame = inbox.poll(); if (frame == null) { // Nothing in memory if (spiller.switchToMemory()) { + if (poisoned) { + break; + } + if (DEBUG) { + LOGGER.info("Consumer is going to sleep"); + } // Nothing in disk - FeedRuntimeInputHandler.this.wait(); + mutex.wait(); + if (DEBUG) { + LOGGER.info("Consumer is waking up"); + } } } } @@ -383,5 +504,18 @@ } // cleanup will always be done through the close() call } + + @Override + public String toString() { + return "consumed: " + consumed; + } + } + + public int getTotal() { + return total; + } + + public LinkedBlockingDeque<ByteBuffer> getInternalBuffer() { + return inbox; } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java index 4a2120a..f02b4aa 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java @@ -19,29 +19,45 @@ package org.apache.asterix.external.feed.dataflow; import java.nio.ByteBuffer; -import java.util.concurrent.LinkedBlockingDeque; -import rx.functions.Action1; +import org.apache.log4j.Logger; -public class FrameAction implements Action1<ByteBuffer> { - private final LinkedBlockingDeque<ByteBuffer> inbox; +public class FrameAction { + private static final boolean DEBUG = false; + private static final Logger LOGGER = Logger.getLogger(FrameAction.class.getName()); + private ByteBuffer allocated; private ByteBuffer frame; - public FrameAction(LinkedBlockingDeque<ByteBuffer> inbox) { - this.inbox = inbox; - } - - @Override public void call(ByteBuffer freeFrame) { + if (DEBUG) { + LOGGER.info("FrameAction: My subscription is being answered"); + } freeFrame.put(frame); - inbox.add(freeFrame); synchronized (this) { - notify(); + allocated = freeFrame; + if (DEBUG) { + LOGGER.info("FrameAction: Waking up waiting threads"); + } + notifyAll(); } } - public ByteBuffer getFrame() { - return frame; + public synchronized ByteBuffer retrieve() throws InterruptedException { + if (DEBUG) { + LOGGER.info("FrameAction: Attempting to get allocated buffer"); + } + while (allocated == null) { + if (DEBUG) { + LOGGER.info("FrameAction: Allocated buffer is not ready yet. I will wait for it"); + } + wait(); + if (DEBUG) { + LOGGER.info("FrameAction: Awoken Up"); + } + } + ByteBuffer temp = allocated; + allocated = null; + return temp; } public void setFrame(ByteBuffer frame) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java index f0d226a..a2f19bb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java @@ -44,6 +44,7 @@ public class FrameSpiller { private static final Logger LOGGER = Logger.getLogger(FrameSpiller.class.getName()); private static final int FRAMES_PER_FILE = 1024; + public static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8; private final String fileNamePrefix; private final ArrayDeque<File> files = new ArrayDeque<>(); @@ -88,11 +89,11 @@ } public synchronized ByteBuffer next() throws HyracksDataException { + frame.reset(); + if (totalReadCount == totalWriteCount) { + return null; + } try { - frame.reset(); - if (totalReadCount == totalWriteCount) { - return null; - } if (currentReadFile == null) { if (!files.isEmpty()) { currentReadFile = files.pop(); @@ -126,6 +127,10 @@ return frame.getBuffer(); } catch (Exception e) { throw new HyracksDataException(e); + } finally { + synchronized (this) { + notify(); + } } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java index 25aa86a..e5543d6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java @@ -28,10 +28,15 @@ import org.apache.asterix.external.feed.dataflow.FrameAction; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.log4j.Logger; public class ConcurrentFramePool { + private static final boolean DEBUG = false; private static final String ERROR_INVALID_FRAME_SIZE = "The size should be an integral multiple of the default frame size"; + private static final String ERROR_LARGER_THAN_BUDGET_REQUEST = + "The requested frame size must not be greater than the allocated budget"; + private static final Logger LOGGER = Logger.getLogger(ConcurrentFramePool.class.getName()); private final String nodeId; private final int budget; private final int defaultFrameSize; @@ -49,10 +54,30 @@ this.largeFramesPools = new HashMap<>(); } + public int getMaxFrameSize() { + return budget * defaultFrameSize; + } + public synchronized ByteBuffer get() { + // Subscribers have higher priority + if (subscribers.isEmpty()) { + return doGet(); + } + if (DEBUG) { + LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = " + + subscribers.size()); + } + return null; + } + + private ByteBuffer doGet() { if (handedOut < budget) { handedOut++; return allocate(); + } + if (DEBUG) { + LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining() + + ", Requested = 1"); } return null; } @@ -61,11 +86,15 @@ return budget - handedOut; } - public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException { + private ByteBuffer doGet(int bufferSize) throws HyracksDataException { + // Subscribers have higher priority if (bufferSize % defaultFrameSize != 0) { throw new HyracksDataException(ERROR_INVALID_FRAME_SIZE); } int multiplier = bufferSize / defaultFrameSize; + if (multiplier > budget) { + throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST); + } if (handedOut + multiplier <= budget) { handedOut += multiplier; ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiplier); @@ -76,9 +105,26 @@ created += multiplier; return ByteBuffer.allocate(bufferSize); } - return largeFramesPool.poll(); + ByteBuffer buffer = largeFramesPool.poll(); + buffer.clear(); + return buffer; } // Not enough budget + if (DEBUG) { + LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining() + + ", Requested = " + multiplier); + } + return null; + } + + public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException { + if (subscribers.isEmpty()) { + return doGet(bufferSize); + } + if (DEBUG) { + LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = " + + subscribers.size()); + } return null; } @@ -121,7 +167,9 @@ created++; return ByteBuffer.allocate(defaultFrameSize); } else { - return pool.pop(); + ByteBuffer buffer = pool.pop(); + buffer.clear(); + return buffer; } } @@ -150,6 +198,9 @@ public synchronized void release(ByteBuffer buffer) throws HyracksDataException { int multiples = buffer.capacity() / defaultFrameSize; handedOut -= multiples; + if (DEBUG) { + LOGGER.info("Releasing " + multiples + " frames. Remaining frames = " + remaining()); + } if (multiples == 1) { pool.add(buffer); } else { @@ -163,21 +214,47 @@ // check subscribers while (!subscribers.isEmpty()) { FrameAction frameAction = subscribers.peek(); + ByteBuffer freeBuffer; // check if we have enough and answer immediately. if (frameAction.getSize() == defaultFrameSize) { - buffer = get(); + if (DEBUG) { + LOGGER.info("Attempting to callback a subscriber that requested 1 frame"); + } + freeBuffer = doGet(); } else { - buffer = get(frameAction.getSize()); + if (DEBUG) { + LOGGER.info("Attempting to callback a subscriber that requested " + + frameAction.getSize() / defaultFrameSize + " frames"); + } + freeBuffer = doGet(frameAction.getSize()); } - if (buffer != null) { + if (freeBuffer != null) { + int handedOutBeforeCall = handedOut; try { - frameAction.call(buffer); + frameAction.call(freeBuffer); + } catch (Exception e) { + LOGGER.error("Error while attempting to answer a subscription. Buffer will be reclaimed", e); + // TODO(amoudi): Add test cases and get rid of recursion + if (handedOut == handedOutBeforeCall) { + release(freeBuffer); + } + throw e; } finally { subscribers.remove(); + if (DEBUG) { + LOGGER.info( + "A subscription has been satisfied. " + subscribers.size() + " remaining subscribers"); + } } } else { + if (DEBUG) { + LOGGER.info("Failed to allocate requested frames"); + } break; } + } + if (DEBUG) { + LOGGER.info(subscribers.size() + " remaining subscribers"); } } @@ -187,18 +264,30 @@ ByteBuffer buffer; // check if we have enough and answer immediately. if (frameAction.getSize() == defaultFrameSize) { - buffer = get(); + buffer = doGet(); } else { - buffer = get(frameAction.getSize()); + buffer = doGet(frameAction.getSize()); } if (buffer != null) { frameAction.call(buffer); // There is no need to subscribe. perform action and return false return false; } + } else { + int multiplier = frameAction.getSize() / defaultFrameSize; + if (multiplier > budget) { + throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST); + } } // none of the above, add to subscribers and return true subscribers.add(frameAction); return true; } + + /* + * For unit testing purposes + */ + public Collection<FrameAction> getSubscribers() { + return subscribers; + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java index f888542..18d4cff 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java @@ -35,11 +35,11 @@ private final String targetId; private final int hashCode; - public FeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition, String operandId) { + public FeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition, String targetId) { this.feedId = feedId; this.runtimeType = runtimeType; this.partition = partition; - this.targetId = operandId; + this.targetId = targetId; this.hashCode = toString().hashCode(); } diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java similarity index 74% rename from asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java rename to asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java index 8a6d1b6..444d8a5 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java @@ -22,8 +22,10 @@ import java.util.ArrayDeque; import java.util.Arrays; import java.util.Random; +import java.util.concurrent.LinkedBlockingDeque; import org.apache.asterix.common.config.AsterixFeedProperties; +import org.apache.asterix.external.feed.dataflow.FrameAction; import org.apache.asterix.external.feed.management.ConcurrentFramePool; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.junit.Assert; @@ -36,7 +38,7 @@ import junit.framework.TestSuite; @RunWith(PowerMockRunner.class) -public class FeedMemoryManagerUnitTest extends TestCase { +public class ConcurrentFramePoolUnitTest extends TestCase { private static final int DEFAULT_FRAME_SIZE = 32768; private static final int NUM_FRAMES = 2048; @@ -44,8 +46,9 @@ private static final int NUM_THREADS = 8; private static final int MAX_SIZE = 52; private static final double RELEASE_PROBABILITY = 0.20; + private volatile static HyracksDataException cause = null; - public FeedMemoryManagerUnitTest(String testName) { + public ConcurrentFramePoolUnitTest(String testName) { super(testName); } @@ -53,7 +56,7 @@ * @return the suite of tests being tested */ public static Test suite() { - return new TestSuite(FeedMemoryManagerUnitTest.class); + return new TestSuite(ConcurrentFramePoolUnitTest.class); } @org.junit.Test @@ -67,6 +70,7 @@ i++; } Assert.assertEquals(i, NUM_FRAMES); + Assert.assertNull(cause); } @org.junit.Test @@ -97,6 +101,7 @@ th.printStackTrace(); Assert.fail(th.getMessage()); } + Assert.assertNull(cause); } @org.junit.Test @@ -131,6 +136,7 @@ th.printStackTrace(); Assert.fail(th.getMessage()); } + Assert.assertNull(cause); } @org.junit.Test @@ -170,6 +176,7 @@ th.printStackTrace(); Assert.fail(th.getMessage()); } + Assert.assertNull(cause); } @org.junit.Test @@ -201,6 +208,7 @@ } stack.clear(); Assert.assertEquals(fmm.remaining(), NUM_FRAMES); + Assert.assertNull(cause); } @org.junit.Test @@ -231,6 +239,7 @@ th.printStackTrace(); Assert.fail(th.getMessage()); } + Assert.assertNull(cause); } @org.junit.Test @@ -281,6 +290,8 @@ } catch (Throwable th) { th.printStackTrace(); Assert.fail(th.getMessage()); + } finally { + Assert.assertNull(cause); } } @@ -315,6 +326,125 @@ } catch (Throwable th) { th.printStackTrace(); Assert.fail(th.getMessage()); + } finally { + Assert.assertNull(cause); + } + } + + @org.junit.Test + public void testFixedSizeSubscribtion() { + try { + AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); + Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); + ConcurrentFramePool fmm = + new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE); + int i = 0; + ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE); + LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>(); + FrameAction frameAction = new FrameAction(); + frameAction.setFrame(buffer); + while (!fmm.subscribe(frameAction)) { + buffers.put(frameAction.retrieve()); + i++; + } + // One subscriber. + // Check that all frames have been consumed + Assert.assertEquals(i, NUM_FRAMES); + // Release a frame (That will be handed out to the subscriber) + fmm.release(buffers.take()); + // Check that all frames have been consumed (since the released frame have been handed to the consumer) + Assert.assertEquals(0, fmm.remaining()); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(th.getMessage()); + } finally { + Assert.assertNull(cause); + } + } + + @org.junit.Test + public void testLargerThanBudgetRequests() { + HyracksDataException hde = null; + try { + ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE); + fmm.get(32 * DEFAULT_FRAME_SIZE); + } catch (HyracksDataException e) { + hde = e; + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(th.getMessage()); + } + Assert.assertNotNull(hde); + Assert.assertNull(cause); + } + + @org.junit.Test + public void testLargerThanBudgetSubscribe() { + HyracksDataException hde = null; + try { + ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE); + ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 32); + FrameAction frameAction = new FrameAction(); + frameAction.setFrame(buffer); + fmm.subscribe(frameAction); + } catch (HyracksDataException e) { + hde = e; + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(th.getMessage()); + } + Assert.assertNotNull(hde); + Assert.assertNull(cause); + } + + @org.junit.Test + public void testgetWhileSubscribersExist() { + try { + AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); + Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); + ConcurrentFramePool fmm = + new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE); + int i = 0; + ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE); + LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>(); + FrameAction frameAction = new FrameAction(); + frameAction.setFrame(buffer); + while (!fmm.subscribe(frameAction)) { + buffers.put(frameAction.retrieve()); + i++; + } + // One subscriber. + // Check that all frames have been consumed + Assert.assertEquals(i, NUM_FRAMES); + // Release a frame (That will be handed out to the subscriber) + fmm.release(buffers.take()); + // Check that all frames have been consumed (since the released frame have been handed to the consumer) + Assert.assertEquals(fmm.remaining(), 0); + buffers.put(frameAction.retrieve()); + // Create another subscriber that takes frames of double the size + ByteBuffer bufferTimes2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 2); + LinkedBlockingDeque<ByteBuffer> buffersTimes2 = new LinkedBlockingDeque<>(); + FrameAction frameActionTimes2 = new FrameAction(); + frameActionTimes2.setFrame(bufferTimes2); + Assert.assertEquals(true, fmm.subscribe(frameActionTimes2)); + // release a small one + fmm.release(buffers.take()); + Assert.assertEquals(fmm.remaining(), 1); + // Check that a small get fails + Assert.assertEquals(null, fmm.get()); + // release another small one + fmm.release(buffers.take()); + // Check that no small frames exists in the pool since subscriber request was satisfied + Assert.assertEquals(fmm.remaining(), 0); + buffersTimes2.add(frameActionTimes2.retrieve()); + fmm.release(buffers); + fmm.release(bufferTimes2); + Assert.assertEquals(fmm.remaining(), NUM_FRAMES); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(th.getMessage()); + } finally { + Assert.assertNull(cause); } } @@ -362,7 +492,8 @@ try { fmm.release(stack.pop()); } catch (HyracksDataException e) { - Assert.fail(); + e.printStackTrace(); + cause = e; } } } else { diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java new file mode 100644 index 0000000..705d5e3 --- /dev/null +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java @@ -0,0 +1,794 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.feed.test; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType; +import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler; +import org.apache.asterix.external.feed.management.ConcurrentFramePool; +import org.apache.asterix.external.feed.management.FeedConnectionId; +import org.apache.asterix.external.feed.management.FeedId; +import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; +import org.apache.asterix.external.feed.runtime.FeedRuntimeId; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.test.FrameWriterTestUtils; +import org.apache.hyracks.api.test.TestControlledFrameWriter; +import org.apache.hyracks.api.test.TestFrameWriter; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.test.support.TestUtils; +import org.junit.Assert; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.modules.junit4.PowerMockRunner; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +@RunWith(PowerMockRunner.class) +public class InputHandlerTest extends TestCase { + + private static final int DEFAULT_FRAME_SIZE = 32768; + private static final int NUM_FRAMES = 128; + private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE * NUM_FRAMES; + private static final String DATAVERSE = "dataverse"; + private static final String DATASET = "dataset"; + private static final String FEED = "feed"; + private static final String NODE_ID = "NodeId"; + private static final float DISCARD_ALLOWANCE = 0.15f; + private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1); + private volatile static HyracksDataException cause = null; + + public InputHandlerTest(String testName) { + super(testName); + } + + public static Test suite() { + return new TestSuite(InputHandlerTest.class); + } + + private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer, + FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException { + FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class); + FeedId feedId = new FeedId(DATAVERSE, FEED); + FeedConnectionId connectionId = new FeedConnectionId(feedId, DATASET); + FeedRuntimeId runtimeId = + new FeedRuntimeId(feedId, FeedRuntimeType.COLLECT, 0, FeedRuntimeId.DEFAULT_TARGET_ID); + return new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, fpa, fta, framePool); + } + + /* + * Testing the following scenarios + * 01. Positive Frames memory budget with fixed size frames, no spill, no discard. + * 02. Positive Frames memory budget with variable size frames, no spill, no discard. + * 03. Positive Frames memory budget with fixed size frames, with spill, no discard. + * 04. Positive Frames memory budget with variable size frames, with spill, no discard. + * 05. Positive Frames memory budget with fixed size frames, no spill, with discard. + * 06. Positive Frames memory budget with variable size frames, no spill, with discard. + * 07. Positive Frames memory budget with fixed size frames, with spill, with discard. + * 08. Positive Frames memory budget with variable size frames, with spill, with discard. + * 09. 0 Frames memory budget with fixed size frames, with spill, no discard. + * 10. 0 Frames memory budget with variable size frames, with spill, no discard. + * 11. TODO 0 Frames memory budget with fixed size frames, with spill, with discard. + * 12. TODO 0 Frames memory budget with variable size frames, with spill, with discard. + * 13. TODO Test exception handling with Open, NextFrame,Flush,Close,Fail exception throwing FrameWriter + * 14. TODO Test exception while waiting for subscription + */ + + private static FeedPolicyAccessor createFeedPolicyAccessor(boolean spill, boolean discard, long spillBudget, + float discardFraction) { + FeedPolicyAccessor fpa = Mockito.mock(FeedPolicyAccessor.class); + Mockito.when(fpa.bufferingEnabled()).thenReturn(true); + Mockito.when(fpa.spillToDiskOnCongestion()).thenReturn(spill); + Mockito.when(fpa.getMaxSpillOnDisk()).thenReturn(spillBudget); + Mockito.when(fpa.discardOnCongestion()).thenReturn(discard); + Mockito.when(fpa.getMaxFractionDiscard()).thenReturn(discardFraction); + return fpa; + } + + @org.junit.Test + public void testZeroMemoryVarSizeFrameWithDiskNoDiscard() { + try { + int numRounds = 5; + Random random = new Random(); + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + // No spill, No discard + FeedPolicyAccessor fpa = + createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE); + // Non-Active Writer + TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList()); + // FramePool + ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE); + FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); + handler.open(); + ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE); + handler.nextFrame(buffer); + Assert.assertEquals(0, handler.getNumProcessedInMemory()); + Assert.assertEquals(1, handler.getNumSpilled()); + // add NUM_FRAMES times + for (int i = 0; i < NUM_FRAMES * numRounds; i++) { + int multiplier = random.nextInt(10) + 1; + buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier); + handler.nextFrame(buffer); + } + // Check that no records were discarded + Assert.assertEquals(handler.getNumDiscarded(), 0); + // Check that no records were spilled + Assert.assertEquals(NUM_FRAMES * numRounds + 1, handler.getNumSpilled()); + writer.validate(false); + handler.close(); + // Check that nextFrame was called + Assert.assertEquals(NUM_FRAMES * numRounds + 1, writer.nextFrameCount()); + writer.validate(true); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(); + } finally { + Assert.assertNull(cause); + } + } + + @org.junit.Test + public void testZeroMemoryFixedSizeFrameWithDiskNoDiscard() { + try { + int numRounds = 10; + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + // No spill, No discard + FeedPolicyAccessor fpa = + createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE); + // Non-Active Writer + TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList()); + // FramePool + ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE); + FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); + handler.open(); + VSizeFrame frame = new VSizeFrame(ctx); + handler.nextFrame(frame.getBuffer()); + Assert.assertEquals(0, handler.getNumProcessedInMemory()); + Assert.assertEquals(1, handler.getNumSpilled()); + // add NUM_FRAMES times + for (int i = 0; i < NUM_FRAMES * numRounds; i++) { + handler.nextFrame(frame.getBuffer()); + } + // Check that no records were discarded + Assert.assertEquals(handler.getNumDiscarded(), 0); + // Check that no records were spilled + Assert.assertEquals(NUM_FRAMES * numRounds + 1, handler.getNumSpilled()); + writer.validate(false); + handler.close(); + // Check that nextFrame was called + Assert.assertEquals(NUM_FRAMES * numRounds + 1, writer.nextFrameCount()); + writer.validate(true); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(); + } finally { + Assert.assertNull(cause); + } + + } + + /* + * Spill = false; + * Discard = true; discard only 5% + * Fixed size frames + */ + @org.junit.Test + public void testMemoryVarSizeFrameWithSpillWithDiscard() { + try { + int numberOfMemoryFrames = 50; + int numberOfSpillFrames = 50; + int notDiscarded = 0; + int totalMinFrames = 0; + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + // Spill budget = Memory budget, No discard + FeedPolicyAccessor fpa = + createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE); + // Non-Active Writer + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + writer.freeze(); + // FramePool + ConcurrentFramePool framePool = + new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE); + FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); + handler.open(); + ByteBuffer buffer1 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE); + ByteBuffer buffer2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 2); + ByteBuffer buffer3 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 3); + ByteBuffer buffer4 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 4); + ByteBuffer buffer5 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 5); + while (true) { + if (totalMinFrames + 1 < numberOfMemoryFrames) { + handler.nextFrame(buffer1); + notDiscarded++; + totalMinFrames++; + } else { + break; + } + if (totalMinFrames + 2 < numberOfMemoryFrames) { + notDiscarded++; + totalMinFrames += 2; + handler.nextFrame(buffer2); + } else { + break; + } + if (totalMinFrames + 3 < numberOfMemoryFrames) { + notDiscarded++; + totalMinFrames += 3; + handler.nextFrame(buffer3); + } else { + break; + } + } + // Now we need to verify that the frame pool memory has been consumed! + Assert.assertTrue(framePool.remaining() < 3); + Assert.assertEquals(0, handler.getNumSpilled()); + Assert.assertEquals(0, handler.getNumStalled()); + Assert.assertEquals(0, handler.getNumDiscarded()); + while (true) { + if (handler.getNumSpilled() < numberOfSpillFrames) { + notDiscarded++; + handler.nextFrame(buffer3); + } else { + break; + } + if (handler.getNumSpilled() < numberOfSpillFrames) { + notDiscarded++; + handler.nextFrame(buffer4); + } else { + break; + } + if (handler.getNumSpilled() < numberOfSpillFrames) { + notDiscarded++; + handler.nextFrame(buffer5); + } else { + break; + } + } + Assert.assertTrue(framePool.remaining() < 3); + Assert.assertEquals(handler.framesOnDisk(), handler.getNumSpilled()); + Assert.assertEquals(handler.framesOnDisk(), numberOfSpillFrames); + Assert.assertEquals(0, handler.getNumStalled()); + Assert.assertEquals(0, handler.getNumDiscarded()); + // We can only discard one frame + double numDiscarded = 0; + boolean nextShouldDiscard = + ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); + while (nextShouldDiscard) { + handler.nextFrame(buffer5); + numDiscarded++; + nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); + } + Assert.assertTrue(framePool.remaining() < 3); + Assert.assertEquals(handler.framesOnDisk(), handler.getNumSpilled()); + Assert.assertEquals(0, handler.getNumStalled()); + Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded()); + // Next Call should block since we're exceeding the discard allowance + Future<?> result = EXECUTOR.submit(new Pusher(buffer5, handler)); + if (result.isDone()) { + Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance"); + } + // consume memory frames + writer.unfreeze(); + result.get(); + handler.close(); + Assert.assertEquals(writer.nextFrameCount(), notDiscarded + 1); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(); + } + Assert.assertNull(cause); + } + + /* + * Spill = true; + * Discard = true + * Fixed size frames + */ + @org.junit.Test + public void testMemoryFixedSizeFrameWithSpillWithDiscard() { + try { + int numberOfMemoryFrames = 50; + int numberOfSpillFrames = 50; + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + // Spill budget = Memory budget, No discard + FeedPolicyAccessor fpa = + createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE); + // Non-Active Writer + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + writer.freeze(); + // FramePool + ConcurrentFramePool framePool = + new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE); + FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); + handler.open(); + VSizeFrame frame = new VSizeFrame(ctx); + for (int i = 0; i < numberOfMemoryFrames; i++) { + handler.nextFrame(frame.getBuffer()); + } + // Now we need to verify that the frame pool memory has been consumed! + Assert.assertEquals(0, framePool.remaining()); + Assert.assertEquals(numberOfMemoryFrames, handler.getTotal()); + Assert.assertEquals(0, handler.getNumSpilled()); + Assert.assertEquals(0, handler.getNumStalled()); + Assert.assertEquals(0, handler.getNumDiscarded()); + for (int i = 0; i < numberOfSpillFrames; i++) { + handler.nextFrame(frame.getBuffer()); + } + Assert.assertEquals(0, framePool.remaining()); + Assert.assertEquals(numberOfMemoryFrames + numberOfSpillFrames, handler.getTotal()); + Assert.assertEquals(numberOfSpillFrames, handler.getNumSpilled()); + Assert.assertEquals(0, handler.getNumStalled()); + Assert.assertEquals(0, handler.getNumDiscarded()); + // We can only discard one frame + double numDiscarded = 0; + boolean nextShouldDiscard = + ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); + while (nextShouldDiscard) { + handler.nextFrame(frame.getBuffer()); + numDiscarded++; + nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); + } + Assert.assertEquals(0, framePool.remaining()); + Assert.assertEquals((int) (numberOfMemoryFrames + numberOfSpillFrames + numDiscarded), handler.getTotal()); + Assert.assertEquals(numberOfSpillFrames, handler.getNumSpilled()); + Assert.assertEquals(0, handler.getNumStalled()); + Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded()); + // Next Call should block since we're exceeding the discard allowance + Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler)); + if (result.isDone()) { + Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance"); + } else { + Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded()); + } + // consume memory frames + writer.unfreeze(); + result.get(); + handler.close(); + Assert.assertTrue(result.isDone()); + Assert.assertEquals(writer.nextFrameCount(), numberOfMemoryFrames + numberOfSpillFrames + 1); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(); + } + Assert.assertNull(cause); + } + + /* + * Spill = false; + * Discard = true; discard only 5% + * Fixed size frames + */ + @org.junit.Test + public void testMemoryVariableSizeFrameNoSpillWithDiscard() { + try { + int discardTestFrames = 100; + Random random = new Random(); + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + // Spill budget = Memory budget, No discard + FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE); + // Non-Active Writer + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + writer.freeze(); + // FramePool + ConcurrentFramePool framePool = + new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE); + FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); + handler.open(); + // add NUM_FRAMES times + ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE); + int multiplier = 1; + int numFrames = 0; + // add NUM_FRAMES times + while ((multiplier <= framePool.remaining())) { + numFrames++; + handler.nextFrame(buffer); + multiplier = random.nextInt(10) + 1; + buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier); + } + // Next call should NOT block but should discard. + double numDiscarded = 0.0; + boolean nextShouldDiscard = + ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); + while (nextShouldDiscard) { + handler.nextFrame(buffer); + numDiscarded++; + nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); + } + Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler)); + if (result.isDone()) { + Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance"); + } else { + // Check that no records were discarded + assertEquals((int) numDiscarded, handler.getNumDiscarded()); + // Check that one frame is spilled + assertEquals(handler.getNumSpilled(), 0); + } + // consume memory frames + writer.unfreeze(); + result.get(); + handler.close(); + Assert.assertEquals(writer.nextFrameCount(), numFrames + 1); + // exit + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(); + } + Assert.assertNull(cause); + } + + /* + * Spill = false; + * Discard = true; discard only 5% + * Fixed size frames + */ + @org.junit.Test + public void testMemoryFixedSizeFrameNoSpillWithDiscard() { + try { + int discardTestFrames = 100; + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + // Spill budget = Memory budget, No discard + FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE); + // Non-Active Writer + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + writer.freeze(); + // FramePool + ConcurrentFramePool framePool = + new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE); + FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); + handler.open(); + VSizeFrame frame = new VSizeFrame(ctx); + // add NUM_FRAMES times + for (int i = 0; i < discardTestFrames; i++) { + handler.nextFrame(frame.getBuffer()); + } + // Next 5 calls call should NOT block but should discard. + double numDiscarded = 0.0; + boolean nextShouldDiscard = + ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); + while (nextShouldDiscard) { + handler.nextFrame(frame.getBuffer()); + numDiscarded++; + nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); + } + // Next Call should block since we're exceeding the discard allowance + Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler)); + if (result.isDone()) { + Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance"); + } else { + // Check that no records were discarded + assertEquals((int) numDiscarded, handler.getNumDiscarded()); + // Check that one frame is spilled + assertEquals(handler.getNumSpilled(), 0); + } + // consume memory frames + writer.unfreeze(); + result.get(); + handler.close(); + Assert.assertEquals(writer.nextFrameCount(), discardTestFrames + 1); + // exit + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(); + } + Assert.assertNull(cause); + } + + /* + * Spill = true; + * Discard = false; + * Fixed size frames + */ + @org.junit.Test + public void testMemoryFixedSizeFrameWithSpillNoDiscard() { + try { + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + // Spill budget = Memory budget, No discard + FeedPolicyAccessor fpa = + createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE); + // Non-Active Writer + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + writer.freeze(); + // FramePool + ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); + FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); + handler.open(); + VSizeFrame frame = new VSizeFrame(ctx); + // add NUM_FRAMES times + for (int i = 0; i < NUM_FRAMES; i++) { + handler.nextFrame(frame.getBuffer()); + } + // Next call should NOT block. we will do it in a different thread + Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler)); + result.get(); + // Check that no records were discarded + assertEquals(handler.getNumDiscarded(), 0); + // Check that one frame is spilled + assertEquals(handler.getNumSpilled(), 1); + // consume memory frames + writer.unfreeze(); + handler.close(); + Assert.assertEquals(handler.framesOnDisk(), 0); + // exit + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(); + } + Assert.assertNull(cause); + } + + /* + * Spill = false; + * Discard = false; + * Fixed size frames + * Very fast next operator + */ + @org.junit.Test + public void testMemoryFixedSizeFrameNoDiskNoDiscardFastConsumer() { + try { + int numRounds = 10; + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + // No spill, No discard + FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE); + // Non-Active Writer + TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList()); + // FramePool + ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); + FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); + handler.open(); + VSizeFrame frame = new VSizeFrame(ctx); + // add NUM_FRAMES times + for (int i = 0; i < NUM_FRAMES * numRounds; i++) { + handler.nextFrame(frame.getBuffer()); + } + // Check that no records were discarded + Assert.assertEquals(handler.getNumDiscarded(), 0); + // Check that no records were spilled + Assert.assertEquals(handler.getNumSpilled(), 0); + writer.validate(false); + handler.close(); + // Check that nextFrame was called + Assert.assertEquals(NUM_FRAMES * numRounds, writer.nextFrameCount()); + writer.validate(true); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(); + } + Assert.assertNull(cause); + } + + /* + * Spill = false; + * Discard = false; + * Fixed size frames + * Slow next operator + */ + @org.junit.Test + public void testMemoryFixedSizeFrameNoDiskNoDiscardSlowConsumer() { + try { + int numRounds = 10; + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + // No spill, No discard + FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE); + // Non-Active Writer + TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList()); + // FramePool + ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); + FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); + handler.open(); + VSizeFrame frame = new VSizeFrame(ctx); + writer.setNextDuration(1); + // add NUM_FRAMES times + for (int i = 0; i < NUM_FRAMES * numRounds; i++) { + handler.nextFrame(frame.getBuffer()); + } + // Check that no records were discarded + Assert.assertEquals(handler.getNumDiscarded(), 0); + // Check that no records were spilled + Assert.assertEquals(handler.getNumSpilled(), 0); + // Check that nextFrame was called + writer.validate(false); + handler.close(); + Assert.assertEquals(writer.nextFrameCount(), (NUM_FRAMES * numRounds)); + writer.validate(true); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(); + } + Assert.assertNull(cause); + } + + /* + * Spill = false + * Discard = false + * VarSizeFrame + */ + public void testMemoryVarSizeFrameNoDiskNoDiscard() { + try { + Random random = new Random(); + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + // No spill, No discard + FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE); + // Non-Active Writer + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + writer.freeze(); + // FramePool + ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); + FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); + handler.open(); + ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE); + int multiplier = 1; + // add NUM_FRAMES times + while ((multiplier <= framePool.remaining())) { + handler.nextFrame(buffer); + multiplier = random.nextInt(10) + 1; + buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier); + } + // we can't satisfy the next request + // Next call should block we will do it in a different thread + Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler)); + // Check that the nextFrame didn't return + if (result.isDone()) { + Assert.fail(); + } + // Check that no records were discarded + assertEquals(handler.getNumDiscarded(), 0); + // Check that no records were spilled + assertEquals(handler.getNumSpilled(), 0); + // Check that number of stalled is not greater than 1 + Assert.assertTrue(handler.getNumStalled() <= 1); + writer.unfreeze(); + result.get(); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(); + } + Assert.assertNull(cause); + } + + /* + * Spill = true; + * Discard = false; + * Variable size frames + */ + @org.junit.Test + public void testMemoryVarSizeFrameWithSpillNoDiscard() { + try { + Random random = new Random(); + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + // Spill budget = Memory budget, No discard + FeedPolicyAccessor fpa = + createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE); + // Non-Active Writer + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + writer.freeze(); + // FramePool + ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); + FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); + handler.open(); + ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE); + int multiplier = 1; + // add NUM_FRAMES times + while ((multiplier <= framePool.remaining())) { + handler.nextFrame(buffer); + multiplier = random.nextInt(10) + 1; + buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier); + } + // Next call should Not block. we will do it in a different thread + Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler)); + result.get(); + // Check that no records were discarded + assertEquals(handler.getNumDiscarded(), 0); + // Check that one frame is spilled + assertEquals(handler.getNumSpilled(), 1); + // consume memory frames + while (!handler.getInternalBuffer().isEmpty()) { + writer.kick(); + } + // There should be 1 frame on disk + Assert.assertEquals(1, handler.framesOnDisk()); + writer.unfreeze(); + result.get(); + Assert.assertEquals(0, handler.framesOnDisk()); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(); + } + Assert.assertNull(cause); + } + + /* + * Spill = false; + * Discard = false; + * Fixed size frames + */ + @org.junit.Test + public void testMemoryFixedSizeFrameNoDiskNoDiscard() { + try { + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + // No spill, No discard + FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE); + // Non-Active Writer + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + writer.freeze(); + // FramePool + ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); + + FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); + handler.open(); + VSizeFrame frame = new VSizeFrame(ctx); + // add NUM_FRAMES times + for (int i = 0; i < NUM_FRAMES; i++) { + handler.nextFrame(frame.getBuffer()); + } + // Next call should block we will do it in a different thread + Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler)); + // Check that the nextFrame didn't return + if (result.isDone()) { + Assert.fail(); + } else { + // Check that no records were discarded + Assert.assertEquals(handler.getNumDiscarded(), 0); + // Check that no records were spilled + Assert.assertEquals(handler.getNumSpilled(), 0); + // Check that no records were discarded + // Check that the inputHandler subscribed to the framePool + // Check that number of stalled is not greater than 1 + Assert.assertTrue(handler.getNumStalled() <= 1); + writer.kick(); + } + result.get(); + writer.unfreeze(); + handler.close(); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(); + } + Assert.assertNull(cause); + } + + private class Pusher implements Runnable { + private final ByteBuffer buffer; + private final IFrameWriter writer; + + public Pusher(ByteBuffer buffer, IFrameWriter writer) { + this.buffer = buffer; + this.writer = writer; + } + + @Override + public void run() { + try { + writer.nextFrame(buffer); + } catch (HyracksDataException e) { + e.printStackTrace(); + cause = e; + } + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml index 3961921..9336921 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml @@ -39,7 +39,23 @@ <properties> <root.dir>${basedir}/../..</root.dir> </properties> - + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> <dependencies> <dependency> <groupId>org.json</groupId> @@ -77,5 +93,22 @@ <artifactId>hyracks-util</artifactId> <version>0.2.18-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>2.0.2-beta</version> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <version>1.6.2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>1.6.2</version> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java new file mode 100644 index 0000000..19998a7 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.test; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.mockito.invocation.InvocationOnMock; + +public class CountAndThrowError extends CountAnswer { + private String errorMessage; + + public CountAndThrowError(String errorMessage) { + this.errorMessage = errorMessage; + } + + @Override + public Object call() throws HyracksDataException { + count++; + throw new UnknownError(errorMessage); + } + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + count++; + throw new UnknownError(errorMessage); + } +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java new file mode 100644 index 0000000..5a5ad59 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.test; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.mockito.invocation.InvocationOnMock; + +public class CountAndThrowException extends CountAnswer { + private String errorMessage; + + public CountAndThrowException(String errorMessage) { + this.errorMessage = errorMessage; + } + + @Override + public Object call() throws HyracksDataException { + count++; + throw new HyracksDataException(errorMessage); + } + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + count++; + throw new HyracksDataException(errorMessage); + } +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java new file mode 100644 index 0000000..e8a6654 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.test; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class CountAnswer implements Answer<Object> { + protected int count = 0; + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + count++; + return null; + } + + public Object call() throws HyracksDataException { + count++; + return null; + } + + public int getCallCount() { + return count; + } + + public void reset() { + count = 0; + } +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java new file mode 100644 index 0000000..4bddfa9 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.test; + +import java.util.Collection; + +public class FrameWriterTestUtils { + public static final String EXCEPTION_MESSAGE = "IFrameWriter Exception in the call to the method "; + public static final String ERROR_MESSAGE = "IFrameWriter Error in the call to the method "; + + public enum FrameWriterOperation { + Open, + NextFrame, + Fail, + Flush, + Close + } + + public static TestFrameWriter create(Collection<FrameWriterOperation> exceptionThrowingOperations, + Collection<FrameWriterOperation> errorThrowingOperations) { + CountAnswer openAnswer = + createAnswer(FrameWriterOperation.Open, exceptionThrowingOperations, errorThrowingOperations); + CountAnswer nextAnswer = + createAnswer(FrameWriterOperation.NextFrame, exceptionThrowingOperations, errorThrowingOperations); + CountAnswer flushAnswer = + createAnswer(FrameWriterOperation.Flush, exceptionThrowingOperations, errorThrowingOperations); + CountAnswer failAnswer = + createAnswer(FrameWriterOperation.Fail, exceptionThrowingOperations, errorThrowingOperations); + CountAnswer closeAnswer = + createAnswer(FrameWriterOperation.Close, exceptionThrowingOperations, errorThrowingOperations); + return new TestFrameWriter(openAnswer, nextAnswer, flushAnswer, failAnswer, closeAnswer); + } + + public static CountAnswer createAnswer(FrameWriterOperation operation, + Collection<FrameWriterOperation> exceptionThrowingOperations, + Collection<FrameWriterOperation> errorThrowingOperations) { + if (exceptionThrowingOperations.contains(operation)) { + return new CountAndThrowException(EXCEPTION_MESSAGE + operation.toString()); + } else if (exceptionThrowingOperations.contains(operation)) { + return new CountAndThrowError(ERROR_MESSAGE + operation.toString()); + } else { + return new CountAnswer(); + } + } + + public static TestControlledFrameWriter create(int initialFrameSize) { + return new TestControlledFrameWriter(initialFrameSize); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java new file mode 100644 index 0000000..2a3f70d --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.test; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class TestControlledFrameWriter extends TestFrameWriter { + private boolean frozen = false; + private boolean timed = false; + private long duration = Long.MAX_VALUE; + private final int initialFrameSize; + private volatile int currentMultiplier = 0; + private volatile int kicks = 0; + + public TestControlledFrameWriter(int initialFrameSize) { + super(new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer()); + this.initialFrameSize = initialFrameSize; + } + + public int getCurrentMultiplier() { + return currentMultiplier; + } + + public synchronized void freeze() { + frozen = true; + } + + public synchronized void time(long ms) { + frozen = true; + timed = true; + duration = ms; + } + + public synchronized void unfreeze() { + frozen = false; + notify(); + } + + public synchronized void kick() { + kicks++; + notify(); + } + + @Override + public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException { + super.nextFrame(buffer); + currentMultiplier = buffer.capacity() / initialFrameSize; + if (frozen) { + try { + if (timed) { + wait(duration); + } else { + while (frozen && kicks == 0) { + wait(); + } + kicks--; + } + } catch (InterruptedException e) { + throw new HyracksDataException(e); + } + } + currentMultiplier = 0; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java new file mode 100644 index 0000000..b3492fe --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.test; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class TestFrameWriter implements IFrameWriter { + private final CountAnswer openAnswer; + private final CountAnswer nextAnswer; + private final CountAnswer flushAnswer; + private final CountAnswer failAnswer; + private final CountAnswer closeAnswer; + private long openDuration = 0L; + private long nextDuration = 0L; + private long flushDuration = 0L; + private long failDuration = 0L; + private long closeDuration = 0L; + + public TestFrameWriter(CountAnswer openAnswer, CountAnswer nextAnswer, CountAnswer flushAnswer, + CountAnswer failAnswer, CountAnswer closeAnswer) { + this.openAnswer = openAnswer; + this.nextAnswer = nextAnswer; + this.closeAnswer = closeAnswer; + this.flushAnswer = flushAnswer; + this.failAnswer = failAnswer; + } + + @Override + public void open() throws HyracksDataException { + delay(openDuration); + openAnswer.call(); + } + + public int openCount() { + return openAnswer.getCallCount(); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + delay(nextDuration); + nextAnswer.call(); + } + + public int nextFrameCount() { + return nextAnswer.getCallCount(); + } + + @Override + public void flush() throws HyracksDataException { + delay(flushDuration); + flushAnswer.call(); + } + + public int flushCount() { + return flushAnswer.getCallCount(); + } + + @Override + public void fail() throws HyracksDataException { + delay(failDuration); + failAnswer.call(); + } + + public int failCount() { + return failAnswer.getCallCount(); + } + + @Override + public void close() throws HyracksDataException { + delay(closeDuration); + closeAnswer.call(); + } + + public int closeCount() { + return closeAnswer.getCallCount(); + } + + public synchronized boolean validate(boolean finished) { + if (failAnswer.getCallCount() > 1 || closeAnswer.getCallCount() > 1 || openAnswer.getCallCount() > 1) { + return false; + } + if (openAnswer.getCallCount() == 0 + && (nextAnswer.getCallCount() > 0 || failAnswer.getCallCount() > 0 || closeAnswer.getCallCount() > 0)) { + return false; + } + if (finished) { + if (closeAnswer.getCallCount() == 0 && (nextAnswer.getCallCount() > 0 || failAnswer.getCallCount() > 0 + || openAnswer.getCallCount() > 0)) { + return false; + } + } + return true; + } + + public void reset() { + openAnswer.reset(); + nextAnswer.reset(); + flushAnswer.reset(); + failAnswer.reset(); + closeAnswer.reset(); + } + + public long getOpenDuration() { + return openDuration; + } + + public void setOpenDuration(long openDuration) { + this.openDuration = openDuration; + } + + public long getNextDuration() { + return nextDuration; + } + + public void setNextDuration(long nextDuration) { + this.nextDuration = nextDuration; + } + + public long getFlushDuration() { + return flushDuration; + } + + public void setFlushDuration(long flushDuration) { + this.flushDuration = flushDuration; + } + + public long getFailDuration() { + return failDuration; + } + + public void setFailDuration(long failDuration) { + this.failDuration = failDuration; + } + + public long getCloseDuration() { + return closeDuration; + } + + public void setCloseDuration(long closeDuration) { + this.closeDuration = closeDuration; + } + + private void delay(long duration) throws HyracksDataException { + if (duration > 0) { + try { + synchronized (this) { + wait(duration); + } + } catch (InterruptedException e) { + throw new HyracksDataException(e); + } + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml index 581fde4..d07d633 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml @@ -39,6 +39,13 @@ <dependencies> <dependency> <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-api</artifactId> + <version>0.2.18-SNAPSHOT</version> + <type>test-jar</type> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-storage-common</artifactId> <version>0.2.18-SNAPSHOT</version> <type>jar</type> diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java index df3a211..d3e7a3a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java @@ -30,6 +30,9 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.test.CountAndThrowError; +import org.apache.hyracks.api.test.CountAndThrowException; +import org.apache.hyracks.api.test.CountAnswer; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; @@ -92,8 +95,8 @@ public boolean validate(boolean finished) { // get number of open calls int openCount = openException.getCallCount() + openNormal.getCallCount() + openError.getCallCount(); - int nextFrameCount = nextFrameException.getCallCount() + nextFrameNormal.getCallCount() - + nextFrameError.getCallCount(); + int nextFrameCount = + nextFrameException.getCallCount() + nextFrameNormal.getCallCount() + nextFrameError.getCallCount(); int failCount = failException.getCallCount() + failNormal.getCallCount() + failError.getCallCount(); int closeCount = closeException.getCallCount() + closeNormal.getCallCount() + closeError.getCallCount(); @@ -422,8 +425,9 @@ public AbstractTreeIndexOperatorDescriptor[] mockIndexOpDesc() throws HyracksDataException, IndexException { IIndexDataflowHelperFactory[] indexDataflowHelperFactories = mockIndexHelperFactories(); ISearchOperationCallbackFactory[] searchOpCallbackFactories = mockSearchOpCallbackFactories(); - AbstractTreeIndexOperatorDescriptor[] opDescs = new AbstractTreeIndexOperatorDescriptor[indexDataflowHelperFactories.length - * searchOpCallbackFactories.length]; + AbstractTreeIndexOperatorDescriptor[] opDescs = + new AbstractTreeIndexOperatorDescriptor[indexDataflowHelperFactories.length + * searchOpCallbackFactories.length]; int k = 0; for (int i = 0; i < indexDataflowHelperFactories.length; i++) { for (int j = 0; j < searchOpCallbackFactories.length; j++) { @@ -450,52 +454,6 @@ private ISearchOperationCallback mockSearchOpCallback() { ISearchOperationCallback opCallback = Mockito.mock(ISearchOperationCallback.class); return opCallback; - } - - public class CountAnswer implements Answer<Object> { - protected int count = 0; - - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - count++; - return null; - } - - public int getCallCount() { - return count; - } - - public void reset() { - count = 0; - } - } - - public class CountAndThrowException extends CountAnswer { - private String errorMessage; - - public CountAndThrowException(String errorMessage) { - this.errorMessage = errorMessage; - } - - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - count++; - throw new HyracksDataException(errorMessage); - } - } - - public class CountAndThrowError extends CountAnswer { - private String errorMessage; - - public CountAndThrowError(String errorMessage) { - this.errorMessage = errorMessage; - } - - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - count++; - throw new UnknownError(errorMessage); - } } public IFrameWriter[] createOutputWriters() throws Exception { -- To view, visit https://asterix-gerrit.ics.uci.edu/866 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I7088f489a7d53dee8cf6cdbf5baa7cd8d3884f55 Gerrit-PatchSet: 9 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
