Xikui Wang has submitted this change and it was merged. Change subject: [NO ISSUE][RT] Replace HybridHashJoin with OptimizedHybridHashJoin ......................................................................
[NO ISSUE][RT] Replace HybridHashJoin with OptimizedHybridHashJoin - user model changes: no - storage format changes: no - interface changes: no Details: The HybridHashJoinOperator is an old implenetation which haven't been used in the runtime, and it lacks necessary documentation and memory management. The OptimizedHybridHashJoinOperatorDescriptor serves the same purpose. We should use this instead and avoid maintaining the old one. Change-Id: I6ed612cc233af1b78d453c7b711077b82e721e82 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3023 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Xikui Wang <xkk...@gmail.com> --- M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java D hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java D hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java 4 files changed, 247 insertions(+), 1,164 deletions(-) Approvals: Jenkins: Verified; No violations found; ; Verified Xikui Wang: Looks good to me, approved diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java index 45ec44b..091cc44 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java @@ -45,7 +45,6 @@ import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily; import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory; @@ -55,7 +54,6 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor; import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -65,7 +63,6 @@ // The maximum number of in-memory frames that this hash join can use. private final int memSizeInFrames; private final int maxInputBuildSizeInFrames; - private final int aveRecordsPerFrame; private final double fudgeFactor; private static final Logger LOGGER = LogManager.getLogger(); @@ -76,7 +73,6 @@ super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities); this.memSizeInFrames = memSizeInFrames; this.maxInputBuildSizeInFrames = maxInputSizeInFrames; - this.aveRecordsPerFrame = aveRecordsPerFrame; this.fudgeFactor = fudgeFactor; if (LOGGER.isTraceEnabled()) { LOGGER.trace("HybridHashJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType=" @@ -117,8 +113,6 @@ int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]); int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]); IVariableTypeEnvironment env = context.getTypeEnvironment(op); - IBinaryHashFunctionFactory[] hashFunFactories = - JobGenHelper.variablesToBinaryHashFunctionFactories(keysLeftBranch, env, context); IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies(keysLeftBranch, env, context); IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length]; @@ -138,21 +132,9 @@ JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context); IOperatorDescriptorRegistry spec = builder.getJobSpec(); IOperatorDescriptor opDesc; - boolean optimizedHashJoin = true; - for (IBinaryHashFunctionFamily family : hashFunFamilies) { - if (family == null) { - optimizedHashJoin = false; - break; - } - } - if (optimizedHashJoin) { - opDesc = generateOptimizedHashJoinRuntime(context, inputSchemas, keysLeft, keysRight, hashFunFamilies, - comparatorFactories, predEvaluatorFactory, recDescriptor, spec); - } else { - opDesc = generateHashJoinRuntime(context, inputSchemas, keysLeft, keysRight, hashFunFactories, - comparatorFactories, predEvaluatorFactory, recDescriptor, spec); - } + opDesc = generateOptimizedHashJoinRuntime(context, inputSchemas, keysLeft, keysRight, hashFunFamilies, + comparatorFactories, predEvaluatorFactory, recDescriptor, spec); opDesc.setSourceLocation(op.getSourceLocation()); contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc); @@ -160,28 +142,6 @@ builder.contributeGraphEdge(src1, 0, op, 0); ILogicalOperator src2 = op.getInputs().get(1).getValue(); builder.contributeGraphEdge(src2, 0, op, 1); - } - - private IOperatorDescriptor generateHashJoinRuntime(JobGenContext context, IOperatorSchema[] inputSchemas, - int[] keysLeft, int[] keysRight, IBinaryHashFunctionFactory[] hashFunFactories, - IBinaryComparatorFactory[] comparatorFactories, IPredicateEvaluatorFactory predEvaluatorFactory, - RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec) throws AlgebricksException { - switch (kind) { - case INNER: - return new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(), maxInputBuildSizeInFrames, - aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight, hashFunFactories, - comparatorFactories, recDescriptor, predEvaluatorFactory, false, null); - case LEFT_OUTER: - IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1].getSize()]; - for (int j = 0; j < nonMatchWriterFactories.length; j++) { - nonMatchWriterFactories[j] = context.getMissingWriterFactory(); - } - return new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(), maxInputBuildSizeInFrames, - aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight, hashFunFactories, - comparatorFactories, recDescriptor, predEvaluatorFactory, true, nonMatchWriterFactories); - default: - throw new NotImplementedException(); - } } private IOperatorDescriptor generateOptimizedHashJoinRuntime(JobGenContext context, IOperatorSchema[] inputSchemas, diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java deleted file mode 100644 index 034b054..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java +++ /dev/null @@ -1,577 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.std.join; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.ActivityId; -import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; -import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.TaskId; -import org.apache.hyracks.api.dataflow.value.IBinaryComparator; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; -import org.apache.hyracks.api.dataflow.value.IMissingWriter; -import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; -import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; -import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; -import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory; -import org.apache.hyracks.dataflow.common.data.partition.RepartitionComputerFactory; -import org.apache.hyracks.dataflow.common.io.RunFileReader; -import org.apache.hyracks.dataflow.common.io.RunFileWriter; -import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; -import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; -import org.apache.hyracks.dataflow.std.base.AbstractStateObject; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; -import org.apache.hyracks.dataflow.std.structures.ISerializableTable; -import org.apache.hyracks.dataflow.std.structures.SimpleSerializableHashTable; -import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator; - -public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor { - private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0; - private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1; - - private final int memsize; - private static final long serialVersionUID = 1L; - private final int inputsize0; - private final double factor; - private final int recordsPerFrame; - private final int[] keys0; - private final int[] keys1; - private final IBinaryHashFunctionFactory[] hashFunctionFactories; - private final IBinaryComparatorFactory[] comparatorFactories; - private final IPredicateEvaluatorFactory predEvaluatorFactory; - private final boolean isLeftOuter; - private final IMissingWriterFactory[] nonMatchWriterFactories1; - - /** - * @param spec - * @param memsize - * in frames - * @param inputsize0 - * in frames - * @param recordsPerFrame - * @param factor - * @param keys0 - * @param keys1 - * @param hashFunctionFactories - * @param comparatorFactories - * @param recordDescriptor - */ - public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, - int recordsPerFrame, double factor, int[] keys0, int[] keys1, - IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories, - RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter, - IMissingWriterFactory[] nullWriterFactories1) { - super(spec, 2, 1); - this.memsize = memsize; - this.inputsize0 = inputsize0; - this.factor = factor; - this.recordsPerFrame = recordsPerFrame; - this.keys0 = keys0; - this.keys1 = keys1; - this.hashFunctionFactories = hashFunctionFactories; - this.comparatorFactories = comparatorFactories; - this.predEvaluatorFactory = predEvalFactory; - this.isLeftOuter = isLeftOuter; - this.nonMatchWriterFactories1 = nullWriterFactories1; - outRecDescs[0] = recordDescriptor; - } - - @Override - public void contributeActivities(IActivityGraphBuilder builder) { - ActivityId p1Aid = new ActivityId(odId, BUILD_AND_PARTITION_ACTIVITY_ID); - ActivityId p2Aid = new ActivityId(odId, PARTITION_AND_JOIN_ACTIVITY_ID); - BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(p1Aid, p2Aid); - PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(p2Aid, p1Aid); - - builder.addActivity(this, phase1); - builder.addSourceEdge(1, phase1, 0); - - builder.addActivity(this, phase2); - builder.addSourceEdge(0, phase2, 0); - - builder.addBlockingEdge(phase1, phase2); - - builder.addTargetEdge(0, phase2, 0); - } - - public static class BuildAndPartitionTaskState extends AbstractStateObject { - private RunFileWriter[] fWriters; - private InMemoryHashJoin joiner; - private int nPartitions; - private int memoryForHashtable; - - public BuildAndPartitionTaskState() { - } - - private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) { - super(jobId, taskId); - } - - @Override - public void toBytes(DataOutput out) throws IOException { - - } - - @Override - public void fromBytes(DataInput in) throws IOException { - - } - - } - - private class BuildAndPartitionActivityNode extends AbstractActivityNode { - private static final long serialVersionUID = 1L; - - private final ActivityId joinAid; - - public BuildAndPartitionActivityNode(ActivityId id, ActivityId joinAid) { - super(id); - this.joinAid = joinAid; - } - - @Override - public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) - throws HyracksDataException { - final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(joinAid, 0); - final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); - final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length]; - for (int i = 0; i < comparatorFactories.length; ++i) { - comparators[i] = comparatorFactories[i].createBinaryComparator(); - } - final IMissingWriter[] nullWriters1 = - isLeftOuter ? new IMissingWriter[nonMatchWriterFactories1.length] : null; - if (isLeftOuter) { - for (int i = 0; i < nonMatchWriterFactories1.length; i++) { - nullWriters1[i] = nonMatchWriterFactories1[i].createMissingWriter(); - } - } - final IPredicateEvaluator predEvaluator = - (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator()); - - IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() { - private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState( - ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition)); - private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(rd1); - private final ITuplePartitionComputer hpcBuild = - new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(ctx); - private final FrameTupleAppender appender = new FrameTupleAppender(); - private final FrameTupleAppender ftappender = new FrameTupleAppender(); - private IFrame[] bufferForPartitions; - private final IFrame inBuffer = new VSizeFrame(ctx); - - @Override - public void close() throws HyracksDataException { - if (state.memoryForHashtable != 0) { - build(inBuffer.getBuffer()); - } - - for (int i = 0; i < state.nPartitions; i++) { - ByteBuffer buf = bufferForPartitions[i].getBuffer(); - accessorBuild.reset(buf); - if (accessorBuild.getTupleCount() > 0) { - write(i, buf); - } - closeWriter(i); - } - - ctx.setStateObject(state); - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - - if (state.memoryForHashtable != memsize - 2) { - accessorBuild.reset(buffer); - int tCount = accessorBuild.getTupleCount(); - for (int i = 0; i < tCount; ++i) { - int entry; - if (state.memoryForHashtable == 0) { - entry = hpcBuild.partition(accessorBuild, i, state.nPartitions); - boolean newBuffer = false; - IFrame bufBi = bufferForPartitions[entry]; - while (true) { - appender.reset(bufBi, newBuffer); - if (appender.append(accessorBuild, i)) { - break; - } else { - write(entry, bufBi.getBuffer()); - bufBi.reset(); - newBuffer = true; - } - } - } else { - entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions)); - if (entry < state.memoryForHashtable) { - while (true) { - if (!ftappender.append(accessorBuild, i)) { - build(inBuffer.getBuffer()); - - ftappender.reset(inBuffer, true); - } else { - break; - } - } - } else { - entry %= state.nPartitions; - boolean newBuffer = false; - IFrame bufBi = bufferForPartitions[entry]; - while (true) { - appender.reset(bufBi, newBuffer); - if (appender.append(accessorBuild, i)) { - break; - } else { - write(entry, bufBi.getBuffer()); - bufBi.reset(); - newBuffer = true; - } - } - } - } - - } - } else { - build(buffer); - } - - } - - private void build(ByteBuffer inBuffer) throws HyracksDataException { - ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.capacity()); - FrameUtils.copyAndFlip(inBuffer, copyBuffer); - state.joiner.build(copyBuffer); - } - - @Override - public void open() throws HyracksDataException { - if (memsize > 1) { - if (memsize > inputsize0) { - state.nPartitions = 0; - } else { - state.nPartitions = - (int) (Math.ceil((inputsize0 * factor / nPartitions - memsize) / (memsize - 1))); - } - if (state.nPartitions <= 0) { - // becomes in-memory HJ - state.memoryForHashtable = memsize - 2; - state.nPartitions = 0; - } else { - state.memoryForHashtable = memsize - state.nPartitions - 2; - if (state.memoryForHashtable < 0) { - state.memoryForHashtable = 0; - state.nPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions)); - } - } - } else { - throw new HyracksDataException("not enough memory"); - } - - ITuplePartitionComputer hpc0 = - new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories).createPartitioner(ctx); - ITuplePartitionComputer hpc1 = - new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(ctx); - int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor); - ISerializableTable table = new SimpleSerializableHashTable(tableSize, ctx); - state.joiner = - new InMemoryHashJoin(ctx, new FrameTupleAccessor(rd0), hpc0, new FrameTupleAccessor(rd1), - rd1, hpc1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, - nullWriters1, table, predEvaluator, null); - bufferForPartitions = new IFrame[state.nPartitions]; - state.fWriters = new RunFileWriter[state.nPartitions]; - for (int i = 0; i < state.nPartitions; i++) { - bufferForPartitions[i] = new VSizeFrame(ctx); - } - - ftappender.reset(inBuffer, true); - } - - @Override - public void fail() throws HyracksDataException { - } - - private void closeWriter(int i) throws HyracksDataException { - RunFileWriter writer = state.fWriters[i]; - if (writer != null) { - writer.close(); - } - } - - private void write(int i, ByteBuffer head) throws HyracksDataException { - RunFileWriter writer = state.fWriters[i]; - if (writer == null) { - FileReference file = ctx.getJobletContext() - .createManagedWorkspaceFile(BuildAndPartitionActivityNode.class.getSimpleName()); - writer = new RunFileWriter(file, ctx.getIoManager()); - writer.open(); - state.fWriters[i] = writer; - } - writer.nextFrame(head); - } - }; - return op; - } - } - - private class PartitionAndJoinActivityNode extends AbstractActivityNode { - private static final long serialVersionUID = 1L; - - private final ActivityId buildAid; - - public PartitionAndJoinActivityNode(ActivityId id, ActivityId buildAid) { - super(id); - this.buildAid = buildAid; - } - - @Override - public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) - throws HyracksDataException { - final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); - final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(buildAid, 0); - final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length]; - for (int i = 0; i < comparatorFactories.length; ++i) { - comparators[i] = comparatorFactories[i].createBinaryComparator(); - } - final IMissingWriter[] nullWriters1 = - isLeftOuter ? new IMissingWriter[nonMatchWriterFactories1.length] : null; - if (isLeftOuter) { - for (int i = 0; i < nonMatchWriterFactories1.length; i++) { - nullWriters1[i] = nonMatchWriterFactories1[i].createMissingWriter(); - } - } - final IPredicateEvaluator predEvaluator = - (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator()); - - IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() { - private BuildAndPartitionTaskState state; - private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(rd0); - private final ITuplePartitionComputerFactory hpcf0 = - new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories); - private final ITuplePartitionComputerFactory hpcf1 = - new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories); - private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner(ctx); - - private final FrameTupleAppender appender = new FrameTupleAppender(); - private final FrameTupleAppender ftap = new FrameTupleAppender(); - private final IFrame inBuffer = new VSizeFrame(ctx); - private final IFrame outBuffer = new VSizeFrame(ctx); - private RunFileWriter[] buildWriters; - private RunFileWriter[] probeWriters; - private IFrame[] bufferForPartitions; - - @Override - public void open() throws HyracksDataException { - writer.open(); - state = (BuildAndPartitionTaskState) ctx.getStateObject( - new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition)); - buildWriters = state.fWriters; - probeWriters = new RunFileWriter[state.nPartitions]; - bufferForPartitions = new IFrame[state.nPartitions]; - for (int i = 0; i < state.nPartitions; i++) { - bufferForPartitions[i] = new VSizeFrame(ctx); - } - appender.reset(outBuffer, true); - ftap.reset(inBuffer, true); - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - if (state.memoryForHashtable != memsize - 2) { - accessorProbe.reset(buffer); - int tupleCount0 = accessorProbe.getTupleCount(); - for (int i = 0; i < tupleCount0; ++i) { - - int entry; - if (state.memoryForHashtable == 0) { - entry = hpcProbe.partition(accessorProbe, i, state.nPartitions); - boolean newBuffer = false; - IFrame outbuf = bufferForPartitions[entry]; - while (true) { - appender.reset(outbuf, newBuffer); - if (appender.append(accessorProbe, i)) { - break; - } else { - write(entry, outbuf.getBuffer()); - outbuf.reset(); - newBuffer = true; - } - } - } else { - entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions)); - if (entry < state.memoryForHashtable) { - while (true) { - if (!ftap.append(accessorProbe, i)) { - state.joiner.join(inBuffer.getBuffer(), writer); - ftap.reset(inBuffer, true); - } else { - break; - } - } - - } else { - entry %= state.nPartitions; - boolean newBuffer = false; - IFrame outbuf = bufferForPartitions[entry]; - while (true) { - appender.reset(outbuf, newBuffer); - if (appender.append(accessorProbe, i)) { - break; - } else { - write(entry, outbuf.getBuffer()); - outbuf.reset(); - newBuffer = true; - } - } - } - } - } - } else { - state.joiner.join(buffer, writer); - } - } - - @Override - public void close() throws HyracksDataException { - try { - try { - state.joiner.join(inBuffer.getBuffer(), writer); - state.joiner.completeJoin(writer); - } finally { - state.joiner.releaseMemory(); - } - ITuplePartitionComputer hpcRep0 = - new RepartitionComputerFactory(state.nPartitions, hpcf0).createPartitioner(ctx); - ITuplePartitionComputer hpcRep1 = - new RepartitionComputerFactory(state.nPartitions, hpcf1).createPartitioner(ctx); - if (state.memoryForHashtable != memsize - 2) { - for (int i = 0; i < state.nPartitions; i++) { - ByteBuffer buf = bufferForPartitions[i].getBuffer(); - accessorProbe.reset(buf); - if (accessorProbe.getTupleCount() > 0) { - write(i, buf); - } - closeWriter(i); - } - - inBuffer.reset(); - int tableSize = -1; - if (state.memoryForHashtable == 0) { - tableSize = (int) (state.nPartitions * recordsPerFrame * factor); - } else { - tableSize = (int) (memsize * recordsPerFrame * factor); - } - ISerializableTable table = new SimpleSerializableHashTable(tableSize, ctx); - for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) { - RunFileWriter buildWriter = buildWriters[partitionid]; - RunFileWriter probeWriter = probeWriters[partitionid]; - if ((buildWriter == null && !isLeftOuter) || probeWriter == null) { - continue; - } - table.reset(); - InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(rd0), - hpcRep0, new FrameTupleAccessor(rd1), rd1, hpcRep1, - new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, - nullWriters1, table, predEvaluator, null); - - if (buildWriter != null) { - RunFileReader buildReader = buildWriter.createDeleteOnCloseReader(); - try { - buildReader.open(); - while (buildReader.nextFrame(inBuffer)) { - ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize()); - FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer); - joiner.build(copyBuffer); - inBuffer.reset(); - } - } finally { - buildReader.close(); - } - } - - // probe - RunFileReader probeReader = probeWriter.createDeleteOnCloseReader(); - try { - probeReader.open(); - try { - while (probeReader.nextFrame(inBuffer)) { - joiner.join(inBuffer.getBuffer(), writer); - inBuffer.reset(); - } - joiner.completeJoin(writer); - } finally { - joiner.releaseMemory(); - } - } finally { - probeReader.close(); - } - } - } - } finally { - writer.close(); - } - } - - private void closeWriter(int i) throws HyracksDataException { - RunFileWriter writer = probeWriters[i]; - if (writer != null) { - writer.close(); - } - } - - private void write(int i, ByteBuffer head) throws HyracksDataException { - RunFileWriter writer = probeWriters[i]; - if (writer == null) { - FileReference file = - ctx.createManagedWorkspaceFile(PartitionAndJoinActivityNode.class.getSimpleName()); - writer = new RunFileWriter(file, ctx.getIoManager()); - writer.open(); - probeWriters[i] = writer; - } - writer.nextFrame(head); - } - - @Override - public void fail() throws HyracksDataException { - writer.fail(); - } - }; - return op; - } - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java deleted file mode 100644 index 289f8ae..0000000 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.tests.integration; - -import java.io.File; -import java.util.Arrays; - -import org.apache.hyracks.api.constraints.PartitionConstraintHelper; -import org.apache.hyracks.api.dataflow.IConnectorDescriptor; -import org.apache.hyracks.api.dataflow.IOperatorDescriptor; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.io.FileSplit; -import org.apache.hyracks.api.io.ManagedFileSplit; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; -import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily; -import org.apache.hyracks.data.std.primitive.UTF8StringPointable; -import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; -import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory; -import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; -import org.apache.hyracks.dataflow.std.connectors.MToNBroadcastConnectorDescriptor; -import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; -import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider; -import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory; -import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor; -import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory; -import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor; -import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor; -import org.junit.Test; - -public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegrationTest { - - private static boolean DEBUG = false; - - static RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - static RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - static RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - static IValueParserFactory[] custValueParserFactories = new IValueParserFactory[custDesc.getFieldCount()]; - static IValueParserFactory[] orderValueParserFactories = new IValueParserFactory[ordersDesc.getFieldCount()]; - - static { - Arrays.fill(custValueParserFactories, UTF8StringParserFactory.INSTANCE); - Arrays.fill(orderValueParserFactories, UTF8StringParserFactory.INSTANCE); - } - - private IOperatorDescriptor getPrinter(JobSpecification spec, String path) { - IFileSplitProvider outputSplitProvider = - new ConstantFileSplitProvider(new FileSplit[] { new ManagedFileSplit(NC1_ID, path) }); - - return DEBUG ? new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|") - : new NullSinkOperatorDescriptor(spec); - } - - @Test - public void customerOrderCIDHybridHashJoin_Case1() throws Exception { - JobSpecification spec = new JobSpecification(); - FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, - "data" + File.separator + "tpch0.001" + File.separator + "customer4.tbl") }; - IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - - FileSplit[] ordersSplits = new FileSplit[] { - new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders4.tbl") }; - - IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); - - FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); - - OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 243, - 1.2, new int[] { 0 }, new int[] { 1 }, - new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }, - new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, - new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), - new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), - null); - - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); - - String path = getClass().getName() + File.separator + "case1"; - IOperatorDescriptor printer = getPrinter(spec, path); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); - - IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec); - spec.connect(custJoinConn, custScanner, 0, join, 0); - - IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec); - spec.connect(ordJoinConn, ordScanner, 0, join, 1); - - IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec); - spec.connect(joinPrinterConn, join, 0, printer, 0); - - spec.addRoot(printer); - runTest(spec); - System.out.println("output to " + path); - } - - @Test - public void customerOrderCIDHybridHashJoin_Case2() throws Exception { - JobSpecification spec = new JobSpecification(); - - FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, - "data" + File.separator + "tpch0.001" + File.separator + "customer3.tbl") }; - IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - - FileSplit[] ordersSplits = new FileSplit[] { - new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders4.tbl") }; - - IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - - FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); - - FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); - - OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 122, - 1.2, new int[] { 0 }, new int[] { 1 }, - new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }, - new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, - new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), - new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), - null); - - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); - - String path = getClass().getName() + File.separator + "case2"; - IOperatorDescriptor printer = getPrinter(spec, path); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); - - IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec); - spec.connect(custJoinConn, custScanner, 0, join, 0); - - IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec); - spec.connect(ordJoinConn, ordScanner, 0, join, 1); - - IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec); - spec.connect(joinPrinterConn, join, 0, printer, 0); - - spec.addRoot(printer); - runTest(spec); - System.out.println("output to " + path); - } - - @Test - public void customerOrderCIDHybridHashJoin_Case3() throws Exception { - - JobSpecification spec = new JobSpecification(); - - FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, - "data" + File.separator + "tpch0.001" + File.separator + "customer3.tbl") }; - IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - - FileSplit[] ordersSplits = new FileSplit[] { - new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders1.tbl") }; - - IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - - FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); - - FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); - - OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 6, 122, - 1.2, new int[] { 0 }, new int[] { 1 }, - new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }, - new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, - new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), - new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), - null); - - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); - - String path = getClass().getName() + File.separator + "case3"; - IOperatorDescriptor printer = getPrinter(spec, path); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); - - IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec); - spec.connect(custJoinConn, custScanner, 0, join, 0); - - IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec); - spec.connect(ordJoinConn, ordScanner, 0, join, 1); - - IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec); - spec.connect(joinPrinterConn, join, 0, printer, 0); - - spec.addRoot(printer); - runTest(spec); - System.out.println("output to " + path); - } - -} diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java index 6154e28..a87dc1e 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java @@ -19,12 +19,14 @@ package org.apache.hyracks.tests.integration; import java.io.File; +import java.util.Arrays; import org.apache.hyracks.api.constraints.PartitionConstraintHelper; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; +import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily; import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; @@ -32,8 +34,10 @@ import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.result.ResultSetId; +import org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHashFunctionFamily; import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory; +import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory; @@ -46,9 +50,12 @@ import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory; import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor; +import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor; import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor; +import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory; +import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor; import org.apache.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor; +import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor; import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor; import org.apache.hyracks.tests.util.NoopMissingWriterFactory; import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider; @@ -68,6 +75,40 @@ * NULL, O_COMMENT VARCHAR(79) NOT NULL ); */ + private static boolean DEBUG = false; + + static RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); + + static RecordDescriptor ordersDesc = + new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); + + static RecordDescriptor custOrderJoinDesc = + new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); + + static IValueParserFactory[] custValueParserFactories = new IValueParserFactory[custDesc.getFieldCount()]; + static IValueParserFactory[] orderValueParserFactories = new IValueParserFactory[ordersDesc.getFieldCount()]; + + static { + Arrays.fill(custValueParserFactories, UTF8StringParserFactory.INSTANCE); + Arrays.fill(orderValueParserFactories, UTF8StringParserFactory.INSTANCE); + } + @Test public void customerOrderCIDJoin() throws Exception { JobSpecification spec = new JobSpecification(); @@ -75,50 +116,17 @@ FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 }, @@ -155,57 +163,28 @@ FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); - HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 32, 20, 200, 1.2, - new int[] { 1 }, new int[] { 0 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, + OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 32, 20, + 1.2, new int[] { 1 }, new int[] { 0 }, + new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, null, false, null); + custOrderJoinDesc, + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), null, + false, null); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); ResultSetId rsId = new ResultSetId(1); @@ -235,50 +214,17 @@ FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()]; @@ -320,50 +266,17 @@ FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()]; @@ -371,11 +284,14 @@ nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE; } - HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 32, 20, 200, 1.2, - new int[] { 0 }, new int[] { 1 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, + OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 32, 20, + 1.2, new int[] { 0 }, new int[] { 1 }, + new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, null, true, nonMatchWriterFactories); + custOrderJoinDesc, + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null, + true, nonMatchWriterFactories); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); ResultSetId rsId = new ResultSetId(1); @@ -408,11 +324,6 @@ new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer-part2.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, @@ -420,41 +331,13 @@ new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 }, @@ -498,11 +381,6 @@ new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer-part2.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, @@ -510,48 +388,24 @@ new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); - HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 100, 1.2, + OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 5, 20, 1.2, new int[] { 1 }, new int[] { 0 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, + new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, null, false, null); + custOrderJoinDesc, + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), null, + false, null); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID); ResultSetId rsId = new ResultSetId(1); @@ -588,11 +442,6 @@ new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer-part2.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, @@ -600,41 +449,13 @@ new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 }, @@ -678,11 +499,6 @@ new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer-part2.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, @@ -690,41 +506,13 @@ new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); MaterializingOperatorDescriptor ordMat = new MaterializingOperatorDescriptor(spec, ordersDesc); @@ -769,4 +557,161 @@ spec.addRoot(printer); runTest(spec); } + + @Test + public void customerOrderCIDHybridHashJoin_Case1() throws Exception { + JobSpecification spec = new JobSpecification(); + FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, + "data" + File.separator + "tpch0.001" + File.separator + "customer4.tbl") }; + IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); + + FileSplit[] ordersSplits = new FileSplit[] { + new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders4.tbl") }; + + IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); + FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); + + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); + + OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 243, + 1.2, new int[] { 0 }, new int[] { 1 }, + new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }, + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, + custOrderJoinDesc, + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), + null); + + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); + + String path = getClass().getName() + File.separator + "case1"; + IOperatorDescriptor printer = getPrinter(spec, path); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); + + IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec); + spec.connect(custJoinConn, custScanner, 0, join, 0); + + IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec); + spec.connect(ordJoinConn, ordScanner, 0, join, 1); + + IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec); + spec.connect(joinPrinterConn, join, 0, printer, 0); + + spec.addRoot(printer); + runTest(spec); + System.out.println("output to " + path); + } + + @Test + public void customerOrderCIDHybridHashJoin_Case2() throws Exception { + JobSpecification spec = new JobSpecification(); + + FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, + "data" + File.separator + "tpch0.001" + File.separator + "customer3.tbl") }; + IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); + + FileSplit[] ordersSplits = new FileSplit[] { + new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders4.tbl") }; + + IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); + + FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); + + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); + + OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 122, + 1.2, new int[] { 0 }, new int[] { 1 }, + new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }, + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, + custOrderJoinDesc, + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), + null); + + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); + + String path = getClass().getName() + File.separator + "case2"; + IOperatorDescriptor printer = getPrinter(spec, path); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); + + IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec); + spec.connect(custJoinConn, custScanner, 0, join, 0); + + IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec); + spec.connect(ordJoinConn, ordScanner, 0, join, 1); + + IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec); + spec.connect(joinPrinterConn, join, 0, printer, 0); + + spec.addRoot(printer); + runTest(spec); + System.out.println("output to " + path); + } + + @Test + public void customerOrderCIDHybridHashJoin_Case3() throws Exception { + + JobSpecification spec = new JobSpecification(); + + FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, + "data" + File.separator + "tpch0.001" + File.separator + "customer3.tbl") }; + IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); + + FileSplit[] ordersSplits = new FileSplit[] { + new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders1.tbl") }; + + IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); + + FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); + + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); + + OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 6, 122, + 1.2, new int[] { 0 }, new int[] { 1 }, + new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }, + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, + custOrderJoinDesc, + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), + null); + + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); + + String path = getClass().getName() + File.separator + "case3"; + IOperatorDescriptor printer = getPrinter(spec, path); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); + + IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec); + spec.connect(custJoinConn, custScanner, 0, join, 0); + + IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec); + spec.connect(ordJoinConn, ordScanner, 0, join, 1); + + IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec); + spec.connect(joinPrinterConn, join, 0, printer, 0); + + spec.addRoot(printer); + runTest(spec); + System.out.println("output to " + path); + } + + private IOperatorDescriptor getPrinter(JobSpecification spec, String path) { + IFileSplitProvider outputSplitProvider = + new ConstantFileSplitProvider(new FileSplit[] { new ManagedFileSplit(NC1_ID, path) }); + + return DEBUG ? new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|") + : new NullSinkOperatorDescriptor(spec); + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/3023 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I6ed612cc233af1b78d453c7b711077b82e721e82 Gerrit-PatchSet: 6 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Xikui Wang <xkk...@gmail.com> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Dmitry Lychagin <dmitry.lycha...@couchbase.com> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Xikui Wang <xkk...@gmail.com>