>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19561 )
Change subject: [ASTERIXDB-3587][RT] Re-use frame in NL join cache activity
......................................................................
[ASTERIXDB-3587][RT] Re-use frame in NL join cache activity
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Instead of creating a frame for each incoming frame, create
one frame and re-use it in the cache activity of the nested
loop join operator.
- Reduce the logging level for some logs from debug to trace.
Ext-ref: MB-66027
Change-Id: I267d227f6a5e435e08221cb94895cedf60764e4c
---
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
5 files changed, 77 insertions(+), 55 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/61/19561/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index bedcd9c..e0e3cbc 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -97,13 +97,15 @@
try {
responseMsg = (ExecuteStatementResponseMessage)
responseFuture.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- cancelQuery(ncMb, ncCtx.getNodeId(),
requestReference.getUuid(), param.getClientContextID(), e, false);
+ cancelQuery(ncMb, ncCtx.getNodeId(),
requestReference.getUuid(), param.getClientContextID(), e, false,
+ "interrupt");
throw e;
} catch (TimeoutException exception) {
RuntimeDataException hde = new
RuntimeDataException(ErrorCode.REQUEST_TIMEOUT);
hde.addSuppressed(exception);
// cancel query
- cancelQuery(ncMb, ncCtx.getNodeId(),
requestReference.getUuid(), param.getClientContextID(), hde, true);
+ cancelQuery(ncMb, ncCtx.getNodeId(),
requestReference.getUuid(), param.getClientContextID(), hde, true,
+ "timeout");
throw hde;
}
executionState.end();
@@ -156,7 +158,7 @@
}
private void cancelQuery(INCMessageBroker messageBroker, String nodeId,
String uuid, String clientContextID,
- Exception exception, boolean wait) {
+ Exception exception, boolean wait, String reason) {
if (uuid == null && clientContextID == null) {
return;
}
@@ -165,8 +167,7 @@
CancelQueryRequest cancelQueryMessage =
new CancelQueryRequest(nodeId,
cancelQueryFuture.getFutureId(), uuid, clientContextID);
// TODO(mblow): multicc -- need to send cancellation to the
correct cc
- LOGGER.info("Cancelling query with uuid:{}, clientContextID:{} due
to {}", uuid, clientContextID,
- exception.getClass().getSimpleName());
+ LOGGER.info("Cancelling query with uuid:{}, clientContextID:{} due
to {}", uuid, clientContextID, reason);
messageBroker.sendMessageToPrimaryCC(cancelQueryMessage);
if (wait) {
cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS,
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
index b053cac..3f5de62 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
@@ -161,8 +161,8 @@
policy.close();
frames.clear();
numTuples = 0;
- if (LOG.isDebugEnabled()) {
- LOG.debug("VariableTupleMemoryManager has reorganized " +
statsReOrg + " times");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("VariableTupleMemoryManager has reorganized {} times",
statsReOrg);
}
statsReOrg = 0;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 1e5c121..d9b473b 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -131,9 +131,9 @@
final int numPartitions = getNumOfPartitions(inputDataBytesSize /
ctx.getInitialFrameSize(), memoryBudget);
final int entriesPerPartition = (int) Math.ceil(1.0 * tableSize /
numPartitions);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("created hashtable, table size:" + tableSize + " file
size:" + inputDataBytesSize
- + " #partitions:" + numPartitions);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("created hashtable, table size:{} file size:{}
#partitions:{}", tableSize, inputDataBytesSize,
+ numPartitions);
}
final ArrayTupleBuilder outputTupleBuilder = new
ArrayTupleBuilder(outRecordDescriptor.getFields().length);
@@ -185,9 +185,9 @@
if (force ||
hashTableForTuplePointer.isGarbageCollectionNeeded()) {
int numberOfFramesReclaimed =
hashTableForTuplePointer.collectGarbage(bufferAccessor, tpcIntermediate);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Garbage Collection on Hash table is
done. Deallocated frames:"
- + numberOfFramesReclaimed);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Garbage Collection on Hash table is
done. Deallocated frames:{}",
+ numberOfFramesReclaimed);
}
return numberOfFramesReclaimed != -1;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 52dc241..831753f 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
@@ -116,17 +117,20 @@
return new AbstractUnaryInputSinkOperatorNodePushable() {
private JoinCacheTaskState state;
+ private VSizeFrame inFrame;
@Override
public void open() throws HyracksDataException {
state = new JoinCacheTaskState(jobletCtx.getJobId(), new
TaskId(getActivityId(), partition));
state.joiner = new NestedLoopJoin(jobletCtx, new
FrameTupleAccessor(rd0),
new FrameTupleAccessor(rd1), memSize, isLeftOuter,
nullWriters1);
+ inFrame = new VSizeFrame(ctx);
}
@Override
public void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
- ByteBuffer copyBuffer =
jobletCtx.allocateFrame(buffer.capacity());
+ inFrame.resize(buffer.capacity());
+ ByteBuffer copyBuffer = inFrame.getBuffer();
FrameUtils.copyAndFlip(buffer, copyBuffer);
state.joiner.cache(copyBuffer);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 91e94dc..e4a5c86 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -140,11 +140,6 @@
private final boolean isLeftOuter;
private final IMissingWriterFactory[] nonMatchWriterFactories;
- //Flags added for test purpose
- private boolean skipInMemoryHJ = false;
- private boolean forceNLJ = false;
- private boolean forceRoleReversal = false;
-
private static final Logger LOGGER = LogManager.getLogger();
public
OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int
memSizeInFrames,
@@ -233,9 +228,6 @@
private int numOfPartitions;
private OptimizedHybridHashJoin hybridHJ;
- public BuildAndPartitionTaskState() {
- }
-
private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) {
super(jobId, taskId);
}
@@ -296,7 +288,7 @@
@Override
public void open() throws HyracksDataException {
if (memSizeInFrames <= 2) { //Dedicated buffers: One
buffer to read and two buffers for output
- throw new HyracksDataException("Not enough memory is
assigend for Hybrid Hash Join.");
+ throw new HyracksDataException("Not enough memory is
assigned for Hybrid Hash Join.");
}
state.memForJoin = memSizeInFrames - 2;
state.numOfPartitions =
@@ -308,8 +300,9 @@
state.hybridHJ.initBuild();
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("OptimizedHybridHashJoin is starting the
build phase with " + state.numOfPartitions
- + " partitions using " + state.memForJoin + "
frames for memory.");
+ LOGGER.trace(
+ "OptimizedHybridHashJoin is starting the build
phase with {} partitions using {} frames for memory.",
+ state.numOfPartitions, state.memForJoin);
}
}
@@ -513,11 +506,11 @@
stats.getLevel().set(level);
}
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("\n>>>Joining Partition Pairs (thread_id
" + Thread.currentThread().getId()
- + ") (pid " + ") - (level " + level + ")" + "
- BuildSize:\t" + buildPartSize
- + "\tProbeSize:\t" + probePartSize + " -
MemForJoin " + (state.memForJoin)
- + " - LeftOuter is " + isLeftOuter);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(
+ "\n>>>Joining Partition Pairs (thread_id {})
(pid ) - (level {}) - BuildSize:\t{}\tProbeSize:\t{} - MemForJoin {} -
LeftOuter is {}",
+ Thread.currentThread().getId(), level,
buildPartSize, probePartSize, state.memForJoin,
+ isLeftOuter);
}
// Calculate the expected hash table size for the both
side.
@@ -527,16 +520,17 @@
SerializableHashTable.getExpectedTableFrameCount(probeSizeInTuple, frameSize);
//Apply in-Mem HJ if possible
- if (!skipInMemoryHJ && ((buildPartSize +
expectedHashTableSizeForBuildInFrame < state.memForJoin)
+ if (((buildPartSize + expectedHashTableSizeForBuildInFrame
< state.memForJoin)
|| (probePartSize +
expectedHashTableSizeForProbeInFrame < state.memForJoin
&& !isLeftOuter))) {
int tabSize = -1;
- if (!forceRoleReversal && (isLeftOuter ||
(buildPartSize < probePartSize))) {
+ if ((isLeftOuter || (buildPartSize < probePartSize))) {
//Case 1.1 - InMemHJ (without Role-Reversal)
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("\t>>>Case 1.1 (IsLeftOuter ||
buildSize<probe) AND ApplyInMemHJ - [Level "
- + level + "]");
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(
+ "\t>>>Case 1.1 (IsLeftOuter ||
buildSize<probe) AND ApplyInMemHJ - [Level {}]",
+ level);
}
tabSize = buildSizeInTuple;
if (tabSize == 0) {
@@ -547,9 +541,10 @@
applyInMemHashJoin(buildKeys, probeKeys, tabSize,
buildRd, probeRd, buildHpc, probeHpc,
buildSideReader, probeSideReader,
probComp); // checked-confirmed
} else { //Case 1.2 - InMemHJ with Role Reversal
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("\t>>>Case 1.2. (NoIsLeftOuter ||
probe<build) AND ApplyInMemHJ"
- + "WITH RoleReversal - [Level " +
level + "]");
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(
+ "\t>>>Case 1.2. (NoIsLeftOuter ||
probe<build) AND ApplyInMemHJWITH RoleReversal - [Level {}]",
+ level);
}
tabSize = probeSizeInTuple;
if (tabSize == 0) {
@@ -563,24 +558,23 @@
}
//Apply (Recursive) HHJ
else {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("\t>>>Case 2. ApplyRecursiveHHJ -
[Level " + level + "]");
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("\t>>>Case 2. ApplyRecursiveHHJ -
[Level {}]", level);
}
- if (!forceRoleReversal && (isLeftOuter ||
buildPartSize < probePartSize)) {
+ if ((isLeftOuter || buildPartSize < probePartSize)) {
//Case 2.1 - Recursive HHJ (without Role-Reversal)
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "\t\t>>>Case 2.1 - RecursiveHHJ WITH
(isLeftOuter || build<probe) - [Level "
- + level + "]");
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(
+ "\t\t>>>Case 2.1 - RecursiveHHJ WITH
(isLeftOuter || build<probe) - [Level {}]",
+ level);
}
applyHybridHashJoin((int) buildPartSize,
PROBE_REL, BUILD_REL, probeKeys, buildKeys,
probeRd, buildRd, probeHpc, buildHpc,
probeSideReader, buildSideReader, level,
beforeMax, probComp);
} else { //Case 2.2 - Recursive HHJ (with
Role-Reversal)
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "\t\t>>>Case 2.2. - RecursiveHHJ WITH
RoleReversal - [Level " + level + "]");
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("\t\t>>>Case 2.2. - RecursiveHHJ
WITH RoleReversal - [Level {}]", level);
}
applyHybridHashJoin((int) probePartSize,
BUILD_REL, PROBE_REL, buildKeys, probeKeys,
@@ -645,11 +639,12 @@
int afterMax = Math.max(maxAfterBuildSize,
maxAfterProbeSize);
BitSet rPStatus = rHHj.getPartitionStatus();
- if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD *
beforeMax))) {
+ if ((afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) {
//Case 2.1.1 - Keep applying HHJ
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("\t\t>>>Case 2.1.1 - KEEP
APPLYING RecursiveHHJ WITH "
- + "(isLeftOuter || build<probe) -
[Level " + level + "]");
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(
+ "\t\t>>>Case 2.1.1 - KEEP APPLYING
RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level {}]",
+ level);
}
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0;
rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw =
rHHj.getBuildRFReader(rPid);
@@ -679,9 +674,10 @@
}
} else { //Case 2.1.2 - Switch to NLJ
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("\t\t>>>Case 2.1.2 - SWITCHED to
NLJ RecursiveHHJ WITH "
- + "(isLeftOuter || build<probe) -
[Level " + level + "]");
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(
+ "\t\t>>>Case 2.1.2 - SWITCHED to NLJ
RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level {}]",
+ level);
}
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0;
rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw =
rHHj.getBuildRFReader(rPid);
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19561
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: I267d227f6a5e435e08221cb94895cedf60764e4c
Gerrit-Change-Number: 19561
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-MessageType: newchange